实验设置说明和要求
保护您的账号和进度。请务必在无痕浏览器窗口中,使用实验凭证运行此实验。

使用 Cloud Pub/Sub 和 Dataflow 進行串流處理:Qwik Start

实验 45 分钟 universal_currency_alt 1 积分 show_chart 入门级
info 此实验可能会提供 AI 工具来支持您学习。
此内容尚未针对移动设备进行优化。
为获得最佳体验,请在桌面设备上访问通过电子邮件发送的链接。

GSP903

Google Cloud 自學實驗室標誌

總覽

Google Cloud Pub/Sub 是在應用程式與服務之間,交換事件資料的訊息傳遞服務,資料生產者將訊息發布至 Cloud Pub/Sub 主題,而消費者訂閱該主題。訂閱者會從訂閱項目提取訊息,或設定為推送訂閱項目的 Webhook。每個訂閱者都必須確認可設定時間範圍內的每個訊息。

Dataflow 是一項全代管服務,能夠轉換並充實串流 (即時) 模式和批次模式的資料,讓資料維持同等的穩定與明確性,並利用 Apache Beam SDK 提供簡化的管道開發環境。這個 SDK 具備多項時間區間設定與工作階段分析基元,以及來源與接收連接器生態系統。

Pub/Sub 是可擴充且耐用的事件擷取與傳送系統。Dataflow 透過視窗與緩衝機制,補強了 Pub/Sub 可擴充的「至少一次」傳送模型,實現訊息簡化、單次處理以及依序處理。

學習內容

  • 讀取發布至 Pub/Sub 主題的訊息
  • 依時間戳記建立訊息視窗或分組
  • 將訊息寫入 Cloud Storage

設定

瞭解以下事項後,再點選「Start Lab」按鈕

請詳閱以下操作說明。實驗室活動會計時,且中途無法暫停。點選「Start Lab」後就會開始計時,顯示可使用 Google Cloud 資源的時間。

您將在真正的雲端環境完成實作實驗室活動,而不是模擬或示範環境。為此,我們會提供新的暫時憑證,供您在實驗室活動期間登入及存取 Google Cloud。

為了順利完成這個實驗室,請先確認:

  • 可以使用標準的網際網路瀏覽器 (Chrome 瀏覽器為佳)。
注意事項:請使用無痕模式 (建議選項) 或私密瀏覽視窗執行此實驗室,這可以防止個人帳戶和學員帳戶之間的衝突,避免個人帳戶產生額外費用。
  • 是時候完成實驗室活動了!別忘了,活動一旦開始將無法暫停。
注意事項:務必使用實驗室專用的學員帳戶。如果使用其他 Google Cloud 帳戶,可能會產生額外費用。

如何開始研究室及登入 Google Cloud 控制台

  1. 點選「Start Lab」按鈕。如果實驗室會產生費用,畫面上會出現選擇付款方式的對話方塊。左側的「Lab Details」窗格會顯示下列項目:

    • 「Open Google Cloud console」按鈕
    • 剩餘時間
    • 必須在這個研究室中使用的臨時憑證
    • 完成這個實驗室所需的其他資訊 (如有)
  2. 點選「Open Google Cloud console」;如果使用 Chrome 瀏覽器,也能按一下滑鼠右鍵,選取「在無痕視窗中開啟連結」

    接著,實驗室會啟動相關資源,並開啟另一個分頁,顯示「登入」頁面。

    提示:您可以在不同的視窗中並排開啟分頁。

    注意:如果頁面中顯示「選擇帳戶」對話方塊,請點選「使用其他帳戶」
  3. 如有必要,請將下方的 Username 貼到「登入」對話方塊。

    {{{user_0.username | "Username"}}}

    您也可以在「Lab Details」窗格找到 Username。

  4. 點選「下一步」

  5. 複製下方的 Password,並貼到「歡迎使用」對話方塊。

    {{{user_0.password | "Password"}}}

    您也可以在「Lab Details」窗格找到 Password。

  6. 點選「下一步」

    重要事項:請務必使用實驗室提供的憑證,而非自己的 Google Cloud 帳戶憑證。 注意:如果使用自己的 Google Cloud 帳戶來進行這個實驗室,可能會產生額外費用。
  7. 按過後續的所有頁面:

    • 接受條款及細則。
    • 由於這是臨時帳戶,請勿新增救援選項或雙重驗證機制。
    • 請勿申請免費試用。

Google Cloud 控制台稍後會在這個分頁開啟。

注意:如要使用 Google Cloud 產品和服務,請點選「導覽選單」,或在「搜尋」欄位輸入服務或產品名稱。「導覽選單」圖示和搜尋欄位

啟動 Cloud Shell

Cloud Shell 是搭載多項開發工具的虛擬機器,提供永久的 5 GB 主目錄,而且在 Google Cloud 中運作。Cloud Shell 提供指令列存取權,方便您使用 Google Cloud 資源。

  1. 點按 Google Cloud 控制台頂端的「啟用 Cloud Shell」圖示 「啟動 Cloud Shell」圖示

  2. 系統顯示視窗時,請按照下列步驟操作:

    • 繼續操作 Cloud Shell 視窗。
    • 授權 Cloud Shell 使用您的憑證發出 Google Cloud API 呼叫。

連線建立完成即代表已通過驗證,而且專案已設為您的 Project_ID。輸出內容中有一行文字,宣告本工作階段的 Project_ID

Your Cloud Platform project in this session is set to {{{project_0.project_id | "PROJECT_ID"}}}

gcloud 是 Google Cloud 的指令列工具,已預先安裝於 Cloud Shell,並支援 Tab 鍵自動完成功能。

  1. (選用) 您可以執行下列指令來列出使用中的帳戶:
gcloud auth list
  1. 點按「授權」

輸出內容:

ACTIVE: * ACCOUNT: {{{user_0.username | "ACCOUNT"}}} To set the active account, run: $ gcloud config set account `ACCOUNT`
  1. (選用) 您可以使用下列指令來列出專案 ID:
gcloud config list project

輸出內容:

[core] project = {{{project_0.project_id | "PROJECT_ID"}}} 注意:如需 gcloud 的完整說明,請前往 Google Cloud 參閱 gcloud CLI 總覽指南

設定區域

  • 在 Cloud Shell 中執行下列指令,設定這個實驗室的專案區域:
gcloud config set compute/region {{{project_0.default_region | "REGION"}}}

確定已啟用 Dataflow API

請重新啟動連至 Dataflow API 的連線,確保可順利使用這個必要的 API。

gcloud services disable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}} --force gcloud services enable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}}

點選「Check my progress」,確認目標已達成。 停用並重新啟用 Dataflow API

工作 1:建立專案資源

  1. 在 Cloud Shell 中,為 bucket、專案和區域建立變數。
PROJECT_ID=$(gcloud config get-value project) BUCKET_NAME="${PROJECT_ID}-bucket" TOPIC_ID=my-id REGION={{{project_0.default_region | "filled in at lab start"}}}
  1. 設定 App Engine 區域。
注意:如果使用的區域不是 us-central1europe-west1,請將 AppEngine 區域變數,依照指派的區域設定。如果分配到的區域是 us-central1,請將 AppEngine 區域變數設為 us-central。如果分配到的區域是 europe-west1,請將 AppEngine 區域變數設為 europe-west

詳情請參閱 App Engine 位置

AE_REGION={{{project_0.startup_script.app_region|region_to_be_set}}}
  1. 建立這個專案擁有的 Cloud Storage bucket:
gsutil mb gs://$BUCKET_NAME 注意:Cloud Storage bucket 名稱在全域範圍內都不可重複。您的 Qwiklabs 專案 ID 一律不會重複,因此本實驗室會使用該 ID 做為 bucket 名稱。
  1. 在本專案建立 Pub/Sub 主題:
gcloud pubsub topics create $TOPIC_ID
  1. 為專案建立 App Engine 應用程式:
gcloud app create --region=$AE_REGION
  1. 在本專案建立 Cloud Scheduler 工作,這項工作會每隔一分鐘將訊息發布至 Pub/Sub 主題:
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!"
  1. 如果系統提示您啟用 Cloud Scheduler API,請按下 y 鍵與 Enter 鍵。

點選「Check my progress」,確認目標已達成。 建立專案資源

  1. 啟動工作:
gcloud scheduler jobs run publisher-job 注意:如果遇到 RESOURCE_EXHAUSTED 錯誤,請嘗試再次執行指令。
  1. 使用下列指令複製快速入門存放區,並前往程式碼範例目錄:
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics docker run -it -e DEVSHELL_PROJECT_ID=$DEVSHELL_PROJECT_ID python:3.7 /bin/bash git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -U -r requirements.txt # Install Apache Beam dependencies 注意:如果您使用 Python 選項,請個別執行 Python 指令。

點選「Check my progress」,確認目標已達成。 啟動 Cloud Scheduler 工作

工作 2:查看程式碼,瞭解如何將訊息從 Pub/Sub 串流至 Cloud Storage

程式碼範例

請查看下列程式碼範例,瞭解如何使用 Dataflow 執行下列作業:

  • 讀取 Pub/Sub 訊息。
  • 根據發布時間戳記,建立訊息視窗或將訊息分組至固定長度的時間間隔。
  • 將每個視窗中的訊息寫入 Cloud Storage 的檔案。
import java.io.IOException; import org.apache.beam.examples.common.WriteOneFilePerWindow; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.Validation.Required; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.joda.time.Duration; public class PubSubToGcs { /* * 定義您自己的設定選項。新增自訂引數,使用 * 指令列剖析器來處理這些引數,並指定引數的預設值。*/ public interface PubSubToGcsOptions extends StreamingOptions { @Description("要讀取的 Cloud Pub/Sub 主題。") @Required String getInputTopic(); void setInputTopic(String value); @Description("輸出檔案的視窗長度以分鐘為單位。") @Default.Integer(1) Integer getWindowSize(); void setWindowSize(Integer value); @Description("含檔案名稱前置字元的輸出內容檔案路徑。") @Required String getOutput(); void setOutput(String value); } public static void main(String[] args) throws IOException { // 寫入輸出時產生的 shard 數量上限。 int numShards = 1; PubSubToGcsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class); options.setStreaming(true); Pipeline pipeline = Pipeline.create(options); pipeline // 1) 讀取 Pub/Sub 主題的字串訊息。 .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic())) // 2) 將訊息分組為固定長度的分鐘區間。 .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))) // 3) 為每個訊息視窗寫入一個檔案到 GCS。 .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards)); // 執行管道並等待執行完畢。 pipeline.run().waitUntilFinish(); } } import argparse from datetime import datetime import logging import random from apache_beam import ( DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys, ) from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.transforms.window import FixedWindows class GroupMessagesByFixedWindows(PTransform): """複合轉換會依據發布時間,將 Pub/Sub 訊息分組,並輸出由多個元組組成的清單,每個元組內含一則訊息與其發布時間。 """ def __init__(self, window_size, num_shards=5): # 將視窗長度設為 60 秒。 self.window_size = int(window_size * 60) self.num_shards = num_shards def expand(self, pcoll): return ( pcoll # 使用元素時間戳記 (或發布時間),將視窗資訊連結至每個元素。 | "Window into fixed intervals" >> WindowInto(FixedWindows(self.window_size)) | "Add timestamp to windowed elements" >> ParDo(AddTimestamp()) # 根據 shard 數量,將隨機金鑰指派給各視窗元素。 | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1)) # 依照金鑰將視窗元素分組。同一視窗中的所有元素都必須 # 能放入記憶體。否則必須使用 `beam.util.BatchElements`。 | "Group by key" >> GroupByKey() ) class AddTimestamp(DoFn): def process(self, element, publish_time=DoFn.TimestampParam): """將訊息內文及其發布時間擷取至元組,來處理各視窗元素。 """ yield ( element.decode("utf-8"), datetime.utcfromtimestamp(float(publish_time)).strftime( "%Y-%m-%d %H:%M:%S.%f" ), ) class WriteToGCS(DoFn): def __init__(self, output_path): self.output_path = output_path def process(self, key_value, window=DoFn.WindowParam): """批次將訊息寫入 Google Cloud Storage。""" ts_format = "%H:%M" window_start = window.start.to_utc_datetime().strftime(ts_format) window_end = window.end.to_utc_datetime().strftime(ts_format) shard_id, batch = key_value filename = "-".join([self.output_path, window_start, window_end, str(shard_id)]) with io.gcsio.GcsIO().open(filename=filename, mode="w") as f: for message_body, publish_time in batch: f.write(f"{message_body},{publish_time}\n".encode()) def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None): # 將 `save_main_session` 設為 True,讓 DoFns 存取全域匯入的模組。 pipeline_options = PipelineOptions( pipeline_args, streaming=True, save_main_session=True ) with Pipeline(options=pipeline_options) as pipeline: ( pipeline # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam # binds the publish time returned by the Pub/Sub server for each message # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`. # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic) | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards) | "Write to GCS" >> ParDo(WriteToGCS(output_path)) ) if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() parser.add_argument( "--input_topic", help="The Cloud Pub/Sub topic to read from." '"projects//topics/".', ) parser.add_argument( "--window_size", type=float, default=1.0, help="Output file's window size in minutes.", ) parser.add_argument( "--output_path", help="Path of the output GCS file including the prefix.", ) parser.add_argument( "--num_shards", type=int, default=5, help="Number of shards to use when writing windowed elements to GCS.", ) known_args, pipeline_args = parser.parse_known_args() run( known_args.input_topic, known_args.output_path, known_args.window_size, known_args.num_shards, pipeline_args, ) 注意:如要進一步瞭解程式碼範例,請造訪 java-docs-samplespython-docs-samples GitHub 頁面。

工作 3:啟動管道

  1. 如要啟動管道,請執行下列指令:
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --windowSize=2 \ --tempLocation=gs://$BUCKET_NAME/temp" python PubSubToGCS.py \ --project=project_id \ --region=region \ --input_topic=projects/project_id/topics/my-id \ --output_path=gs://bucket_name/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://bucket_name/temp 注意:執行 Python 指令時,請將 project_idbucket_nameregion,替換為您的專案 ID、bucket 名稱和指派的實驗室區域。

上述指令會在本地端執行,並啟動在雲端執行的 Dataflow 工作。

注意:程式碼完全執行完畢,以及管道工作出現在下一項工作的 Dataflow 控制台,可能需要 10 分鐘左右,請稍候片刻。 注意:如果收到有關 StaticLoggerBinder 的警告,請放心,可以忽略並繼續進行實驗室。

點選「Check my progress」,確認目標已達成。 啟動管道並執行 Dataflow 工作

工作 4:觀察工作和管道進度

  1. 前往 Dataflow 控制台,查看工作進度。

  2. 點選「重新整理」,即可查看工作和最新狀態。

Dataflow 頁面顯示 pubsubtogcs 0815172250-75a99ab8 工作的資訊

  1. 點選工作名稱,開啟工作詳細資料並查看下列資訊:
  • 工作結構
  • 工作記錄
  • 階段指標

工作頁面顯示工作摘要資訊

您可能需要再等幾分鐘,才能在 Cloud Storage 看到輸出檔案。

  1. 如要查看輸出檔案,請依序前往「導覽選單」>「Cloud Storage」,依序點選您的 bucket 名稱和「範例」

bucket 詳細資料頁面,顯示輸出檔案資訊

  1. 或者,您也可以在 Cloud Shell 按下 CTRL+C,結束應用程式 (如果是 Python 請輸入 exit),然後執行下列指令,列出已寫入 Cloud Storage 的檔案:
gsutil ls gs://${BUCKET_NAME}/samples/

輸出內容應如下所示:

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

工作 5:清除所用資源

  1. 如果尚未退出應用程式,請在 Cloud Shell 輸入 CTRL+C 退出。

如果是 Python 選項,請輸入 exit 離開 Python 環境。

  1. 在 Cloud Shell 刪除 Cloud Scheduler 工作:
gcloud scheduler jobs delete publisher-job

如果出現「確定要繼續操作嗎?」的提示訊息,請按下 Y 鍵和 Enter 鍵。

  1. 在 Dataflow 控制台,選取工作名稱並點選「停止」,即可停止工作。

系統提示時,依序點選「停止工作」>「取消」,即可取消管道而不排空。

  1. 在 Cloud Shell 刪除主題:
gcloud pubsub topics delete $TOPIC_ID
  1. 在 Cloud Shell 刪除管道建立的檔案:
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
  1. 在 Cloud Shell 刪除 Cloud Storage bucket:
gsutil rb gs://${BUCKET_NAME}

恭喜!

您已建立 Dataflow 管道,可從 Pub/Sub 主題讀取訊息、依時間戳記建立訊息視窗,並將訊息寫入 Cloud Storage bucket。

後續步驟/瞭解詳情

Google Cloud 教育訓練與認證

協助您瞭解如何充分運用 Google Cloud 的技術。我們的課程會介紹專業技能和最佳做法,讓您可以快速掌握要領並持續進修。我們提供從基本到進階等級的訓練課程,並有隨選、線上和虛擬課程等選項,方便您抽空參加。認證可協助您驗證及證明自己在 Google Cloud 技術方面的技能和專業知識。

使用手冊上次更新日期:2025 年 8 月 20 日

實驗室上次測試日期:2025 年 8 月 20 日

Copyright 2025 Google LLC 保留所有權利。Google 和 Google 標誌是 Google LLC 的商標,其他公司和產品名稱則有可能是其關聯公司的商標。

准备工作

  1. 实验会创建一个 Google Cloud 项目和一些资源,供您使用限定的一段时间
  2. 实验有时间限制,并且没有暂停功能。如果您中途结束实验,则必须重新开始。
  3. 在屏幕左上角,点击开始实验即可开始

使用无痕浏览模式

  1. 复制系统为实验提供的用户名密码
  2. 在无痕浏览模式下,点击打开控制台

登录控制台

  1. 使用您的实验凭证登录。使用其他凭证可能会导致错误或产生费用。
  2. 接受条款,并跳过恢复资源页面
  3. 除非您已完成此实验或想要重新开始,否则请勿点击结束实验,因为点击后系统会清除您的工作并移除该项目

此内容目前不可用

一旦可用,我们会通过电子邮件告知您

太好了!

一旦可用,我们会通过电子邮件告知您

一次一个实验

确认结束所有现有实验并开始此实验

使用无痕浏览模式运行实验

请使用无痕模式或无痕式浏览器窗口运行此实验。这可以避免您的个人账号与学生账号之间发生冲突,这种冲突可能导致您的个人账号产生额外费用。