GSP903

概要
Google Cloud Pub/Sub は、アプリケーションやサービスの間でイベントデータを交換するためのメッセージング サービスです。データのプロデューサーは Pub/Sub のトピックにメッセージをパブリッシュし、コンシューマはそのトピックにサブスクリプションを作成します。サブスクライバーは、サブスクリプションからメッセージを pull するか、push サブスクリプションの webhook として構成されます。すべてのサブスクライバーは、構成可能な時間の範囲内で各メッセージに確認応答を返信する必要があります。
Dataflow は、ストリーミング(リアルタイム)モードのデータとバッチモードのデータを同等の信頼性と表現力で変換、活用するフルマネージド サービスです。Apache Beam SDK を使用して、簡素化されたパイプライン開発環境を提供します。Apache Beam SDK は、ウィンドウ処理とセッション分析のプリミティブが豊富に用意されているだけでなく、ソースとシンクのコネクタからなるエコシステムも提供しています。
Pub/Sub は、スケーラブルで耐久性のあるイベントの取り込みおよび配信システムです。ウィンドウとバッファリングを使用する場合、Dataflow は、Pub/Sub のスケーラブルな「最低 1 回」配信モデルに、メッセージ重複排除と「1 回限り」「正しい順序」の処理機能を追加します。
演習内容
- Pub/Sub トピックにパブリッシュされたメッセージを読む
- タイムスタンプごとにメッセージをウィンドウ処理(グループ化)する
- Cloud Storage にメッセージを書き込む
設定
[ラボを開始] ボタンをクリックする前に
こちらの説明をお読みください。ラボには時間制限があり、一時停止することはできません。タイマーは、Google Cloud のリソースを利用できる時間を示しており、[ラボを開始] をクリックするとスタートします。
このハンズオンラボでは、シミュレーションやデモ環境ではなく実際のクラウド環境を使って、ラボのアクティビティを行います。そのため、ラボの受講中に Google Cloud にログインおよびアクセスするための、新しい一時的な認証情報が提供されます。
このラボを完了するためには、下記が必要です。
- 標準的なインターネット ブラウザ(Chrome を推奨)
注: このラボの実行には、シークレット モード(推奨)またはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウント間の競合を防ぎ、個人アカウントに追加料金が発生しないようにすることができます。
- ラボを完了するための時間(開始後は一時停止できません)
注: このラボでは、受講者アカウントのみを使用してください。別の Google Cloud アカウントを使用すると、そのアカウントに料金が発生する可能性があります。
ラボを開始して Google Cloud コンソールにログインする方法
-
[ラボを開始] ボタンをクリックします。ラボの料金をお支払いいただく必要がある場合は、表示されるダイアログでお支払い方法を選択してください。
左側の [ラボの詳細] ペインには、以下が表示されます。
- [Google Cloud コンソールを開く] ボタン
- 残り時間
- このラボで使用する必要がある一時的な認証情報
- このラボを行うために必要なその他の情報(ある場合)
-
[Google Cloud コンソールを開く] をクリックします(Chrome ブラウザを使用している場合は、右クリックして [シークレット ウィンドウで開く] を選択します)。
ラボでリソースがスピンアップし、別のタブで [ログイン] ページが表示されます。
ヒント: タブをそれぞれ別のウィンドウで開き、並べて表示しておきましょう。
注: [アカウントの選択] ダイアログが表示されたら、[別のアカウントを使用] をクリックします。
-
必要に応じて、下のユーザー名をコピーして、[ログイン] ダイアログに貼り付けます。
{{{user_0.username | "Username"}}}
[ラボの詳細] ペインでもユーザー名を確認できます。
-
[次へ] をクリックします。
-
以下のパスワードをコピーして、[ようこそ] ダイアログに貼り付けます。
{{{user_0.password | "Password"}}}
[ラボの詳細] ペインでもパスワードを確認できます。
-
[次へ] をクリックします。
重要: ラボで提供された認証情報を使用する必要があります。Google Cloud アカウントの認証情報は使用しないでください。
注: このラボでご自身の Google Cloud アカウントを使用すると、追加料金が発生する場合があります。
-
その後次のように進みます。
- 利用規約に同意してください。
- 一時的なアカウントなので、復元オプションや 2 要素認証プロセスは設定しないでください。
- 無料トライアルには登録しないでください。
その後、このタブで Google Cloud コンソールが開きます。
注: Google Cloud のプロダクトやサービスにアクセスするには、ナビゲーション メニューをクリックするか、[検索] フィールドにサービス名またはプロダクト名を入力します。
Cloud Shell をアクティブにする
Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。
-
Google Cloud コンソールの上部にある「Cloud Shell をアクティブにする」アイコン
をクリックします。
-
ウィンドウで次の操作を行います。
- Cloud Shell 情報ウィンドウで操作を進めます。
- Cloud Shell が認証情報を使用して Google Cloud API を呼び出すことを承認します。
接続した時点で認証が完了しており、プロジェクトに各自の Project_ID、 が設定されます。出力には、このセッションの PROJECT_ID を宣言する次の行が含まれています。
Your Cloud Platform project in this session is set to {{{project_0.project_id | "PROJECT_ID"}}}
gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。
- (省略可)次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
gcloud auth list
- [承認] をクリックします。
出力:
ACTIVE: *
ACCOUNT: {{{user_0.username | "ACCOUNT"}}}
To set the active account, run:
$ gcloud config set account `ACCOUNT`
- (省略可)次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
gcloud config list project
出力:
[core]
project = {{{project_0.project_id | "PROJECT_ID"}}}
注: Google Cloud における gcloud ドキュメントの全文については、gcloud CLI の概要ガイドをご覧ください。
リージョンを設定する
- Cloud Shell で次のコマンドを実行して、このラボのプロジェクト リージョンを設定します。
gcloud config set compute/region {{{project_0.default_region | "REGION"}}}
Dataflow API が有効になっていることを確認する
必要な API にアクセスできることを確認するには、Dataflow API への接続をリセットします。
gcloud services disable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}} --force
gcloud services enable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}}
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
Dataflow API を無効にし、再度有効にする
タスク 1. プロジェクト リソースを作成する
- Cloud Shell で、バケット、プロジェクト、リージョンの変数を作成します。
PROJECT_ID=$(gcloud config get-value project)
BUCKET_NAME="${PROJECT_ID}-bucket"
TOPIC_ID=my-id
REGION={{{project_0.default_region | "filled in at lab start"}}}
- App Engine リージョンを設定します。
注: us-central1 と europe-west1 以外のリージョンでは、AppEngine リージョン変数を割り当てられたリージョンと同じに設定してください。us-central1 が割り当てられている場合は、AppEngine リージョン変数を us-central に設定します。europe-west1 が割り当てられている場合は、AppEngine リージョン変数を europe-west に設定します。
詳細については、App Engine のロケーションをご覧ください。
AE_REGION={{{project_0.startup_script.app_region|region_to_be_set}}}
- このプロジェクトが所有する Cloud Storage バケットを作成します。
gsutil mb gs://$BUCKET_NAME
注: Cloud Storage バケット名は、グローバルに一意である必要があります。Qwiklabs プロジェクト ID は常に一意であるため、このラボではバケット名に使用されます。
- このプロジェクトで Pub/Sub トピックを作成します。
gcloud pubsub topics create $TOPIC_ID
- プロジェクトの App Engine アプリを作成します。
gcloud app create --region=$AE_REGION
- このプロジェクトで Cloud Scheduler ジョブを作成します。このジョブは、1 分間隔で Pub/Sub トピックにメッセージをパブリッシュします。
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
--topic=$TOPIC_ID --message-body="Hello!"
- Cloud Scheduler API を有効にするように求められたら、「
y」と入力して Enter キーを押します。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
プロジェクト リソースを作成する
- ジョブを開始します。
gcloud scheduler jobs run publisher-job
注: RESOURCE_EXHAUSTED のエラーが発生した場合は、コマンドを再度実行してください。
- 次のコマンドを使用して、クイックスタート リポジトリのクローンを作成し、サンプルコード ディレクトリに移動します。
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsub/streaming-analytics
docker run -it -e DEVSHELL_PROJECT_ID=$DEVSHELL_PROJECT_ID python:3.7 /bin/bash
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/pubsub/streaming-analytics
pip install -U -r requirements.txt # Apache Beam の依存関係をインストールする
注: Python オプションを使用している場合は、Python コマンドを個別に実行してください。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
Cloud Scheduler ジョブを開始する
タスク 2. Pub/Sub から Cloud Storage にメッセージをストリーミングするコードを確認する
コードサンプル
Dataflow を使用して次のことを行うサンプルコードを確認します。
- Pub/Sub メッセージを読み取ります。
- パブリッシュ タイムスタンプにより、固定サイズの間隔でメッセージをウィンドウ処理(グループ化)します。
- 各ウィンドウのメッセージを Cloud Storage のファイルに書き込みます。
import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
public class PubSubToGcs {
/*
* 独自の構成オプションを定義します。コマンドライン パーサーによって処理される
* 独自の引数を追加し、それらのデフォルト値を指定できます。
*/
public interface PubSubToGcsOptions extends StreamingOptions {
@Description("The Cloud Pub/Sub topic to read from.")
@Required
String getInputTopic();
void setInputTopic(String value);
@Description("Output file's window size in number of minutes.")
@Default.Integer(1)
Integer getWindowSize();
void setWindowSize(Integer value);
@Description("Path of the output file including its filename prefix.")
@Required
String getOutput();
void setOutput(String value);
}
public static void main(String[] args) throws IOException {
// 出力書き込み時のシャードの最大数。
int numShards = 1;
PubSubToGcsOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
pipeline
// 1) Pub/Sub トピックから文字列メッセージを読み取ります。
.apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
// 2) メッセージを固定サイズの分間隔でグループ化します。
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
// 3) メッセージのウィンドウごとに 1 つのファイルを GCS に書き込みます。
.apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));
// パイプラインを実行して終了するまで待ちます。
pipeline.run().waitUntilFinish();
}
}
import argparse
from datetime import datetime
import logging
import random
from apache_beam import (
DoFn,
GroupByKey,
io,
ParDo,
Pipeline,
PTransform,
WindowInto,
WithKeys,
)
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
class GroupMessagesByFixedWindows(PTransform):
"""A composite transform that groups Pub/Sub messages based on publish time
and outputs a list of tuples, each containing a message and its publish time.
"""
def __init__(self, window_size, num_shards=5):
# ウィンドウ サイズを 60 秒に設定します。
self.window_size = int(window_size * 60)
self.num_shards = num_shards
def expand(self, pcoll):
return (
pcoll
# 要素のタイムスタンプ(またはパブリッシュ時間)を使用して、ウィンドウを各要素にバインドします。
| "Window into fixed intervals"
>> WindowInto(FixedWindows(self.window_size))
| "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
# シャードの数に基づいて、ウィンドウ化された各要素にランダムキーを割り当てます。
| "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
# ウィンドウ化された要素をキーでグループ化します。同じウィンドウ内のすべての要素が収まるだけの
# メモリが必要です。そうでない場合は、`beam.util.BatchElements` を使用する必要があります。
| "Group by key" >> GroupByKey()
)
class AddTimestamp(DoFn):
def process(self, element, publish_time=DoFn.TimestampParam):
"""Processes each windowed element by extracting the message body and its
publish time into a tuple.
"""
yield (
element.decode("utf-8"),
datetime.utcfromtimestamp(float(publish_time)).strftime(
"%Y-%m-%d %H:%M:%S.%f"
),
)
class WriteToGCS(DoFn):
def __init__(self, output_path):
self.output_path = output_path
def process(self, key_value, window=DoFn.WindowParam):
"""Write messages in a batch to Google Cloud Storage."""
ts_format = "%H:%M"
window_start = window.start.to_utc_datetime().strftime(ts_format)
window_end = window.end.to_utc_datetime().strftime(ts_format)
shard_id, batch = key_value
filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])
with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
for message_body, publish_time in batch:
f.write(f"{message_body},{publish_time}\n".encode())
def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
# `save_main_session` を True に設定して、DoFn がグローバルにインポートされたモジュールにアクセスできるようにします。
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True
)
with Pipeline(options=pipeline_options) as pipeline:
(
pipeline
# `timestamp_attribute` が `ReadFromPubSub` で指定されていないため、Beam は
# 各メッセージの Pub/Sub サーバーから返されたパブリッシュ時間を
# 要素のタイムスタンプ パラメータにバインドします。このパラメータには `DoFn.TimestampParam` を使用してアクセスできます。
# https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
| "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
| "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
| "Write to GCS" >> ParDo(WriteToGCS(output_path))
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_topic",
help="The Cloud Pub/Sub topic to read from."
'"projects//topics/".',
)
parser.add_argument(
"--window_size",
type=float,
default=1.0,
help="Output file's window size in minutes.",
)
parser.add_argument(
"--output_path",
help="Path of the output GCS file including the prefix.",
)
parser.add_argument(
"--num_shards",
type=int,
default=5,
help="Number of shards to use when writing windowed elements to GCS.",
)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.input_topic,
known_args.output_path,
known_args.window_size,
known_args.num_shards,
pipeline_args,
)
注: サンプルコードの詳細については、java-docs-samples と python-docs-samples の GitHub ページをご覧ください。
タスク 3. パイプラインを開始する
- パイプラインを開始するには、次のコマンドを実行します。
mvn compile exec:java \
-Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--project=$PROJECT_ID \
--region=$REGION \
--inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
--output=gs://$BUCKET_NAME/samples/output \
--runner=DataflowRunner \
--windowSize=2 \
--tempLocation=gs://$BUCKET_NAME/temp"
python PubSubToGCS.py \
--project=project_id \
--region=region \
--input_topic=projects/project_id/topics/my-id \
--output_path=gs://bucket_name/samples/output \
--runner=DataflowRunner \
--window_size=2 \
--num_shards=2 \
--temp_location=gs://bucket_name/temp
注: python コマンドを実行するときは、project_id、bucket_name、region を、自分のプロジェクト ID、バケット名、割り当てられたラボのリージョンに置き換えてください。
上記のコマンドがローカルで実行され、クラウドで実行される Dataflow ジョブを起動します。
注: コードが完全に実行され、次のタスクでパイプライン ジョブが Dataflow コンソールに表示されるまで、約 10 分かかる場合があります。
注: StaticLoggerBinder に関する警告が表示された場合は、無視してラボを進めてください。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
パイプラインを開始して Dataflow ジョブを起動する
タスク 4. ジョブとパイプラインの進行状況を確認する
-
Dataflow コンソールに移動して、ジョブの進行状況を確認します。
-
[更新] をクリックして、ジョブと最新のステータス更新を確認します。

- ジョブ名をクリックしてジョブの詳細を開き、以下を確認します。

Cloud Storage に出力ファイルが表示されるまで、さらに数分間かかる場合があります。
- 出力ファイルは、ナビゲーション メニュー > [Cloud Storage] に移動し、バケット名をクリックして [サンプル] をクリックすると確認できます。

- または、CTRL+C(Python オプションの場合は「
exit」と入力)を使用して Cloud Shell でアプリケーションを終了し、次のコマンドを実行して、Cloud Storage に書き出されたファイルを一覧表示します。
gsutil ls gs://${BUCKET_NAME}/samples/
出力は次のようになります。
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
タスク 5. クリーンアップ
- まだ行っていない場合は、Cloud Shell で Ctrl+C キーを押してアプリケーションを終了します。
Python オプションの場合は、exit と入力して Python 環境を終了します。
- Cloud Shell で、Cloud Scheduler ジョブを削除します。
gcloud scheduler jobs delete publisher-job
「Do you want to continue」というプロンプトが表示されたら、Y キーを押して Enter キーを押します。
- Dataflow コンソールで、ジョブ名を指定して [停止] をクリックし、ジョブを停止します。
プロンプトが表示されたら、[ジョブを中止] > [キャンセル] をクリックして、パイプラインをドレインせずにキャンセルします。
- Cloud Shell で、トピックを削除します。
gcloud pubsub topics delete $TOPIC_ID
- Cloud Shell で、パイプラインによって作成されたファイルを削除します。
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
- Cloud Shell で、Cloud Storage バケットを削除します。
gsutil rb gs://${BUCKET_NAME}
お疲れさまでした
Pub/Sub トピックからメッセージを読み取り、タイムスタンプでウィンドウ処理を行い、Cloud Storage バケットに書き込む Dataflow パイプラインを作成しました。
次のステップと詳細情報
Google Cloud トレーニングと認定資格
Google Cloud トレーニングと認定資格を通して、Google Cloud 技術を最大限に活用できるようになります。必要な技術スキルとベスト プラクティスについて取り扱うクラスでは、学習を継続的に進めることができます。トレーニングは基礎レベルから上級レベルまであり、オンデマンド、ライブ、バーチャル参加など、多忙なスケジュールにも対応できるオプションが用意されています。認定資格を取得することで、Google Cloud テクノロジーに関するスキルと知識を証明できます。
マニュアルの最終更新日: 2025 年 8 月 20 日
ラボの最終テスト日: 2025 年 8 月 20 日
Copyright 2025 Google LLC. All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。