始める前に
- ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
- ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
- 画面左上の [ラボを開始] をクリックして開始します
Create Vertex AI Platform Notebooks instance and clone course repo
/ 20
Prepare Environment
/ 15
Run your pipeline
/ 15
Test your pipeline
/ 15
このラボの内容:
前回のラボの最後に、リアルタイム型パイプラインが対処すべきある種の課題に触れました。イベントが発生したタイミングと処理されたタイミングの間に生じる隔たりで、ラグとも呼ばれます。このラボでは、Apache Beam のコンセプトをいくつか紹介します。これを理解すると、パイプラインを作成するときに、ラグに正式に対処するにはどうすればよいかを指定できるようになります。
また、ストリーミング環境でパイプラインを使用するときに発生しうる問題は、ラグだけではありません。システムの外部から入力を受け付ける場合、入力がどこか不正な形式である可能性が常にあります。このラボでは、そうした入力に対処するための手法についても紹介します。
このラボで作成する最終的なパイプラインは、以下の画像のようなものになります。分岐が含まれていることに注意してください。
この Google Skills ハンズオンラボでは、シミュレーションやデモ環境ではなく、実際のクラウド環境を使ってご自身でラボのアクティビティを実施できます。そのため、ラボの受講中に Google Cloud にログインおよびアクセスするための、新しい一時的な認証情報が提供されます。
このラボを完了するためには、下記が必要です。
[ラボを開始] ボタンをクリックします。ラボの料金をお支払いいただく必要がある場合は、表示されるポップアップでお支払い方法を選択してください。 左側のパネルには、このラボで使用する必要がある一時的な認証情報が表示されます。
ユーザー名をコピーし、[Google Console を開く] をクリックします。 ラボでリソースが起動し、別のタブで [アカウントの選択] ページが表示されます。
[アカウントの選択] ページで [別のアカウントを使用] をクリックします。[ログイン] ページが開きます。
[接続の詳細] パネルでコピーしたユーザー名を貼り付けます。パスワードもコピーして貼り付けます。
しばらくすると、このタブで Cloud コンソールが開きます。
このラボでは、すべてのコマンドをインスタンス ノートブックのターミナルで実行します。
Google Cloud コンソールのナビゲーション メニュー()で [Vertex AI] を選択します。
[すべての推奨 API を有効化] をクリックします。
ナビゲーション メニューで [ワークベンチ] をクリックします。
[ワークベンチ] ページの上部で、[インスタンス] ビューになっていることを確認します。
[新規作成] をクリックします。
インスタンスの構成:
インスタンスが作成されるまで数分かかります。作成が終了するとインスタンスの名前の横に緑色のチェックマークが付きます。
このラボで使用するコード リポジトリをダウンロードします。
ノートブック環境の左側パネルのファイル ブラウザに、training-data-analyst リポジトリが追加されます。
クローン リポジトリ /training-data-analyst/quests/dataflow_python/ に移動します。ラボごとに、1 つのフォルダが表示されます。このフォルダはさらに、完成させるコードが格納される lab サブフォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution サブフォルダとに分けられています。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
これまでのラボでは、イベント時間ごとの要素を固定幅のウィンドウに分割する場合、以下のようなコードを記述していました。
ただ、前回の非 SQL ラボの最後に見たように、データ ストリームでラグが発生するのは珍しいことではありません。ウィンドウ処理に(処理時間ではなく)イベント時間を使用する場合は、ラグが問題となります。イベント時間のある特定の時点ですべてのイベントが届いているかどうか、確実にはわからないからです。
そのため、結果を出力するには、パイプライン内にこの点に関する判断を記述する必要があります。その目的で使用するのが、ウォーターマークというコンセプトです。ウォーターマークは、イベント時間のある時点までにすべてのデータがパイプラインに届いていると見なすタイミングをシステムがヒューリスティックに基づいて判断するという考え方です。ウォーターマークがウィンドウの枠を超えたら、以後、そのウィンドウ内のタイムスタンプで届いた要素は、遅延データと見なされ、ただ単にドロップされます。ウィンドウ処理は、デフォルトではすべてのデータが届いているとシステムが高い確度で判断した場合にできれば完全な形で結果を 1 つだけ出力するというように動作します。
Apache Beam では、数多くのヒューリスティックを使用して、何をウォーターマークにするかを経験則から導き出しますが、それでもヒューリスティックであることに変わりはありません。さらに重要なのは、こうしたヒューリスティックは汎用的なものであり、すべてのユースケースに適しているわけではないということです。パイプラインの設計担当者は、汎用的なヒューリスティックを使用するのではなく、以下の質問をじっくりと検討してうまくトレードオフを図る必要があります。
こうした疑問への答えに基づくと、Apache Beam の形式手法に従って、妥当なトレードオフを実現するコードを記述できます。
許容遅延を使用すると、ウィンドウの状態が維持される期間を制御できます。ウォーターマークが許容遅延の期間に達すると、すべての状態がドロップされます。すべての永続状態を期間の最後までずっと維持できればよいのですが、現実にはそうはいきません。無限データソースに対処する場合、特定のウィンドウの状態を無期限に維持するのは実用的ではありません。そんなことをすれば、ディスク容量を使い切ってしまいます。
そのため、実際にデータを順不同で処理するシステムでは、なんらかの方法で処理対象のウィンドウに存続期間を設定するよう義務づける必要があります。その簡単明瞭な方法の一つに、システム内で許容遅延の範囲を定義することがあります。たとえば、処理対象の特定のレコードに遅延の限度を設け(ウォーターマークを基準とする)、その限度を超えて届いたデータは単にドロップするというようにします。各データに許容遅延の限度を設定すると、ウィンドウの状態を維持するべき期間も正確に定義できます(ウィンドウの最後に達してその遅延限度をウォーターマークが超えるまで)。
以前のラボと同様に、最初のステップはパイプラインで処理するデータを生成することです。ラボ環境を開いて、以前と同じようにデータを生成します。
実際のパイプライン コードの編集を開始する前に、必要な依存関係がインストールされていることを確認する必要があります。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab に移動し、streaming_minute_traffic_pipeline.py ファイルを開きます。Apache Beam では、次の例のように、allowed_lateness キーワード引数を使用し、WindowInto PTransform 内に AfterWatermark() トリガーを指定することで、許容遅延を設定します。
allowed_lateness コマンドライン引数によって定義された許容遅延を確認します。値の妥当性について判断したら、適切な単位を反映するようコマンドラインを更新します。パイプラインの設計者は、暫定的な結果を出力するタイミングを自ら決定することもできます。前のステップでは、指定された許容遅延とともに AfterWatermark() トリガーを使用しました。例として、ウィンドウの最後までとするウォーターマークにはまだ達していないものの、想定したデータの 75% がすでに届いている場合を考えてみます。このような例は典型的な例として、エンドユーザーに紹介する価値が十分にあります。
トリガーは、処理時間中のどの時点で具体的な結果が出力されるかを決定するものです。あるウィンドウの個々の出力は、そのウィンドウのペインと呼ばれます。トリガーの条件が満たされると、トリガーによりペインが発生します。Apache Beam では、トリガーの条件として、ウォーターマークの進行状況、処理時間の進行状況(どのくらいのデータが実際に届いたかにかかわらず均一に進行)、要素数のカウント(ある量のデータが新規に届いたときなど)、データ依存トリガー(ファイルの末尾に達したときなど)といったものがあります。
トリガーの条件によっては、特定のペインが何度も発生する可能性があります。そのため、出力結果を累積する方法も指定する必要があります。Apache Beam は現在、2 つの累積モードをサポートしています。一つは結果を全部まとめて累積するモードで、もう一つはペインが最後に発生してから新たに結果に含まれた部分のみを返すモードです。
Window 変換を使用して PCollection のウィンドウ処理機能を設定するときに、トリガーも指定できます。
PCollection のトリガーを設定する場合は、WindowInto PTransform の trigger キーワード引数を設定します。Apache Beam には、数多くのトリガーが用意されています。
AfterWatermark: ウィンドウの最後に達するかペインの最初の要素が届いたタイミングで決定されるタイムスタンプを、ウォーターマークが超えると発生します。
AfterProcessingTime: ある程度の処理時間が経過したら発生します(通常はペインの最初の要素が届いてからの経過時間)。
AfterCount: ウィンドウ内の要素の数がある特定の数に達すると発生します。
次のコードサンプルは、ウィンドウ内の最初の要素が処理されてから 1 分後に結果を出力する時間ベースのトリガーを PCollection に設定しています。コードサンプルの最終行では、AccumulationMode.DISCARDING にキーワード引数 accumulation_mode を定義して、ウィンドウの累積モードを設定しています。
このタスクを完了するには、キーワード引数 trigger を WindowInto に追加して AfterWatermark トリガーに渡します。トリガーを設計する際には、このユースケースを念頭に、データが 1 分間のウィンドウ内で処理されることと、データが遅れて届くこともあるという点に留意してください。また、AfterWatermark トリガーの引数として、遅延要素(許容遅延内)ごとに遅延トリガーを追加します。
行き詰まった場合は、こちらの解決策をご覧ください。
113 行目あたりに次の #TODO を入力して、トリガーと累積モードを設定します。
#TODO を入力して、許容遅延、トリガー、累積モードを設定します。トリガーの設定方法によっては、今回のパイプラインをすぐに実行して以前のラボから取得したパイプラインと比較した場合、今回の新しいパイプラインの方がすばやく結果が表示される可能性があります。また、ヒューリスティックがストリーミング動作をうまく予測できず、許容遅延の方がよい成果を上げている場合は、今回のパイプラインの方が出力結果が正確である可能性もあります。
ただ、今回のパイプラインは遅延に対して高い堅牢性を発揮しますが、不正な形式のデータに対しては依然として脆弱です。今回のパイプラインを実行してパブリッシュしたメッセージに CommonLog に解析できる適切な JSON 形式以外の文字列が含まれていた場合は、パイプラインでエラーが発生します。こうしたエラーは、Cloud Logging のようなツールを使用すると簡単に確認できますが、事前に定義した場所に保存して後で調べられるようにパイプラインをうまく設計することもできます。
このセクションでは、モジュール性と堅牢性の両方を高めるコンポーネントをパイプラインに追加します。
不正な形式のデータに対する堅牢性を高めるためには、そのデータを除外して別の分岐で処理するための手段をパイプラインに組み込む必要があります。パイプラインで分岐する方法はすでに 1 つ見てきました。1 つの PCollection を複数変換の入力とする方法です。
この形式の分岐は高性能ですが、ユースケースによっては処理が非効率になることもあります。たとえば、同じ PCollection に対して 2 種類のサブセットを作成したいとします。複数変換の方法では、サブセットごとにフィルタ変換を 1 つ作成して、その両方を元の PCollection に適用します。しかし、これでは各要素を 2 回処理することになります。
パイプラインに分岐を生成するにはもう一つ、入力の PCollection を 1 回処理する間に単一の変換で複数の出力を生成するという方法もあります。このタスクでは、複数の出力を生成する変換を作成します。最初の出力は適切な形式のデータから得た結果であり、もう一つの出力は元の入力ストリームから得た不正な形式の要素です。
Apache Beam で引き続き PCollection を 1 つだけ作成しながら複数の結果を出力するためには、TaggedOutput というクラスを使用して、DoFn の出力を複数の(場合によっては異種の)出力の入力とします。
次に、TaggedOutput を使用して、DoFn の複数の異なる出力にタグ付けする例を示します。こうした PCollection は、with_outputs() メソッドを使用して復元し、TaggedOutput にタグ名を指定することで参照します。
このタスクを完了するには、上記のように ConvertToCommonLogFn クラスに TaggedOutput の復帰を 2 つ宣言します。try ステートメントでは解析した行を CommonLog クラスのインスタンスとして返し、catch ステートメントでは未解析の(デコード済みの)行を返します。
ConvertToCommonLogFn クラスに 1 つ目の #TODO を入力します。ConvertToCommonLogFn クラスに 2 つ目の #TODO を入力します。不正な形式のデータを生成しているアップストリームの問題を解決するためには、不正な形式のデータを分析できることが重要です。そのためには、どこかにその具体的なデータを出力する必要があります。このタスクでは、Google Cloud Storage に不正な形式のデータを書き込みます。デッドレターを保管しておくパターンと同じです。
これまでのラボでは、beam.io.WriteToText() を使用して、有限ソース(バッチ)から直接 Cloud Storage に書き込んでいましたが、無限ソース(ストリーミング)から書き込むときは、この手法に少し変更を加える必要があります。
まずは、書き込み変換のアップストリームです。トリガーを使用して、処理時間のどのタイミングで書き込むかを指定する必要があります。これを指定せずにデフォルトのままにしておくと、書き込みは行われません。デフォルトでは、すべてのイベントがグローバル ウィンドウに属しています。この場合、データセット全体の内容を実行時に把握できるため、一括して操作するときはデフォルトのままでかまいません。一方、無限ソースの場合、データセット全体のサイズが不明であるため、グローバル ウィンドウのペインが発生することはなく、したがって完了することもありません。
トリガーを使用しているため、ウィンドウも使用する必要があります。ただし、必ずしもウィンドウの変更が必要になるとは限りません。これまでのラボとタスクでは、ウィンドウ処理変換を使用して、グローバル ウィンドウをイベント時間内に期間が固定されたウィンドウに置き換えてきました。この場合、有用な方法かつ実用的な速度で具体的な結果を出力することが重要であり、それに比べればどの要素をグループ化するかはそれほど重要ではありません。
以下の例に示したウィンドウの場合、処理時間が 10 秒経過するたびにグローバル ウィンドウのペインが発生するものの、新しいイベントの書き込みだけが行われます。
トリガーを設定したら、書き込みが行えるよう、シンクを beam.io.WriteToText()(ストリーミング非対応)から beam.io.fileio.WriteToFiles() に変更する必要があります。ウィンドウ処理変換のダウンストリームを書き込むときは、書き込みを並行して行えるように多数のシャードを指定します。
このタスクを完了するには、不正な形式のデータを取得できるよう、rows.unparsed_row を入力として使用して新しい変換を作成します。長さが 120 秒の固定ウィンドウに 120 秒の処理時間トリガーを使用し、累積モードを AccumulationMode.DISCARDING に設定します。
beam.fileio.WriteToFiles を使用して GCS に書き込むよう、#TODO を入力します。
パイプラインを実行するには、次の例に示したようなコマンドを作成します。コマンドライン オプションを含めた場合は、その名前が反映されるようにコマンドを変更する必要があります。
このクエストのコードには、Pub/Sub を使用して JSON イベントをパブリッシュするためのスクリプトが含まれています。このタスクを完了してメッセージのパブリッシュを開始するには、新しいターミナルを現在のターミナルと並べて開き、以下のスクリプトを実行します。これにより、スクリプトを終了するまでメッセージのパブリッシュが継続されます。training-data-analyst/quests/dataflow_python フォルダ内で作業していることを確認してください。
true フラグは、ストリームに遅延イベントを追加します。[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
Google Cloud コンソールのタイトルバーにある [検索] フィールドに「Pub/Sub」と入力し、[プロダクトとページ] セクションの [Pub/Sub] をクリックします。
[トピック] をクリックし、トピック「my_topic」をクリックします。
[メッセージ] をクリックします。
[メッセージの pull 元の Cloud Pub/Sub サブスクリプションの選択*] をクリックし、プルダウンからトピックのサブスクリプションを選択します。
[メッセージをパブリッシュ] ボタンをクリックします。
後続のページで、パブリッシュするメッセージを入力し、[パブリッシュ] をクリックします。
CommonLog JSON 仕様に対して完璧に適合しない限り、メッセージは短時間のうちにデッドレター Cloud Storage バケットに到着します。パイプラインのモニタリング ウィンドウに戻り、未解析メッセージの処理を行っている分岐内のノードをクリックすることで、パイプラインを通るパスをトレースできます。この分岐に要素が追加されたら、Cloud Storage に移動して、メッセージがディスクに書き込まれたことを確認できます。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。
ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。
星の数は、それぞれ次の評価を表します。
フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。
フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。
Copyright 2026 Google LLC All rights reserved. Google および Google のロゴは、Google LLC の商標です。その他すべての社名および製品名は、それぞれ該当する企業の商標である可能性があります。
このコンテンツは現在ご利用いただけません
利用可能になりましたら、メールでお知らせいたします
ありがとうございます。
利用可能になりましたら、メールでご連絡いたします
1 回に 1 つのラボ
既存のラボをすべて終了して、このラボを開始することを確認してください