Before you begin
- Labs create a Google Cloud project and resources for a fixed time
- Labs have a time limit and no pause feature. If you end the lab, you'll have to restart from the beginning.
- On the top left of your screen, click Start lab to begin
Load the data
/ 20
Setting up Pub/Sub Topic
/ 20
Add a Pub/Sub subscription
/ 20
Add Cloud Data Fusion API Service Agent role to service account
/ 20
Build and execute runtime pipeline
/ 20
除了批次管道,您也可以利用 Data Fusion 建立即時管道,即時處理剛產生的事件。目前,即時管道是透過 Cloud Dataproc 叢集上的 Apache Spark Streaming 執行。本實驗室將說明如何使用 Data Fusion 建構串流管道。
您將實際建立管道,利用管道從 Cloud Pub/Sub 主題讀取並處理事件,經資料轉換後,再將輸出結果寫入 BigQuery。
每個實驗室都會提供新的 Google Cloud 專案和一組資源,讓您在時限內免費使用。
請以無痕視窗登入 Google Skills。
請記下實驗室時間限制 (例如 02:00:00),務必在時限內完成作業。
研究室不提供暫停功能。如有需要,您可以重新開始,但原先的進度恕無法保留。
準備就緒之後,請點選「Start Lab」。
請記下研究室憑證 (使用者名稱和密碼),登入 Google Cloud 控制台時會用到。
點選「Open Google console」。
點選「Use another account」,然後複製這個研究室的憑證,並貼到提示中。
如果使用其他憑證,系統會顯示錯誤或向您收取費用。
接受條款,然後略過資源復原頁面。
這個臨時帳戶只在實驗室期間有效,使用時務必遵守下列規定:
Cloud Shell 是含有多項開發工具的虛擬機器,提供永久的 5 GB 主目錄,並在 Google Cloud 中運作。Cloud Shell 可讓您透過指令列存取 Google Cloud 資源。gcloud 是 Google Cloud 的指令列工具,已預先安裝於 Cloud Shell,並支援 Tab 鍵完成功能。
在控制台的右上方,點按「啟用 Cloud Shell」按鈕 。
點按「繼續」。
請稍候片刻,等待系統完成佈建作業並連線至環境。連線建立後,即代表您已通過驗證,且專案已設為「PROJECT_ID」。
輸出內容
輸出內容範例
輸出內容
輸出內容範例
開始使用 Google Cloud 前,請務必確保專案在 Identity and Access Management (IAM) 中具備正確的權限。
前往 Google Cloud 控制台的「導覽選單」,依序點選「IAM 與管理」>「身分與存取權管理」。
確認具有預設的運算服務帳戶 {project-number}-compute@developer.gserviceaccount.com,且已指派 editor 角色。帳戶前置字串為專案編號,如需查看,請前往「導覽選單」>「Cloud 總覽」。
如果帳戶未顯示在 IAM 中,或沒有 editor 角色,請依照下列步驟指派必要角色。
前往 Google Cloud 控制台,依序點選「導覽選單」>「Cloud 總覽」。
從「專案資訊」資訊卡複製「專案編號」。
從「導覽選單」依序點選「IAM 與管理」>「身分與存取權管理」。
點選「身分與存取權管理」頁面頂端的「新增」。
在「新增主體」輸入:
將 {project-number} 換成您的專案編號。
從「請選擇角色」選單依序選取「基本」或「專案」>「編輯者」。
點選「儲存」。
請重新啟動連至 Dataflow API 的連線,確保可順利使用這個必要的 API。
在 Cloud 控制台最上方的搜尋列中,輸入「Dataflow API」。點選「Dataflow API」搜尋結果。
點選「管理」。
點選「停用 API」。
如果系統要求您確認操作,請點選「停用」。
您也需要在 Cloud Storage bucket 存放一份相同的範例推文檔案。在本實驗室的最後,您會將 bucket 中的資料以串流方式傳送至 Pub/Sub 主題。
新建立的 bucket 名稱會與專案 ID 相同。
點選「Check my progress」,確認目標已達成。
使用 Pub/Sub 前,您必須先建立用於存放資料的主題,然後建立訂閱項目,以便存取發布至該主題的資料。
在 Cloud 控制台的「導覽選單」中,點選「查看所有產品」,然後在「數據分析」部分點選「Pub/Sub」,選取「主題」。
按一下「建立主題」。
cdf_lab_topic,然後點選「建立」。點選「Check my progress」,確認目標已達成。
繼續留在主題頁面,建立存取該主題的訂閱項目。
cdf_lab_subscription,將傳送類型設為「提取」,然後點選「建立」。點選「Check my progress」,確認目標已達成。
接著請按照下列步驟,授權給執行個體綁定的服務帳戶。
前往 Google Cloud 控制台,依序點選「IAM 與管理」>「IAM」。
確認 Compute Engine 預設服務帳戶 {project-number}-compute@developer.gserviceaccount.com 確實存在,並將該服務帳戶複製到剪貼簿。
在「IAM 權限」頁面中,按一下「+ 授予存取權」。
在「新增主體」欄位貼上服務帳戶。
按一下「選取角色」欄位,輸入並選取「Cloud Data Fusion API 服務代理」。
點選「新增其他角色」。
新增「Dataproc 管理員」角色。
按一下「儲存」。
點選「Check my progress」,確認目標已達成。
前往控制台,依序點選「導覽選單」圖示 >「IAM 與管理」>「身分與存取權管理」。
勾選「包含 Google 提供的角色授予項目」核取方塊。
向下捲動清單,找到 Google 代管的 Cloud Data Fusion 服務帳戶 (格式為 service-{project-number}@gcp-sa-datafusion.iam.gserviceaccount.com),然後將該帳戶的名稱複製到剪貼簿。
接著,依序點選「IAM 與管理」>「服務帳戶」。
點選預設的 Compute Engine 帳戶 (格式為 {project-number}-compute@developer.gserviceaccount.com),然後選取頂端導覽面板中的「具備存取權的主體」分頁標籤。
點選「授予存取權」按鈕。
在「新增主體」欄位,貼上先前複製的服務帳戶名稱。
在「角色」下拉式選單,選取「服務帳戶使用者」。
點選「儲存」。
使用 Cloud Data Fusion 時,需要同時操作 Cloud 控制台和獨立的 Cloud Data Fusion UI。在 Cloud 控制台中,您可以建立 Cloud 控制台專案,以及建立和刪除 Cloud Data Fusion 執行個體;而在 Cloud Data Fusion UI 中,您可以透過「Pipeline Studio」或「Wrangler」等頁面,使用 Cloud Data Fusion 功能。
如要瀏覽 Cloud Data Fusion UI,請按照下列步驟操作:
在 Cloud Data Fusion Control Center,使用「導覽選單」開啟左選單,然後依序選取「Pipeline」>「Studio」。
在左上方使用下拉式選單,選取「Data Pipeline - Realtime」。
處理資料時,若能先觀察原始資料,將有助於規劃後續的轉換流程。因此,您將使用 Wrangler 來準備並清理資料。透過這種「資料優先」做法,您可以即時查看轉換效果,並檢查設定是否正確。
在外掛程式區塊面板的「Transform」部分,選取「Wrangler」。Wrangler 節點會出現在畫布上,請點選「Properties」按鈕開啟設定。
在「Directives」部分,點選「WRANGLE」按鈕。
載入後,在左側選單點選「Upload」。接著按一下上傳圖示,將剛才下載到電腦中的範例推文檔案上傳。
第一步操作,是將 JSON 資料剖析為列與欄組成的表格。請點選第一欄 (body) 標題中的下拉式選單圖示,接著依序選取「Parse」選單項目,以及子選單中的「JSON」。在彈出式視窗中,將「Depth」設為「1」,然後點選「Apply」。
重複上述步驟,即可看到更易讀的資料結構,方便後續轉換。點選「body」欄的下拉式選單圖示,依序選取「Parse」>「JSON」,將「Depth」設為「1」,然後點選「Apply」。
除了使用 UI,您也可以在 Wrangler 指令列方塊中寫入轉換步驟。這個方塊位於 Wrangler UI 的下方區域 (請尋找有綠色 $ 提示字元的指令輸入框)。下一步,您將直接在這個指令輸入框貼上一組轉換步驟。
複製下列轉換步驟,全部貼到 Wrangler 指令列方塊中:
現在畫面已回到 Pipeline Studio。畫布上出現了一個節點,代表剛在 Wrangler 中定義的轉換機制。不過,這個管道尚未連接任何資料來源,因為如前所述,您剛才是針對筆電上的樣本資料執行轉換,而不是直接處理正式環境的資料。
接下來,我們要指定資料實際存放的位置。
在外掛程式區塊面板的「Source」部分,選取「PubSub」。畫布上會出現 PubSub 來源節點,請點選「Properties」按鈕開啟設定。
依下列說明設定 PubSub 來源的各項屬性:
a. 在「Reference Name」部分輸入 Twitter_Input_Stream
b. 在「Subscription」部分輸入 cdf_lab_subscription (即您先前建立的 PubSub 訂閱項目名稱)
c. 點選「Validate」,確認設定無誤。
d. 點選右上方的「X」關閉屬性方塊。
請注意,先前在 Wrangler 使用的是資料樣本,來源資料欄名為「body」。但實際上,Pub/Sub 來源會將資料放在名為「message」的欄位中。因此接下來,我們要針對這個差異做出調整。
點選右上方的「X」關閉屬性方塊。
在管道中加入資料來源與轉換步驟後,只要再加入接收器即可完成設定。 在左側面板的「Sink」部分,選擇「BigQuery」。BigQuery 接收器節點會出現在畫布上。
將箭頭從 Wrangler 節點拖曳至 BigQuery 節點,即可連接兩者。接著再設定 BigQuery 節點的屬性。
將游標懸停在「BigQuery」節點上,然後點選「Properties」。
a. 在「Reference Name」下方輸入 realtime_pipeline
b. 在「Dataset」下方輸入 realtime
c. 在「Table」下方輸入 tweets
d. 點選「Validate」,確認設定無誤。
點選右上方的「X」關閉屬性方塊。
點選「Name your pipeline」,將名稱設為 Realtime_Pipeline,然後點選「Save」。
點選「Deploy」圖示,然後啟動管道。
部署完成後,點選「Run」。接著等待管道的「Status」變成「Running」。這項操作大約需要幾分鐘時間。
傳送事件的做法,是用 Dataflow 範本將事件大量載入訂閱項目。
接下來,請使用範本建立 Dataflow 工作,將推文檔案中的多則訊息處理後,發布至先前建立的 Pub/Sub 主題。在 Dataflow 的建立工作頁面中,從「Process Data Continuously (Stream)」類別中選擇 Text Files on Cloud Storage to Pub/Sub 範本。
返回 Cloud 控制台,依序點選「導覽選單」>「查看所有產品」,然後在「數據分析」部分點選「Dataflow」。
點選頂端選單列中的「依據範本建立工作」。
輸入「streaming-pipeline」做為 Cloud Dataflow 工作的名稱。
在「Dataflow 範本」下方,選取 Text Files on Cloud Storage to Pub/Sub 範本。
在「Input Cloud Storage File(s)」下方,輸入 gs://<YOUR-BUCKET-NAME>/<FILE-NAME>
<YOUR-BUCKET-NAME> 需換成您的 bucket 名稱,<FILE-NAME> 則需換成您稍早下載到電腦的檔案名稱。
例如:gs://qwiklabs-gcp-01-dfdf34926367/pubnub_tweets_2019-06-09-05-50_part-r-00000
projects/<PROJECT-ID>/topics/cdf_lab_topic。PROJECT-ID 需換成實際專案 ID。
<YOUR-BUCKET-NAME>/tmp/
<YOUR-BUCKET-NAME> 需換成您的 bucket 名稱。
按一下「執行工作」按鈕。
執行 Dataflow 工作,並等待幾分鐘。您可以先在 pubsub 訂閱項目中查看訊息,然後觀察即時 CDF 管道處理訊息的情形。
點選「Check my progress」,確認目標已達成。
事件載入至 Pub/Sub 主題後,您就可以看到管道開始處理這些事件,請留意各節點的指標是否開始更新。
在 Data Fusion Console 中,等待管道指標出現變化
在本實驗室,您已學會如何在 Data Fusion 建立即時管道,利用管道從 Cloud Pub/Sub 讀取串流傳入訊息、處理資料,並將結果寫入 BigQuery。
使用手冊上次更新日期:2025 年 2 月 6 日
實驗室上次測試日期:2025 年 2 月 6 日
Copyright 2026 Google LLC 保留所有權利。Google 和 Google 標誌是 Google LLC 的商標,其他公司和產品名稱則有可能是其關聯公司的商標。
This content is not currently available
We will notify you via email when it becomes available
Great!
We will contact you via email if it becomes available
One lab at a time
Confirm to end all existing labs and start this one