准备工作
- 实验会创建一个 Google Cloud 项目和一些资源,供您使用限定的一段时间
- 实验有时间限制,并且没有暂停功能。如果您中途结束实验,则必须重新开始。
- 在屏幕左上角,点击开始实验即可开始
Load the data
/ 20
Setting up Pub/Sub Topic
/ 20
Add a Pub/Sub subscription
/ 20
Add Cloud Data Fusion API Service Agent role to service account
/ 20
Build and execute runtime pipeline
/ 20
除了批次管道,您也可以利用 Data Fusion 建立即時管道,即時處理剛產生的事件。目前,即時管道是透過 Cloud Dataproc 叢集上的 Apache Spark Streaming 執行。本實驗室將說明如何使用 Data Fusion 建構串流管道。
您將實際建立管道,利用管道從 Cloud Pub/Sub 主題讀取並處理事件,經資料轉換後,再將輸出結果寫入 BigQuery。
每個實驗室都會提供新的 Google Cloud 專案和一組資源,讓您在時限內免費使用。
請以無痕視窗登入 Google Skills。
請記下實驗室時間限制 (例如 02:00:00),務必在時限內完成作業。
研究室不提供暫停功能。如有需要,您可以重新開始,但原先的進度恕無法保留。
準備就緒之後,請點選「Start Lab」。
請記下研究室憑證 (使用者名稱和密碼),登入 Google Cloud 控制台時會用到。
點選「Open Google console」。
點選「Use another account」,然後複製這個研究室的憑證,並貼到提示中。
如果使用其他憑證,系統會顯示錯誤或向您收取費用。
接受條款,然後略過資源復原頁面。
這個臨時帳戶只在實驗室期間有效,使用時務必遵守下列規定:
Cloud Shell 是含有多項開發工具的虛擬機器,提供永久的 5 GB 主目錄,並在 Google Cloud 中運作。Cloud Shell 可讓您透過指令列存取 Google Cloud 資源。gcloud 是 Google Cloud 的指令列工具,已預先安裝於 Cloud Shell,並支援 Tab 鍵完成功能。
在控制台的右上方,點按「啟用 Cloud Shell」按鈕 。
點按「繼續」。
請稍候片刻,等待系統完成佈建作業並連線至環境。連線建立後,即代表您已通過驗證,且專案已設為「PROJECT_ID」。
輸出內容
輸出內容範例
輸出內容
輸出內容範例
開始使用 Google Cloud 前,請務必確保專案在 Identity and Access Management (IAM) 中具備正確的權限。
前往 Google Cloud 控制台的「導覽選單」,依序點選「IAM 與管理」>「身分與存取權管理」。
確認具有預設的運算服務帳戶 {project-number}-compute@developer.gserviceaccount.com,且已指派 editor 角色。帳戶前置字串為專案編號,如需查看,請前往「導覽選單」>「Cloud 總覽」。
如果帳戶未顯示在 IAM 中,或沒有 editor 角色,請依照下列步驟指派必要角色。
前往 Google Cloud 控制台,依序點選「導覽選單」>「Cloud 總覽」。
從「專案資訊」資訊卡複製「專案編號」。
從「導覽選單」依序點選「IAM 與管理」>「身分與存取權管理」。
點選「身分與存取權管理」頁面頂端的「新增」。
在「新增主體」輸入:
將 {project-number} 換成您的專案編號。
從「請選擇角色」選單依序選取「基本」或「專案」>「編輯者」。
點選「儲存」。
請重新啟動連至 Dataflow API 的連線,確保可順利使用這個必要的 API。
在 Cloud 控制台最上方的搜尋列中,輸入「Dataflow API」。點選「Dataflow API」搜尋結果。
點選「管理」。
點選「停用 API」。
如果系統要求您確認操作,請點選「停用」。
您也需要在 Cloud Storage bucket 存放一份相同的範例推文檔案。在本實驗室的最後,您會將 bucket 中的資料以串流方式傳送至 Pub/Sub 主題。
新建立的 bucket 名稱會與專案 ID 相同。
點選「Check my progress」,確認目標已達成。
使用 Pub/Sub 前,您必須先建立用於存放資料的主題,然後建立訂閱項目,以便存取發布至該主題的資料。
在 Cloud 控制台的「導覽選單」中,點選「查看所有產品」,然後在「數據分析」部分點選「Pub/Sub」,選取「主題」。
按一下「建立主題」。
cdf_lab_topic,然後點選「建立」。點選「Check my progress」,確認目標已達成。
繼續留在主題頁面,建立存取該主題的訂閱項目。
cdf_lab_subscription,將傳送類型設為「提取」,然後點選「建立」。點選「Check my progress」,確認目標已達成。
接著請按照下列步驟,授權給執行個體綁定的服務帳戶。
前往 Google Cloud 控制台,依序點選「IAM 與管理」>「IAM」。
確認 Compute Engine 預設服務帳戶 {project-number}-compute@developer.gserviceaccount.com 確實存在,並將該服務帳戶複製到剪貼簿。
在「IAM 權限」頁面中,按一下「+ 授予存取權」。
在「新增主體」欄位貼上服務帳戶。
按一下「選取角色」欄位,輸入並選取「Cloud Data Fusion API 服務代理」。
點選「新增其他角色」。
新增「Dataproc 管理員」角色。
按一下「儲存」。
點選「Check my progress」,確認目標已達成。
前往控制台,依序點選「導覽選單」圖示 >「IAM 與管理」>「身分與存取權管理」。
勾選「包含 Google 提供的角色授予項目」核取方塊。
向下捲動清單,找到 Google 代管的 Cloud Data Fusion 服務帳戶 (格式為 service-{project-number}@gcp-sa-datafusion.iam.gserviceaccount.com),然後將該帳戶的名稱複製到剪貼簿。
接著,依序點選「IAM 與管理」>「服務帳戶」。
點選預設的 Compute Engine 帳戶 (格式為 {project-number}-compute@developer.gserviceaccount.com),然後選取頂端導覽面板中的「具備存取權的主體」分頁標籤。
點選「授予存取權」按鈕。
在「新增主體」欄位,貼上先前複製的服務帳戶名稱。
在「角色」下拉式選單,選取「服務帳戶使用者」。
點選「儲存」。
使用 Cloud Data Fusion 時,需要同時操作 Cloud 控制台和獨立的 Cloud Data Fusion UI。在 Cloud 控制台中,您可以建立 Cloud 控制台專案,以及建立和刪除 Cloud Data Fusion 執行個體;而在 Cloud Data Fusion UI 中,您可以透過「Pipeline Studio」或「Wrangler」等頁面,使用 Cloud Data Fusion 功能。
如要瀏覽 Cloud Data Fusion UI,請按照下列步驟操作:
在 Cloud Data Fusion Control Center,使用「導覽選單」開啟左選單,然後依序選取「Pipeline」>「Studio」。
在左上方使用下拉式選單,選取「Data Pipeline - Realtime」。
處理資料時,若能先觀察原始資料,將有助於規劃後續的轉換流程。因此,您將使用 Wrangler 來準備並清理資料。透過這種「資料優先」做法,您可以即時查看轉換效果,並檢查設定是否正確。
在外掛程式區塊面板的「Transform」部分,選取「Wrangler」。Wrangler 節點會出現在畫布上,請點選「Properties」按鈕開啟設定。
在「Directives」部分,點選「WRANGLE」按鈕。
載入後,在左側選單點選「Upload」。接著按一下上傳圖示,將剛才下載到電腦中的範例推文檔案上傳。
第一步操作,是將 JSON 資料剖析為列與欄組成的表格。請點選第一欄 (body) 標題中的下拉式選單圖示,接著依序選取「Parse」選單項目,以及子選單中的「JSON」。在彈出式視窗中,將「Depth」設為「1」,然後點選「Apply」。
重複上述步驟,即可看到更易讀的資料結構,方便後續轉換。點選「body」欄的下拉式選單圖示,依序選取「Parse」>「JSON」,將「Depth」設為「1」,然後點選「Apply」。
除了使用 UI,您也可以在 Wrangler 指令列方塊中寫入轉換步驟。這個方塊位於 Wrangler UI 的下方區域 (請尋找有綠色 $ 提示字元的指令輸入框)。下一步,您將直接在這個指令輸入框貼上一組轉換步驟。
複製下列轉換步驟,全部貼到 Wrangler 指令列方塊中:
現在畫面已回到 Pipeline Studio。畫布上出現了一個節點,代表剛在 Wrangler 中定義的轉換機制。不過,這個管道尚未連接任何資料來源,因為如前所述,您剛才是針對筆電上的樣本資料執行轉換,而不是直接處理正式環境的資料。
接下來,我們要指定資料實際存放的位置。
在外掛程式區塊面板的「Source」部分,選取「PubSub」。畫布上會出現 PubSub 來源節點,請點選「Properties」按鈕開啟設定。
依下列說明設定 PubSub 來源的各項屬性:
a. 在「Reference Name」部分輸入 Twitter_Input_Stream
b. 在「Subscription」部分輸入 cdf_lab_subscription (即您先前建立的 PubSub 訂閱項目名稱)
c. 點選「Validate」,確認設定無誤。
d. 點選右上方的「X」關閉屬性方塊。
請注意,先前在 Wrangler 使用的是資料樣本,來源資料欄名為「body」。但實際上,Pub/Sub 來源會將資料放在名為「message」的欄位中。因此接下來,我們要針對這個差異做出調整。
點選右上方的「X」關閉屬性方塊。
在管道中加入資料來源與轉換步驟後,只要再加入接收器即可完成設定。 在左側面板的「Sink」部分,選擇「BigQuery」。BigQuery 接收器節點會出現在畫布上。
將箭頭從 Wrangler 節點拖曳至 BigQuery 節點,即可連接兩者。接著再設定 BigQuery 節點的屬性。
將游標懸停在「BigQuery」節點上,然後點選「Properties」。
a. 在「Reference Name」下方輸入 realtime_pipeline
b. 在「Dataset」下方輸入 realtime
c. 在「Table」下方輸入 tweets
d. 點選「Validate」,確認設定無誤。
點選右上方的「X」關閉屬性方塊。
點選「Name your pipeline」,將名稱設為 Realtime_Pipeline,然後點選「Save」。
點選「Deploy」圖示,然後啟動管道。
部署完成後,點選「Run」。接著等待管道的「Status」變成「Running」。這項操作大約需要幾分鐘時間。
傳送事件的做法,是用 Dataflow 範本將事件大量載入訂閱項目。
接下來,請使用範本建立 Dataflow 工作,將推文檔案中的多則訊息處理後,發布至先前建立的 Pub/Sub 主題。在 Dataflow 的建立工作頁面中,從「Process Data Continuously (Stream)」類別中選擇 Text Files on Cloud Storage to Pub/Sub 範本。
返回 Cloud 控制台,依序點選「導覽選單」>「查看所有產品」,然後在「數據分析」部分點選「Dataflow」。
點選頂端選單列中的「依據範本建立工作」。
輸入「streaming-pipeline」做為 Cloud Dataflow 工作的名稱。
在「Dataflow 範本」下方,選取 Text Files on Cloud Storage to Pub/Sub 範本。
在「Input Cloud Storage File(s)」下方,輸入 gs://<YOUR-BUCKET-NAME>/<FILE-NAME>
<YOUR-BUCKET-NAME> 需換成您的 bucket 名稱,<FILE-NAME> 則需換成您稍早下載到電腦的檔案名稱。
例如:gs://qwiklabs-gcp-01-dfdf34926367/pubnub_tweets_2019-06-09-05-50_part-r-00000
projects/<PROJECT-ID>/topics/cdf_lab_topic。PROJECT-ID 需換成實際專案 ID。
<YOUR-BUCKET-NAME>/tmp/
<YOUR-BUCKET-NAME> 需換成您的 bucket 名稱。
按一下「執行工作」按鈕。
執行 Dataflow 工作,並等待幾分鐘。您可以先在 pubsub 訂閱項目中查看訊息,然後觀察即時 CDF 管道處理訊息的情形。
點選「Check my progress」,確認目標已達成。
事件載入至 Pub/Sub 主題後,您就可以看到管道開始處理這些事件,請留意各節點的指標是否開始更新。
在 Data Fusion Console 中,等待管道指標出現變化
在本實驗室,您已學會如何在 Data Fusion 建立即時管道,利用管道從 Cloud Pub/Sub 讀取串流傳入訊息、處理資料,並將結果寫入 BigQuery。
使用手冊上次更新日期:2025 年 2 月 6 日
實驗室上次測試日期:2025 年 2 月 6 日
Copyright 2026 Google LLC 保留所有權利。Google 和 Google 標誌是 Google LLC 的商標,其他公司和產品名稱則有可能是其關聯公司的商標。
此内容目前不可用
一旦可用,我们会通过电子邮件告知您
太好了!
一旦可用,我们会通过电子邮件告知您
一次一个实验
确认结束所有现有实验并开始此实验