arrow_back

Dataflow を使用したサーバーレスのデータ処理 - ストリーミング分析に Dataflow を使用する(Java)

ログイン 参加
700 以上のラボとコースにアクセス

Dataflow を使用したサーバーレスのデータ処理 - ストリーミング分析に Dataflow を使用する(Java)

ラボ 2時間 universal_currency_alt クレジット: 5 show_chart 上級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
700 以上のラボとコースにアクセス

概要

このラボでは、バッチのコンテキストで紹介した多くのコンセプトをストリーミングのコンテキストに適用し、BatchMinuteTrafficPipeline に似た、ただしリアルタイムに運用されるパイプラインを作成します。完成したパイプラインは、まず Pub/Sub から JSON メッセージを読み取り、ブランチする前にそのメッセージを解析します。一方のブランチは元データを BigQuery に書き込み、イベント時刻と処理時刻を記録します。他方のブランチはデータをウィンドウ処理して集計し、その結果を BigQuery に書き込みます。

目標

  • ストリーミング ソースからデータを読み取る。
  • ストリーミング シンクにデータを書き込む。
  • ストリーミングのコンテキストでデータをウィンドウ処理する。
  • ラグの影響を実験的に検証する。

次のパイプラインを構築します。

ReadMessage pub/sub からブランチしたパイプラインの図。

各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。

  1. Qwiklabs にシークレット ウィンドウでログインします。

  2. ラボのアクセス時間(例: 1:15:00)に注意し、時間内に完了できるようにしてください。
    一時停止機能はありません。必要な場合はやり直せますが、最初からになります。

  3. 準備ができたら、[ラボを開始] をクリックします。

  4. ラボの認証情報(ユーザー名パスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。

  5. [Google Console を開く] をクリックします。

  6. [別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
    他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。

  7. 利用規約に同意し、再設定用のリソースページをスキップします。

Google Cloud Shell の有効化

Google Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。

Google Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。

  1. Google Cloud コンソールで、右上のツールバーにある [Cloud Shell をアクティブにする] ボタンをクリックします。

    ハイライト表示された Cloud Shell アイコン

  2. [続行] をクリックします。

環境がプロビジョニングされ、接続されるまでしばらく待ちます。接続した時点で認証が完了しており、プロジェクトに各自のプロジェクト ID が設定されます。次に例を示します。

Cloud Shell ターミナルでハイライト表示されたプロジェクト ID

gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。

  • 次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
gcloud auth list

出力:

Credentialed accounts: - @.com (active)

出力例:

Credentialed accounts: - google1623327_student@qwiklabs.net
  • 次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
gcloud config list project

出力:

[core] project =

出力例:

[core] project = qwiklabs-gcp-44776a13dea667a6 注: gcloud ドキュメントの全文については、 gcloud CLI の概要ガイド をご覧ください。

プロジェクトの権限を確認する

Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。

  1. Google Cloud コンソールのナビゲーション メニューナビゲーション メニュー アイコン)で、[IAM と管理] > [IAM] を選択します。

  2. Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com が存在し、編集者のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。

Compute Engine のデフォルトのサービス アカウント名と編集者のステータスがハイライト表示された [権限] タブページ

注: アカウントが IAM に存在しない場合やアカウントに編集者のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。
  1. Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
  2. プロジェクト番号(例: 729328892908)をコピーします。
  3. ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
  4. ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセス権を付与] をクリックします。
  5. [新しいプリンシパル] に次のように入力します。
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number} はプロジェクト番号に置き換えてください。
  2. [ロール] で、[Project](または [基本])> [編集者] を選択します。
  3. [保存] をクリックします。

IDE の設定

このラボでは、Google Compute Engine でホストされる Theia Web IDE を主に使用します。これには、事前にクローンが作成されたラボリポジトリが含まれます。Java 言語サーバーがサポートされているとともに、Cloud Shell に似た仕組みで、gcloud コマンドライン ツールを通じて Google Cloud API へのプログラムによるアクセスが可能なターミナルも使用できます。

Theia IDE にアクセスするには、Qwiklabs に表示されたリンクをコピーして新しいタブに貼り付けます。

注: URL が表示された後も、環境が完全にプロビジョニングされるまで 3~5 分待つ必要がある場合があります。その間はブラウザにエラーが表示されます。

ide_url

ラボリポジトリのクローンが環境に作成されました。各ラボは、完成させるコードが格納される labs フォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution フォルダに分けられています。ファイル エクスプローラ ボタンをクリックして確認します。

file_explorer

Cloud Shell で行うように、この環境で複数のターミナルを作成することも可能です。

new_terminal

提供されたサービス アカウント(ラボのユーザー アカウントとまったく同じ権限がある)でログインしたターミナルで gcloud auth list を実行すれば、以下を確認できます。

gcloud_auth

環境が機能しなくなった場合は、IDE をホストしている VM を GCE コンソールから次のようにリセットしてみてください。

gce_reset

該当するラボを開く

  • IDE 環境に新しいターミナルをまだ作成していない場合は作成し、次のコマンドをコピーして貼り付けます。
# ディレクトリをラボに変更する cd 5_Streaming_Analytics/labs # 依存関係をダウンロードする mvn clean dependency:resolve export BASE_DIR=$(pwd)

データ環境を設定する

# GCS バケット、BQ データセット、Pub/Sub トピックを作成する cd $BASE_DIR/../.. source create_streaming_sinks.sh # 練習用コードが含まれているディレクトリに移動する cd $BASE_DIR

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 データ環境を設定する

タスク 1. ストリーミング ソースから読み取る

以前のラボでは、TextIO.read() を使用して Google Cloud Storage から読み取りました。このラボでは、Google Cloud Storage ではなく Pub/Sub を使用します。Pub/Sub はフルマネージドのリアルタイム メッセージング サービスで、パブリッシャーはメッセージをトピックに送信でき、サブスクライバーはサブスクリプションを介してこのトピックをサブスクライブできます。

このメッセージング サービスを表す図で Cloud Pub/Sub セクションがハイライト表示されている。

作成するパイプラインは、create_streaming_sinks.sh スクリプトで作成したばかりのトピックである my_topic をサブスクライブします。本番環境では多くの場合、このトピックは公開チームが作成します。コンソールの Pub/Sub セクションでこのトピックを確認できます。

Apache Beam の IO コネクタを使用して Pub/Sub から読み取るには、StreamingMinuteTrafficPipeline.java ファイルを開き、PubsubIO.readStrings() 関数を使用する変換をパイプラインに追加します。この関数は、ソーストピックとタイムスタンプ属性を指定するメソッドを持つ PubsubIO.Read のインスタンスを返します。Pub/Sub トピック名のコマンドライン オプションがすでにあるので注意してください。タイムスタンプ属性を timestamp に設定します。これは各 Pub/Sub メッセージに追加される属性に対応しています。メッセージのパブリッシュ時刻で十分な場合、このステップは不要です。

注: パブリッシュ時刻は、Pub/Sub サービスがメッセージを初めて受信する時刻です。実際のイベント時刻とパブリッシュ時刻の間に遅延が生じる(つまり、遅延データ)可能性のあるシステムでこの点を考慮したい場合、メッセージをパブリッシュするクライアント コードで、メッセージのタイムスタンプ メタデータ属性を設定し、実際のイベント タイムスタンプを与える必要があります。Pub/Sub は、ペイロードに埋め込まれたイベント タイムスタンプの抽出方法をネイティブには把握しないためです。

メッセージの生成で使用するクライアント コードは GitHub で確認できます。

このタスクを完了するには、inputTopic コマンドライン パラメータで指定される Pub/Sub トピックから読み取る変換を追加します。次に、与えられている DoFn である JsonToCommonLog を使用し、各 JSON 文字列を CommonLog インスタンスに変換します。この変換による結果を CommonLog インスタンスの PCollection に収集します。

タスク 2. データをウィンドウ処理する

前回の非 SQL ラボでは、固定時間ウィンドウ処理を実装し、イベント時刻ごとにイベントを相互に排他的な固定サイズのウィンドウにグループ化しました。ここでは、ストリーミング入力に関して同じことを行います。行き詰まった場合は、前回のラボのコードまたはソリューションを参照してください。

1 分間のウィンドウにウィンドウ処理する

  1. 1 分間の固定ウィンドウを次のように実装します。
PCollection<String> pColl= ...; PCollection<String> windowedPCollection = pColl.apply( Window.<String>into(FixedWindows.of(Duration.standardSeconds(60))));
  1. このタスクを完了するには、CommonLogPCollection のデータを受け取り、要素を windowDuration 秒のウィンドウにウィンドウ処理する変換をパイプラインに追加します。ここで windowDuration は別のコマンドライン パラメータです。

タスク 3. データを集計する

前回のラボでは、Count 変換を使用してウィンドウごとのイベント数をカウントしました。ここでも同じことを行います。

ウィンドウごとのイベント数をカウントする

  1. グローバルではないウィンドウ内のイベント数をカウントするために、次のようなコードを書くことができます。
PCollection<Long> counts = pColl.apply("CountPerWindow", Combine.globally(Count.<MyClass>combineFn()).withoutDefaults());
  1. 以前と同様に、次のように <Long> 値から <Row> 値に戻し、タイムスタンプを抽出する必要があります。
@ProcessElement public void processElement(@Element T l, OutputReceiver<T> r, IntervalWindow window) { Instant i = Instant.ofEpochMilli(window.end().getMillis()); Row row = Row.withSchema(appSchema) .addValues(.......) .build() ... r.output(...); }
  1. シリアル化のために、次のようにスキーマを指定することを忘れないでください。
apply().setRowSchema(appSchema)
  1. このタスクを完了するには、ウィンドウ処理された PCollection を入力として、ウィンドウごとのイベント数をカウントする変換に渡します。

  2. その結果を、すでに与えられている pageviewsSchema スキーマを使用して Row の PCollection に戻す変換を追加します。

タスク 4. BigQuery に書き込む

このパイプラインは、BigQuery への書き込みを 2 つのブランチで行います。1 つ目のブランチは集計データを BigQuery に書き込みます。2 つ目のブランチはすでに作成されていて、各イベントの元データとしてのメタデータ、すなわちイベント タイムスタンプや実際の処理タイムスタンプなどを書き込みます。両方ともストリーミング挿入で BigQuery に直接書き込みます。

集計データを BigQuery に書き込む

BigQuery への書き込みについては以前のラボで広範に説明しているため、ここでは基本的なメカニズムを深く掘り下げません。

このタスクでは、すでに与えられているコードを使用します。

このタスクを完了するには、次のことを行います。

  1. 集計データを格納するためのテーブル用に、aggregateTableName という新しいコマンドライン パラメータを作成します。

  2. 以前と同様に、.useBeamSchema() を使用して BigQuery に書き込む変換を追加します。

注: ストリーミングのコンテキストでは、BigQueryIO.write() は、テーブルを削除してから再作成する、WRITE_TRUNCATEWriteDisposition をサポートしていません。この例では、WRITE_APPEND を使用します。

BigQuery 挿入メソッド

BigQueryIO.Write はデフォルトで、制限なし PCollections に対するストリーミング挿入か、制限付き PCollections に対するファイルのバッチ読み込みジョブのいずれかになります。ストリーミング挿入は、データの集計値をすぐに表示したい場合に特に有用ですが、追加料金が発生します。

数分程度ごとの定期的な一括アップロードで問題がないストリーミング ユースケースでは、この動作を .withMethod() で指定し、頻度についても .withTriggeringFrequency(org.joda.time.Duration) で設定できます。

詳細については、BigQueryIO.Write.Method のドキュメントを参照してください。

rowsPCollection.apply("WriteToBQ", BigQueryIO.<Row>write().to(myTableName).useBeamSchema() .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

元データを BigQuery に書き込む

このタスクを完了するには、次のことを行います。

  1. 元データを格納するためのテーブルの名前に関するコマンドライン パラメータを見つけます。

  2. すでに作成されているパイプライン ブランチを調べます。このブランチはイベント時刻と処理時刻を取得しています。

タスク 5. パイプラインを実行する

  1. パイプラインを実行するには、以下の例に類似したコマンドを作成します。
注: 記述に含めたコマンドライン オプションの名前を反映するように修正する必要があります。 export PROJECT_ID=$(gcloud config get-value project) export REGION=us-central1 export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export MAIN_CLASS_NAME=com.mypackage.pipeline.StreamingMinuteTrafficPipeline export RUNNER=DataflowRunner export PUBSUB_TOPIC=projects/${PROJECT_ID}/topics/my_topic export WINDOW_DURATION=60 export AGGREGATE_TABLE_NAME=${PROJECT_ID}:logs.windowed_traffic export RAW_TABLE_NAME=${PROJECT_ID}:logs.raw cd $BASE_DIR mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --inputTopic=${PUBSUB_TOPIC} \ --windowDuration=${WINDOW_DURATION} \ --aggregateTableName=${AGGREGATE_TABLE_NAME} \ --rawTableName=${RAW_TABLE_NAME}"
  1. Dataflow UI で、エラーなしに正常に実行されることを確認します。
注: データの作成もパイプラインへの取り込みもまだ行われていないため、パイプラインは実行されていますが、何も処理していません。次のステップでデータを投入します。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 パイプラインを実行する

タスク 6. ラグのないストリーミング入力を生成する

ストリーミング パイプラインであるため、ストリーミング ソースをサブスクライブし、入力を待ちます。現在、入力はありません。このセクションでは、ラグのないデータを生成します。実際のデータでは、ラグはほぼ間違いなく存在します。しかし、ラグのないストリーミング入力を理解するのは教育面で有益です。

このクエストのコードには、Pub/Sub を使用して JSON イベントをパブリッシュするスクリプトが含まれています。

  • このタスクを完了してメッセージのパブリッシュを開始するには、現在のターミナルと並べて新しいターミナルを開き、次のスクリプトを実行します。このスクリプトは、強制終了するまで、メッセージをパブリッシュし続けます。training-data-analyst/quests/dataflow フォルダが作業ディレクトリになっていることを確認します。
bash generate_streaming_events.sh

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 ラグのないストリーミング入力を生成する

結果を確認

  1. 数分待ってデータが入力されるようになったら、BigQuery に移動し、logs.minute_traffic テーブルに対して次のようなクエリを実行します。
SELECT minute, pageviews FROM `logs.windowed_traffic` ORDER BY minute ASC

ページビュー数が 1 分間に 100 回前後で推移しているのを確認できるはずです。

また、BigQuery コマンドライン ツールを使用すると、結果が次のように記述されていることを迅速に確認できます。

bq head logs.raw bq head logs.windowed_traffic
  1. ここで、次のクエリを入力します。
SELECT UNIX_MILLIS(event_timestamp) - min_millis.min_event_millis AS event_millis, UNIX_MILLIS(processing_timestamp) - min_millis.min_event_millis AS processing_millis, user_id, -- すべてのポイントが表示されるようにユニークラベルとして追加 CAST(UNIX_MILLIS(event_timestamp) - min_millis.min_event_millis AS STRING) AS label FROM `logs.raw` CROSS JOIN ( SELECT MIN(UNIX_MILLIS(event_timestamp)) AS min_event_millis FROM `logs.raw`) min_millis WHERE event_timestamp IS NOT NULL ORDER BY event_millis ASC

このクエリによって、イベント時刻と処理時刻の時間差が示されます。しかし、未加工の表形式データを見るだけでは、全体像の把握は困難なことがあります。軽量のデータ可視化および BI エンジンであるデータポータルを使用します。

  1. データポータルを次の手順で有効にします。
  • データポータルにアクセスします。
  • 左上の [Create] をクリックします。
  • [Report] をクリックします。
  • 利用規約をクリックして確認し、[Done] をクリックします。
  • BigQuery UI に戻ります。
  1. BigQuery UI で、[データを探索] ボタンをクリックし、[データポータルで調べる] を選択します。

新しいウィンドウが開きます。

  1. ウィンドウの右側のパネルで、グラフの種類として散布図を選択します。

  2. 右側のパネルのデータ列で、次の値を設定します。

  • ディメンション: label
  • 指標 X: event_millis
  • 指標 Y: processing_millis

グラフは散布図に変換され、すべてのポイントが対角線上に位置します。現在生成されているストリーミング データのイベントは生成直後に処理され、ラグが発生しないためです。Dataflow ジョブが完全に実行状態になる前に、すぐにデータ生成スクリプトを開始した場合、ホッケー スティック型になることがあります。メッセージが Pub/Sub のキューに入れられ、そのすべてがほぼ同時に処理されたためです。

しかし実際の環境では、ラグへの対処がパイプラインに必要です。

新たに変換された散布図。

タスク 7. ストリーミング入力にラグを導入する

ストリーミング イベント スクリプトによるイベント生成では、ラグをシミュレートできます。

これはイベント生成と Pub/Sub へのパブリッシュの間に遅れが生じるシナリオを表しています。たとえば、モバイル クライアントがオフライン モードになりユーザーがサービスを受けられない場合、イベントはデバイス上で収集され、デバイスがオンラインに復帰したときにすべてのイベントが一度にパブリッシュされます。

ラグのあるストリーミング入力を生成する

  1. まず、データポータルのウィンドウを閉じます。

  2. ラグを有効にするために、IDE ターミナルが含まれるウィンドウに戻ります。

  3. ターミナルで Ctrl+C キーを使用して、実行中のスクリプトを停止します。

  4. 次のスクリプトを実行します。

bash generate_streaming_events.sh true

結果を確認

  • BigQuery UI に戻って再度クエリを実行し、次に以前と同様に、データポータルのビューを作成します。

グラフの右側に表示される新着データはもはや完全ではなく、一部は対角線より上に表示されます。これはイベントが発生したタイミングより後に処理されたことを意味しています。

グラフの種類: 散布図

  • ディメンション: label
  • 指標 X: event_millis
  • 指標 Y: processing_millis

更新された散布図。

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

シークレット ブラウジングを使用する

  1. ラボで使用するユーザー名パスワードをコピーします
  2. プライベート モードで [コンソールを開く] をクリックします

コンソールにログインする

    ラボの認証情報を使用して
  1. ログインします。他の認証情報を使用すると、エラーが発生したり、料金が発生したりする可能性があります。
  2. 利用規約に同意し、再設定用のリソースページをスキップします
  3. ラボを終了する場合や最初からやり直す場合を除き、[ラボを終了] はクリックしないでください。クリックすると、作業内容がクリアされ、プロジェクトが削除されます

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。