ラボの設定手順と要件
アカウントと進行状況を保護します。このラボを実行するには、常にシークレット ブラウジング ウィンドウとラボの認証情報を使用してください。

Cloud Data Fusion でリアルタイム パイプラインを構築する

ラボ 1時間 30分 universal_currency_alt クレジット: 5 show_chart 入門
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
このコンテンツはまだモバイル デバイス向けに最適化されていません。
快適にご利用いただくには、メールで送信されたリンクを使用して、デスクトップ パソコンでアクセスしてください。

GSP808

Google Cloud セルフペース ラボのロゴ

概要

Data Fusion では、バッチ パイプラインに加えて、生成されたイベントをリアルタイムで処理するパイプラインも作成できます。現在、リアルタイム パイプラインは、Cloud Dataproc クラスタで Apache Spark Streaming を使用して実行されます。このラボでは、Data Fusion を使用してストリーミング パイプラインを構築する方法を学びます。

Cloud Pub/Sub のトピックからデータを読み取って、イベントを処理、変換し、その結果を BigQuery に書き込むパイプラインを作成します。

目標

  1. リアルタイム パイプラインの作成方法を学ぶ
  2. Data Fusion で Pub/Sub ソース プラグインを設定する方法を学ぶ
  3. Wrangler を使用して、サポートされていない接続にあるデータの変換を定義する方法を学ぶ

設定と要件

各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。

  1. シークレット ウィンドウを使用して Google Skills にログインします。

  2. ラボのアクセス時間(例: 02:00:00)に注意し、時間内に完了できるようにしてください。
    一時停止機能はありません。必要な場合はやり直せますが、最初からになります。

  3. 準備ができたら、[ラボを開始] をクリックします。

    注: [ラボを開始] をクリックしてから、ラボが必要なリソースをプロビジョニングして Data Fusion インスタンスを作成するまで 15~20 分ほどかかります。 その間、ラボの目標を理解するために以下のステップをご確認ください。

    左側のパネルにラボの認証情報(ユーザー名パスワード)が表示されたら、インスタンスの作成が完了したため、コンソールへのログインに進めるようになります。
  4. ラボの認証情報(ユーザー名パスワード)をメモしておきます。この情報は、Google Cloud コンソールにログインする際に使用します。

  5. [Google Console を開く] をクリックします。

  6. [別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
    他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。

  7. 利用規約に同意し、再設定用のリソースページをスキップします。

注: ラボを完了した場合と最初からやり直す場合以外は、[ラボを終了] をクリックしないでください。クリックすると、作業内容とプロジェクトが削除されます。

Google Cloud コンソールにログインする

  1. このラボ セッションで使用しているブラウザタブまたはウィンドウで、[接続の詳細] パネルからユーザー名をコピーし、[Google Console を開く] ボタンをクリックします。
注: アカウントの選択を求められたら、[別のアカウントを使用] をクリックします。
  1. ユーザー名を貼り付け、プロンプトが表示されたらパスワードを入力します。
  2. [次へ] をクリックします。
  3. 利用規約に同意します。

これは、このラボの間だけ有効な一時的なアカウントです。以下の点に注意してください。

  • 復元オプションを追加しないでください。
  • 無料トライアルに登録しないでください。
  1. コンソールが開いたら、左上のナビゲーション メニューナビゲーション メニュー アイコン)をクリックしてサービスのリストを確認します。

ナビゲーション メニュー

Cloud Shell をアクティブにする

Cloud Shell は、開発ツールが組み込まれた仮想マシンです。5 GB の永続ホーム ディレクトリを提供し、Google Cloud 上で実行されます。Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。gcloud は Google Cloud のコマンドライン ツールで、Cloud Shell にプリインストールされており、Tab キーによる入力補完がサポートされています。

  1. Google Cloud Console のナビゲーション パネルで、「Cloud Shell をアクティブにする」アイコン(Cloud Shell アイコン)をクリックします。

  2. [次へ] をクリックします。
    環境がプロビジョニングされ、接続されるまでしばらく待ちます。接続の際に認証も行われ、プロジェクトは現在のプロジェクト ID に設定されます。次に例を示します。

Cloud Shell ターミナル

サンプル コマンド

  • 有効なアカウント名前を一覧表示する:

gcloud auth list

(出力)

Credentialed accounts: - <myaccount>@<mydomain>.com (active)

(出力例)

Credentialed accounts: - google1623327_student@qwiklabs.net
  • プロジェクト ID を一覧表示する:

gcloud config list project

(出力)

[core] project = <プロジェクト ID>

(出力例)

[core] project = qwiklabs-gcp-44776a13dea667a6

タスク 1. プロジェクトの権限

プロジェクトの権限を確認する

Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。

  1. Google Cloud コンソールのナビゲーション メニューナビゲーション メニュー アイコン)で、[IAM と管理] > [IAM] をクリックします。

  2. Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com が存在し、編集者のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] から確認できます。

デフォルトのコンピューティング サービス アカウント

アカウントが IAM に存在しない場合やアカウントに編集者のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。

  1. Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] をクリックします。

  2. [プロジェクト情報] カードからプロジェクト番号をコピーします。

  3. ナビゲーション メニューで、[IAM と管理] > [IAM] をクリックします。

  4. IAM ページの上部にある [追加] をクリックします。

  5. 新しいプリンシパルの場合は、次のように入力します。

{project-number}-compute@developer.gserviceaccount.com

{project-number} はプロジェクト番号に置き換えてください。

  1. [ロールを選択] で、[基本](または [Project])> [編集者] を選択します。

  2. [保存] をクリックします。

タスク 2. Dataflow API が有効になっていることを確認する

必要な API にアクセスできることを確認するには、Dataflow API への接続をリセットします。

  1. Cloud コンソールの上部の検索バーに「Dataflow API」と入力します。検索結果の「Dataflow API」をクリックします。

  2. [管理] をクリックします。

  3. [API を無効にする] をクリックします。

確認を求められたら、[無効にする] をクリックします。

  1. [有効にする] をクリックします。

タスク 3. データを読み込む

  1. まず、サンプルツイートをパソコンにダウンロードする必要があります。後程、Wrangler を使用して変換ステップを作成する際に、このデータをアップロードします。

また、同じサンプル ツイート ファイルを Cloud Storage バケットにステージングする必要があります。このラボの最後では、バケットから Pub/Sub トピックにデータをストリーミングします。

  1. Cloud Shell で、次のコマンドを実行して新しいバケットを作成します。
export BUCKET=$GOOGLE_CLOUD_PROJECT gsutil mb gs://$BUCKET

作成されたバケットの名前は、現在のプロジェクト ID です。

  1. 次のコマンドを実行して、ツイート ファイルをバケットにコピーします。
gsutil cp gs://cloud-training/OCBL164/pubnub_tweets_2019-06-09-05-50_part-r-00000 gs://$BUCKET
  1. ファイルが Cloud Storage バケットにコピーされたことを確認します。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 データを読み込む

タスク 4. Pub/Sub トピックを設定する

Pub/Sub を使用するには、データを保持するトピックを作成し、そのトピックにパブリッシュされたデータにアクセスするためのサブスクリプションを作成します。

  1. Cloud コンソールで、ナビゲーション メニューから [すべてのプロダクトを表示] をクリックし、[分析] セクションで [Pub/Sub] をクリックして、[トピック] を選択します。

  2. [トピックを作成] をクリックします。

[トピックを作成] ボタン

  1. トピックには一意の名前を付ける必要があります。このラボでは、トピックに cdf_lab_topic という名前を付け、[作成] をクリックします。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 Pub/Sub トピックを設定する

タスク 5. Pub/Sub サブスクリプションを追加する

引き続きトピック ページで作業し、トピックにアクセスするためのサブスクリプションを作成します。

  1. [サブスクリプションを作成] をクリックします。

[サブスクリプションを作成] リンク

  1. サブスクリプションの名前(cdf_lab_subscription など)を入力し、[配信タイプ] を [Pull] に設定して、[作成] をクリックします。

[サブスクリプションをトピックに追加] ページ

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 Pub/Sub サブスクリプションを追加する

タスク 6. Cloud Data Fusion インスタンスに必要な権限を追加する

  1. Google Cloud コンソールのナビゲーション メニューで、[すべてのプロダクトを表示] をクリックし、[分析] セクションで [Data Fusion] > [インスタンス] をクリックします。
: インスタンスの作成には 20 分ほどかかります。完了するまでお待ちください。

次に、以下の手順に沿って、インスタンスに関連付けられているサービス アカウントに権限を付与します。

  1. Google Cloud コンソールで、[IAM と管理] > [IAM] に移動します。

  2. Compute Engine のデフォルトのサービス アカウント {プロジェクト番号}-compute@developer.gserviceaccount.com が表示されていることを確認し、サービス アカウントをクリップボードにコピーします。

  3. [IAM 権限] ページで、[+アクセスを許可] をクリックします。

  4. [新しいプリンシパル] フィールドに、サービス アカウントを貼り付けます。

  5. [ロールを選択] フィールドをクリックし、「Cloud Data Fusion API サービス エージェント」と入力します。最初の数文字を入力すると [Cloud Data Fusion API サービス エージェント] が表示されるので、それを選択します。

  6. [別のロールを追加] をクリックします。

  7. [Dataproc 管理者] ロールを追加します。

  8. [保存] をクリックします。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 Cloud Data Fusion API サービス エージェントのロールをサービス アカウントに追加する

サービス アカウントにユーザー権限を付与する

  1. コンソールのナビゲーション メニューで、[IAM と管理] > [IAM] をクリックします。

  2. [Google 提供のロール付与を含める] チェックボックスをオンにします。

  3. リストを下にスクロールして、Google が管理する service-{project-number}@gcp-sa-datafusion.iam.gserviceaccount.com という表示形式の Cloud Data Fusion サービス アカウントを探し、サービス アカウント名をクリップボードにコピーします。

Google が管理する Cloud Data Fusion サービス アカウントのリスト

  1. 次に、[IAM と管理] > [サービス アカウント] に移動します。

  2. {project-number}-compute@developer.gserviceaccount.com という表示形式のデフォルトの Compute Engine アカウントをクリックし、上部のナビゲーション メニューの [アクセス権を持つプリンシパル] タブを選択します。

  3. [アクセスを許可] ボタンをクリックします。

  4. [新しいプリンシパル] フィールドに、前の手順でコピーしたサービスアカウントを貼り付けます。

  5. [ロール] プルダウン メニューで、[サービス アカウント ユーザー] を選択します。

  6. [保存] をクリックします。

タスク 7. Cloud Data Fusion UI を操作する

Cloud Data Fusion を使用する際は、Cloud Console と個別の Cloud Data Fusion UI の両方を使用します。Cloud Console では、Cloud Console プロジェクトの作成や Cloud Data Fusion インスタンスの作成、削除を行えます。Cloud Data Fusion UI では、[Pipeline Studio] や [Wrangler] などのさまざまなページで Cloud Data Fusion の機能を使用できます。

Cloud Data Fusion UI を操作するには、次の手順に従います。

  1. Cloud Console で [Data Fusion] に戻り、Data Fusion インスタンスの横にある [インスタンスを表示] リンクをクリックします。ラボの認証情報を選択してログインします。サービスのガイドに進むダイアログが表示された場合は [No, Thanks] をクリックします。これで Cloud Data Fusion UI が表示されるようになります。

[インスタンスを表示] リンク

  1. Cloud Data Fusion のコントロール センターで、ナビゲーション メニューを使用して左側のメニューを表示し、[パイプライン] > [Studio] を選択します。

  2. 左上のプルダウン メニューを使用して、[データ パイプライン - リアルタイム] を選択します。

タスク 8. リアルタイム パイプラインを構築する

データを処理するときに元データがどのようなものであるかを確認できると、元データを変換の出発点として使用できて便利です。この目的のために、Wrangler を使用してデータの準備とクリーニングを行います。このデータファーストのアプローチにより、変換内容をすばやく可視化でき、リアルタイムのフィードバックによって正しく進めているかを確認できます。

  1. プラグイン パレットの [変換] セクションで、[Wrangler] を選択します。Wrangler ノードがキャンバスに表示されます。[プロパティ] ボタンをクリックして開きます。

  2. [ディレクティブ] セクションにある [WRANGLE] ボタンをクリックします。

  3. 読み込まれたら、左側のサイドメニューで [Upload] をクリックします。次に、アップロード アイコンをクリックして、先ほどパソコンにダウンロードしたサンプル ツイート ファイルをアップロードします。

Wrangler の [Upload data from your computer] ページ

  1. データが行と列の形式で Wrangler の画面に読み込まれます。これには数分かかる場合があります。
注: このデータは、最終的に Pub/Sub で受信するイベントのサンプルとして扱います。実際の開発では、本番環境データにアクセスできないケースが一般的であり、そのような現実的な状況を想定しています。

ただし、管理者から少量のサンプルデータへのアクセスが提供される場合や、API の仕様に準拠したモックデータを使用する場合もあります。このセクションでは、各ステップでフィードバックを得ながら、このサンプルデータに対して段階的に変換を適用します。その後、実際のデータに対して同じ変換を再適用する方法を学びます。
  1. 最初の操作として、JSON データを解析し、行と列に分かれた表形式に変換します。これを行うには、最初の列(body)の見出しにあるプルダウン アイコンを選択し、[Parse] メニュー項目のサブメニューから [JSON] を選択します。ポップアップで [Depth] を [1] に設定し、[Apply] をクリックします。

    [JSON] オプションへのナビゲーション パス。

  2. 続けて、より意味のあるデータ構造にするため、同じ操作をもう一度行います。[body] 列のプルダウン アイコンをクリックし、[Parse] > [JSON] を選択して、[Depth] を 1 に設定し、[Apply] をクリックします。

    body_payload データ

    UI を使う方法に加えて、Wrangler のディレクティブ コマンドライン ボックスに変換ステップを記述することもできます。このボックスは Wrangler UI の下部にあり、緑色の $ プロンプトが表示されているコマンドコンソールです。 次のステップでは、このコマンド コンソールに変換ステップを貼り付けて使用します。

  3. 以下の変換ステップをすべてコピーし、Wrangler のディレクティブ コマンドラインに貼り付けてください。

columns-replace s/^body_payload_//g drop id_str parse-as-simple-date :created_at EEE MMM dd HH:mm:ss Z yyyy drop display_text_range drop truncated drop in_reply_to_status_id_str drop in_reply_to_user_id_str parse-as-json :user 1 drop coordinates set-type :place string drop geo,place,contributors,is_quote_status,favorited,retweeted,filter_level,user_id_str,user_url,user_description,user_translator_type,user_protected,user_verified,user_followers_count,user_friends_count,user_statuses_count,user_favourites_count,user_listed_count,user_is_translator,user_contributors_enabled,user_lang,user_geo_enabled,user_time_zone,user_utc_offset,user_created_at,user_profile_background_color,user_profile_background_image_url,user_profile_background_image_url_https,user_profile_background_tile,user_profile_link_color,user_profile_sidebar_border_color,user_profile_sidebar_fill_color,user_profile_text_color,user_profile_use_background_image drop user_following,user_default_profile_image,user_follow_request_sent,user_notifications,extended_tweet,quoted_status_id,quoted_status_id_str,quoted_status,quoted_status_permalink drop user_profile_image_url,user_profile_image_url_https,user_profile_banner_url,user_default_profile,extended_entities fill-null-or-empty :possibly_sensitive 'false' set-type :possibly_sensitive boolean drop :entities drop :user_location 注: 「No data. Try removing some transformation steps.」と表示された場合は、いずれかの変換ステップの X をクリックして削除してください。データが再表示されたら、そのまま次の手順に進めます。
  1. 右上の [Apply] ボタンをクリックします。次に、右上の [X] をクリックして、プロパティ ボックスを閉じます。

ご覧のとおり、Pipeline Studio に戻ると、Wrangler で定義した変換内容を表すノードが 1 つキャンバス上に配置されています。ただし、この時点ではパイプラインにソースは接続されていません。これは前述のとおり、実際の本番環境のデータではなく、ノートパソコン上のサンプルデータに対して変換を適用したためです。

次のステップでは、実際のデータの取得元を指定します。

  1. プラグイン パレットの [ソース] セクションで [PubSub] を選択します。PubSub ソースノードがキャンバスに表示されます。[プロパティ] ボタンをクリックして開きます。

  2. PubSub ソースのさまざまなプロパティを以下のように指定します。

a. [リファレンス名] に「Twitter_Input_Stream」と入力します。

b. [サブスクリプション] に「cdf_lab_subscription」(先ほど作成した Pub/Sub サブスクリプションの名前)と入力します。

注: PubSub ソースでは、サブスクリプションの完全修飾名は使用できません。.../subscriptions/ 以降の名前のみを指定してください。

c. [検証] をクリックして、エラーがないことを確認します。

Pub/Sub のプロパティ ページ

d. 右上の [X] をクリックして、プロパティ ボックスを閉じます。

  1. 次に、PubSub ソースノードを、先ほど追加した Wrangler ノードに接続します。

パイプライン: Pub/Sub から Wrangler

Wrangler ではサンプルデータを使用していたため、ソース列は body として表示されていました。一方、PubSub ソースではこのデータは message というフィールド名で出力されます。次のステップで、この違いを調整します。

  1. Wrangler ノードのプロパティを開き、既存の変換ステップの先頭に次のディレクティブを追加します。
keep :message set-charset :message 'utf-8' rename :message :body

Wrangler のプロパティ ページ

右上の [X] をクリックして、プロパティ ボックスを閉じます。

  1. ソースと変換をパイプラインに接続したので、シンクを追加してパイプラインを完成させます。左側のサイドパネルの [シンク] セクションで、[BigQuery] を選択します。BigQuery シンクノードがキャンバスに表示されます。

  2. Wrangler ノードの矢印を BigQuery ノードまでドラッグして、Wrangler ノードを BigQuery ノードに接続します。次に、BigQuery ノードのプロパティを設定します。

    パイプライン

  3. BigQuery ノードにカーソルを合わせ、[プロパティ] をクリックします。

    a. [リファレンス名] に「realtime_pipeline」と入力します。

    b. [データセット] に「realtime」と入力します。

    c. [テーブル] に「tweets」と入力します。

    d. [検証] をクリックして、エラーがないことを確認します。

  4. 右上の [X] をクリックして、プロパティ ボックスを閉じます。

BigQuery のプロパティ ウィンドウ

  1. [パイプラインに名前を付ける] をクリックし、名前として「Realtime_Pipeline」を追加して、[保存] をクリックします。

  2. [デプロイ] アイコンをクリックして、パイプラインを開始します。

  3. デプロイが完了したら、[実行] をクリックします。パイプラインの [ステータス] が [実行中] に変わるまで待ちます。これには数分かかる場合があります。

タスク 9. Cloud Pub/Sub にメッセージを送信する

Dataflow テンプレートを使用して、イベントをまとめてサブスクリプションに送信します。

ここでは、テンプレートに基づいて Dataflow ジョブを作成し、ツイート ファイルから複数のメッセージを処理して、先ほど作成した Pub/Sub トピックに公開します。Dataflow のジョブ作成ページから、[Process Data Continuously (Stream)] の [Text Files on Cloud Storage to Pub/Sub] テンプレートを使用します。

  1. Cloud コンソールに戻り、ナビゲーション メニューに移動して、[すべてのプロダクトを表示] をクリックし、[分析] セクションで [Dataflow] をクリックします。

  2. 上部のメニューバーの [テンプレートからジョブを作成] をクリックします。

  3. Cloud Dataflow ジョブのジョブ名として「streaming-pipeline」と入力します。

  4. [Cloud Dataflow テンプレート] で、[Text Files on Cloud Storage to Pub/Sub] テンプレートを選択します。

  5. [Input Cloud Storage File(s)] に「gs://<YOUR-BUCKET-NAME>/<FILE-NAME>」と入力します。 <YOUR-BUCKET-NAME> はバケット名に、<FILE-NAME> は先ほどパソコンにダウンロードしたファイルの名前に置き換えてください。

例: gs://qwiklabs-gcp-01-dfdf34926367/pubnub_tweets_2019-06-09-05-50_part-r-00000

  1. [Output Pub/Sub Topic] に「projects/<PROJECT-ID>/topics/cdf_lab_topic」と入力します。

PROJECT-ID は実際のプロジェクト ID に置き換えてください。

  1. [一時的な場所] に「<YOUR-BUCKET-NAME>/tmp/」と入力します。

<YOUR-BUCKET-NAME> はお使いのバケットの名前に置き換えてください。

  1. [ジョブを実行] ボタンをクリックします。

  2. Dataflow ジョブを実行し、数分待ちます。Pub/Sub サブスクリプションでメッセージを確認し、その後リアルタイムの Cloud Data Fusion パイプラインで処理される様子を確認できます。

    [テンプレートからジョブを作成] ダイアログ

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 ランタイム パイプラインを構築して実行する

タスク 10. パイプライン指標を確認する

イベントが Pub/Sub トピックに読み込まれるとすぐに、パイプラインによるイベントの消費が開始されます。各ノードの指標が更新されていく様子を確認してください。

  • Data Fusion コンソールで、パイプラインの指標が変化するまで待ちます。

    パイプラインの指標

お疲れさまでした

このラボでは、Cloud Pub/Sub からストリーミングで受信したメッセージを読み取り、データを処理し、その結果を BigQuery に書き込むリアルタイム パイプラインを Data Fusion で設定する方法を学びました。

マニュアルの最終更新日: 2025 年 2 月 6 日

ラボの最終テスト日: 2025 年 2 月 6 日

Copyright 2026 Google LLC All rights reserved. Google および Google のロゴは、Google LLC の商標です。その他すべての社名および製品名は、それぞれ該当する企業の商標である可能性があります。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

シークレット ブラウジングを使用する

  1. ラボで使用するユーザー名パスワードをコピーします
  2. プライベート モードで [コンソールを開く] をクリックします

コンソールにログインする

    ラボの認証情報を使用して
  1. ログインします。他の認証情報を使用すると、エラーが発生したり、料金が発生したりする可能性があります。
  2. 利用規約に同意し、再設定用のリソースページをスキップします
  3. ラボを終了する場合や最初からやり直す場合を除き、[ラボを終了] はクリックしないでください。クリックすると、作業内容がクリアされ、プロジェクトが削除されます

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボを実行するには、シークレット モードまたはシークレット ブラウジング ウィンドウを使用することをおすすめします。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。