ラボの設定手順と要件
アカウントと進行状況を保護します。このラボを実行するには、常にシークレット ブラウジング ウィンドウとラボの認証情報を使用してください。

Dataflow を使用したサーバーレスのデータ処理 - Dataflow を使用した高度なストリーミング分析パイプライン(Python)

ラボ 2時間 universal_currency_alt クレジット: 5 show_chart 上級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
このコンテンツはまだモバイル デバイス向けに最適化されていません。
快適にご利用いただくには、メールで送信されたリンクを使用して、デスクトップ パソコンでアクセスしてください。

概要

このラボの内容:

  • 遅延データに対処する。
  • 不正な形式のデータに以下の方法で対処する。
  • 複合変換を書き込むことでコードのモジュール性を高める。
  • 多種多様な出力を生成する変換を書き込む。
  • 不正な形式のデータを収集して、後で調査できるよう保管場所に書き込む。

前回のラボの最後に、リアルタイム型パイプラインが対処すべきある種の課題に触れました。イベントが発生したタイミングと処理されたタイミングの間に生じる隔たりで、ラグとも呼ばれます。このラボでは、Apache Beam のコンセプトをいくつか紹介します。これを理解すると、パイプラインを作成するときに、ラグに正式に対処するにはどうすればよいかを指定できるようになります。

また、ストリーミング環境でパイプラインを使用するときに発生しうる問題は、ラグだけではありません。システムの外部から入力を受け付ける場合、入力がどこか不正な形式である可能性が常にあります。このラボでは、そうした入力に対処するための手法についても紹介します。

このラボで作成する最終的なパイプラインは、以下の画像のようなものになります。分岐が含まれていることに注意してください。

ReadPubSubMessages で始まって、WriteToBQ で終わる分岐と、WriteDeadLetterStorage で終わる分岐が含まれている、パイプラインの流れ

設定と要件

[ラボを開始] ボタンをクリックする前に

注: 以下の説明をお読みください。

ラボには時間制限があり、一時停止することはできません。[ラボを開始] をクリックするとスタートするタイマーは、Google Cloud のリソースを利用できる時間を示しています。

この Google Skills ハンズオンラボでは、シミュレーションやデモ環境ではなく、実際のクラウド環境を使ってご自身でラボのアクティビティを実施できます。そのため、ラボの受講中に Google Cloud にログインおよびアクセスするための、新しい一時的な認証情報が提供されます。

必要なもの

このラボを完了するためには、下記が必要です。

  • 標準的なインターネット ブラウザ(Chrome を推奨)
  • ラボを完了するために十分な時間
注: すでに個人の Google Cloud アカウントやプロジェクトをお持ちの場合でも、ラボでは使用しないでください。 注: Pixelbook を使用している場合は、このラボをシークレット ウィンドウで実施してください。

ラボを開始してコンソールにログインする方法

  1. [ラボを開始] ボタンをクリックします。ラボの料金をお支払いいただく必要がある場合は、表示されるポップアップでお支払い方法を選択してください。 左側のパネルには、このラボで使用する必要がある一時的な認証情報が表示されます。

    [認証情報] パネル

  2. ユーザー名をコピーし、[Google Console を開く] をクリックします。 ラボでリソースが起動し、別のタブで [アカウントの選択] ページが表示されます。

    注: タブをそれぞれ別のウィンドウで開き、並べて表示しておきましょう。
  3. [アカウントの選択] ページで [別のアカウントを使用] をクリックします。[ログイン] ページが開きます。

    [別のアカウントを使用] オプションがハイライト表示されている、アカウントのダイアログ ボックスを選択します。

  4. [接続の詳細] パネルでコピーしたユーザー名を貼り付けます。パスワードもコピーして貼り付けます。

注: 認証情報は [接続の詳細] パネルに表示されたものを使用してください。Google Skills の認証情報は使用しないでください。請求が発生する事態を避けるため、Google Cloud アカウントをお持ちの場合でも、このラボでは使用しないでください。
  1. その後次のように進みます。
  • 利用規約に同意します。
  • 一時的なアカウントなので、復元オプションや 2 要素認証プロセスは設定しないでください。
  • 無料トライアルには登録しないでください。

しばらくすると、このタブで Cloud コンソールが開きます。

注: 左上にある [ナビゲーション メニュー] をクリックすると、Google Cloud のプロダクトやサービスのリストが含まれるメニューが表示されます。 Cloud コンソール メニュー

Workbench インスタンスの開発環境の設定

このラボでは、すべてのコマンドをインスタンス ノートブックのターミナルで実行します。

  1. Google Cloud コンソールのナビゲーション メニューナビゲーション メニュー)で [Vertex AI] を選択します。

  2. [すべての推奨 API を有効化] をクリックします。

  3. ナビゲーション メニューで [ワークベンチ] をクリックします。

    [ワークベンチ] ページの上部で、[インスタンス] ビューになっていることを確認します。

  4. [ボックスを追加する新規作成] をクリックします。

  5. インスタンスの構成:

    • 名前: lab-workbench
    • リージョン: リージョンを に設定します
    • ゾーン: ゾーンを に設定します
    • 詳細オプション(任意): 必要に応じて [詳細オプション] をクリックして、より詳細なカスタマイズを行います(マシンタイプ、ディスクサイズなど)。

Vertex AI Workbench インスタンスを作成する

  1. [作成] をクリックします。

インスタンスが作成されるまで数分かかります。作成が終了するとインスタンスの名前の横に緑色のチェックマークが付きます。

  1. インスタンスの名前の横に表示されている [JupyterLab を開く] をクリックして JupyterLab インターフェースを起動します。ブラウザで新しいタブが開きます。

デプロイされた Workbench インスタンス

  1. 次に、[ターミナル] をクリックします。これにより、このラボのすべてのコマンドを実行できるターミナルが開きます。

コード リポジトリをダウンロードする

このラボで使用するコード リポジトリをダウンロードします。

  1. 開いたターミナルで、次のコマンドを入力します。
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. ノートブック環境の左側パネルのファイル ブラウザに、training-data-analyst リポジトリが追加されます。

  2. クローン リポジトリ /training-data-analyst/quests/dataflow_python/ に移動します。ラボごとに、1 つのフォルダが表示されます。このフォルダはさらに、完成させるコードが格納される lab サブフォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution サブフォルダとに分けられています。

展開された [表示] メニューでハイライト表示されているエクスプローラ オプション

注: 編集のためにファイルを開くには、目的のファイルに移動してクリックします。 ファイルが開き、コードを追加または変更できます。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 ノートブック インスタンスを作成し、コース リポジトリのクローンを作成する

ラボパート 1: 遅延データに対処する

これまでのラボでは、イベント時間ごとの要素を固定幅のウィンドウに分割する場合、以下のようなコードを記述していました。

parsed_msgs | "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(window_duration)) | "CountPerMinute" >> beam.CombineGlobally(CountCombineFn()).without_defaults()

ただ、前回の非 SQL ラボの最後に見たように、データ ストリームでラグが発生するのは珍しいことではありません。ウィンドウ処理に(処理時間ではなく)イベント時間を使用する場合は、ラグが問題となります。イベント時間のある特定の時点ですべてのイベントが届いているかどうか、確実にはわからないからです。

そのため、結果を出力するには、パイプライン内にこの点に関する判断を記述する必要があります。その目的で使用するのが、ウォーターマークというコンセプトです。ウォーターマークは、イベント時間のある時点までにすべてのデータがパイプラインに届いていると見なすタイミングをシステムがヒューリスティックに基づいて判断するという考え方です。ウォーターマークがウィンドウの枠を超えたら、以後、そのウィンドウ内のタイムスタンプで届いた要素は、遅延データと見なされ、ただ単にドロップされます。ウィンドウ処理は、デフォルトではすべてのデータが届いているとシステムが高い確度で判断した場合にできれば完全な形で結果を 1 つだけ出力するというように動作します。

Apache Beam では、数多くのヒューリスティックを使用して、何をウォーターマークにするかを経験則から導き出しますが、それでもヒューリスティックであることに変わりはありません。さらに重要なのは、こうしたヒューリスティックは汎用的なものであり、すべてのユースケースに適しているわけではないということです。パイプラインの設計担当者は、汎用的なヒューリスティックを使用するのではなく、以下の質問をじっくりと検討してうまくトレードオフを図る必要があります。

  • 完全性: 結果を計算する前にすべてのデータが揃っていることはどれほど重要か?
  • レイテンシ: どのくらいの時間までデータの到着を待つか?たとえば、すべてのデータが揃ったと判断できるまで待つのか、データが届くたびに処理していくのかということです。
  • 費用: レイテンシ低減のために費やしてかまわないと考える演算能力と費用はどれくらいか?

こうした疑問への答えに基づくと、Apache Beam の形式手法に従って、妥当なトレードオフを実現するコードを記述できます。

許容遅延

許容遅延を使用すると、ウィンドウの状態が維持される期間を制御できます。ウォーターマークが許容遅延の期間に達すると、すべての状態がドロップされます。すべての永続状態を期間の最後までずっと維持できればよいのですが、現実にはそうはいきません。無限データソースに対処する場合、特定のウィンドウの状態を無期限に維持するのは実用的ではありません。そんなことをすれば、ディスク容量を使い切ってしまいます。

そのため、実際にデータを順不同で処理するシステムでは、なんらかの方法で処理対象のウィンドウに存続期間を設定するよう義務づける必要があります。その簡単明瞭な方法の一つに、システム内で許容遅延の範囲を定義することがあります。たとえば、処理対象の特定のレコードに遅延の限度を設け(ウォーターマークを基準とする)、その限度を超えて届いたデータは単にドロップするというようにします。各データに許容遅延の限度を設定すると、ウィンドウの状態を維持するべき期間も正確に定義できます(ウィンドウの最後に達してその遅延限度をウォーターマークが超えるまで)。

タスク 1. 環境を準備する

以前のラボと同様に、最初のステップはパイプラインで処理するデータを生成することです。ラボ環境を開いて、以前と同じようにデータを生成します。

適切なラボを開く

  • IDE のターミナルで次のコマンドを実行して、このラボで使用するディレクトリに変更します。
# ディレクトリを lab に変更する cd 7_Advanced_Streaming_Analytics/lab export BASE_DIR=$(pwd)

仮想環境と依存関係を設定する

実際のパイプライン コードの編集を開始する前に、必要な依存関係がインストールされていることを確認する必要があります。

  1. 次のコマンドを実行して、このラボでの作業用に仮想環境を作成します。
sudo apt-get update && sudo apt-get install -y python3-venv ## 仮想環境を作成して有効化する python3 -m venv df-env source df-env/bin/activate
  1. 次に、パイプラインを実行するために必要なパッケージをインストールします。
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Dataflow API が有効になっていることを確認します。
gcloud services enable dataflow.googleapis.com

データ環境を設定する

# GCS バケットと BQ データセットを作成する cd $BASE_DIR/../../ source create_streaming_sinks.sh # 練習用コードが含まれているディレクトリに移動する cd $BASE_DIR

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 環境を準備する

タスク 2. 許容遅延を設定する

  1. ファイル エクスプローラで training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab に移動し、streaming_minute_traffic_pipeline.py ファイルを開きます。

Apache Beam では、次の例のように、allowed_lateness キーワード引数を使用し、WindowInto PTransform 内に AfterWatermark() トリガーを指定することで、許容遅延を設定します。

items = p | ... Windowed_items = items | beam.WindowInto(beam.window.FixedWindows(60), # 1 minute trigger=AfterWatermark(), allowed_lateness=60*60*24) # 1 day
  1. このタスクの仕上げとして、ウィンドウ処理変換と、allowed_lateness コマンドライン引数によって定義された許容遅延を確認します。値の妥当性について判断したら、適切な単位を反映するようコマンドラインを更新します。

トリガー

パイプラインの設計者は、暫定的な結果を出力するタイミングを自ら決定することもできます。前のステップでは、指定された許容遅延とともに AfterWatermark() トリガーを使用しました。例として、ウィンドウの最後までとするウォーターマークにはまだ達していないものの、想定したデータの 75% がすでに届いている場合を考えてみます。このような例は典型的な例として、エンドユーザーに紹介する価値が十分にあります。

トリガーは、処理時間中のどの時点で具体的な結果が出力されるかを決定するものです。あるウィンドウの個々の出力は、そのウィンドウのペインと呼ばれます。トリガーの条件が満たされると、トリガーによりペインが発生します。Apache Beam では、トリガーの条件として、ウォーターマークの進行状況、処理時間の進行状況(どのくらいのデータが実際に届いたかにかかわらず均一に進行)、要素数のカウント(ある量のデータが新規に届いたときなど)、データ依存トリガー(ファイルの末尾に達したときなど)といったものがあります。

トリガーの条件によっては、特定のペインが何度も発生する可能性があります。そのため、出力結果を累積する方法も指定する必要があります。Apache Beam は現在、2 つの累積モードをサポートしています。一つは結果を全部まとめて累積するモードで、もう一つはペインが最後に発生してから新たに結果に含まれた部分のみを返すモードです。

タスク 3. トリガーを設定する

Window 変換を使用して PCollection のウィンドウ処理機能を設定するときに、トリガーも指定できます。

PCollection のトリガーを設定する場合は、WindowInto PTransform の trigger キーワード引数を設定します。Apache Beam には、数多くのトリガーが用意されています。

  • AfterWatermark: ウィンドウの最後に達するかペインの最初の要素が届いたタイミングで決定されるタイムスタンプを、ウォーターマークが超えると発生します。

  • AfterProcessingTime: ある程度の処理時間が経過したら発生します(通常はペインの最初の要素が届いてからの経過時間)。

  • AfterCount: ウィンドウ内の要素の数がある特定の数に達すると発生します。

次のコードサンプルは、ウィンドウ内の最初の要素が処理されてから 1 分後に結果を出力する時間ベースのトリガーを PCollection に設定しています。コードサンプルの最終行では、AccumulationMode.DISCARDING にキーワード引数 accumulation_mode を定義して、ウィンドウの累積モードを設定しています。

items = p | ... windowed_items = items | beam.WindowInto(FixedWindows(60), # 1 minute trigger=AfterProcessingTime(60), accumulation_mode=AccumulationMode.DISCARDING)
  1. このタスクを完了するには、キーワード引数 triggerWindowInto に追加して AfterWatermark トリガーに渡します。トリガーを設計する際には、このユースケースを念頭に、データが 1 分間のウィンドウ内で処理されることと、データが遅れて届くこともあるという点に留意してください。また、AfterWatermark トリガーの引数として、遅延要素(許容遅延内)ごとに遅延トリガーを追加します。 行き詰まった場合は、こちらの解決策をご覧ください。

  2. 113 行目あたりに次の #TODO を入力して、トリガーと累積モードを設定します。

trigger=AfterProcessingTime(120), accumulation_mode=AccumulationMode.DISCARDING)
  1. 119 行目あたりに次の #TODO を入力して、許容遅延、トリガー、累積モードを設定します。
trigger=AfterWatermark(late=AfterCount(1)), allowed_lateness=int(allowed_lateness), accumulation_mode=AccumulationMode.ACCUMULATING)

ラボパート 2: 不正な形式のデータに対処する

トリガーの設定方法によっては、今回のパイプラインをすぐに実行して以前のラボから取得したパイプラインと比較した場合、今回の新しいパイプラインの方がすばやく結果が表示される可能性があります。また、ヒューリスティックがストリーミング動作をうまく予測できず、許容遅延の方がよい成果を上げている場合は、今回のパイプラインの方が出力結果が正確である可能性もあります。

ただ、今回のパイプラインは遅延に対して高い堅牢性を発揮しますが、不正な形式のデータに対しては依然として脆弱です。今回のパイプラインを実行してパブリッシュしたメッセージに CommonLog に解析できる適切な JSON 形式以外の文字列が含まれていた場合は、パイプラインでエラーが発生します。こうしたエラーは、Cloud Logging のようなツールを使用すると簡単に確認できますが、事前に定義した場所に保存して後で調べられるようにパイプラインをうまく設計することもできます。

このセクションでは、モジュール性と堅牢性の両方を高めるコンポーネントをパイプラインに追加します。

タスク 1. 不正な形式のデータを収集する

不正な形式のデータに対する堅牢性を高めるためには、そのデータを除外して別の分岐で処理するための手段をパイプラインに組み込む必要があります。パイプラインで分岐する方法はすでに 1 つ見てきました。1 つの PCollection を複数変換の入力とする方法です。

この形式の分岐は高性能ですが、ユースケースによっては処理が非効率になることもあります。たとえば、同じ PCollection に対して 2 種類のサブセットを作成したいとします。複数変換の方法では、サブセットごとにフィルタ変換を 1 つ作成して、その両方を元の PCollection に適用します。しかし、これでは各要素を 2 回処理することになります。

パイプラインに分岐を生成するにはもう一つ、入力の PCollection を 1 回処理する間に単一の変換で複数の出力を生成するという方法もあります。このタスクでは、複数の出力を生成する変換を作成します。最初の出力は適切な形式のデータから得た結果であり、もう一つの出力は元の入力ストリームから得た不正な形式の要素です。

Apache Beam で引き続き PCollection を 1 つだけ作成しながら複数の結果を出力するためには、TaggedOutput というクラスを使用して、DoFn の出力を複数の(場合によっては異種の)出力の入力とします。

次に、TaggedOutput を使用して、DoFn の複数の異なる出力にタグ付けする例を示します。こうした PCollection は、with_outputs() メソッドを使用して復元し、TaggedOutput にタグ名を指定することで参照します。

class ConvertToCommonLogFn(beam.DoFn): def process(self, element): try: row = json.loads(element.decode('utf-8')) yield beam.pvalue.TaggedOutput('parsed_row', CommonLog(**row)) except: yield beam.pvalue.TaggedOutput('unparsed_row', element.decode('utf-8')) … rows = (p | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(input_topic) | 'ParseJson' >> beam.ParDo(ConvertToCommonLogFn()).with_outputs('parsed_row', 'unparsed_row') .with_output_types(CommonLog)) (rows.unparsed_row | … (rows.parsed_row | …

このタスクを完了するには、上記のように ConvertToCommonLogFn クラスに TaggedOutput の復帰を 2 つ宣言します。try ステートメントでは解析した行を CommonLog クラスのインスタンスとして返し、catch ステートメントでは未解析の(デコード済みの)行を返します。

  1. ConvertToCommonLogFn クラスに 1 つ目の #TODO を入力します。
beam.pvalue.TaggedOutput('parsed_row', CommonLog(**row))
  1. ConvertToCommonLogFn クラスに 2 つ目の #TODO を入力します。
beam.pvalue.TaggedOutput('unparsed_row', element.decode('utf-8'))

タスク 2. 後で解析できるように不正な形式のデータを書き込む

不正な形式のデータを生成しているアップストリームの問題を解決するためには、不正な形式のデータを分析できることが重要です。そのためには、どこかにその具体的なデータを出力する必要があります。このタスクでは、Google Cloud Storage に不正な形式のデータを書き込みます。デッドレターを保管しておくパターンと同じです。

これまでのラボでは、beam.io.WriteToText() を使用して、有限ソース(バッチ)から直接 Cloud Storage に書き込んでいましたが、無限ソース(ストリーミング)から書き込むときは、この手法に少し変更を加える必要があります。

まずは、書き込み変換のアップストリームです。トリガーを使用して、処理時間のどのタイミングで書き込むかを指定する必要があります。これを指定せずにデフォルトのままにしておくと、書き込みは行われません。デフォルトでは、すべてのイベントがグローバル ウィンドウに属しています。この場合、データセット全体の内容を実行時に把握できるため、一括して操作するときはデフォルトのままでかまいません。一方、無限ソースの場合、データセット全体のサイズが不明であるため、グローバル ウィンドウのペインが発生することはなく、したがって完了することもありません。

トリガーを使用しているため、ウィンドウも使用する必要があります。ただし、必ずしもウィンドウの変更が必要になるとは限りません。これまでのラボとタスクでは、ウィンドウ処理変換を使用して、グローバル ウィンドウをイベント時間内に期間が固定されたウィンドウに置き換えてきました。この場合、有用な方法かつ実用的な速度で具体的な結果を出力することが重要であり、それに比べればどの要素をグループ化するかはそれほど重要ではありません。

以下の例に示したウィンドウの場合、処理時間が 10 秒経過するたびにグローバル ウィンドウのペインが発生するものの、新しいイベントの書き込みだけが行われます。

pcollection | “FireEvery10s” >> WindowInto(FixedWindows(10) trigger=AfterProcessingTime(10)) accumulation_mode=AccumulationMode.DISCARDING

トリガーを設定したら、書き込みが行えるよう、シンクを beam.io.WriteToText()(ストリーミング非対応)から beam.io.fileio.WriteToFiles() に変更する必要があります。ウィンドウ処理変換のダウンストリームを書き込むときは、書き込みを並行して行えるように多数のシャードを指定します。

windowed_items = p | 'WriteWindowedPCollection' >> fileio.WriteToFiles("gs://path/to/somewhere", shards=int(num_shards), max_writers_per_bundle=0)
  1. このタスクを完了するには、不正な形式のデータを取得できるよう、rows.unparsed_row を入力として使用して新しい変換を作成します。長さが 120 秒の固定ウィンドウに 120 秒の処理時間トリガーを使用し、累積モードを AccumulationMode.DISCARDING に設定します。

  2. beam.fileio.WriteToFiles を使用して GCS に書き込むよう、#TODO を入力します。

fileio.WriteToFiles(output_path,shards=1,max_writers_per_bundle=0)

タスク 3. パイプラインを実行する

パイプラインを実行するには、次の例に示したようなコマンドを作成します。コマンドライン オプションを含めた場合は、その名前が反映されるようにコマンドを変更する必要があります。

export PROJECT_ID=$(gcloud config get-value project) export REGION={{{project_0.startup_script.lab_region|Region}}} export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export PUBSUB_TOPIC=projects/${PROJECT_ID}/topics/my_topic export WINDOW_DURATION=60 export ALLOWED_LATENESS=1 export OUTPUT_TABLE_NAME=${PROJECT_ID}:logs.minute_traffic export DEADLETTER_BUCKET=${BUCKET} cd $BASE_DIR python3 streaming_minute_traffic_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --staging_location=${PIPELINE_FOLDER}/staging \ --temp_location=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --input_topic=${PUBSUB_TOPIC} \ --window_duration=${WINDOW_DURATION} \ --allowed_lateness=${ALLOWED_LATENESS} \ --table_name=${OUTPUT_TABLE_NAME} \ --dead_letter_bucket=${DEADLETTER_BUCKET} \ --allow_unsafe_triggers

このクエストのコードには、Pub/Sub を使用して JSON イベントをパブリッシュするためのスクリプトが含まれています。このタスクを完了してメッセージのパブリッシュを開始するには、新しいターミナルを現在のターミナルと並べて開き、以下のスクリプトを実行します。これにより、スクリプトを終了するまでメッセージのパブリッシュが継続されます。training-data-analyst/quests/dataflow_python フォルダ内で作業していることを確認してください。

注: true フラグは、ストリームに遅延イベントを追加します。 cd /home/jupyter/training-data-analyst/quests/dataflow_python/ bash generate_streaming_events.sh true

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 パイプラインを実行する

タスク 4. パイプラインをテストする

  1. Google Cloud コンソールのタイトルバーにある [検索] フィールドに「Pub/Sub」と入力し、[プロダクトとページ] セクションの [Pub/Sub] をクリックします。

  2. [トピック] をクリックし、トピック「my_topic」をクリックします。

  3. [メッセージ] をクリックします。

  4. [メッセージの pull 元の Cloud Pub/Sub サブスクリプションの選択*] をクリックし、プルダウンからトピックのサブスクリプションを選択します。

注: サブスクリプションが何も表示されない場合は、[更新] をクリックしてみてください。
  1. [メッセージをパブリッシュ] ボタンをクリックします。

  2. 後続のページで、パブリッシュするメッセージを入力し、[パブリッシュ] をクリックします。

CommonLog JSON 仕様に対して完璧に適合しない限り、メッセージは短時間のうちにデッドレター Cloud Storage バケットに到着します。パイプラインのモニタリング ウィンドウに戻り、未解析メッセージの処理を行っている分岐内のノードをクリックすることで、パイプラインを通るパスをトレースできます。この分岐に要素が追加されたら、Cloud Storage に移動して、メッセージがディスクに書き込まれたことを確認できます。

export PROJECT_ID=$(gcloud config get-value project) export REGION={{{project_0.startup_script.lab_region|Region}}} export BUCKET=gs://${PROJECT_ID}/deadletter gcloud storage ls $BUCKET gcloud storage cat $BUCKET/*

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 パイプラインをテストする

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2026 Google LLC All rights reserved. Google および Google のロゴは、Google LLC の商標です。その他すべての社名および製品名は、それぞれ該当する企業の商標である可能性があります。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

シークレット ブラウジングを使用する

  1. ラボで使用するユーザー名パスワードをコピーします
  2. プライベート モードで [コンソールを開く] をクリックします

コンソールにログインする

    ラボの認証情報を使用して
  1. ログインします。他の認証情報を使用すると、エラーが発生したり、料金が発生したりする可能性があります。
  2. 利用規約に同意し、再設定用のリソースページをスキップします
  3. ラボを終了する場合や最初からやり直す場合を除き、[ラボを終了] はクリックしないでください。クリックすると、作業内容がクリアされ、プロジェクトが削除されます

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボを実行するには、シークレット モードまたはシークレット ブラウジング ウィンドウを使用することをおすすめします。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。