实验设置说明和要求
保护您的账号和进度。请务必在无痕浏览器窗口中,使用实验凭证运行此实验。

在 Cloud Data Fusion 建構即時管道

实验 1 小时 30 分钟 universal_currency_alt 5 积分 show_chart 入门级
info 此实验可能会提供 AI 工具来支持您学习。
此内容尚未针对移动设备进行优化。
为获得最佳体验,请在桌面设备上访问通过电子邮件发送的链接。

GSP808

Google Cloud 自修研究室標誌

總覽

除了批次管道,您也可以利用 Data Fusion 建立即時管道,即時處理剛產生的事件。目前,即時管道是透過 Cloud Dataproc 叢集上的 Apache Spark Streaming 執行。本實驗室將說明如何使用 Data Fusion 建構串流管道。

您將實際建立管道,利用管道從 Cloud Pub/Sub 主題讀取並處理事件,經資料轉換後,再將輸出結果寫入 BigQuery。

目標

  1. 瞭解如何建立即時管道
  2. 瞭解如何在 Data Fusion 中設定 Pub/Sub 來源外掛程式
  3. 瞭解如何使用 Wrangler,定義如何轉換不支援連線中的資料

設定和需求

每個實驗室都會提供新的 Google Cloud 專案和一組資源,讓您在時限內免費使用。

  1. 請以無痕視窗登入 Google Skills。

  2. 請記下實驗室時間限制 (例如 02:00:00),務必在時限內完成作業。
    研究室不提供暫停功能。如有需要,您可以重新開始,但原先的進度恕無法保留。

  3. 準備就緒之後,請點選「Start Lab」

    注意事項:點選「Start Lab」之後,研究室需要 15 至 20 分鐘來佈建必要資源,並建立 Data Fusion 執行個體。 在此期間,不妨詳閱下方步驟,瞭解研究室的目標。

    執行個體建立完畢之後,左側面板會顯示研究室憑證 (使用者名稱密碼),此時即可繼續登入控制台。
  4. 請記下研究室憑證 (使用者名稱密碼),登入 Google Cloud 控制台時會用到。

  5. 點選「Open Google console」

  6. 點選「Use another account」,然後複製這個研究室的憑證,並貼到提示中。
    如果使用其他憑證,系統會顯示錯誤或向您收取費用

  7. 接受條款,然後略過資源復原頁面。

注意事項:請等到已完成研究室工作或想重新開始時,再點選「End Lab」。這麼做會清除現有工作並移除專案。

登入 Google Cloud 控制台

  1. 在用來進行本實驗室活動的瀏覽器分頁或視窗,複製「Connection Details」面板中的使用者名稱,然後點選「Open Google Console」按鈕。
注意:如果系統要求您選擇帳戶,請點選「使用其他帳戶」
  1. 按照系統提示,依序貼上使用者名稱密碼
  2. 點選「Next」
  3. 接受條款及細則。

這個臨時帳戶只在實驗室期間有效,使用時務必遵守下列規定:

  • 請勿新增救援選項
  • 請勿申請免費試用
  1. 開啟主控台後,點選畫面左上方的「導覽選單」圖示 「導覽選單」圖示,即可查看服務清單。

導覽選單

啟用 Cloud Shell

Cloud Shell 是含有多項開發工具的虛擬機器,提供永久的 5 GB 主目錄,並在 Google Cloud 中運作。Cloud Shell 可讓您透過指令列存取 Google Cloud 資源。gcloud 是 Google Cloud 的指令列工具,已預先安裝於 Cloud Shell,並支援 Tab 鍵完成功能。

  1. 在控制台的右上方,點按「啟用 Cloud Shell」按鈕 「啟用 Cloud Shell」圖示

  2. 點按「繼續」
    請稍候片刻,等待系統完成佈建作業並連線至環境。連線建立後,即代表您已通過驗證,且專案已設為「PROJECT_ID」

指令範例

  • 列出目前使用的帳戶名稱:
gcloud auth list

輸出內容

Credentialed accounts: - <myaccount>@<mydomain>.com (active)

輸出內容範例

Credentialed accounts: - google1623327_student@qwiklabs.net
  • 列出專案 ID:
gcloud config list project

輸出內容

[core] project = <project_ID>

輸出內容範例

[core] project = qwiklabs-gcp-44776a13dea667a6 注意:如需 gcloud 的完整說明,請參閱 gcloud CLI 總覽指南

工作 1:專案權限

檢查專案權限

開始使用 Google Cloud 前,請務必確保專案在 Identity and Access Management (IAM) 中具備正確的權限。

  1. 前往 Google Cloud 控制台的「導覽選單」「導覽選單」圖示,依序點選「IAM 與管理」>「身分與存取權管理」

  2. 確認具有預設的運算服務帳戶 {project-number}-compute@developer.gserviceaccount.com,且已指派 editor 角色。帳戶前置字串為專案編號,如需查看,請前往「導覽選單」>「Cloud 總覽」

預設的運算服務帳戶

如果帳戶未顯示在 IAM 中,或沒有 editor 角色,請依照下列步驟指派必要角色。

  1. 前往 Google Cloud 控制台,依序點選「導覽選單」>「Cloud 總覽」

  2. 從「專案資訊」資訊卡複製「專案編號」

  3. 從「導覽選單」依序點選「IAM 與管理」>「身分與存取權管理」

  4. 點選「身分與存取權管理」頁面頂端的「新增」

  5. 在「新增主體」輸入:

{project-number}-compute@developer.gserviceaccount.com

{project-number} 換成您的專案編號。

  1. 從「請選擇角色」選單依序選取「基本」或「專案」>「編輯者」

  2. 點選「儲存」

工作 2:確定已成功啟用 Dataflow API

請重新啟動連至 Dataflow API 的連線,確保可順利使用這個必要的 API。

  1. 在 Cloud 控制台最上方的搜尋列中,輸入「Dataflow API」。點選「Dataflow API」搜尋結果。

  2. 點選「管理」

  3. 點選「停用 API」

如果系統要求您確認操作,請點選「停用」

  1. 按一下「啟用」

工作 3:載入資料

  1. 首先,請將範例推文下載到電腦,稍後將使用 Wrangler 上傳資料並建立轉換步驟。

您也需要在 Cloud Storage bucket 存放一份相同的範例推文檔案。在本實驗室的最後,您會將 bucket 中的資料以串流方式傳送至 Pub/Sub 主題。

  1. 在 Cloud Shell 執行以下指令,建立新的 bucket:
export BUCKET=$GOOGLE_CLOUD_PROJECT gsutil mb gs://$BUCKET

新建立的 bucket 名稱會與專案 ID 相同。

  1. 執行下列指令,將推文檔案複製到 bucket:
gsutil cp gs://cloud-training/OCBL164/pubnub_tweets_2019-06-09-05-50_part-r-00000 gs://$BUCKET
  1. 確認檔案已複製到 Cloud Storage bucket。

點選「Check my progress」,確認目標已達成。 載入資料

工作 4:設定 Pub/Sub 主題

使用 Pub/Sub 前,您必須先建立用於存放資料的主題,然後建立訂閱項目,以便存取發布至該主題的資料。

  1. 在 Cloud 控制台的「導覽選單」中,點選「查看所有產品」,然後在「數據分析」部分點選「Pub/Sub」,選取「主題」

  2. 按一下「建立主題」

「建立主題」按鈕

  1. 主題的名稱不得重複。在本實驗室中,請將主題命名為 cdf_lab_topic,然後點選「建立」

點選「Check my progress」,確認目標已達成。 設定 Pub/Sub 主題

工作 5:新增 Pub/Sub 訂閱項目

繼續留在主題頁面,建立存取該主題的訂閱項目。

  1. 點選「建立訂閱項目」

「建立訂閱項目」連結

  1. 輸入訂閱項目名稱,例如 cdf_lab_subscription,將傳送類型設為「提取」,然後點選「建立」

「將訂閱項目新增至主題」頁面

點選「Check my progress」,確認目標已達成。 新增 Pub/Sub 訂閱項目

工作 6:新增 Cloud Data Fusion 執行個體的必要權限

  1. 在 Google Cloud 控制台的「導覽選單」,點選「查看所有產品」,然後在「數據分析」部分依序點選「Data Fusion」>「執行個體」
注意:執行個體大約 20 分鐘可建立完成,請耐心等候。

接著請按照下列步驟,授權給執行個體綁定的服務帳戶。

  1. 前往 Google Cloud 控制台,依序點選「IAM 與管理」>「IAM」

  2. 確認 Compute Engine 預設服務帳戶 {project-number}-compute@developer.gserviceaccount.com 確實存在,並將該服務帳戶複製到剪貼簿。

  3. 在「IAM 權限」頁面中,按一下「+ 授予存取權」

  4. 在「新增主體」欄位貼上服務帳戶。

  5. 按一下「選取角色」欄位,輸入並選取「Cloud Data Fusion API 服務代理」

  6. 點選「新增其他角色」

  7. 新增「Dataproc 管理員」角色。

  8. 按一下「儲存」

點選「Check my progress」,確認目標已達成。 將 Cloud Data Fusion API 服務代理角色新增至服務帳戶

授予服務帳戶使用者權限

  1. 前往控制台,依序點選「導覽選單」圖示 >「IAM 與管理」>「身分與存取權管理」

  2. 勾選「包含 Google 提供的角色授予項目」核取方塊。

  3. 向下捲動清單,找到 Google 代管的 Cloud Data Fusion 服務帳戶 (格式為 service-{project-number}@gcp-sa-datafusion.iam.gserviceaccount.com),然後將該帳戶的名稱複製到剪貼簿。

Google 代管的 Cloud Data Fusion 服務帳戶清單

  1. 接著,依序點選「IAM 與管理」>「服務帳戶」

  2. 點選預設的 Compute Engine 帳戶 (格式為 {project-number}-compute@developer.gserviceaccount.com),然後選取頂端導覽面板中的「具備存取權的主體」分頁標籤。

  3. 點選「授予存取權」按鈕。

  4. 在「新增主體」欄位,貼上先前複製的服務帳戶名稱。

  5. 在「角色」下拉式選單,選取「服務帳戶使用者」

  6. 點選「儲存」

工作 7:瀏覽 Cloud Data Fusion UI

使用 Cloud Data Fusion 時,需要同時操作 Cloud 控制台和獨立的 Cloud Data Fusion UI。在 Cloud 控制台中,您可以建立 Cloud 控制台專案,以及建立和刪除 Cloud Data Fusion 執行個體;而在 Cloud Data Fusion UI 中,您可以透過「Pipeline Studio」或「Wrangler」等頁面,使用 Cloud Data Fusion 功能。

如要瀏覽 Cloud Data Fusion UI,請按照下列步驟操作:

  1. 返回 Cloud 控制台的「Data Fusion」頁面,接著點選 Data Fusion 執行個體旁的「查看執行個體」連結。使用實驗室憑證登入。如果系統請您觀看服務導覽,請點選「不用了,謝謝」。現在應會進入 Cloud Data Fusion UI。

「查看執行個體」連結

  1. 在 Cloud Data Fusion Control Center,使用「導覽選單」開啟左選單,然後依序選取「Pipeline」>「Studio」

  2. 在左上方使用下拉式選單,選取「Data Pipeline - Realtime」

工作 8:建構即時管道

處理資料時,若能先觀察原始資料,將有助於規劃後續的轉換流程。因此,您將使用 Wrangler 來準備並清理資料。透過這種「資料優先」做法,您可以即時查看轉換效果,並檢查設定是否正確。

  1. 在外掛程式區塊面板的「Transform」部分,選取「Wrangler」。Wrangler 節點會出現在畫布上,請點選「Properties」按鈕開啟設定。

  2. 在「Directives」部分,點選「WRANGLE」按鈕。

  3. 載入後,在左側選單點選「Upload」。接著按一下上傳圖示,將剛才下載到電腦中的範例推文檔案上傳。

Wrangler 的「Upload data from your computer」頁面

  1. 資料會以列/欄的形式載入到 Wrangler 畫面,過程需要幾分鐘。
注意:請將這份資料當成日後在 Pub/Sub 接收到的事件樣本。這也符合實務情況:在開發管道時,通常無法直接存取正式環境的資料。

不過,管理員可能會提供少量樣本資料,或是讓您使用符合 API 規範的模擬資料。在本節中,您將反覆轉換這份樣本,並根據每一步的回饋調整設定,最後再學習如何將這些轉換規則應用到真實資料上。
  1. 第一步操作,是將 JSON 資料剖析為列與欄組成的表格。請點選第一欄 (body) 標題中的下拉式選單圖示,接著依序選取「Parse」選單項目,以及子選單中的「JSON」。在彈出式視窗中,將「Depth」設為「1」,然後點選「Apply」

    前往「JSON」選項的操作路徑。

  2. 重複上述步驟,即可看到更易讀的資料結構,方便後續轉換。點選「body」欄的下拉式選單圖示,依序選取「Parse」>「JSON」,將「Depth」設為「1」,然後點選「Apply」

    body_payload 資料

    除了使用 UI,您也可以在 Wrangler 指令列方塊中寫入轉換步驟。這個方塊位於 Wrangler UI 的下方區域 (請尋找有綠色 $ 提示字元的指令輸入框)。下一步,您將直接在這個指令輸入框貼上一組轉換步驟。

  3. 複製下列轉換步驟,全部貼到 Wrangler 指令列方塊中:

columns-replace s/^body_payload_//g drop id_str parse-as-simple-date :created_at EEE MMM dd HH:mm:ss Z yyyy drop display_text_range drop truncated drop in_reply_to_status_id_str drop in_reply_to_user_id_str parse-as-json :user 1 drop coordinates set-type :place string drop geo,place,contributors,is_quote_status,favorited,retweeted,filter_level,user_id_str,user_url,user_description,user_translator_type,user_protected,user_verified,user_followers_count,user_friends_count,user_statuses_count,user_favourites_count,user_listed_count,user_is_translator,user_contributors_enabled,user_lang,user_geo_enabled,user_time_zone,user_utc_offset,user_created_at,user_profile_background_color,user_profile_background_image_url,user_profile_background_image_url_https,user_profile_background_tile,user_profile_link_color,user_profile_sidebar_border_color,user_profile_sidebar_fill_color,user_profile_text_color,user_profile_use_background_image drop user_following,user_default_profile_image,user_follow_request_sent,user_notifications,extended_tweet,quoted_status_id,quoted_status_id_str,quoted_status,quoted_status_permalink drop user_profile_image_url,user_profile_image_url_https,user_profile_banner_url,user_default_profile,extended_entities fill-null-or-empty :possibly_sensitive 'false' set-type :possibly_sensitive boolean drop :entities drop :user_location 注意:如果出現「No data. Try removing some transformation steps」之類訊息,請點選「X」移除任一轉換步驟。待資料重新顯示後,即可繼續後續操作。
  1. 點選右上方的「Apply」按鈕,接著點選右上方的「X」關閉屬性方塊。

現在畫面已回到 Pipeline Studio。畫布上出現了一個節點,代表剛在 Wrangler 中定義的轉換機制。不過,這個管道尚未連接任何資料來源,因為如前所述,您剛才是針對筆電上的樣本資料執行轉換,而不是直接處理正式環境的資料。

接下來,我們要指定資料實際存放的位置。

  1. 在外掛程式區塊面板的「Source」部分,選取「PubSub」。畫布上會出現 PubSub 來源節點,請點選「Properties」按鈕開啟設定。

  2. 依下列說明設定 PubSub 來源的各項屬性:

a. 在「Reference Name」部分輸入 Twitter_Input_Stream

b. 在「Subscription」部分輸入 cdf_lab_subscription (即您先前建立的 PubSub 訂閱項目名稱)

注意:設定 PubSub 來源時,不必輸入完整的訂閱項目名稱,只需填入 .../subscriptions/ 後方的最後一段名稱即可。

c. 點選「Validate」,確認設定無誤。

「Pub/Sub Properties」頁面

d. 點選右上方的「X」關閉屬性方塊。

  1. 現在,請將 PubSub 來源節點連結至先前新增的 Wrangler 節點。

管道;Pub/Sub 至 Wrangler

請注意,先前在 Wrangler 使用的是資料樣本,來源資料欄名為「body」。但實際上,Pub/Sub 來源會將資料放在名為「message」的欄位中。因此接下來,我們要針對這個差異做出調整。

  1. 開啟 Wrangler 節點的「Properties」設定,在現有轉換步驟的最上方加入下列指令:
keep :message set-charset :message 'utf-8' rename :message :body

「Wrangler Properties」頁面

點選右上方的「X」關閉屬性方塊。

  1. 在管道中加入資料來源與轉換步驟後,只要再加入接收器即可完成設定。 在左側面板的「Sink」部分,選擇「BigQuery」。BigQuery 接收器節點會出現在畫布上。

  2. 將箭頭從 Wrangler 節點拖曳至 BigQuery 節點,即可連接兩者。接著再設定 BigQuery 節點的屬性。

    管道

  3. 將游標懸停在「BigQuery」節點上,然後點選「Properties」

    a. 在「Reference Name」下方輸入 realtime_pipeline

    b. 在「Dataset」下方輸入 realtime

    c. 在「Table」下方輸入 tweets

    d. 點選「Validate」,確認設定無誤。

  4. 點選右上方的「X」關閉屬性方塊。

「BigQuery Properties」視窗

  1. 點選「Name your pipeline」,將名稱設為 Realtime_Pipeline,然後點選「Save」

  2. 點選「Deploy」圖示,然後啟動管道。

  3. 部署完成後,點選「Run」。接著等待管道的「Status」變成「Running」。這項操作大約需要幾分鐘時間。

工作 9:傳送訊息至 Cloud Pub/Sub

傳送事件的做法,是用 Dataflow 範本將事件大量載入訂閱項目。

接下來,請使用範本建立 Dataflow 工作,將推文檔案中的多則訊息處理後,發布至先前建立的 Pub/Sub 主題。在 Dataflow 的建立工作頁面中,從「Process Data Continuously (Stream)」類別中選擇 Text Files on Cloud Storage to Pub/Sub 範本。

  1. 返回 Cloud 控制台,依序點選「導覽選單」>「查看所有產品」,然後在「數據分析」部分點選「Dataflow」

  2. 點選頂端選單列中的「依據範本建立工作」

  3. 輸入「streaming-pipeline」做為 Cloud Dataflow 工作的名稱。

  4. 在「Dataflow 範本」下方,選取 Text Files on Cloud Storage to Pub/Sub 範本。

  5. 在「Input Cloud Storage File(s)」下方,輸入 gs://<YOUR-BUCKET-NAME>/<FILE-NAME> <YOUR-BUCKET-NAME> 需換成您的 bucket 名稱,<FILE-NAME> 則需換成您稍早下載到電腦的檔案名稱。

例如:gs://qwiklabs-gcp-01-dfdf34926367/pubnub_tweets_2019-06-09-05-50_part-r-00000

  1. 在「Output Pub/Sub Topic」下方,輸入 projects/<PROJECT-ID>/topics/cdf_lab_topic

PROJECT-ID 需換成實際專案 ID。

  1. 在「臨時位置」下方,輸入 <YOUR-BUCKET-NAME>/tmp/

<YOUR-BUCKET-NAME> 需換成您的 bucket 名稱。

  1. 按一下「執行工作」按鈕。

  2. 執行 Dataflow 工作,並等待幾分鐘。您可以先在 pubsub 訂閱項目中查看訊息,然後觀察即時 CDF 管道處理訊息的情形。

    「依據範本建立工作」對話方塊

點選「Check my progress」,確認目標已達成。 執行及建構執行階段管道

工作 10:查看管道指標

事件載入至 Pub/Sub 主題後,您就可以看到管道開始處理這些事件,請留意各節點的指標是否開始更新。

  • 在 Data Fusion Console 中,等待管道指標出現變化

    管道指標

恭喜!

在本實驗室,您已學會如何在 Data Fusion 建立即時管道,利用管道從 Cloud Pub/Sub 讀取串流傳入訊息、處理資料,並將結果寫入 BigQuery。

使用手冊上次更新日期:2025 年 2 月 6 日

實驗室上次測試日期:2025 年 2 月 6 日

Copyright 2026 Google LLC 保留所有權利。Google 和 Google 標誌是 Google LLC 的商標,其他公司和產品名稱則有可能是其關聯公司的商標。

准备工作

  1. 实验会创建一个 Google Cloud 项目和一些资源,供您使用限定的一段时间
  2. 实验有时间限制,并且没有暂停功能。如果您中途结束实验,则必须重新开始。
  3. 在屏幕左上角,点击开始实验即可开始

使用无痕浏览模式

  1. 复制系统为实验提供的用户名密码
  2. 在无痕浏览模式下,点击打开控制台

登录控制台

  1. 使用您的实验凭证登录。使用其他凭证可能会导致错误或产生费用。
  2. 接受条款,并跳过恢复资源页面
  3. 除非您已完成此实验或想要重新开始,否则请勿点击结束实验,因为点击后系统会清除您的工作并移除该项目

此内容目前不可用

一旦可用,我们会通过电子邮件告知您

太好了!

一旦可用,我们会通过电子邮件告知您

一次一个实验

确认结束所有现有实验并开始此实验

使用无痕浏览模式运行实验

使用无痕模式或无痕浏览器窗口是运行此实验的最佳方式。这可以避免您的个人账号与学生账号之间发生冲突,这种冲突可能导致您的个人账号产生额外费用。