GSP903

總覽
Google Cloud Pub/Sub 是在應用程式與服務之間,交換事件資料的訊息傳遞服務,資料生產者將訊息發布至 Cloud Pub/Sub 主題,而消費者訂閱該主題。訂閱者會從訂閱項目提取訊息,或設定為推送訂閱項目的 Webhook。每個訂閱者都必須確認可設定時間範圍內的每個訊息。
Dataflow 是一項全代管服務,能夠轉換並充實串流 (即時) 模式和批次模式的資料,讓資料維持同等的穩定與明確性,並利用 Apache Beam SDK 提供簡化的管道開發環境。這個 SDK 具備多項時間區間設定與工作階段分析基元,以及來源與接收連接器生態系統。
Pub/Sub 是可擴充且耐用的事件擷取與傳送系統。Dataflow 透過視窗與緩衝機制,補強了 Pub/Sub 可擴充的「至少一次」傳送模型,實現訊息簡化、單次處理以及依序處理。
學習內容
- 讀取發布至 Pub/Sub 主題的訊息
- 依時間戳記建立訊息視窗或分組
- 將訊息寫入 Cloud Storage
設定
瞭解以下事項後,再點選「Start Lab」按鈕
請詳閱以下操作說明。實驗室活動會計時,且中途無法暫停。點選「Start Lab」後就會開始計時,顯示可使用 Google Cloud 資源的時間。
您將在真正的雲端環境完成實作實驗室活動,而不是模擬或示範環境。為此,我們會提供新的暫時憑證,供您在實驗室活動期間登入及存取 Google Cloud。
為了順利完成這個實驗室,請先確認:
- 可以使用標準的網際網路瀏覽器 (Chrome 瀏覽器為佳)。
注意事項:請使用無痕模式 (建議選項) 或私密瀏覽視窗執行此實驗室,這可以防止個人帳戶和學員帳戶之間的衝突,避免個人帳戶產生額外費用。
- 是時候完成實驗室活動了!別忘了,活動一旦開始將無法暫停。
注意事項:務必使用實驗室專用的學員帳戶。如果使用其他 Google Cloud 帳戶,可能會產生額外費用。
如何開始研究室及登入 Google Cloud 控制台
-
點選「Start Lab」按鈕。如果實驗室會產生費用,畫面上會出現選擇付款方式的對話方塊。左側的「Lab Details」窗格會顯示下列項目:
- 「Open Google Cloud console」按鈕
- 剩餘時間
- 必須在這個研究室中使用的臨時憑證
- 完成這個實驗室所需的其他資訊 (如有)
-
點選「Open Google Cloud console」;如果使用 Chrome 瀏覽器,也能按一下滑鼠右鍵,選取「在無痕視窗中開啟連結」。
接著,實驗室會啟動相關資源,並開啟另一個分頁,顯示「登入」頁面。
提示:您可以在不同的視窗中並排開啟分頁。
注意:如果頁面中顯示「選擇帳戶」對話方塊,請點選「使用其他帳戶」。
-
如有必要,請將下方的 Username 貼到「登入」對話方塊。
{{{user_0.username | "Username"}}}
您也可以在「Lab Details」窗格找到 Username。
-
點選「下一步」。
-
複製下方的 Password,並貼到「歡迎使用」對話方塊。
{{{user_0.password | "Password"}}}
您也可以在「Lab Details」窗格找到 Password。
-
點選「下一步」。
重要事項:請務必使用實驗室提供的憑證,而非自己的 Google Cloud 帳戶憑證。
注意:如果使用自己的 Google Cloud 帳戶來進行這個實驗室,可能會產生額外費用。
-
按過後續的所有頁面:
- 接受條款及細則。
- 由於這是臨時帳戶,請勿新增救援選項或雙重驗證機制。
- 請勿申請免費試用。
Google Cloud 控制台稍後會在這個分頁開啟。
注意:如要使用 Google Cloud 產品和服務,請點選「導覽選單」,或在「搜尋」欄位輸入服務或產品名稱。
啟動 Cloud Shell
Cloud Shell 是搭載多項開發工具的虛擬機器,提供永久的 5 GB 主目錄,而且在 Google Cloud 中運作。Cloud Shell 提供指令列存取權,方便您使用 Google Cloud 資源。
-
點按 Google Cloud 控制台頂端的「啟用 Cloud Shell」圖示
。
-
系統顯示視窗時,請按照下列步驟操作:
- 繼續操作 Cloud Shell 視窗。
- 授權 Cloud Shell 使用您的憑證發出 Google Cloud API 呼叫。
連線建立完成即代表已通過驗證,而且專案已設為您的 Project_ID:。輸出內容中有一行文字,宣告本工作階段的 Project_ID:
Your Cloud Platform project in this session is set to {{{project_0.project_id | "PROJECT_ID"}}}
gcloud 是 Google Cloud 的指令列工具,已預先安裝於 Cloud Shell,並支援 Tab 鍵自動完成功能。
- (選用) 您可以執行下列指令來列出使用中的帳戶:
gcloud auth list
- 點按「授權」。
輸出內容:
ACTIVE: *
ACCOUNT: {{{user_0.username | "ACCOUNT"}}}
To set the active account, run:
$ gcloud config set account `ACCOUNT`
- (選用) 您可以使用下列指令來列出專案 ID:
gcloud config list project
輸出內容:
[core]
project = {{{project_0.project_id | "PROJECT_ID"}}}
注意:如需 gcloud 的完整說明,請前往 Google Cloud 參閱 gcloud CLI 總覽指南。
設定區域
- 在 Cloud Shell 中執行下列指令,設定這個實驗室的專案區域:
gcloud config set compute/region {{{project_0.default_region | "REGION"}}}
確定已啟用 Dataflow API
請重新啟動連至 Dataflow API 的連線,確保可順利使用這個必要的 API。
gcloud services disable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}} --force
gcloud services enable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}}
點選「Check my progress」,確認目標已達成。
停用並重新啟用 Dataflow API
工作 1:建立專案資源
- 在 Cloud Shell 中,為 bucket、專案和區域建立變數。
PROJECT_ID=$(gcloud config get-value project)
BUCKET_NAME="${PROJECT_ID}-bucket"
TOPIC_ID=my-id
REGION={{{project_0.default_region | "filled in at lab start"}}}
- 設定 App Engine 區域。
注意:如果使用的區域不是 us-central1 和 europe-west1,請將 AppEngine 區域變數,依照指派的區域設定。如果分配到的區域是 us-central1,請將 AppEngine 區域變數設為 us-central。如果分配到的區域是 europe-west1,請將 AppEngine 區域變數設為 europe-west。
詳情請參閱 App Engine 位置。
AE_REGION={{{project_0.startup_script.app_region|region_to_be_set}}}
- 建立這個專案擁有的 Cloud Storage bucket:
gsutil mb gs://$BUCKET_NAME
注意:Cloud Storage bucket 名稱在全域範圍內都不可重複。您的 Qwiklabs 專案 ID 一律不會重複,因此本實驗室會使用該 ID 做為 bucket 名稱。
- 在本專案建立 Pub/Sub 主題:
gcloud pubsub topics create $TOPIC_ID
- 為專案建立 App Engine 應用程式:
gcloud app create --region=$AE_REGION
- 在本專案建立 Cloud Scheduler 工作,這項工作會每隔一分鐘將訊息發布至 Pub/Sub 主題:
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
--topic=$TOPIC_ID --message-body="Hello!"
- 如果系統提示您啟用 Cloud Scheduler API,請按下
y 鍵與 Enter 鍵。
點選「Check my progress」,確認目標已達成。
建立專案資源
- 啟動工作:
gcloud scheduler jobs run publisher-job
注意:如果遇到 RESOURCE_EXHAUSTED 錯誤,請嘗試再次執行指令。
- 使用下列指令複製快速入門存放區,並前往程式碼範例目錄:
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsub/streaming-analytics
docker run -it -e DEVSHELL_PROJECT_ID=$DEVSHELL_PROJECT_ID python:3.7 /bin/bash
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/pubsub/streaming-analytics
pip install -U -r requirements.txt # Install Apache Beam dependencies
注意:如果您使用 Python 選項,請個別執行 Python 指令。
點選「Check my progress」,確認目標已達成。
啟動 Cloud Scheduler 工作
工作 2:查看程式碼,瞭解如何將訊息從 Pub/Sub 串流至 Cloud Storage
程式碼範例
請查看下列程式碼範例,瞭解如何使用 Dataflow 執行下列作業:
- 讀取 Pub/Sub 訊息。
- 根據發布時間戳記,建立訊息視窗或將訊息分組至固定長度的時間間隔。
- 將每個視窗中的訊息寫入 Cloud Storage 的檔案。
import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
public class PubSubToGcs {
/*
* 定義您自己的設定選項。新增自訂引數,使用 * 指令列剖析器來處理這些引數,並指定引數的預設值。*/
public interface PubSubToGcsOptions extends StreamingOptions {
@Description("要讀取的 Cloud Pub/Sub 主題。")
@Required
String getInputTopic();
void setInputTopic(String value);
@Description("輸出檔案的視窗長度以分鐘為單位。")
@Default.Integer(1)
Integer getWindowSize();
void setWindowSize(Integer value);
@Description("含檔案名稱前置字元的輸出內容檔案路徑。")
@Required
String getOutput();
void setOutput(String value);
}
public static void main(String[] args) throws IOException {
// 寫入輸出時產生的 shard 數量上限。
int numShards = 1;
PubSubToGcsOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
pipeline
// 1) 讀取 Pub/Sub 主題的字串訊息。
.apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
// 2) 將訊息分組為固定長度的分鐘區間。
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
// 3) 為每個訊息視窗寫入一個檔案到 GCS。
.apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));
// 執行管道並等待執行完畢。
pipeline.run().waitUntilFinish();
}
}
import argparse
from datetime import datetime
import logging
import random
from apache_beam import (
DoFn,
GroupByKey,
io,
ParDo,
Pipeline,
PTransform,
WindowInto,
WithKeys,
)
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
class GroupMessagesByFixedWindows(PTransform):
"""複合轉換會依據發布時間,將 Pub/Sub 訊息分組,並輸出由多個元組組成的清單,每個元組內含一則訊息與其發布時間。
"""
def __init__(self, window_size, num_shards=5):
# 將視窗長度設為 60 秒。
self.window_size = int(window_size * 60)
self.num_shards = num_shards
def expand(self, pcoll):
return (
pcoll
# 使用元素時間戳記 (或發布時間),將視窗資訊連結至每個元素。
| "Window into fixed intervals"
>> WindowInto(FixedWindows(self.window_size))
| "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
# 根據 shard 數量,將隨機金鑰指派給各視窗元素。
| "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
# 依照金鑰將視窗元素分組。同一視窗中的所有元素都必須
# 能放入記憶體。否則必須使用 `beam.util.BatchElements`。
| "Group by key" >> GroupByKey()
)
class AddTimestamp(DoFn):
def process(self, element, publish_time=DoFn.TimestampParam):
"""將訊息內文及其發布時間擷取至元組,來處理各視窗元素。
"""
yield (
element.decode("utf-8"),
datetime.utcfromtimestamp(float(publish_time)).strftime(
"%Y-%m-%d %H:%M:%S.%f"
),
)
class WriteToGCS(DoFn):
def __init__(self, output_path):
self.output_path = output_path
def process(self, key_value, window=DoFn.WindowParam):
"""批次將訊息寫入 Google Cloud Storage。"""
ts_format = "%H:%M"
window_start = window.start.to_utc_datetime().strftime(ts_format)
window_end = window.end.to_utc_datetime().strftime(ts_format)
shard_id, batch = key_value
filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])
with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
for message_body, publish_time in batch:
f.write(f"{message_body},{publish_time}\n".encode())
def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
# 將 `save_main_session` 設為 True,讓 DoFns 存取全域匯入的模組。
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True
)
with Pipeline(options=pipeline_options) as pipeline:
(
pipeline
# Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
# binds the publish time returned by the Pub/Sub server for each message
# to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
# https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
| "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
| "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
| "Write to GCS" >> ParDo(WriteToGCS(output_path))
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_topic",
help="The Cloud Pub/Sub topic to read from."
'"projects//topics/".',
)
parser.add_argument(
"--window_size",
type=float,
default=1.0,
help="Output file's window size in minutes.",
)
parser.add_argument(
"--output_path",
help="Path of the output GCS file including the prefix.",
)
parser.add_argument(
"--num_shards",
type=int,
default=5,
help="Number of shards to use when writing windowed elements to GCS.",
)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.input_topic,
known_args.output_path,
known_args.window_size,
known_args.num_shards,
pipeline_args,
)
注意:如要進一步瞭解程式碼範例,請造訪 java-docs-samples 和 python-docs-samples GitHub 頁面。
工作 3:啟動管道
- 如要啟動管道,請執行下列指令:
mvn compile exec:java \
-Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--project=$PROJECT_ID \
--region=$REGION \
--inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
--output=gs://$BUCKET_NAME/samples/output \
--runner=DataflowRunner \
--windowSize=2 \
--tempLocation=gs://$BUCKET_NAME/temp"
python PubSubToGCS.py \
--project=project_id \
--region=region \
--input_topic=projects/project_id/topics/my-id \
--output_path=gs://bucket_name/samples/output \
--runner=DataflowRunner \
--window_size=2 \
--num_shards=2 \
--temp_location=gs://bucket_name/temp
注意:執行 Python 指令時,請將 project_id、bucket_name 和 region,替換為您的專案 ID、bucket 名稱和指派的實驗室區域。
上述指令會在本地端執行,並啟動在雲端執行的 Dataflow 工作。
注意:程式碼完全執行完畢,以及管道工作出現在下一項工作的 Dataflow 控制台,可能需要 10 分鐘左右,請稍候片刻。
注意:如果收到有關 StaticLoggerBinder 的警告,請放心,可以忽略並繼續進行實驗室。
點選「Check my progress」,確認目標已達成。
啟動管道並執行 Dataflow 工作
工作 4:觀察工作和管道進度
-
前往 Dataflow 控制台,查看工作進度。
-
點選「重新整理」,即可查看工作和最新狀態。

- 點選工作名稱,開啟工作詳細資料並查看下列資訊:

您可能需要再等幾分鐘,才能在 Cloud Storage 看到輸出檔案。
- 如要查看輸出檔案,請依序前往「導覽選單」>「Cloud Storage」,依序點選您的 bucket 名稱和「範例」。

- 或者,您也可以在 Cloud Shell 按下 CTRL+C,結束應用程式 (如果是 Python 請輸入
exit),然後執行下列指令,列出已寫入 Cloud Storage 的檔案:
gsutil ls gs://${BUCKET_NAME}/samples/
輸出內容應如下所示:
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
工作 5:清除所用資源
- 如果尚未退出應用程式,請在 Cloud Shell 輸入 CTRL+C 退出。
如果是 Python 選項,請輸入 exit 離開 Python 環境。
- 在 Cloud Shell 刪除 Cloud Scheduler 工作:
gcloud scheduler jobs delete publisher-job
如果出現「確定要繼續操作嗎?」的提示訊息,請按下 Y 鍵和 Enter 鍵。
- 在 Dataflow 控制台,選取工作名稱並點選「停止」,即可停止工作。
系統提示時,依序點選「停止工作」>「取消」,即可取消管道而不排空。
- 在 Cloud Shell 刪除主題:
gcloud pubsub topics delete $TOPIC_ID
- 在 Cloud Shell 刪除管道建立的檔案:
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
- 在 Cloud Shell 刪除 Cloud Storage bucket:
gsutil rb gs://${BUCKET_NAME}
恭喜!
您已建立 Dataflow 管道,可從 Pub/Sub 主題讀取訊息、依時間戳記建立訊息視窗,並將訊息寫入 Cloud Storage bucket。
後續步驟/瞭解詳情
Google Cloud 教育訓練與認證
協助您瞭解如何充分運用 Google Cloud 的技術。我們的課程會介紹專業技能和最佳做法,讓您可以快速掌握要領並持續進修。我們提供從基本到進階等級的訓練課程,並有隨選、線上和虛擬課程等選項,方便您抽空參加。認證可協助您驗證及證明自己在 Google Cloud 技術方面的技能和專業知識。
使用手冊上次更新日期:2025 年 8 月 20 日
實驗室上次測試日期:2025 年 8 月 20 日
Copyright 2025 Google LLC 保留所有權利。Google 和 Google 標誌是 Google LLC 的商標,其他公司和產品名稱則有可能是其關聯公司的商標。