
始める前に
- ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
- ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
- 画面左上の [ラボを開始] をクリックして開始します
Setup the data environment
/ 10
Run your pipeline
/ 10
Generate lag-less streaming input
/ 10
このラボでは、バッチのコンテキストで紹介した多くのコンセプトをストリーミングのコンテキストに適用し、BatchMinuteTrafficPipeline に似た、ただしリアルタイムに運用されるパイプラインを作成します。完成したパイプラインは、まず Pub/Sub から JSON メッセージを読み取り、ブランチする前にそのメッセージを解析します。一方のブランチは元データを BigQuery に書き込み、イベント時刻と処理時刻を記録します。他方のブランチはデータをウィンドウ処理して集計し、その結果を BigQuery に書き込みます。
次のパイプラインを構築します。
各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。
Qwiklabs にシークレット ウィンドウでログインします。
ラボのアクセス時間(例: 1:15:00
)に注意し、時間内に完了できるようにしてください。
一時停止機能はありません。必要な場合はやり直せますが、最初からになります。
準備ができたら、[ラボを開始] をクリックします。
ラボの認証情報(ユーザー名とパスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。
[Google Console を開く] をクリックします。
[別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。
利用規約に同意し、再設定用のリソースページをスキップします。
Google Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。
Google Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。
Google Cloud コンソールで、右上のツールバーにある [Cloud Shell をアクティブにする] ボタンをクリックします。
[続行] をクリックします。
環境がプロビジョニングされ、接続されるまでしばらく待ちます。接続した時点で認証が完了しており、プロジェクトに各自のプロジェクト ID が設定されます。次に例を示します。
gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。
出力:
出力例:
出力:
出力例:
Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。
Google Cloud コンソールのナビゲーション メニュー()で、[IAM と管理] > [IAM] を選択します。
Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com
が存在し、編集者
のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。
編集者
のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。729328892908
)をコピーします。{project-number}
はプロジェクト番号に置き換えてください。このラボでは、Google Compute Engine でホストされる Theia Web IDE を主に使用します。これには、事前にクローンが作成されたラボリポジトリが含まれます。Java 言語サーバーがサポートされているとともに、Cloud Shell に似た仕組みで、gcloud
コマンドライン ツールを通じて Google Cloud API へのプログラムによるアクセスが可能なターミナルも使用できます。
Theia IDE にアクセスするには、Qwiklabs に表示されたリンクをコピーして新しいタブに貼り付けます。
ラボリポジトリのクローンが環境に作成されました。各ラボは、完成させるコードが格納される labs
フォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution
フォルダに分けられています。ファイル エクスプローラ
ボタンをクリックして確認します。
Cloud Shell で行うように、この環境で複数のターミナルを作成することも可能です。
提供されたサービス アカウント(ラボのユーザー アカウントとまったく同じ権限がある)でログインしたターミナルで gcloud auth list
を実行すれば、以下を確認できます。
環境が機能しなくなった場合は、IDE をホストしている VM を GCE コンソールから次のようにリセットしてみてください。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
以前のラボでは、TextIO.read() を使用して Google Cloud Storage から読み取りました。このラボでは、Google Cloud Storage ではなく Pub/Sub を使用します。Pub/Sub はフルマネージドのリアルタイム メッセージング サービスで、パブリッシャーはメッセージをトピックに送信でき、サブスクライバーはサブスクリプションを介してこのトピックをサブスクライブできます。
作成するパイプラインは、create_streaming_sinks.sh
スクリプトで作成したばかりのトピックである my_topic
をサブスクライブします。本番環境では多くの場合、このトピックは公開チームが作成します。コンソールの Pub/Sub セクションでこのトピックを確認できます。
Apache Beam の IO コネクタを使用して Pub/Sub から読み取るには、StreamingMinuteTrafficPipeline.java
ファイルを開き、PubsubIO.readStrings() 関数を使用する変換をパイプラインに追加します。この関数は、ソーストピックとタイムスタンプ属性を指定するメソッドを持つ PubsubIO.Read のインスタンスを返します。Pub/Sub トピック名のコマンドライン オプションがすでにあるので注意してください。タイムスタンプ属性を timestamp に設定します。これは各 Pub/Sub メッセージに追加される属性に対応しています。メッセージのパブリッシュ時刻で十分な場合、このステップは不要です。
このタスクを完了するには、inputTopic
コマンドライン パラメータで指定される Pub/Sub トピックから読み取る変換を追加します。次に、与えられている DoFn
である JsonToCommonLog
を使用し、各 JSON 文字列を CommonLog
インスタンスに変換します。この変換による結果を CommonLog
インスタンスの PCollection
に収集します。
前回の非 SQL ラボでは、固定時間ウィンドウ処理を実装し、イベント時刻ごとにイベントを相互に排他的な固定サイズのウィンドウにグループ化しました。ここでは、ストリーミング入力に関して同じことを行います。行き詰まった場合は、前回のラボのコードまたはソリューションを参照してください。
CommonLog
の PCollection
のデータを受け取り、要素を windowDuration
秒のウィンドウにウィンドウ処理する変換をパイプラインに追加します。ここで windowDuration
は別のコマンドライン パラメータです。前回のラボでは、Count 変換を使用してウィンドウごとのイベント数をカウントしました。ここでも同じことを行います。
<Long>
値から <Row>
値に戻し、タイムスタンプを抽出する必要があります。このタスクを完了するには、ウィンドウ処理された PCollection
を入力として、ウィンドウごとのイベント数をカウントする変換に渡します。
その結果を、すでに与えられている pageviewsSchema
スキーマを使用して Row
の PCollection に戻す変換を追加します。
このパイプラインは、BigQuery への書き込みを 2 つのブランチで行います。1 つ目のブランチは集計データを BigQuery に書き込みます。2 つ目のブランチはすでに作成されていて、各イベントの元データとしてのメタデータ、すなわちイベント タイムスタンプや実際の処理タイムスタンプなどを書き込みます。両方ともストリーミング挿入で BigQuery に直接書き込みます。
BigQuery への書き込みについては以前のラボで広範に説明しているため、ここでは基本的なメカニズムを深く掘り下げません。
このタスクでは、すでに与えられているコードを使用します。
このタスクを完了するには、次のことを行います。
集計データを格納するためのテーブル用に、aggregateTableName
という新しいコマンドライン パラメータを作成します。
以前と同様に、.useBeamSchema()
を使用して BigQuery に書き込む変換を追加します。
BigQueryIO.write()
は、テーブルを削除してから再作成する、WRITE_TRUNCATE
の WriteDisposition
をサポートしていません。この例では、WRITE_APPEND
を使用します。BigQueryIO.Write
はデフォルトで、制限なし PCollections に対するストリーミング挿入か、制限付き PCollections に対するファイルのバッチ読み込みジョブのいずれかになります。ストリーミング挿入は、データの集計値をすぐに表示したい場合に特に有用ですが、追加料金が発生します。
数分程度ごとの定期的な一括アップロードで問題がないストリーミング ユースケースでは、この動作を .withMethod()
で指定し、頻度についても .withTriggeringFrequency(org.joda.time.Duration)
で設定できます。
詳細については、BigQueryIO.Write.Method のドキュメントを参照してください。
このタスクを完了するには、次のことを行います。
元データを格納するためのテーブルの名前に関するコマンドライン パラメータを見つけます。
すでに作成されているパイプライン ブランチを調べます。このブランチはイベント時刻と処理時刻を取得しています。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
ストリーミング パイプラインであるため、ストリーミング ソースをサブスクライブし、入力を待ちます。現在、入力はありません。このセクションでは、ラグのないデータを生成します。実際のデータでは、ラグはほぼ間違いなく存在します。しかし、ラグのないストリーミング入力を理解するのは教育面で有益です。
このクエストのコードには、Pub/Sub を使用して JSON イベントをパブリッシュするスクリプトが含まれています。
training-data-analyst/quests/dataflow
フォルダが作業ディレクトリになっていることを確認します。[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
ページビュー数が 1 分間に 100 回前後で推移しているのを確認できるはずです。
また、BigQuery コマンドライン ツールを使用すると、結果が次のように記述されていることを迅速に確認できます。
このクエリによって、イベント時刻と処理時刻の時間差が示されます。しかし、未加工の表形式データを見るだけでは、全体像の把握は困難なことがあります。軽量のデータ可視化および BI エンジンであるデータポータルを使用します。
新しいウィンドウが開きます。
ウィンドウの右側のパネルで、グラフの種類として散布図を選択します。
右側のパネルのデータ列で、次の値を設定します。
グラフは散布図に変換され、すべてのポイントが対角線上に位置します。現在生成されているストリーミング データのイベントは生成直後に処理され、ラグが発生しないためです。Dataflow ジョブが完全に実行状態になる前に、すぐにデータ生成スクリプトを開始した場合、ホッケー スティック型になることがあります。メッセージが Pub/Sub のキューに入れられ、そのすべてがほぼ同時に処理されたためです。
しかし実際の環境では、ラグへの対処がパイプラインに必要です。
ストリーミング イベント スクリプトによるイベント生成では、ラグをシミュレートできます。
これはイベント生成と Pub/Sub へのパブリッシュの間に遅れが生じるシナリオを表しています。たとえば、モバイル クライアントがオフライン モードになりユーザーがサービスを受けられない場合、イベントはデバイス上で収集され、デバイスがオンラインに復帰したときにすべてのイベントが一度にパブリッシュされます。
まず、データポータルのウィンドウを閉じます。
ラグを有効にするために、IDE ターミナルが含まれるウィンドウに戻ります。
ターミナルで Ctrl+C
キーを使用して、実行中のスクリプトを停止します。
次のスクリプトを実行します。
グラフの右側に表示される新着データはもはや完全ではなく、一部は対角線より上に表示されます。これはイベントが発生したタイミングより後に処理されたことを意味しています。
グラフの種類: 散布図
ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。
ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。
星の数は、それぞれ次の評価を表します。
フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。
フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。
Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。
このコンテンツは現在ご利用いただけません
利用可能になりましたら、メールでお知らせいたします
ありがとうございます。
利用可能になりましたら、メールでご連絡いたします
1 回に 1 つのラボ
既存のラボをすべて終了して、このラボを開始することを確認してください