
始める前に
- ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
- ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
- 画面左上の [ラボを開始] をクリックして開始します
Setup the data environment
/ 20
Run your pipeline
/ 10
Generate lag-less streaming input
/ 10
このラボでは、バッチのコンテキストで紹介した多くのコンセプトをストリーミングのコンテキストに適用し、batch_minute_traffic_pipeline と同様のパイプラインを作成します。ただし、このパイプラインはリアルタイムに機能します。完成したパイプラインは、まず Pub/Sub から JSON メッセージを読み取り、分岐前にそのメッセージを解析します。一方のブランチは BigQuery に元データをいくらか書き込み、イベント時刻と処理時刻を記録します。他方のブランチはデータをウィンドウ処理して集計し、結果を BigQuery に書き込みます。
次のパイプラインを構築します。
この Qwiklabs ハンズオンラボでは、シミュレーションやデモ環境ではなく、実際のクラウド環境を使ってご自身でラボのアクティビティを行うことができます。そのため、ラボの受講中に Google Cloud にログインおよびアクセスするための、新しい一時的な認証情報が提供されます。
このラボを完了するためには、下記が必要です。
[ラボを開始] ボタンをクリックします。ラボの料金をお支払いいただく必要がある場合は、表示されるポップアップでお支払い方法を選択してください。 左側のパネルには、このラボで使用する必要がある一時的な認証情報が表示されます。
ユーザー名をコピーし、[Google Console を開く] をクリックします。 ラボでリソースが起動し、別のタブで [アカウントの選択] ページが表示されます。
[アカウントの選択] ページで [別のアカウントを使用] をクリックします。[ログイン] ページが開きます。
[接続の詳細] パネルでコピーしたユーザー名を貼り付けます。パスワードもコピーして貼り付けます。
しばらくすると、このタブで Cloud コンソールが開きます。
このラボでは、すべてのコマンドをノートブックのターミナルで実行します。
Google Cloud コンソールのナビゲーション メニューで、[Vertex AI] > [Workbench] をクリックします。
[Notebooks API を有効にする] をクリックします。
[Workbench] ページで [ユーザー管理のノートブック] を選択し、[新規作成] をクリックします。
表示された [新しいインスタンス] ダイアログ ボックスで、リージョンを
[環境] で [Apache Beam] を選択します。
ダイアログ ボックスの下部にある [作成] をクリックします。
このラボで使用するコード リポジトリをダウンロードします。
ノートブック環境の左側パネルのファイル ブラウザに、training-data-analyst リポジトリが追加されます。
クローン リポジトリ /training-data-analyst/quests/dataflow_python/
に移動します。ラボごとに、1 つのフォルダが表示されます。このフォルダはさらに、完成させるコードが格納される lab
サブフォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution
サブフォルダとに分けられています。
実際のパイプライン コードの編集を開始する前に、必要な依存関係がインストールされていることを確認する必要があります。
dataflow.worker
のロールを Compute Engine のデフォルトのサービス アカウントに付与します。Google Cloud コンソールで、[IAM と管理] > [IAM] に移動し、Compute Engine のデフォルトのサービス アカウント
の [プリンシパルを編集します] アイコンをクリックします。
Dataflow ワーカーを別のロールとして追加し、[保存] をクリックします。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
これまでのラボでは、beam.io.ReadFromText
を使用して Google Cloud Storage から読み取りを行いました。このラボでは、Google Cloud Storage ではなく Pub/Sub を使用します。Pub/Sub はフルマネージドのリアルタイム メッセージング サービスで、パブリッシャーはメッセージを「トピック」に送信でき、サブスクライバーは「サブスクリプション」を介してこのトピックをサブスクライブできます。
作成するパイプラインは、create_streaming_sinks.sh
スクリプトで作成したばかりのトピック my_topic
をサブスクライブします。本番環境では多くの場合、このトピックはパブリッシュする側のチームが作成します。トピックはコンソールの Pub/Sub セクションで確認できます。
training-data-analyst/quest/dataflow_python/5_Streaming_Analytics/lab/
に移動し、streaming_minute_traffic_pipeline.py
ファイルを開きます。beam.io.ReadFromPubSub()
クラスを使用する変換をパイプラインに追加します。このクラスには、ソーストピックと timestamp_attribute
を指定する属性があります。デフォルトでは、この属性はメッセージのパブリッシュ時刻に設定されています。このタスクを完了するには、以下を実行します。
input_topic
コマンドライン パラメータで指定された Pub/Sub トピックから読み取りを行う変換を追加します。parse_json
を beam.Map
とともに使用し、各 JSON 文字列を CommonLog
インスタンスに変換します。with_output_types()
を使用して CommonLog
インスタンスの PCollection
に収集します。#TODO
に、以下のコードを追加します。SQL を使用しない前回のラボでは、固定時間ウィンドウ処理を実装し、イベント時刻ごとにイベントを相互排他的な固定サイズのウィンドウにグループ化しました。ここでは、ストリーミング入力に関して同じことを行います。行き詰まった場合は、前回のラボのコードまたはソリューションを参照してください。
このタスクを完了するには、以下を実行します。
CommonLog
データの PCollection
を受け取り、要素を window_duration
秒のウィンドウにウィンドウ処理する変換をパイプラインに追加します。ここで window_duration
は別のコマンドライン パラメータです。前のラボでは、CountCombineFn()
combiner を使用してウィンドウごとのイベント数をカウントしました。ここでも同じことを行います。
このタスクを完了するには、以下を実行します。
PCollection
を入力として、ウィンドウごとのイベント数をカウントする変換に渡します。DoFn
、GetTimestampFn
を beam.ParDo
とともに使用し、ウィンドウの開始タイムスタンプを含めます。このパイプラインは、BigQuery への書き込みを 2 つの異なるブランチで行います。1 つ目のブランチは集計データを BigQuery に書き込みます。2 つ目のブランチはすでに作成されていて、イベント タイムスタンプや実際の処理タイムスタンプなど、各未加工イベントのメタデータを書き出します。両方ともストリーミング挿入で BigQuery に直接書き込みます。
BigQuery への書き込みについては以前のラボで広範に説明しているため、ここでは基本的なメカニズムを深く掘り下げません。
このタスクを完了するには、以下を実行します。
aggr_table_name
という新しいコマンドライン パラメータを作成します。WRITE_TRUNCATE
の Write_disposition
が、beam.io.WriteToBigQuery()
によってサポートされていません。この例では、WRITE_APPEND
を使用します。
beam.io.WriteToBigQuery
はデフォルトで、制限なし PCollections に対するストリーミング挿入か、制限付き PCollections に対するファイルのバッチ読み込みジョブのいずれかになります。ストリーミング挿入は、データの集計値をすぐに表示したい場合に特に便利ですが、追加料金が発生します。数分程度ごとの定期的な一括アップロードで問題がないストリーミング ユースケースでは、この動作を method
キーワード引数で指定し、頻度についても triggering_frequency
キーワード引数で設定できます。詳細は、apache_beam.io.gcp.bigquery モジュールのドキュメントの「Write data to BigQuery」セクションをご覧ください。
pipeline.py
ファイルを開くことができないというエラーが表示される場合は、パイプラインをもう一度実行すると問題なく実行されます。
Dataflow UI で、エラーが発生せず正常に実行されることを確認します。データの作成もパイプラインへの取り込みもまだ行われていないため、パイプラインは実行されているものの、何も処理されていません。次のステップでデータを投入します。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
ストリーミング パイプラインであるため、ストリーミング ソースをサブスクライブし、入力を待ちます。現在、入力はありません。このセクションでは、ラグのないデータを生成します。実際のデータにはラグがほぼ間違いなく存在します。しかし、ラグのないストリーミング入力を理解することは勉強になります。
このクエストのコードには、Pub/Sub を使用して JSON イベントをパブリッシュするスクリプトが含まれています。
training-data-analyst/quests/dataflow_python
フォルダ内で作業していることを確認してください。[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
logs.minute_traffic
テーブルに対して次のクエリを実行します。ページビュー数が 1 分間に 100 回前後で推移しているのを確認できるはずです。
このクエリによって、イベント時刻と処理時刻の時間差が示されます。しかし、未加工の表形式データを見るだけでは、全体像の把握は困難なことがあります。軽量のデータ可視化および BI エンジンであるデータポータルを使用します。
新しいウィンドウが開きます。
開いたウィンドウの右側のパネルで、グラフの種類として散布図を選択します。
右側のパネルのデータ列で、次の値を設定します。
グラフは散布図に変換され、すべてのポイントが対角線上に位置します。これは、現在生成されているストリーミング データのイベントが生成直後に処理されていて、ラグが発生しないためです。Dataflow ジョブが完全に実行状態になる前に、すぐにデータ生成スクリプトを開始した場合、ホッケー スティック型になることがあります。メッセージが Pub/Sub のキューに入れられ、そのすべてがほぼ同時に処理されたためです。
しかし実際の環境では、パイプラインでラグに対処する必要があります。
ストリーミング イベント スクリプトによるイベント生成では、ラグをシミュレートできます。
これはイベント生成と Pub/Sub へのパブリッシュの間に遅れが生じるシナリオを表しています。たとえば、モバイル クライアントがオフライン モードになりユーザーがサービスを受けられない場合、イベントはデバイス上で収集され、デバイスがオンラインに復帰したときにすべてのイベントが一度にパブリッシュされます。
まず、データポータルのウィンドウを閉じます。
次に、ラグを有効にするために、ターミナルに戻って Ctrl+C
キーを押して実行中のスクリプトを停止します。
以下のスクリプトを実行します。
グラフの種類: 散布図
ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。
ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。
星の数は、それぞれ次の評価を表します。
フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。
フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。
Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。
このコンテンツは現在ご利用いただけません
利用可能になりましたら、メールでお知らせいたします
ありがとうございます。
利用可能になりましたら、メールでご連絡いたします
1 回に 1 つのラボ
既存のラボをすべて終了して、このラボを開始することを確認してください