Create Managed Airflow environment.

Check my progress

/ 25

Create two Cloud Storage buckets.

Check my progress

/ 25

Create a dataset.

Check my progress

/ 25

Uploading the DAG and dependencies to Cloud Storage

Check my progress

/ 25

This lab may incorporate AI tools to support your learning.

GSP283

Google Cloud 自學實驗室

總覽

假設您的資料集分散在世界各地,且資料存放在 Google Cloud Storage bucket 或 BigQuery 資料表中,該如何整理這些資料,才能有效整合並加以分析,進而取得業務洞察結果?

Managed Service for Apache Airflow 可協助您透過符合直覺的圖形介面,建構、排定及監控工作流程,在不同區域和儲存系統間移動及處理資料。這項服務提供包含各種運算子和整合功能的彈性架構,能讓您以可靠的方式在 BigQuery 和 Cloud Storage 等服務之間移轉資料。

在本實驗室,您將使用 Managed Service for Apache Airflow 建立並執行 Apache Airflow 工作流程,完成下列工作:

  • 從設定檔中讀取要複製的資料表清單
  • 根據設定檔中的清單,將位於美國的 BigQuery 資料集內的特定資料表匯出至 Cloud Storage
  • 將匯出的資料表從美國 bucket 複製到歐盟區域的 Cloud Storage bucket
  • 將設定檔中的清單所列的資料表匯入歐盟區域的目標 BigQuery 資料集

DAG「Graph」檢視畫面

學習內容

本實驗室的內容包括:

  • 建立 Managed Airflow 環境。
  • 建立 Cloud Storage bucket。
  • 建立 BigQuery 資料集。
  • 在 Managed Airflow 中建立並執行 Apache Airflow 工作流程,在 Cloud Storage 與 BigQuery 之間移動資料。

設定和需求

瞭解以下事項後,再點選「Start Lab」按鈕

請詳閱以下操作說明。實驗室活動會計時,且中途無法暫停。點選「Start Lab」後就會開始計時,顯示可使用 Google Cloud 資源的時間。

您將在真正的雲端環境完成實作實驗室活動,而不是模擬或示範環境。為此,我們會提供新的暫時憑證,供您在實驗室活動期間登入及存取 Google Cloud。

為了順利完成這個實驗室,請先確認:

  • 可以使用標準的網際網路瀏覽器 (Chrome 瀏覽器為佳)。
注意事項:請使用無痕模式 (建議選項) 或私密瀏覽視窗執行此實驗室,這可以防止個人帳戶和學員帳戶之間的衝突,避免個人帳戶產生額外費用。
  • 是時候完成實驗室活動了!別忘了,活動一旦開始將無法暫停。
注意事項:務必使用實驗室專用的學員帳戶。如果使用其他 Google Cloud 帳戶,可能會產生額外費用。

如何開始研究室及登入 Google Cloud 控制台

  1. 點選「Start Lab」按鈕。如果實驗室會產生費用,畫面上會出現選擇付款方式的對話方塊。左側的「Lab Details」窗格會顯示下列項目:

    • 「Open Google Cloud console」按鈕
    • 剩餘時間
    • 必須在這個研究室中使用的臨時憑證
    • 完成這個實驗室所需的其他資訊 (如有)
  2. 點選「Open Google Cloud console」;如果使用 Chrome 瀏覽器,也能按一下滑鼠右鍵,選取「在無痕視窗中開啟連結」

    接著,實驗室會啟動相關資源,並開啟另一個分頁,顯示「登入」頁面。

    提示:您可以在不同的視窗中並排開啟分頁。

    注意:如果頁面中顯示「選擇帳戶」對話方塊,請點選「使用其他帳戶」
  3. 如有必要,請將下方的 Username 貼到「登入」對話方塊。

    {{{user_0.username | "Username"}}}

    您也可以在「Lab Details」窗格找到 Username。

  4. 點選「下一步」

  5. 複製下方的 Password,並貼到「歡迎使用」對話方塊。

    {{{user_0.password | "Password"}}}

    您也可以在「Lab Details」窗格找到 Password。

  6. 點選「下一步」

    重要事項:請務必使用實驗室提供的憑證,而非自己的 Google Cloud 帳戶憑證。 注意:如果使用自己的 Google Cloud 帳戶來進行這個實驗室,可能會產生額外費用。
  7. 按過後續的所有頁面:

    • 接受條款及細則。
    • 由於這是臨時帳戶,請勿新增救援選項或雙重驗證機制。
    • 請勿申請免費試用。

Google Cloud 控制台稍後會在這個分頁開啟。

注意:如要使用 Google Cloud 產品和服務,請點選「導覽選單」,或在「搜尋」欄位輸入服務或產品名稱。「導覽選單」圖示和搜尋欄位

啟動 Cloud Shell

Cloud Shell 是搭載多項開發工具的虛擬機器,提供永久的 5 GB 主目錄,而且在 Google Cloud 中運作。Cloud Shell 提供指令列存取權,方便您使用 Google Cloud 資源。

  1. 點按 Google Cloud 控制台頂端的「啟用 Cloud Shell」圖示 「啟動 Cloud Shell」圖示

  2. 系統顯示視窗時,請按照下列步驟操作:

    • 繼續操作 Cloud Shell 視窗。
    • 授權 Cloud Shell 使用您的憑證發出 Google Cloud API 呼叫。

連線建立完成即代表已通過驗證,而且專案已設為您的 Project_ID。輸出內容中有一行文字,宣告本工作階段的 Project_ID

Your Cloud Platform project in this session is set to {{{project_0.project_id | "PROJECT_ID"}}}

gcloud 是 Google Cloud 的指令列工具,已預先安裝於 Cloud Shell,並支援 Tab 鍵自動完成功能。

  1. (選用) 您可以執行下列指令來列出使用中的帳戶:
gcloud auth list
  1. 點按「授權」

輸出內容:

ACTIVE: * ACCOUNT: {{{user_0.username | "ACCOUNT"}}} To set the active account, run: $ gcloud config set account `ACCOUNT`
  1. (選用) 您可以使用下列指令來列出專案 ID:
gcloud config list project

輸出內容:

[core] project = {{{project_0.project_id | "PROJECT_ID"}}} 注意:如需 gcloud 的完整說明,請前往 Google Cloud 參閱 gcloud CLI 總覽指南

工作 1:建立 Managed Airflow 環境

  1. 前往 Google Cloud 控制台,在標題列的「搜尋」欄位輸入 Managed Airflow,然後點選「產品與頁面」專區中的「Managed Airflow」來建立 Managed Airflow 環境。

  2. 接著點選「建立環境」

  3. 在下拉式選單中選取「代管 Airflow 第 3 代」

  4. 為環境設定下列參數:

    • 「名稱」:airflow-advanced-lab

    • 「位置」:

    • 「映像檔版本」:composer-3-airflow-2.n.n-build.n (選取版本編號最高的映像檔)

    • 「服務帳戶」:Compute Engine 預設服務帳戶

    • 「彈性模式」:選取「標準彈性」,然後在「Airflow 資料庫可用區」部分選取「

    • 在「環境資源」下選取「小」

    • 在「網路伺服器網路存取控管」部分,選取「允許所有 IP 位址傳出的存取要求」

    其他設定皆保留預設值。

  5. 點選「建立」

如果在 Cloud 控制台的「環境」頁面的環境名稱左側顯示綠色勾號,表示環境已建立完畢。

注意:環境最多可能需要 20 分鐘才能完成設定程序。請前往下一節「建立 Cloud Storage bucket 和 BigQuery 目的地資料集」。

點選「Check my progress」,確認目標已達成。

建立 Managed Airflow 環境。

工作 2:建立 Cloud Storage bucket

在這項工作中,您將建立兩個 Cloud Storage 多區域 bucket,用於在不同區域之間複製匯出的資料表,例如從美國複製到歐盟區域。

建立位於美國的 bucket

  1. 依序前往「Cloud Storage」>「Bucket」,然後按一下「建立」
  2. 為 bucket 命名,這個名稱在全域範圍內皆不得重複,且須包含專案 ID (例如 -us)。
  3. 在「位置類型」部分,選取「美國 (多個美國地區)」
  4. 其他設定均保留預設值,然後按一下「建立」
  5. 勾選「強制禁止公開存取這個 bucket」方塊,如果顯示「系統會禁止公開存取」彈出式視窗,請點選「確認」

建立位於歐盟地區的 bucket

重複上述步驟,建立另一個位於 EU 區域的 bucket。這個 bucket 的名稱在全域範圍內皆不得重複,且後面須加上位置 (例如 -eu)。

點選「Check my progress」,確認目標已達成。

建立兩個 Cloud Storage bucket。

工作 3:建立 BigQuery 目的地資料集

  1. 透過 BigQuery 新網頁版 UI 建立歐盟區域的目的地 BigQuery 資料集。

  2. 依序前往「導覽選單」>「BigQuery」

接著,畫面上會顯示「歡迎使用 Cloud 控制台中的 BigQuery」訊息方塊,當中會列出快速入門指南的連結和 UI 更新內容。

  1. 點選「完成」

  2. 接著按一下 Qwiklabs 專案 ID 旁的三點圖示,然後選取「建立資料集」

醒目顯示「建立資料集」選項的畫面

  1. 在「資料集 ID」部分,輸入 nyc_tlc_EU,然後在「資料位置」下拉式選單中,選取「歐盟 (多個歐盟區域)」

「資料集 ID」文字欄位和「資料位置」下拉式選單

  1. 點選「建立資料集」

點選「Check my progress」,確認目標已達成。

建立 BigQuery 資料集。

工作 4:Airflow 與核心概念簡介

  • 在環境建立期間,請查看您將在本實驗室中使用的範例檔案。

Airflow 這個平台能讓您以程式輔助方式建立、監控工作流程及安排時程。

您可以透過 Airflow 建立工作流程,設計成由多項工作組成的有向無環圖 (DAG)。Airflow 排程器會根據指定的依附關係,在多個 worker 上執行工作。

核心概念

DAG:有向無環圖是工作的集合,編排方式會反映出工作之間的關聯和依附關係。

運算子:單一工作的說明,通常是最小的處理單位。舉例來說,「BashOperator」可用來執行 Bash 指令。

工作:運算子的參數化執行個體,在 DAG 中為節點。

工作執行個體:工作的特定執行作業,由 DAG、工作和時間點這三個要素構成,而且會透過以下狀態表示執行狀況:「執行中」、「成功」、「失敗」、「略過」...

詳情請參閱概念說明文件

工作 5:定義工作流程

Apache Airflow 中的工作流程是由 DAG (有向無環圖) 組成。bq_copy_across_locations.py 中的程式碼代表工作流程,也就是所謂的 DAG。請開啟這個檔案,瞭解 DAG 的建構方式。下一節將詳細介紹其主要元件。

為了自動調度管理工作流程中的各項工作,DAG 會匯入下列運算子:

  1. DummyOperator:建立「開始」和「結束」虛擬工作,讓 DAG 看起來更清楚易懂。
  2. BigQueryToCloudStorageOperator:將 BigQuery 資料表以 Avro 格式匯出到 Cloud Storage bucket。
  3. GoogleCloudStorageToGoogleCloudStorageOperator:在 Cloud Storage bucket 之間複製檔案。
  4. GoogleCloudStorageToBigQueryOperator:從 Cloud Storage bucket 中的 Avro 檔案匯入資料表。
  • 本例定義的 read_table_list() 函式是用來讀取設定檔,並建立要複製的資料表清單:
# -------------------------------------------------------------------------------- # Functions # -------------------------------------------------------------------------------- def read_table_list(table_list_file): """ 讀取主 CSV 檔案,該檔案用於在 DAG 中動態建立 Airflow 工作。 :param table_list_file: (String) 主檔案的位置,例如 '/home/airflow/framework/master.csv' :return master_record_all: (List) Python 字典的清單,每個字典皆包含主 CSV 檔案某一列的資訊。 """ master_record_all = [] logger.info('Reading table_list_file from : %s' % str(table_list_file)) try: with open(table_list_file, 'rb') as csv_file: csv_reader = csv.reader(csv_file) next(csv_reader) # 略過標頭 for row in csv_reader: logger.info(row) master_record = { 'table_source': row[0], 'table_dest': row[1] } master_record_all.append(master_record) return master_record_all except IOError as e: logger.error('Error opening table_list_file %s: ' % str( table_list_file), e)
  • DAG 的名稱為 bq_copy_us_to_eu_01,且預設不會排定執行時程,因此需要手動觸發。
default_args = { 'owner': 'airflow', 'start_date': datetime.today(), 'depends_on_past': False, 'email': [''], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } # DAG 物件。 with models.DAG('bq_copy_us_to_eu_01', default_args=default_args, schedule_interval=None) as dag:
  • 為設定 Cloud Storage 外掛程式,必須定義 Cloud StoragePlugin(AirflowPlugin) 類別,對應從 Airflow 1.10-stable 分支版本下載的掛鉤和運算子。
# 從外掛程式匯入運算子 from gcs_plugin.operators import gcs_to_gcs

工作 6:查看環境資訊

  1. 返回「Managed Airflow」,確認環境狀態。

  2. 環境建立後,按一下環境名稱即可查看詳細資料。

「環境詳細資料」頁面有各種資訊,例如 Airflow 網頁 UI 網址、Google Kubernetes Engine 叢集 ID、連結 DAG 資料夾的 Cloud Storage bucket 名稱。

環境設定頁面

注意:代管的 Apache Airflow 服務會使用 Cloud Storage 儲存 DAG (又稱為「工作流程」)。每個環境都有相關聯的 Cloud Storage bucket,Airflow 只會為該 bucket 中的 DAG 排程。

接下來的步驟須在 Cloud Shell 中完成。

建立虛擬環境

Python 虛擬環境可用來獨立安裝套件,將其與系統區隔開來。

  1. 安裝 virtualenv 環境:
sudo apt-get install -y virtualenv
  1. 建構虛擬環境:
python3 -m venv venv
  1. 啟用虛擬環境:
source venv/bin/activate

工作 7:為 DAG Cloud Storage bucket 建立變數

  • 在 Cloud Shell 中執行下列指令,從「環境詳細資料」頁面複製 DAG bucket 名稱,然後設定變數,以便在 Cloud Shell 中用來參照該名稱:
注意:請務必更改下列指令中的 DAG bucket 名稱。依序前往「導覽選單」>「Cloud Storage」>「Bucket」,名稱會類似 -airflow-advance-YOURDAGSBUCKET-bucket DAGS_BUCKET=<您的 DAG bucket 名稱>

本實驗室會多次使用這個變數。

工作 8:設定 Airflow 變數

Airflow 變數是 Airflow 的專屬概念,與環境變數不同。在這個步驟中,您將設定以下三個 Airflow 變數,供要部署的 DAG 使用:table_list_file_pathgcs_source_bucketgcs_dest_bucket

詳細資料
table_list_file_path /home/airflow/gcs/dags/bq_copy_eu_to_us_sample.csv 列出來源和目標資料表 (包括資料集) 的 CSV 檔案
gcs_source_bucket {UNIQUE ID}-us 從來源匯出 BigQuery 資料表時要使用的 Cloud Storage bucket
gcs_dest_bucket {UNIQUE ID}-eu 將 BigQuery 資料表匯入目的地時要使用的 Cloud Storage bucket

接下來的 gcloud composer 指令會執行 Airflow CLI 子指令 variables。這個子指令會將引數傳遞至 gcloud 指令列工具。

請為上表中的每一列各執行一次 composer 指令,以便設定前述的那三個變數。指令格式如下:

gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION variables -- \ set KEY VALUE 您可以放心忽略這個 gcloud 錯誤:(ERROR: gcloud crashed (TypeError): 'NoneType' object is not callable)。這是透過 gcloud 410.0.0 版使用 gcloud composer environments run 時會發生的已知問題。即使出現錯誤訊息,變數還是會照常完成設定。
  • ENVIRONMENT_NAME 是環境的名稱。
  • LOCATION 是環境所在的 Compute Engine 區域。執行 gcloud composer 指令前,必須在該指令中加入 --location 標記,或設定預設位置
  • KEYVALUE 用來指定要設定的變數及其值。請在左側含有 gcloud 相關引數的 gcloud 指令,以及右側的 Airflow 子指令相關引數之間,依序加入空格、兩個破折號、空格 ( -- )。此外,如果使用 gcloud composer environments run 指令和變數子指令,KEYVALUE 引數中間也要加入空格。

在 Cloud Shell 中執行下列指令,請將 gcs_source_bucketgcs_dest_bucket 改成您在工作 2 建立的 bucket 名稱。

gcloud composer environments run airflow-advanced-lab \ --location {{{ project_0.default_region | "REGION" }}} variables -- \ set table_list_file_path /home/airflow/gcs/dags/bq_copy_eu_to_us_sample.csv gcloud composer environments run airflow-advanced-lab \ --location {{{ project_0.default_region | "REGION" }}} variables -- \ set gcs_source_bucket {UNIQUE ID}-us gcloud composer environments run airflow-advanced-lab \ --location {{{ project_0.default_region | "REGION" }}} variables -- \ set gcs_dest_bucket {UNIQUE_ID}-eu

如要查看變數的值,請執行 Airflow CLI 子指令 variables 並搭配 get 引數,或使用 Airflow UI

例如,您可以執行下列指令:

gcloud composer environments run airflow-advanced-lab \ --location {{{ project_0.default_region | "REGION" }}} variables -- \ get gcs_source_bucket 注意:請務必設定 DAG 要使用的三個 Airflow 變數。

工作 9:將 DAG 和依附元件上傳至 Cloud Storage

  1. 將 Google Cloud Python 文件範例檔複製到 Cloud Shell:
cd ~ gcloud storage cp -r gs://spls/gsp283/python-docs-samples .
  1. 將第三方掛鉤和運算子副本上傳到 Airflow DAG Cloud Storage bucket 的 plugins 資料夾。
gcloud storage cp -r python-docs-samples/third_party/apache-airflow/plugins/* gs://$DAGS_BUCKET/plugins
  1. 接下來,將 DAG 和設定檔上傳到環境的 Cloud Storage bucket:
gcloud storage cp python-docs-samples/composer/workflows/bq_copy_across_locations.py gs://$DAGS_BUCKET/dags gcloud storage cp python-docs-samples/composer/workflows/bq_copy_eu_to_us_sample.csv gs://$DAGS_BUCKET/dags

代管的 Apache Airflow 服務會自動偵測並在 Airflow 環境中註冊 DAG,更新通常會在幾分鐘內生效。您可以透過 Airflow 網頁介面監控工作狀態,並確認 DAG 排程行為是否符合定義的設定。

工作 10:探索 Airflow UI

請按照下列步驟操作,使用 Cloud 控制台存取 Airflow 網頁介面:

  1. 返回 Managed Airflow 中的「環境」頁面。
  2. 在該環境的「Airflow 網路伺服器」欄中,點選「Airflow」連結。

「Airflow 網路伺服器」欄中顯目顯示的「Airflow」連結

  1. 按一下實驗室憑證。
  2. Airflow 的網頁 UI 會在新瀏覽器視窗中開啟。進入頁面時,資料應該還在載入中。您可以在載入期間繼續執行實驗室的後續步驟。

查看變數

您先前設定的變數會保留在環境中。

  • 在 Airflow 選單列中依序選取「Admin」>「Variables」,即可查看變數。

「Variables」頁面

手動觸發 DAG 執行作業

  1. 按一下「DAG」分頁標籤,然後等待連結載入完畢。

  2. 點選 composer_sample_bq_copy_across_locations 的播放按鈕,以便手動觸發 DAG:

「觸發 DAG」按鈕

  1. 按一下「觸發 DAG」,確認這項操作。

點選「Check my progress」,確認目標已達成。

將 DAG 和依附元件上傳至 Cloud Storage

瞭解 DAG 執行作業

將 DAG 檔案上傳至 DAG 資料夾 (設定的儲存位置) 後,Apache Airflow 會剖析該檔案。如果沒有發現錯誤,工作流程就會出現在 DAG 清單中,並根據排程設定排入執行佇列。如果排程設為「None」,工作流程就不會自動執行,須手動觸發才會執行。

按下播放按鈕後,「Runs」狀態會變成綠色:

綠色的 DAG 執行作業狀態

  1. 按一下 DAG 名稱,開啟 DAG 詳細資料頁面。此頁面以圖形呈現工作流程中的工作和依附關係。

DAG 樹狀檢視

  1. 接著在工具列中點選「Graph」,然後將滑鼠游標懸停在各工作的圖形上,即可查看工作狀態。請注意,每個工作周圍的框線也代表其狀態,例如綠色框線表示執行中,紅色框線表示執行失敗等。

DAG「Graph」檢視畫面

請按照下列步驟操作,在「Graph」檢視畫面再次執行工作流程:

  1. 在 Airflow UI 的「Graph」檢視畫面中,按一下「開始」圖形。
  2. 依序點選「Clear」>「Clear existing task」,重設所有工作,然後點選「Clear DAG run」確認操作。

在程序執行時重新整理瀏覽器,即可查看最新資訊。

工作 11:驗證結果

現在前往 Cloud 控制台的下列頁面,確認工作流程的狀態和結果:

  • 匯出的資料表已從美國 bucket 複製到歐盟區域的 Cloud Storage bucket。按一下「Cloud Storage」,即可查看來源 (美國) 和目的地 (歐盟區域) bucket 的中繼 Avro 檔案。
  • 設定檔中的清單所列的資料表已匯入目標 BigQuery 資料集。請依序點選「BigQuery」、專案名稱和「nyc_tlc_EU」資料集,確認在您建立的資料集中,能否存取這些資料表。

刪除 Cloud Composer 環境

  1. 返回 Cloud Composer 中的「環境」頁面。

  2. 選取您的 Cloud Composer 環境。

  3. 點選「刪除」。

  4. 在開啟的對話方塊點選「刪除」,確認要刪除該 Cloud Composer 環境。

恭喜!

您已透過程式輔助方式,將資料表從美國複製到歐盟區域!本實驗室的內容是根據 David Sabater Dinter 的這篇網誌文章設計。

後續步驟

Google Cloud 教育訓練與認證

協助您瞭解如何充分運用 Google Cloud 的技術。我們的課程會介紹專業技能和最佳做法,讓您可以快速掌握要領並持續進修。我們提供從基本到進階等級的訓練課程,並有隨選、線上和虛擬課程等選項,方便您抽空參加。認證可協助您驗證及證明自己在 Google Cloud 技術方面的技能和專業知識。

使用手冊上次更新日期:2026 年 4 月 20 日

實驗室上次測試日期:2026 年 4 月 20 日

Copyright 2026 Google LLC 保留所有權利。Google 和 Google 標誌是 Google LLC 的商標,其他公司和產品名稱則有可能是其關聯公司的商標。

Before you begin

  1. Labs create a Google Cloud project and resources for a fixed time
  2. Labs have a time limit and no pause feature. If you end the lab, you'll have to restart from the beginning.
  3. On the top left of your screen, click Start lab to begin

Use private browsing

  1. Copy the provided Username and Password for the lab
  2. Click Open console in private mode

Sign in to the Console

  1. Sign in using your lab credentials. Using other credentials might cause errors or incur charges.
  2. Accept the terms, and skip the recovery resource page
  3. Don't click End lab unless you've finished the lab or want to restart it, as it will clear your work and remove the project

This content is not currently available

We will notify you via email when it becomes available

Great!

We will contact you via email if it becomes available

One lab at a time

Confirm to end all existing labs and start this one

Use private browsing to run the lab

Using an Incognito or private browser window is the best way to run this lab. This prevents any conflicts between your personal account and the Student account, which may cause extra charges incurred to your personal account.

Complete this quick step to start your lab.