ラボの設定手順と要件
アカウントと進行状況を保護します。このラボを実行するには、常にシークレット ブラウジング ウィンドウとラボの認証情報を使用してください。

Dataflow を使用したサーバーレスのデータ処理 - Dataflow を使用したバッチ分析パイプライン(Java)

ラボ 2時間 universal_currency_alt クレジット: 5 show_chart 上級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
このコンテンツはまだモバイル デバイス向けに最適化されていません。
快適にご利用いただくには、メールで送信されたリンクを使用して、デスクトップ パソコンでアクセスしてください。

概要

このラボの内容:

  • ユーザー別にサイト トラフィックを集計するパイプラインを作成する。
  • 分単位でサイト トラフィックを集計するパイプラインを作成する。
  • 時系列データのウィンドウ処理を実装する。

前提条件

Java に関する基本的な知識。

設定と要件

各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。

  1. Qwiklabs にシークレット ウィンドウでログインします。

  2. ラボのアクセス時間(例: 1:15:00)に注意し、時間内に完了できるようにしてください。
    一時停止機能はありません。必要な場合はやり直せますが、最初からになります。

  3. 準備ができたら、[ラボを開始] をクリックします。

  4. ラボの認証情報(ユーザー名パスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。

  5. [Google Console を開く] をクリックします。

  6. [別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
    他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。

  7. 利用規約に同意し、再設定用のリソースページをスキップします。

プロジェクトの権限を確認する

Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。

  1. Google Cloud コンソールのナビゲーション メニューナビゲーション メニュー アイコン)で、[IAM と管理] > [IAM] を選択します。

  2. Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com が存在し、編集者のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。

Compute Engine のデフォルトのサービス アカウント名と編集者のステータスがハイライト表示された [権限] タブページ

注: アカウントが IAM に存在しない場合やアカウントに編集者のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。
  1. Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
  2. プロジェクト番号(例: 729328892908)をコピーします。
  3. ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
  4. ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセスを許可] をクリックします。
  5. [新しいプリンシパル] に次のように入力します。
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number} はプロジェクト番号に置き換えてください。
  2. [ロール] で、[Project](または [基本])> [編集者] を選択します。
  3. [保存] をクリックします。

Google Cloud Shell の有効化

Google Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。

Google Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。

  1. Google Cloud コンソールで、右上のツールバーにある [Cloud Shell をアクティブにする] ボタンをクリックします。

    ハイライト表示された Cloud Shell アイコン

  2. [続行] をクリックします。

環境がプロビジョニングされ、接続されるまでしばらく待ちます。接続した時点で認証が完了しており、プロジェクトに各自のプロジェクト ID が設定されます。次に例を示します。

Cloud Shell ターミナルでハイライト表示されたプロジェクト ID

gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。

  • 次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
gcloud auth list

出力:

Credentialed accounts: - @.com (active)

出力例:

Credentialed accounts: - google1623327_student@qwiklabs.net
  • 次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
gcloud config list project

出力:

[core] project =

出力例:

[core] project = qwiklabs-gcp-44776a13dea667a6 注: gcloud ドキュメントの全文については、 gcloud CLI の概要ガイド をご覧ください。

IDE の設定

このラボでは、Google Compute Engine でホストされる Theia Web IDE を主に使用します。これには、事前にクローンが作成されたラボリポジトリが含まれます。Java 言語サーバーがサポートされているとともに、Cloud Shell に似た仕組みで、gcloud コマンドライン ツールを通じて Google Cloud API へのプログラムによるアクセスが可能なターミナルも使用できます。

  1. Theia IDE にアクセスするには、Google Cloud Skills Boost に表示されたリンクをコピーして新しいタブに貼り付けます。
注: URL が表示された後も、環境が完全にプロビジョニングされるまで 3~5 分待つ必要がある場合があります。その間はブラウザにエラーが表示されます。

ide_url が表示されている認証情報ペイン

ラボリポジトリのクローンが環境に作成されました。各ラボは、完成させるコードが格納される labs フォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution フォルダに分けられています。

  1. ファイル エクスプローラ ボタンをクリックして確認します。

展開されたファイル エクスプローラ メニューで、labs フォルダがハイライト表示されている

Cloud Shell で行うように、この環境で複数のターミナルを作成することも可能です。

[ターミナル] メニューでハイライト表示されている [新しいターミナル] オプション

提供されたサービス アカウント(ラボのユーザー アカウントとまったく同じ権限がある)でログインしたターミナルで gcloud auth list を実行すれば、以下を確認できます。

gcloud auth list コマンドが表示されたターミナル

環境が機能しなくなった場合は、IDE をホストしている VM を GCE コンソールから次のようにリセットしてみてください。

[VM インスタンス] ページで、リセットボタンと VM インスタンス名の両方がハイライト表示されている

パート 1: ユーザー別にサイト トラフィックを集計する

ラボのこのパートでは、次のようなパイプラインを作成します。

  1. Cloud Storage 内のファイルからその日のトラフィックを読み取る。
  2. 各イベントを CommonLog オブジェクトに変換する。
  3. 個々のオブジェクトをユーザー ID 別にグループ化し、値を足し合わせてその特定ユーザーのヒットの合計数を取得して、ユニーク ユーザーごとの総ヒット数を合計する。
  4. 追加の集計を各ユーザーに対して行う。
  5. 得られたデータを BigQuery に書き込む。

タスク 1. 合成データを生成する

以前のラボと同様に、最初のステップはパイプラインで処理するデータを生成することです。ラボ環境を開いて、以前と同じようにデータを生成します。

適切なラボを開く

  • IDE 環境に新しいターミナルをまだ作成していない場合は作成し、次のコマンドをコピーして貼り付けます。
# ラボのディレクトリに移動します cd 3_Batch_Analytics/labs # 依存関係をダウンロードします mvn clean dependency:resolve export BASE_DIR=$(pwd)

データ環境を設定する

# GCS バケットと BQ データセットを作成する cd $BASE_DIR/../.. source create_batch_sinks.sh # イベント データフローを生成する source generate_batch_events.sh # 練習用コードが含まれているディレクトリに移動する cd $BASE_DIR

このスクリプトにより、次のような行が含まれている events.json というファイルが作成されます。

{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

このファイルは gs://my-project-id/events.json の Google Cloud Storage バケットに自動的にコピーされます。

  • Google Cloud Storage に移動し、ストレージ バケットに events.json というファイルが含まれていることを確認します。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 合成データを生成する

タスク 2. ユーザー別にページビューを合計する

  1. IDE で BatchUserTrafficPipeline.java を開きます。これは 3_Batch_Analytics/labs/src/main/java/com/mypackage/pipeline にあります。

このパイプラインには、入力パスと出力テーブル名のコマンドライン オプションを受け入れるために必要なコードと、Google Cloud Storage からイベントを読み取り、それらのイベントを解析して、結果を BigQuery に書き込むコードがすでに含まれています。ただし、いくつかの重要な部分が欠けています。

  1. パイプラインの次のステップでは、一意の user_id 別にイベントを集計し、それぞれのページビューをカウントします。Beam スキーマを使用して Row やオブジェクトに対してこの操作を行う簡単な方法は、Group.byFieldNames() 変換を使用し、結果のグループに対していくつかの集計を実行することです。次に例を示します。
purchases.apply(Group.<MyObject>byFieldNames("userId", "address"));

これは「key」と「value」の 2 つのフィールドを含む行の PCollection を返します。「key」フィールド自体は、userIDaddress の一意の組み合わせをすべて表すスキーマ <userID:STRING, address:STRING> のある Row です。「values」フィールドは、ITERABLE[ROW[MyObject]] 型で、その一意のグループ内のすべてのオブジェクトが含まれます。

FieldName FieldType
key ROW{userId:STRING, streetAddress:STRING}
values ITERABLE[ROW[Purchase]]
  1. これは、このグループ化に対して集計計算を実行し、その結果のフィールドに次のような名前を付ける場合に最も役立ちます。
purchases.apply(Group.byFieldNames("userId") .aggregateField("itemId", Count.combineFn(), "numPurchases") .aggregateField("costCents", Sum.ofLongs(), "totalSpendCents") .aggregateField("costCents", Max.ofLongs(), "largestPurchases"));

これは複数形の「values」ではなく、「key」「value」の Row を返します。

FieldName FieldType
key ROW{userId:STRING}
value ROW{numPurchases: INT64, totalSpendCents: INT64, largestPurchases: INT64}

このような用途には Sum 変換と Count 変換が最適です。Sum と Count は、データのグループに作用する Combine 変換の例です。

注: この例では、任意の Count.combineFn() フィールド、またはワイルドカード フィールド * を集計できます。これは、この変換が単にグループ全体に含まれる要素の数を数えているだけだからです。
  1. パイプラインの次のステップでは、user_id 別にイベントを集計して、ページビューを合計し、さらに num_bytes に関する追加の集計(ユーザーの合計バイト数など)を計算します。

このタスクを完了するには、イベントを user_id でグループ化し、関連する集計を実行する別の変換をパイプラインに追加します。入力、使用する CombineFns、出力フィールドの命名方法にご注意ください。

タスク 3. スキーマをフラット化する

この時点で、すでに述べたように、新しい変換はスキーマ <Key,Value> のある PCollection を返します。パイプラインをそのまま実行すると、本来それぞれ 1 行の値しかない場合でも、2 つのネストされた RECORDS として BigQuery に書き込まれます。

  1. これは、次のような Select 変換を追加することで回避できます。
purchases.apply(Group.byFieldNames("userId") .aggregateField("itemId", Count.combineFn(), "numPurchases") .aggregateField("costCents", Sum.ofLongs(), "totalSpendCents") .aggregateField("costCents", Max.ofLongs(), "largestPurchases")) .apply(Select.fieldNames("key.userId", "value.numPurchases", "value.totalSpendCents", "value.largestPurchases")

これにより、新しいフラット化されたスキーマで関連するフィールド名が保持され、「key」と「value」が削除されます。

  1. このタスクを完了するには、Select 変換を追加して、新しい行のスキーマをフラット化します。

注: BigQueryIO.<CommonLog>write() のオブジェクト ヒントを <Row> に変更していない場合は、忘れずに変更してください。

タスク 4. パイプラインを実行する

  • Cloud Shell に戻り、次のコマンドで、Dataflow サービスを使用したパイプラインを実行します。問題が発生した場合は DirectRunner で実行するか、ソリューションを参照してください。
export PROJECT_ID=$(gcloud config get-value project) export REGION={{{ project_0.default_region | "REGION" }}} export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export MAIN_CLASS_NAME=com.mypackage.pipeline.BatchUserTrafficPipeline export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export TABLE_NAME=${PROJECT_ID}:logs.user_traffic cd $BASE_DIR mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --inputPath=${INPUT_PATH} \ --tableName=${TABLE_NAME}"

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 ユーザー別にサイト トラフィックを集計し、パイプラインを実行する

タスク 5. BigQuery で結果を確認する

  1. このタスクを完了するには、パイプラインが完了するまで数分待ってから、BigQuery に移動して user_traffic テーブルに対してクエリを実行します。

  2. 興味がある場合は、Select 変換ステップをコメントアウトしてパイプラインを再実行し、結果の BigQuery スキーマを確認してください。

パート 2: 分単位でサイト トラフィックを集計する

ラボのこのパートでは、BatchMinuteTraffic という新しいパイプラインを作成します。BatchMinuteTraffic は、BatchUserTraffic で使用されている基本的なバッチ分析原則を拡張し、バッチ全体でユーザー別に集計するのではなく、イベントが発生した時間ごとに集計します。

  • IDE で、3_Batch_Analytics/labs/src/main/java/com/mypackage/pipeline 内のファイル BatchMinuteTrafficPipeline.java を開きます。

タスク 1. 各要素にタイムスタンプを追加する

制限なしソースから各要素のタイムスタンプが提供されます。制限なしソースによっては、元データ ストリームからタイムスタンプを抽出する方法を構成する必要があるかもしれません。

ただし、有限ソース(このパイプラインで使用される TextIO からのファイルなど)はタイムスタンプを提供しません。

  1. 各レコードのタイムスタンプ フィールドを解析し、WithTimestamps 変換を使用して、PCollection の各要素にタイムスタンプを付けることができます。
PCollection<MyClass> unstamped = ...; PCollection<MyClass> stamped = unstampedLogs.apply(WithTimestamps.of((MyClass m) -> org.joda.time.Instant.parse(m.getTimestamp())));
  1. このタスクを完了するには、パイプラインの各要素にタイムスタンプを追加する変換をパイプラインに追加します。

タスク 2. 1 分間のウィンドウにウィンドウ処理する

ウィンドウ処理では、PCollection を個々の要素のタイムスタンプに基づいて細分化します。複数の要素を集計する変換である GroupByKey や Combine などは、暗黙的にウィンドウ単位で動作します。各 PCollection は、全体のサイズが無限のこともありますが、複数の有限ウィンドウが連続的につながったものとして処理されます。

PCollection の要素を分割するために、さまざまな種類のウィンドウを定義できます。Beam では、次のようないくつかのウィンドウ処理関数が用意されています。

  • 固定時間ウィンドウ
  • スライド時間ウィンドウ
  • セッション単位ウィンドウ
  • 単一グローバル ウィンドウ
  • カレンダーベース ウィンドウ(Beam SDK for Python ではサポートされていません)

このラボでは、固定時間ウィンドウを使用します。固定時間ウィンドウは、データ ストリームの間隔に重複がない一貫した継続期間を表します。5 分間のウィンドウについて考えてみます。制限なし PCollection で、タイムスタンプ値が 0:00:00 から 0:05:00 まで(0:05:00 は含まれない)のすべての要素が最初のウィンドウに属し、タイムスタンプ値が 0:05:00 から 0:10:00 まで(0:10:00 は含まれない)の要素は 2 番目のウィンドウに属します。その後に関しても同様です。

  1. 1 秒間の固定時間ウィンドウを次のように実装します。
PCollection<String> pColl= ...; PCollection<String> windowedPCollection = pColl.apply( Window.<String>into(FixedWindows.of(Duration.standardSeconds(60))));
  1. このタスクを完了するには、要素を 1 分間のウィンドウにウィンドウ処理する変換をパイプラインに追加します。

他の種類のウィンドウ処理について詳しくは、Apache Beam ドキュメントのウィンドウ関数に関するセクションをご覧ください。

タスク 3. ウィンドウごとのイベント数をカウントする

次に、パイプラインで各ウィンドウ内に発生したイベントの数を計算する必要があります。BatchUserTraffic パイプラインでは、キーごとの合計に Sum 変換を使用しました。しかし、今回はそのパイプラインとは違い、要素がウィンドウ処理されており、目的の計算でウィンドウ境界を考慮する必要があります。

この新たな制約にもかかわらず、やはり Combine 変換が適切です。これは、Combine 変換がウィンドウ境界を自動的に考慮するためです。

  1. ウィンドウごとの要素数をカウントする新しい変換を追加する方法については、Count のドキュメントをご覧ください。

Beam 2.22 の時点では、ウィンドウ処理中に行の要素をカウントする最適な方法は、Combine.globally(Count.<T>combineFn()).withoutDefaults() を使用することです(つまり、全面的に SQL を使用するわけではありません。これについては次のラボで詳しく説明します)。この変換は PCollection<Long> 型を出力しますが、これには Beam スキーマが使用されていないことがわかります。

  1. このタスクを完了するには、各ウィンドウ内のすべての要素をカウントする変換を追加します。行き詰まった場合は、ソリューションを参照してください。

タスク 4. 行に戻してタイムスタンプを追加する

BigQuery に書き込むには、各要素を「pageviews」というフィールドと「minute」という追加フィールドを含む Row オブジェクトに変換し直す必要があります。これは、各ウィンドウの境界を 1 つのフィールドとして、ページビューの合計数をもう 1 つのフィールドとして使用するという考え方です。

これまでのところ、要素は JSON String から CommonLog オブジェクトに変換されると常に Beam スキーマに準拠しており、場合によっては Row オブジェクトに戻っています。元のスキーマは @DefaultSchema(JavaFieldSchema.class) アノテーションを介して CommonLog POJO から推論され、その後追加 / 削除されたフィールドはパイプライン変換で指定されました。ただし、パイプラインのこの時点では、Count 変換の出力のとおり、すべての要素は Long 型になります。したがって、新しい Row オブジェクトをゼロから作成する必要があります。

  1. スキーマは次のように手動で作成および登録できます。このコードは、CommonLog オブジェクト定義と同様に、main() メソッドの外側に追加されます。
// レコードのスキーマを定義します Schema appSchema = Schema .builder() .addInt32Field("appId") .addStringField("description") .addDateTimeField("rowtime") .build();
  1. このスキーマの後続の Row オブジェクトは、次のような Long などの入力に基づいて、PTransform で作成できます。
Row row = Row .withSchema(appSchema) .addValues(1, "Some cool app", new Date()) .build();
  1. 通常、変換により前の行を変更するのではなく新しい行を作成する場合、Beam は PTransform で新しいスキーマを示す必要があります。
.apply().setRowSchema(appSchema)

スキーマの作成と推論の詳細については、Apache の Beam SQL チュートリアルをご覧ください。

もう一つの問題は、現時点では、Count 変換がタイムスタンプ情報を持たない Long 型の要素しか提供しないということです。

ただし、実際には、それほど明白な方法によってではありませんが、それらの要素はタイムスタンプ情報を持ちます。Apache Beam ランナーは、イベント タイムスタンプ、ウィンドウ、パイプライン オプションなど、多くの追加パラメータの値を提供する方法をデフォルトで認識しています。完全なリストについては、Apache の DoFn パラメータのドキュメントをご覧ください。

  1. このタスクを完了するには、Long 型の要素を受け取り、スキーマ型 pageViewsSchema を使用して Row 型の要素を出力する ParDo 関数を作成します。この関数には、IntervalWindow 型の追加の入力パラメータが含まれています。この追加パラメータを使用して Instant のインスタンスを作成し、これを使用して分フィールド「:」の文字列表現を導き出します。
@ProcessElement public void processElement(@Element T l, OutputReceiver<T> r, IntervalWindow window) { Instant i = Instant.ofEpochMilli(window.start().getMillis()); ... r.output(...); }

タスク 5. パイプラインを実行する

  • コーディングが完了したら、以下のコマンドを使用してパイプラインを実行します。コードをテストするときは、RUNNER 環境変数を DirectRunner に変更すると、パイプラインがローカルで実行されるため、はるかに高速になります。
export PROJECT_ID=$(gcloud config get-value project) export REGION={{{ project_0.default_region | "REGION" }}} export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export MAIN_CLASS_NAME=com.mypackage.pipeline.BatchMinuteTrafficPipeline export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export TABLE_NAME=${PROJECT_ID}:logs.minute_traffic cd $BASE_DIR mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --inputPath=${INPUT_PATH} \ --tableName=${TABLE_NAME}"

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 サイト トラフィックを分単位で集計し、パイプラインを実行する

タスク 6. 結果を確認します

  • このタスクを完了するには、パイプラインが実行されるまで数分待ってから BigQuery に移動し、minute_traffic テーブルに対してクエリを実行します。

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

シークレット ブラウジングを使用する

  1. ラボで使用するユーザー名パスワードをコピーします
  2. プライベート モードで [コンソールを開く] をクリックします

コンソールにログインする

    ラボの認証情報を使用して
  1. ログインします。他の認証情報を使用すると、エラーが発生したり、料金が発生したりする可能性があります。
  2. 利用規約に同意し、再設定用のリソースページをスキップします
  3. ラボを終了する場合や最初からやり直す場合を除き、[ラボを終了] はクリックしないでください。クリックすると、作業内容がクリアされ、プロジェクトが削除されます

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。