概要
このラボの内容:
- Apache Beam のテストツールを使用して
DoFn と PTransform の単体テストを作成する。
- パイプライン統合テストを実施する。
-
TestStream クラスを使用して、ストリーミング パイプラインのウィンドウ処理の動作をテストする。
パイプラインのテストは、効果的なデータ処理ソリューションの開発で特に重要なステップです。Beam モデルの間接的な性質により、失敗した実行のデバッグが複雑なタスクになることがあります。
このラボでは、Beam SDK のテスト用パッケージに含まれるツールを使い、ローカルで DirectRunner による単体テストを行う方法を学びます。
設定と要件
[ラボを開始] ボタンをクリックする前に
注: 以下の説明をお読みください。
ラボには時間制限があり、一時停止することはできません。[ラボを開始] をクリックするとスタートするタイマーは、Google Cloud のリソースを利用できる時間を示しています。
この Google Skills ハンズオンラボでは、シミュレーションやデモ環境ではなく、実際のクラウド環境を使ってご自身でラボのアクティビティを実施できます。そのため、ラボの受講中に Google Cloud にログインおよびアクセスするための、新しい一時的な認証情報が提供されます。
必要なもの
このラボを完了するためには、下記が必要です。
- 標準的なインターネット ブラウザ(Chrome を推奨)
- ラボを完了するために十分な時間
注: すでに個人の Google Cloud アカウントやプロジェクトをお持ちの場合でも、ラボでは使用しないでください。
注: Pixelbook を使用している場合は、このラボをシークレット ウィンドウで実施してください。
ラボを開始してコンソールにログインする方法
-
[ラボを開始] ボタンをクリックします。ラボの料金をお支払いいただく必要がある場合は、表示されるポップアップでお支払い方法を選択してください。
左側のパネルには、このラボで使用する必要がある一時的な認証情報が表示されます。
![[認証情報] パネル](https://cdn.qwiklabs.com/%2FtHp4GI5VSDyTtdqi3qDFtevuY014F88%2BFow%2FadnRgE%3D)
-
ユーザー名をコピーし、[Google Console を開く] をクリックします。
ラボでリソースが起動し、別のタブで [アカウントの選択] ページが表示されます。
注: タブをそれぞれ別のウィンドウで開き、並べて表示しておきましょう。
-
[アカウントの選択] ページで [別のアカウントを使用] をクリックします。[ログイン] ページが開きます。
![[別のアカウントを使用] オプションがハイライト表示されている、アカウントのダイアログ ボックスを選択します。](https://cdn.qwiklabs.com/eQ6xPnPn13GjiJP3RWlHWwiMjhooHxTNvzfg1AL2WPw%3D)
-
[接続の詳細] パネルでコピーしたユーザー名を貼り付けます。パスワードもコピーして貼り付けます。
注: 認証情報は [接続の詳細] パネルに表示されたものを使用してください。Google Skills の認証情報は使用しないでください。請求が発生する事態を避けるため、Google Cloud アカウントをお持ちの場合でも、このラボでは使用しないでください。
- その後次のように進みます。
- 利用規約に同意します。
- 一時的なアカウントなので、復元オプションや 2 要素認証プロセスは設定しないでください。
- 無料トライアルには登録しないでください。
しばらくすると、このタブで Cloud コンソールが開きます。
注: 左上にある [ナビゲーション メニュー] をクリックすると、Google Cloud のプロダクトやサービスのリストが含まれるメニューが表示されます。
このラボでは、すべてのコマンドをインスタンス ノートブックのターミナルで実行します。
-
Google Cloud コンソールのナビゲーション メニュー(
)で [Vertex AI] を選択します。
-
[すべての推奨 API を有効化] をクリックします。
-
ナビゲーション メニューで [ワークベンチ] をクリックします。
[ワークベンチ] ページの上部で、[インスタンス] ビューになっていることを確認します。
-
[
新規作成] をクリックします。
-
インスタンスの構成:
-
名前: lab-workbench
-
リージョン: リージョンを に設定します
-
ゾーン: ゾーンを に設定します
-
詳細オプション(任意): 必要に応じて [詳細オプション] をクリックして、より詳細なカスタマイズを行います(マシンタイプ、ディスクサイズなど)。

- [作成] をクリックします。
インスタンスが作成されるまで数分かかります。作成が終了するとインスタンスの名前の横に緑色のチェックマークが付きます。
- インスタンスの名前の横に表示されている [JupyterLab を開く] をクリックして JupyterLab インターフェースを起動します。ブラウザで新しいタブが開きます。

- 次に、[ターミナル] をクリックします。これにより、このラボのすべてのコマンドを実行できるターミナルが開きます。
コード リポジトリをダウンロードする
このラボで使用するコード リポジトリをダウンロードします。
- 開いたターミナルで、次のコマンドを入力します。
git clone https://github.com/GoogleCloudPlatform/training-data-analyst
cd /home/jupyter/training-data-analyst/quests/dataflow_python/
-
ノートブック環境の左側パネルのファイル ブラウザに、training-data-analyst リポジトリが追加されます。
-
クローン リポジトリ /training-data-analyst/quests/dataflow_python/ に移動します。ラボごとに、1 つのフォルダが表示されます。このフォルダはさらに、完成させるコードが格納される lab サブフォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution サブフォルダとに分けられています。
![展開された [表示] メニューでハイライト表示されているエクスプローラ オプション](https://cdn.qwiklabs.com/yeDC%2FCILaUn6YJSJjfGKlB6ju13WPLsGaZj6tEqxpw0%3D)
注: 編集のためにファイルを開くには、目的のファイルに移動してクリックします。
ファイルが開き、コードを追加または変更できます。
ラボのコードは 8a_Batch_Testing_Pipeline/lab と 8b_Stream_Testing_Pipeline/lab の 2 つのフォルダに分かれています。ヒントが必要な場合は、それぞれに対応する solution フォルダでソリューションを確認してください。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
ノートブック インスタンスを作成し、コース リポジトリのクローンを作成する
ラボ パート 1: DoFn と PTransform の単体テストを実施する
タスク 1. 環境を準備する
ここでは、気象センサーの統計を計算するバッチ パイプラインの DoFn と PTransform の単体テストを実施します。作成した変換のテストには、以下のパターンと Beam が提供する変換を使用できます。
-
TestPipeline を作成します。
- テスト入力データを作成し、
Create 変換を使って入力データの PCollection を作成します。
- 変換を入力
PCollection に適用して、生成された PCollection を保存します。
-
testing.util モジュールの assert_that メソッドとその他のメソッドを使用して、想定される要素が出力 PCollection に含まれていることを確認します。
TestPipeline は Beam SDK に含まれる特別なクラスで、特に変換とパイプライン ロジックのテストに使います。テストでは、パイプライン オブジェクトの作成時に、Pipeline の代わりに TestPipeline を使用します。Create 変換では、オブジェクト(Java Iterable)のインメモリ コレクションを利用して、このコレクションから PCollection を作成します。目的は、想定される出力 PCollection がわかっている少量のテスト入力データを PTransform から得ることです。
with TestPipeline() as p:
INPUTS = [fake_input_1, fake_input_2]
test_output = p | beam.Create(INPUTS) | # テスト対象とする変換
最後に、出力 PCollection が想定される出力と一致することを確認するために、assert_that メソッドを使用します。たとえば、equal_to メソッドを使用すると、出力 PCollection に正しい要素があることを確認できます。
assert_that(test_output, equal_to(EXPECTED_OUTPUTS))
以前のラボと同様に、最初のステップはパイプラインで処理するデータを生成することです。ラボ環境を開いて、以前と同じようにデータを生成します。
適切なラボを開く
- IDE のターミナルで次のコマンドを実行して、このラボで使用するディレクトリに変更します。
# ラボのディレクトリに移動する
cd 8a_Batch_Testing_Pipeline/lab
export BASE_DIR=$(pwd)
仮想環境と依存関係を設定する
実際のパイプライン コードの編集を開始する前に、必要な依存関係がインストールされていることを確認する必要があります。
- 次のコマンドを実行して、このラボでの作業用に仮想環境を作成します。
sudo apt-get install -y python3-venv
# 仮想環境を作成して有効化する
python3 -m venv df-env
source df-env/bin/activate
- 次に、パイプラインを実行するために必要なパッケージをインストールします。
python3 -m pip install -q --upgrade pip setuptools wheel
python3 -m pip install apache-beam[gcp]
- Dataflow API が有効になっていることを確認します。
gcloud services enable dataflow.googleapis.com
- 最後にストレージ バケットを作成します。
export PROJECT_ID=$(gcloud config get-value project)
gcloud storage buckets create gs://$PROJECT_ID --location=US
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
環境を準備する
タスク 2. パイプラインのメインコードを確認する
-
ファイル エクスプローラで 8a_Batch_Testing_Pipeline/lab に移動します。このディレクトリには、パイプラインのメインコードを含む weather_statistics_pipeline.py と、テスト用コードを含む weather_statistics_pipeline_test.py の 2 つのファイルがあります。
-
まずは 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline.py を開きます。
単体テストに取りかかる前に、パイプライン コードを簡単に確認します。最初は WeatherRecord クラス(6 行目以降)です。これは typing.NamedTuple のサブクラスなので、スキーマ対応の変換を使用してこのクラスのオブジェクトを操作できます。後ほど、テスト用にこのクラスのオブジェクトのインメモリ コレクションも定義します。
class WeatherRecord(typing.NamedTuple):
loc_id: str
lat: float
lng: float
date: str
low_temp: float
high_temp: float
precip: float
-
DoFn と PTransform の定義の先頭(17 行目)までスクロールします。
このパイプラインのコンセプトについてはこれまでのラボでおおむね説明していますが、以下の箇所は詳しく確認するようにしてください。
-
DoFn の ConvertCsvToWeatherRecord(17 行目以降)と ConvertTempUnits(27 行目以降)。後でこれらの DoFn の単体テストを行います。
-
PTransform ComputeStatistics(41 行目以降)。DoFn と同じ方法でテストできる複合変換の例です。
-
PTransform WeatherStatsTransform(55 行目以降)。この PTransform には、Create 変換で作成された合成データに小規模なパイプライン統合テストを実施できるように、パイプライン全体(ソースおよびシンク変換を除く)に使用する処理ロジックが含まれています。
注: 処理コードに含まれる論理エラーに気付いても、まだ修正しないでください。後で、テストによってエラーを絞り込む方法を説明します。
タスク 3. テスト用の依存関係を追加する
- ファイル エクスプローラで 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py を開きます。
テストのために依存関係をいくつか追加する必要があります。Apache Beam に含まれるテスト ユーティリティと Python の unittest パッケージを利用します。
- このタスクを完了するには、weather_statistics_pipeline_test.py の先頭の必要な場所に次の import ステートメントを追加します。
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import BeamAssertException
from apache_beam.testing.util import assert_that, equal_to
ヒントが必要な場合は、ソリューションをご覧ください。
タスク 4. Apache Beam で最初の DoFn 単体テストを作成する
8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py ファイルには、DoFn と PTransform の単体テストに必要なコードが含まれています。現在はほとんどのコードがコメントアウトされていますが、後でコメント化解除します。
Beam コードを確認する前に、テストの実行とテキスト ファイルへのテスト出力の書き込みを管理するカスタムの main メソッドが定義されていることに注意してください。これにより、テストを記録して現在のターミナル セッション終了後に参照できます。次のように logging モジュールを使用して管理することもできます。
def main(out = sys.stderr, verbosity = 2):
loader = unittest.TestLoader()
suite = loader.loadTestsFromModule(sys.modules[__name__])
unittest.TextTestRunner(out, verbosity = verbosity).run(suite)
# テスト用コードは省略
if __name__ == '__main__':
with open('testing.out', 'w') as f:
main(f)
- まず、
ConvertCsvToWeatherRecord DoFn(43 行目以降)の DoFn 単体テストを確認します。最初にパイプラインをテストするためのクラスを作成し、次に TestPipeline オブジェクトを作成します。
class ConvertToWeatherRecordTest(unittest.TestCase):
def test_convert_to_csv(self):
with TestPipeline() as p:
...
- 最初のテストのための(不完全な)コードを見てみます。
LINES = ['x,0.0,0.0,2/2/2021,1.0,2.0,0.1']
EXPECTED_OUTPUT = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 1.0, 2.0, 0.1)]
input_lines = p | # タスク 4: LINES から PCollection を作成する
output = input_lines | beam.ParDo(ConvertCsvToWeatherRecord())
# タスク 4: assert_that ステートメントを記述する
CSV ファイル(パイプラインで想定される入力形式)の 1 行を表す 1 つのテスト入力(LINES)を作成して、リストに追加します。想定される出力(EXPECTED_OUTPUT)も WeatherRecords のリストとして定義します。
テスト用コードの残りの部分には、欠けているパーツがいくつかあります。
-
このタスクを完了するには、まず LINES を PCollection に変換する Create 変換を追加します。
-
次に、equal_to メソッドを使用して output と EXPECTED_OUTPUT を比較する assert_that ステートメントを追加します。
ヒントが必要な場合は、後述のコメント付きのテスト、またはソリューションをご覧ください。
タスク 5. 最初の DoFn 単体テストを実施する
python3 weather_statistics_pipeline_test.py
cat testing.out
テストの出力が testing.out ファイルに書き込まれます。ターミナルで cat testing.out を実行すれば、このファイルの内容を確認できます。前のタスクが正常に完了している場合、testing.out ファイルの内容は次のようになります(経過時間は異なる場合があります)。
test_convert_to_csv (__main__.ConvertToWeatherRecordTest) ... ok
----------------------------------------------------------------------
Ran 1 test in 0.918s
OK
注: 上記のテストはコマンド「python3 -m unittest test_script.py」を使用して実行された可能性があります。Python スクリプトを直接実行して main メソッドにアクセスしましたが、実際にはここで説明したアプローチの方が一般的になってきています。
タスク 6. 2 度目の DoFn 単体テストを実施してパイプラインをデバッグする
-
8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py に戻り、2 度目の単体テストのためにコード(33~50 行付近)をコメント化解除します。これを行うには、コードをハイライト表示して Ctrl + / キー(MacOS の場合は Cmd + / キー)を押します。参考までに、コードを以下に示します。
class ConvertTempUnitsTest(unittest.TestCase):
def test_convert_temp_units(self):
with TestPipeline() as p:
RECORDS = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 1.0, 2.0, 0.1),
WeatherRecord('y', 0.0, 0.0, '2/2/2021', -3.0, -1.0, 0.3)]
EXPECTED_RECORDS = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 33.8, 35.6, 0.1),
WeatherRecord('y', 0.0, 0.0, '2/2/2021', 26.6, 30.2, 0.3)]
input_records = p | beam.Create(RECORDS)
output = input_records | beam.ParDo(ConvertTempUnits())
assert_that(output, equal_to(EXPECTED_RECORDS))
このテストにより、ConvertTempUnits() DoFn が想定どおりに機能するかどうかを確認できます。weather_statistics_pipeline_test.py を保存し、ターミナルに戻ります。
- 次のコマンドを実行してテストを行い、出力を確認します。
rm testing.out
python3 weather_statistics_pipeline_test.py
cat testing.out
今回のテストは失敗します。出力をスクロールしていくと、テストの失敗について次の情報が見つかります。
test_compute_statistics (__main__.ComputeStatsTest) ... ok
test_convert_temp_units (__main__.ConvertTempUnitsTest) ... ERROR
...
apache_beam.testing.util.BeamAssertException: Failed assert: [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=33.8, high_temp=35.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=26.6, high_temp=30.2, precip=0.3)] == [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=32.8, high_temp=34.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=25.6, high_temp=29.2, precip=0.3)], unexpected elements [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=32.8, high_temp=34.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=25.6, high_temp=29.2, precip=0.3)], missing elements [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=33.8, high_temp=35.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=26.6, high_temp=30.2, precip=0.3)] [while running 'assert_that/Match']
BeamAssertException をもう少し詳しく見てみると、low_temp と high_temp の値が正しくないことがわかります。ConvertTempUnits DoFn の処理ロジックになんらかの問題があるということです。
-
8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline.py に戻り、
ConvertTempUnits の定義(32 行目付近)までスクロールします。このタスクを完了するには、DoFn 処理ロジックにあるエラーを見つけ、次のコマンドをもう一度実行してテストが成功することを確認します。
rm testing.out
python3 weather_statistics_pipeline_test.py
cat testing.out
なお、摂氏と華氏を変換するコードは以下のとおりです。
temp_f = temp_c * 1.8 + 32.0
ヒントが必要な場合は、ソリューションをご覧ください。
タスク 7. PTransform 単体テストを実施してエンドツーエンド パイプラインをテストする
-
8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py に戻り、最後の 2 つのテストのためにコード(53 行目付近以降)をコメント化解除します
先ほどコメント化解除した最初のテストは、複合変換 PTransform ComputeStatistics をテストします。参考までに、コードの省略形を以下に示します。
def test_compute_statistics(self):
with TestPipeline() as p:
INPUT_RECORDS = # テスト入力(ここでは省略)
EXPECTED_STATS = # 想定される出力(ここでは省略)
inputs = p | beam.Create(INPUT_RECORDS)
output = inputs | ComputeStatistics()
assert_that(output, equal_to(EXPECTED_STATS))
先の DoFn 単体テストとよく似ていることに注目してください。テスト入力と出力の違い以外で異なる点は、beam.ParDo(DoFn()) ではなく PTransform を使用していることだけです。
最後はエンドツーエンド パイプラインのテストです。パイプライン コード(weather_statistics_pipeline.py)では、1 つの PTransform WeatherStatsTransform に、ソースとシンクを除いたエンドツーエンド パイプライン全体が含まれています。エンドツーエンド パイプラインをテストするには、これまでと同様の手順を繰り返しますが、代わりに PTransform を使用します。
- ターミナルに戻り、次のコマンドを実行してもう一度テストを行います。
rm testing.out
python3 weather_statistics_pipeline_test.py
cat testing.out
ここまでのタスクが正常に完了していれば、テスト終了後に次のような出力がターミナルに表示されます。
test_compute_statistics (__main__.ComputeStatsTest) ... ok
test_convert_temp_units (__main__.ConvertTempUnitsTest) ... ok
test_convert_to_csv (__main__.ConvertToWeatherRecordTest) ... ok
test_weather_stats_transform (__main__.WeatherStatsTransformTest) ... ok
----------------------------------------------------------------------
Ran 4 tests in 2.295s
OK
-
testing.out ファイルをストレージ バケットにコピーします。
export PROJECT_ID=$(gcloud config get-value project)
gcloud storage cp testing.out gs://$PROJECT_ID/8a_Batch_Testing_Pipeline/
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
DoFn と PTransform の単体テストを実施する
ラボ パート 2: TestStream を使用してストリーム処理ロジックをテストする
ここでは、ウィンドウ処理されたタクシー乗車数を計算するストリーミング パイプラインの単体テストを実施します。作成した変換のテストには、以下のパターンと Beam が提供する変換を使用できます。
-
TestPipeline を作成します。
-
TestStream クラスを使用してストリーミング データを生成します。これには一連のイベントの生成と、ウォーターマークおよび処理時間の進行が含まれています。
-
testing.util モジュールの assert_that メソッドとその他のメソッドを使用して、想定される要素が出力 PCollection に含まれていることを確認します。
TestStream から読み取るパイプラインを実行すると、各イベントの結果(処理時間の進行や適切なトリガーの起動など)がすべて完了するまで待ってから次のイベントに移ります。TestStream により、トリガーと許容される遅延の効果をパイプラインでモニタリングおよびテストできます。これには遅延トリガーや遅延によるデータの欠損に関するロジックも含まれています。
タスク 1. パイプラインのメインコードを確認する
- ファイル エクスプローラで 8b_Stream_Testing_Pipeline/lab に移動します。
このディレクトリには、パイプラインのメインコードを含む taxi_streaming_pipeline.py と、テスト用コードを含む taxi_streaming_pipeline_test.py の 2 つのファイルがあります。
- まずは 8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline.py を開きます。
単体テストに取りかかる前に、パイプライン コードを簡単に確認します。最初は TaxiRide クラス(6 行目以降)です。これは typing.NamedTuple のサブクラスなので、スキーマ対応の変換を使用してこのクラスのオブジェクトを操作できます。後ほど、テスト用にこのクラスのオブジェクトのインメモリ コレクションも定義します。
class TaxiRide(typing.NamedTuple):
ride_id: str
point_idx: int
latitude: float
longitude: float
timestamp: str
meter_reading: float
meter_increment: float
ride_status: str
passenger_count: int
次はパイプラインのメインコードです。このパイプラインのコンセプトについてはこれまでのラボでおおむね説明していますが、以下の箇所は詳しく確認するようにしてください。
-
DoFn の JsonToTaxiRide(22 行目以降)。受信した Pub/Sub メッセージを TaxiRide クラスのオブジェクトに変換するために使用されます。
-
PTransform TaxiCountTransform(36 行目以降)。この PTransform には、パイプラインで使用するカウントとウィンドウ処理のメインロジックが含まれています。今回のテストでフォーカスするのはこの PTransform です。
TaxiCountTransform の出力では、ウィンドウごとに記録されたタクシー乗車がすべてカウントされます。しかし、1 件の乗車に複数のイベント(ピックアップや降車など)があるので、各乗車が 1 回だけカウントされるように、ride_status プロパティでフィルタします。これは、ride_status が「pickup」の要素のみを保持することで実現できます。
... | "FilterForPickups" >> beam.Filter(lambda x : x.ride_status == 'pickup')
さらに詳しく見ると、このパイプラインで使用されているウィンドウ処理ロジックが以下に含まれています。
... | "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(60),
trigger=AfterWatermark(late=AfterCount(1)),
allowed_lateness=60,
accumulation_mode=AccumulationMode.ACCUMULATING)
60 秒の長さの固定ウィンドウにウィンドウ処理します。早期起動のトリガーはありませんが、ウォーターマークがウィンドウの最後を過ぎると結果が出力されます。受信する新しい要素それぞれに遅延のあるトリガー起動が含まれますが、許容範囲である 60 秒を過ぎた呼び出しは含まれません。最終的に、許容される遅延を過ぎるまでウィンドウ内の状態が累積されます。
タスク 2. TestStream の使い方を確認して、最初のテストを実施する
-
8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline_test.py を開きます。
最初の目標は、テスト用コードでの TestStream の使用方法を理解することです。TestStream クラスを使用すると、メッセージのリアルタイム ストリームをシミュレーションしながら、処理時間とウォーターマークの進行を管理できることを思い出してください。最初のテストのコード(66 行目以降)は次のとおりです。
test_stream = TestStream().advance_watermark_to(0).add_elements([
TimestampedValue(base_json_pickup, 0),
TimestampedValue(base_json_pickup, 0),
TimestampedValue(base_json_enroute, 0),
TimestampedValue(base_json_pickup, 60)
]).advance_watermark_to(60).advance_processing_time(60).add_elements([
TimestampedValue(base_json_pickup, 120)
]).advance_watermark_to_infinity()
- 新しい
TestStream オブジェクトを作成し、JSON メッセージで文字列(乗車状況に応じて base_json_pickup または base_json_enroute)として渡します。この TestStream の役割を確認します。
TestStream は次のタスクを行います。
- 最初のウォーターマークの時間を
0 に設定します(すべてのタイムスタンプは秒単位です)。
-
add_elements メソッドを使用して、3 つの要素をイベント タイムスタンプ 0 でストリームに追加します。これらのイベントのうち 2 つはカウントされます(ride_status = "pickup")が、残りの 1 つはカウントされません。
- イベント タイムスタンプを
60 に指定して「pickup」イベントをもう 1 つ追加します。
- ウォーターマークと処理時間を
60 に進めて、最初のウィンドウをトリガーします。
- イベント タイムスタンプを
120 に指定して「pickup」イベントをもう 1 つ追加します。
- ウォーターマークを「infinity」まで進めます。これですべてのウィンドウが終了し、新しいデータはすべて許容される遅延を過ぎることになります。
- 最初のテストのコードの残りの部分は前のバッチの例に似ていますが、今回は
Create 変換の代わりに TestStream を使用します。
taxi_counts = (p | test_stream
| TaxiCountTransform()
)
EXPECTED_WINDOW_COUNTS = {IntervalWindow(0,60): [3],
IntervalWindow(60,120): [1],
IntervalWindow(120,180): [1]}
assert_that(taxi_counts, equal_to_per_window(EXPECTED_WINDOW_COUNTS),
reify_windows=True)
上のコードでは、TestStream を作成して TaxiCountTransform PTransform を適用することで、出力 PCollection(taxi_counts)を定義しています。IntervalWindow クラスを使用してチェック対象のウィンドウを定義してから、assert_that と equal_to_per_window メソッドを使用してウィンドウごとの結果を確認します。
- ファイルを保存してターミナルに戻り、次のコマンドを実行して正しいディレクトリに移動します。
# ラボのディレクトリに移動する
cd $BASE_DIR/../../8b_Stream_Testing_Pipeline/lab
export BASE_DIR=$(pwd)
- 次のコマンドを実行して上のテストを実施し、出力を確認します。
python3 taxi_streaming_pipeline_test.py
cat testing.out
テストが終わると、次の出力が表示されます(経過時間は異なる場合があります)。
test_windowing_behavior (__main__.TaxiWindowingTest) ... ok
----------------------------------------------------------------------
Ran 1 test in 1.113s
OK
タスク 3. 遅延データの処理をテストする TestStream を作成する
このタスクでは、遅延データの処理に関連するロジックをテストする TestStream のコードを記述します。
-
8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline_test.py に戻り、
test_late_data_behavior メソッドがコメントアウトされている箇所(60 行目付近)までスクロールします。このタスクで使用するコードを完成するために、テスト用のコードをコメント化解除します。
class TaxiLateDataTest(unittest.TestCase):
def test_late_data_behavior(self):
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
base_json_pickup = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0,\"longitude\":0.0," \
"\"timestamp\":\"00:00:00\",\"meter_reading\":1.0,\"meter_increment\":0.1," \
"\"ride_status\":\"pickup\",\"passenger_count\":1}"
test_stream = # タスク 3: TestStream オブジェクトを作成する
EXPECTED_RESULTS = {IntervalWindow(0,60): [2,3]} #定刻と遅延の結果
taxi_counts = (p | test_stream
| TaxiCountTransform()
)
assert_that(taxi_counts, equal_to(EXPECTED_RESULTS))
EXPECTED_RESULTS には IntervalWindow(0,60) の 2 つの結果が含まれていることに注意してください。これらは、このウィンドウの定刻トリガーと遅延トリガーの結果を表します。
テスト用のコードは、TestStream の作成以外は完了しています。
-
このタスクを完了するには、以下のタスクを実行する TestStream オブジェクトを作成します。
- ウォーターマークを
0 に進めます(すべてのタイムスタンプは秒単位です)。
- 値が
base_json_pickup でタイムスタンプが 0 の TimestampedValue を 2 つ追加します。
- ウォーターマークと処理時間を
60 に進めます。
- 値が
base_json_pickup でタイムスタンプが 0 の TimestampedValue をもう 1 つ追加します。
- ウォーターマークと処理時間を
300 に進めます。
- 値が
base_json_pickup でタイムスタンプが 0 の TimestampedValue をもう 1 つ追加します。
- ウォーターマークを infinity まで進めます。
これで、4 つの要素が最初のウィンドウに属する TestStream が作成されます。最初の 2 要素は定刻、2 番目の要素は遅延(ただし許容される遅延の範囲内)、最後の要素は許容される範囲外の遅延です。トリガ―されたペインは累積されるため、最初のトリガーは 2 つのイベントをカウントし、最後のトリガーは 3 つのイベントをカウントします。4 番目のイベントは含まれません。
ヒントが必要な場合は、ソリューションをご覧ください。
タスク 4. 遅延データの処理をテストする
- ターミナルに戻り、以下のコマンドを実行してもう一度テストを実施します。
rm testing.out
python3 taxi_streaming_pipeline_test.py
cat testing.out
ここまでのタスクが正常に完了していれば、テスト終了後に次のような出力がターミナルに表示されます。
test_late_data_behavior (__main__.TaxiLateDataTest) ... ok
test_windowing_behavior (__main__.TaxiWindowingTest) ... ok
----------------------------------------------------------------------
Ran 2 tests in 2.225s
OK
-
testing.out ファイルをストレージ バケットにコピーします。
export PROJECT_ID=$(gcloud config get-value project)
gcloud storage cp testing.out gs://$PROJECT_ID/8b_Stream_Testing_Pipeline/
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
TestStream を使用してストリーム処理ロジックをテストする
ラボを終了する
ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。
ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。
星の数は、それぞれ次の評価を表します。
- 星 1 つ = 非常に不満
- 星 2 つ = 不満
- 星 3 つ = どちらともいえない
- 星 4 つ = 満足
- 星 5 つ = 非常に満足
フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。
フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。
Copyright 2026 Google LLC All rights reserved. Google および Google のロゴは、Google LLC の商標です。その他すべての社名および製品名は、それぞれ該当する企業の商標である可能性があります。