arrow_back

Dataflow によるサーバーレス データ処理 — ストリーミング分析に Dataflow を使用する(Python)

ログイン 参加
700 以上のラボとコースにアクセス

Dataflow によるサーバーレス データ処理 — ストリーミング分析に Dataflow を使用する(Python)

ラボ 2時間 universal_currency_alt クレジット: 5 show_chart 上級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
700 以上のラボとコースにアクセス

概要

このラボでは、バッチのコンテキストで紹介した多くのコンセプトをストリーミングのコンテキストに適用し、batch_minute_traffic_pipeline と同様のパイプラインを作成します。ただし、このパイプラインはリアルタイムに機能します。完成したパイプラインは、まず Pub/Sub から JSON メッセージを読み取り、分岐前にそのメッセージを解析します。一方のブランチは BigQuery に元データをいくらか書き込み、イベント時刻と処理時刻を記録します。他方のブランチはデータをウィンドウ処理して集計し、結果を BigQuery に書き込みます。

目標

  • ストリーミング ソースからデータを読み取る。
  • ストリーミング シンクにデータを書き込む。
  • ストリーミングのコンテキストでデータをウィンドウ処理する。
  • ラグの影響を実験的に検証する。

次のパイプラインを構築します。

ReadMessage から始まり、WriteAggregateToBQ の 2 つのインスタンスで終わる実行中のパイプライン

設定と要件

[ラボを開始] ボタンをクリックする前に

注: 以下の説明をお読みください。

ラボの時間は計測されており、一時停止することはできません。[ラボを開始] をクリックするとスタートするタイマーは、Google Cloud のリソースを利用できる時間を示しています。

この Qwiklabs ハンズオンラボでは、シミュレーションやデモ環境ではなく、実際のクラウド環境を使ってご自身でラボのアクティビティを行うことができます。そのため、ラボの受講中に Google Cloud にログインおよびアクセスするための、新しい一時的な認証情報が提供されます。

必要なもの

このラボを完了するためには、下記が必要です。

  • 標準的なインターネット ブラウザ(Chrome を推奨)
  • ラボを完了するために十分な時間
注: すでに個人の Google Cloud アカウントやプロジェクトをお持ちの場合でも、ラボでは使用しないでください。 注: Pixelbook を使用している場合は、このラボをシークレット ウィンドウで実施してください。

ラボを開始してコンソールにログインする方法

  1. [ラボを開始] ボタンをクリックします。ラボの料金をお支払いいただく必要がある場合は、表示されるポップアップでお支払い方法を選択してください。 左側のパネルには、このラボで使用する必要がある一時的な認証情報が表示されます。

    [認証情報] パネル

  2. ユーザー名をコピーし、[Google Console を開く] をクリックします。 ラボでリソースが起動し、別のタブで [アカウントの選択] ページが表示されます。

    注: タブをそれぞれ別のウィンドウで開き、並べて表示しておきましょう。
  3. [アカウントの選択] ページで [別のアカウントを使用] をクリックします。[ログイン] ページが開きます。

    [別のアカウントを使用] オプションがハイライト表示されている、アカウントのダイアログ ボックスを選択します。

  4. [接続の詳細] パネルでコピーしたユーザー名を貼り付けます。パスワードもコピーして貼り付けます。

注: 認証情報は [接続の詳細] パネルに表示されたものを使用してください。Google Cloud Skills Boost の認証情報は使用しないでください。請求が発生する事態を避けるため、Google Cloud アカウントをお持ちの場合でも、このラボでは使用しないでください。
  1. その後次のように進みます。
  • 利用規約に同意してください。
  • 一時的なアカウントなので、復元オプションや 2 要素認証プロセスは設定しないでください。
  • 無料トライアルには登録しないでください。

しばらくすると、このタブで Cloud コンソールが開きます。

注: 左上にある [ナビゲーション メニュー] をクリックすると、Google Cloud のプロダクトやサービスのリストが含まれるメニューが表示されます。 Cloud コンソール メニュー

Jupyter ノートブック ベースの開発環境の設定

このラボでは、すべてのコマンドをノートブックのターミナルで実行します。

  1. Google Cloud コンソールのナビゲーション メニューで、[Vertex AI] > [Workbench] をクリックします。

  2. [Notebooks API を有効にする] をクリックします。

  3. [Workbench] ページで [ユーザー管理のノートブック] を選択し、[新規作成] をクリックします。

  4. 表示された [新しいインスタンス] ダイアログ ボックスで、リージョンを に、ゾーンを に設定します。

  5. [環境] で [Apache Beam] を選択します。

  6. ダイアログ ボックスの下部にある [作成] をクリックします。

注: 環境の完全なプロビジョニングには 3~5 分かかる場合があります。処理が完了するまでお待ちください。 注: [Notebook API を有効にする] をクリックして Notebooks API を有効にします。
  1. 環境の準備が完了したら、ノートブック名の横にある [JupyterLab を開く] をクリックします。これにより、使用環境がブラウザの新しいタブで開きます。

IDE_link

  1. 次に、[ターミナル] をクリックします。これにより、このラボのすべてのコマンドを実行できるターミナルが開きます。

ターミナルを開く

コード リポジトリをダウンロードする

このラボで使用するコード リポジトリをダウンロードします。

  1. 開いたターミナルで、次のコマンドを入力します。
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. ノートブック環境の左側パネルのファイル ブラウザに、training-data-analyst リポジトリが追加されます。

  2. クローン リポジトリ /training-data-analyst/quests/dataflow_python/ に移動します。ラボごとに、1 つのフォルダが表示されます。このフォルダはさらに、完成させるコードが格納される lab サブフォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution サブフォルダとに分けられています。

展開された [表示] メニューでハイライト表示されているエクスプローラ オプション

注: 編集のためにファイルを開くには、目的のファイルに移動してクリックします。ファイルが開き、コードを追加または変更できます。

適切なラボを開く

  • ターミナルで以下のコマンドを実行して、このラボで使用するディレクトリに変更します。
# ディレクトリをラボに変更する cd 5_Streaming_Analytics/lab export BASE_DIR=$(pwd)

仮想環境と依存関係を設定する

実際のパイプライン コードの編集を開始する前に、必要な依存関係がインストールされていることを確認する必要があります。

  1. 以下のコマンドを実行して、このラボでの作業用に仮想環境を作成します。
sudo apt-get install -y python3-venv ## 仮想環境を作成して有効化する python3 -m venv df-env source df-env/bin/activate
  1. 次に、パイプラインの実行に必要なパッケージをインストールします。
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Dataflow API が有効になっていることを確認します。
gcloud services enable dataflow.googleapis.com
  1. 最後に、dataflow.worker のロールを Compute Engine のデフォルトのサービス アカウントに付与します。
PROJECT_ID=$(gcloud config get-value project) export PROJECT_NUMBER=$(gcloud projects list --filter="$PROJECT_ID" --format="value(PROJECT_NUMBER)") export serviceAccount=""$PROJECT_NUMBER"-compute@developer.gserviceaccount.com"
  1. Google Cloud コンソールで、[IAM と管理] > [IAM] に移動し、Compute Engine のデフォルトのサービス アカウントの [プリンシパルを編集します] アイコンをクリックします。

  2. Dataflow ワーカーを別のロールとして追加し、[保存] をクリックします。

データ環境を設定する

# GCS バケットと BQ データセットを作成する cd $BASE_DIR/../.. source create_streaming_sinks.sh # 練習用コードが含まれているディレクトリに移動する cd $BASE_DIR

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 データ環境を設定する

タスク 1. ストリーミング ソースから読み取る

これまでのラボでは、beam.io.ReadFromText を使用して Google Cloud Storage から読み取りを行いました。このラボでは、Google Cloud Storage ではなく Pub/Sub を使用します。Pub/Sub はフルマネージドのリアルタイム メッセージング サービスで、パブリッシャーはメッセージを「トピック」に送信でき、サブスクライバーは「サブスクリプション」を介してこのトピックをサブスクライブできます。

パブリッシャーからサブスクライバーに向かう、5 つのポイントがあるパイプライン(ポイント 2「メッセージ ストレージ」とポイント 3「サブスクリプション」がハイライト表示されている)

作成するパイプラインは、create_streaming_sinks.sh スクリプトで作成したばかりのトピック my_topic をサブスクライブします。本番環境では多くの場合、このトピックはパブリッシュする側のチームが作成します。トピックはコンソールの Pub/Sub セクションで確認できます。

  1. ファイル エクスプローラで training-data-analyst/quest/dataflow_python/5_Streaming_Analytics/lab/ に移動し、streaming_minute_traffic_pipeline.py ファイルを開きます。
  • Apache Beam の IO コネクタを使用して Pub/Sub から読み取りを行うには、beam.io.ReadFromPubSub() クラスを使用する変換をパイプラインに追加します。このクラスには、ソーストピックと timestamp_attribute を指定する属性があります。デフォルトでは、この属性はメッセージのパブリッシュ時刻に設定されています。
注: パブリッシュ時刻は、Pub/Sub サービスがメッセージを初めて受信する時刻です。実際のイベント時刻とパブリッシュ時刻の間に遅延(つまり、遅延データ)が生じる可能性のあるシステムでこの点を考慮したい場合は、メッセージをパブリッシュするクライアント コードで、メッセージのタイムスタンプ メタデータ属性を設定し、実際のイベント タイムスタンプを与える必要があります。Pub/Sub は、ペイロードに埋め込まれたイベント タイムスタンプの抽出方法をネイティブには把握しないためです。メッセージの生成で使用するクライアント コードはこちらで確認できます。

このタスクを完了するには、以下を実行します。

  • input_topic コマンドライン パラメータで指定された Pub/Sub トピックから読み取りを行う変換を追加します。
  • 次に、用意されている関数 parse_jsonbeam.Map とともに使用し、各 JSON 文字列を CommonLog インスタンスに変換します。
  • この変換による結果を、with_output_types() を使用して CommonLog インスタンスの PCollection に収集します。
  1. 最初の #TODO に、以下のコードを追加します。
beam.io.ReadFromPubSub(input_topic)

タスク 2. データをウィンドウ処理する

SQL を使用しない前回のラボでは、固定時間ウィンドウ処理を実装し、イベント時刻ごとにイベントを相互排他的な固定サイズのウィンドウにグループ化しました。ここでは、ストリーミング入力に関して同じことを行います。行き詰まった場合は、前回のラボのコードまたはソリューションを参照してください。

1 分間のウィンドウにウィンドウ処理する

このタスクを完了するには、以下を実行します。

  1. CommonLog データの PCollection を受け取り、要素を window_duration 秒のウィンドウにウィンドウ処理する変換をパイプラインに追加します。ここで window_duration は別のコマンドライン パラメータです。
  2. 以下のコードを使用して、要素を 1 分間のウィンドウにウィンドウ処理する変換をパイプラインに追加します。
"WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(60))

タスク 3. データを集計する

前のラボでは、CountCombineFn() combiner を使用してウィンドウごとのイベント数をカウントしました。ここでも同じことを行います。

ウィンドウごとのイベント数をカウントする

このタスクを完了するには、以下を実行します。

  1. ウィンドウ処理された PCollection を入力として、ウィンドウごとのイベント数をカウントする変換に渡します。
  2. その後、用意されている DoFnGetTimestampFnbeam.ParDo とともに使用し、ウィンドウの開始タイムスタンプを含めます。
  3. 以下のコードを使用して、ウィンドウごとのイベント数をカウントする変換をパイプラインに追加します。
"CountPerMinute" >> beam.CombineGlobally(CountCombineFn()).without_defaults()

タスク 4. BigQuery への書き込み

このパイプラインは、BigQuery への書き込みを 2 つの異なるブランチで行います。1 つ目のブランチは集計データを BigQuery に書き込みます。2 つ目のブランチはすでに作成されていて、イベント タイムスタンプや実際の処理タイムスタンプなど、各未加工イベントのメタデータを書き出します。両方ともストリーミング挿入で BigQuery に直接書き込みます。

集計データを BigQuery に書き込む

BigQuery への書き込みについては以前のラボで広範に説明しているため、ここでは基本的なメカニズムを深く掘り下げません。

このタスクを完了するには、以下を実行します。

  • 集計データを格納するためのテーブル用に、aggr_table_name という新しいコマンドライン パラメータを作成します。
  • 以前と同様に、BigQuery に書き込む変換を追加します。
注: ストリーミングのコンテキストでは、テーブルを削除してから再作成する WRITE_TRUNCATEWrite_disposition が、beam.io.WriteToBigQuery() によってサポートされていません。この例では、WRITE_APPEND を使用します。

BigQuery 挿入メソッド

beam.io.WriteToBigQuery はデフォルトで、制限なし PCollections に対するストリーミング挿入か、制限付き PCollections に対するファイルのバッチ読み込みジョブのいずれかになります。ストリーミング挿入は、データの集計値をすぐに表示したい場合に特に便利ですが、追加料金が発生します。数分程度ごとの定期的な一括アップロードで問題がないストリーミング ユースケースでは、この動作を method キーワード引数で指定し、頻度についても triggering_frequency キーワード引数で設定できます。詳細は、apache_beam.io.gcp.bigquery モジュールのドキュメントの「Write data to BigQuery」セクションをご覧ください。

  • 以下のコードを使用して、集計データを BigQuery テーブルに書き込む変換をパイプラインに追加します。
'WriteAggToBQ' >> beam.io.WriteToBigQuery( agg_table_name, schema=agg_table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND )

タスク 5. パイプラインを実行する

  • ターミナルに戻り、以下のコードを実行してパイプラインを実行します。
export PROJECT_ID=$(gcloud config get-value project) export REGION='us-central1' export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export PUBSUB_TOPIC=projects/${PROJECT_ID}/topics/my_topic export WINDOW_DURATION=60 export AGGREGATE_TABLE_NAME=${PROJECT_ID}:logs.windowed_traffic export RAW_TABLE_NAME=${PROJECT_ID}:logs.raw python3 streaming_minute_traffic_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --staging_location=${PIPELINE_FOLDER}/staging \ --temp_location=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --input_topic=${PUBSUB_TOPIC} \ --window_duration=${WINDOW_DURATION} \ --agg_table_name=${AGGREGATE_TABLE_NAME} \ --raw_table_name=${RAW_TABLE_NAME} 注: Dataflow パイプラインが失敗し、pipeline.py ファイルを開くことができないというエラーが表示される場合は、パイプラインをもう一度実行すると問題なく実行されます。

Dataflow UI で、エラーが発生せず正常に実行されることを確認します。データの作成もパイプラインへの取り込みもまだ行われていないため、パイプラインは実行されているものの、何も処理されていません。次のステップでデータを投入します。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 パイプラインを実行する

タスク 6. ラグのないストリーミング入力を生成する

ストリーミング パイプラインであるため、ストリーミング ソースをサブスクライブし、入力を待ちます。現在、入力はありません。このセクションでは、ラグのないデータを生成します。実際のデータにはラグがほぼ間違いなく存在します。しかし、ラグのないストリーミング入力を理解することは勉強になります。

このクエストのコードには、Pub/Sub を使用して JSON イベントをパブリッシュするスクリプトが含まれています。

  • このタスクを完了してメッセージのパブリッシュを開始するには、現在のターミナルと並べて新しいターミナルを開き、次のスクリプトを実行します。これにより、スクリプトを終了するまでメッセージのパブリッシュが継続されます。training-data-analyst/quests/dataflow_python フォルダ内で作業していることを確認してください。
bash generate_streaming_events.sh

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 ラグのないストリーミング入力を生成する

結果を確認

  1. データの入力が開始されるまで数分待ちます。その後、BigQuery に移動し、logs.minute_traffic テーブルに対して次のクエリを実行します。
SELECT timestamp, page_views FROM `logs.windowed_traffic` ORDER BY timestamp ASC

ページビュー数が 1 分間に 100 回前後で推移しているのを確認できるはずです。

  1. また、BigQuery コマンドライン ツールを使用すると、結果が以下のように記述されていることを迅速に確認できます。
bq head logs.raw bq head logs.windowed_traffic
  1. ここで、以下のクエリを入力します。
SELECT UNIX_MILLIS(TIMESTAMP(event_timestamp)) - min_millis.min_event_millis AS event_millis, UNIX_MILLIS(TIMESTAMP(processing_timestamp)) - min_millis.min_event_millis AS processing_millis, user_id, -- すべてのポイントが表示されるように、一意のラベルとして追加 CAST(UNIX_MILLIS(TIMESTAMP(event_timestamp)) - min_millis.min_event_millis AS STRING) AS label FROM `logs.raw` CROSS JOIN ( SELECT MIN(UNIX_MILLIS(TIMESTAMP(event_timestamp))) AS min_event_millis FROM `logs.raw`) min_millis WHERE event_timestamp IS NOT NULL ORDER BY event_millis ASC

このクエリによって、イベント時刻と処理時刻の時間差が示されます。しかし、未加工の表形式データを見るだけでは、全体像の把握は困難なことがあります。軽量のデータ可視化および BI エンジンであるデータポータルを使用します。

  1. データポータルを次の手順で有効にします。
  • https://datastudio.google.com にアクセスします。
  • 左上の [Create] をクリックします。
  • [Report] をクリックします。
  • 利用規約をクリックして確認し、[Done] をクリックします。
  • BigQuery UI に戻ります。
  1. BigQuery UI で、[データを探索] ボタンをクリックし、[データポータルで調べる] を選択します。

新しいウィンドウが開きます。

  1. 開いたウィンドウの右側のパネルで、グラフの種類として散布図を選択します。

  2. 右側のパネルのデータ列で、次の値を設定します。

  • ディメンション: label
  • 階層: disabled
  • 指標 X: event_millis
  • 指標 Y: processing_millis

グラフは散布図に変換され、すべてのポイントが対角線上に位置します。これは、現在生成されているストリーミング データのイベントが生成直後に処理されていて、ラグが発生しないためです。Dataflow ジョブが完全に実行状態になる前に、すぐにデータ生成スクリプトを開始した場合、ホッケー スティック型になることがあります。メッセージが Pub/Sub のキューに入れられ、そのすべてがほぼ同時に処理されたためです。

しかし実際の環境では、パイプラインでラグに対処する必要があります。

すべてのポイントが対角線上に位置する散布図

タスク 7. ストリーミング入力にラグを導入する

ストリーミング イベント スクリプトによるイベント生成では、ラグをシミュレートできます。

これはイベント生成と Pub/Sub へのパブリッシュの間に遅れが生じるシナリオを表しています。たとえば、モバイル クライアントがオフライン モードになりユーザーがサービスを受けられない場合、イベントはデバイス上で収集され、デバイスがオンラインに復帰したときにすべてのイベントが一度にパブリッシュされます。

ラグのあるストリーミング入力を生成する

  1. まず、データポータルのウィンドウを閉じます。

  2. 次に、ラグを有効にするために、ターミナルに戻って Ctrl+C キーを押して実行中のスクリプトを停止します。

  3. 以下のスクリプトを実行します。

bash generate_streaming_events.sh true

結果を確認

  • BigQuery UI に戻って再度クエリを実行してから、以前と同様にデータポータルのビューを再作成します。グラフの右側に表示される新着データはもはや完全ではなく、一部は対角線より上に表示されます。これはイベントが発生したタイミングより後に処理されたことを意味しています。

グラフの種類: 散布図

  • ディメンション: label
  • 階層: disabled
  • 指標 X: event_millis
  • 指標 Y: processing_millis

一部のデータポイントが対角線より上にある散布図

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

シークレット ブラウジングを使用する

  1. ラボで使用するユーザー名パスワードをコピーします
  2. プライベート モードで [コンソールを開く] をクリックします

コンソールにログインする

    ラボの認証情報を使用して
  1. ログインします。他の認証情報を使用すると、エラーが発生したり、料金が発生したりする可能性があります。
  2. 利用規約に同意し、再設定用のリソースページをスキップします
  3. ラボを終了する場合や最初からやり直す場合を除き、[ラボを終了] はクリックしないでください。クリックすると、作業内容がクリアされ、プロジェクトが削除されます

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。