概要
Google Cloud の Apache Spark 向け Serverless は、インフラストラクチャを管理することなく Spark バッチ ワークロードの実行を簡素化するフルマネージド サービスです。このパターンは、ETL(抽出、変換、読み込み)ワークフローの堅牢なアプローチを提供し、高品質のデータのみが分析システムに格納されるようにします。
課題
データレイクに取り込まれた未加工データには、欠損値、形式の誤り、無効なエントリなどの不完全な部分が含まれていることがよくあります。このデータを分析用ウェアハウスに直接読み込むと、レポートが破損し、ビジネス上の意思決定が不適切になる可能性があります。
ソリューション
自動データ品質パイプラインを作成します。このパイプラインは、未加工のデータをインターセプトし、一連の検証ルールを適用してから、データをインテリジェントにルーティングします。クリーンなレコードは本番環境のデータ ウェアハウスに送信され、検証に失敗したレコードは検査と修復のために「デッドレター キュー」(DLQ)に送信されます。
このラボでは、Apache Spark 向け Serverless でカスタム PySpark ジョブを実行して、このソリューションを構築します。ジョブは次のことを行います。
- Cloud Storage バケットから未加工の CSV ファイルを読み取る。
- データ品質ルールを適用して各レコードを検証する。
- クリーンで有効なレコードを BigQuery テーブルに読み込む。
- 無効なレコードを Cloud Storage の別の DLQ バケットに書き込む。
このパターンにより、データ ウェアハウスが常にクリーンな状態に保たれ、データエラーを処理するための明確で監査可能なプロセスが提供されます。
企業ユースケース
-
e コマース: パイプラインは、受信した注文データを検証し、商品 ID が有効であることと、顧客のメールアドレスの形式が正しいことを確認してから、販売分析の BigQuery テーブルに読み込みます。無効な注文は、手動レビューのために DLQ にルーティングされます。
-
医療: システムが患者の記録を処理し、医療コードが存在することと日付が正しい形式であることを検証します。エラーのあるレコードは、コンプライアンスを確保するためにデータ管理者が確認できるよう、安全な DLQ バケットに送信されます。
-
財務: 毎日実行されるパイプラインで株式市場のデータを取り込み、
close_price などの重要なフィールドに null 値がないか確認します。不完全なティッカーデータは DLQ に送信され、時系列分析モデルの破損を防ぎます。
目標
このラボでは、次の方法について学びます。
- Terraform によってプロビジョニングされた事前構成済みのラボ環境を確認する。
- BigQuery データセットを作成して、環境設定を完了する。
- データ品質とルーティング ロジックを含む、コメント付きのカスタム PySpark スクリプトを記述する。
- カスタムの安全な VPC ネットワークで Spark バッチジョブを構成して実行する。
- BigQuery テーブルで、クリーンなデータの出力を確認する。
- Cloud Storage DLQ で無効なレコードを確認する。
設定と要件
各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。
-
シークレット ウィンドウを使用して Google Skills にログインします。
-
ラボのアクセス時間(例: 1:15:00)に注意し、時間内に完了できるようにしてください。
一時停止機能はありません。必要な場合はやり直せますが、最初からになります。
-
準備ができたら、[ラボを開始] をクリックします。
-
ラボの認証情報(ユーザー名とパスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。
-
[Google Console を開く] をクリックします。
-
[別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。
-
利用規約に同意し、再設定用のリソースページをスキップします。
Cloud Shell をアクティブにする
Cloud Shell は、開発ツールが組み込まれた仮想マシンです。5 GB の永続ホーム ディレクトリを提供し、Google Cloud 上で実行されます。Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。gcloud は Google Cloud のコマンドライン ツールで、Cloud Shell にプリインストールされており、Tab キーによる入力補完がサポートされています。
-
Google Cloud Console のナビゲーション パネルで、「Cloud Shell をアクティブにする」アイコン(
)をクリックします。
-
[次へ] をクリックします。
環境がプロビジョニングされ、接続されるまでしばらく待ちます。接続の際に認証も行われ、プロジェクトは現在のプロジェクト ID に設定されます。次に例を示します。

サンプル コマンド
gcloud auth list
(出力)
Credentialed accounts:
- <myaccount>@<mydomain>.com (active)
(出力例)
Credentialed accounts:
- google1623327_student@qwiklabs.net
gcloud config list project
(出力)
[core]
project = <プロジェクト ID>
(出力例)
[core]
project = qwiklabs-gcp-44776a13dea667a6
ラボ環境
このラボを開始すると、必要なインフラストラクチャとリソースのほとんどを自動的にプロビジョニングする Terraform スクリプトが実行されます。作成済みの構成は以下のとおりです。
-
プロジェクト ID =
-
リージョン =
-
ゾーン =
-
カスタム VPC ネットワーク(spark-network)とサブネット(spark-subnet)が、Apache Spark 向け Serverless に必要なネットワーク アクセスで構成されています。
-
2 つの Cloud Storage バケット:
-
メインバケット(gs://-main-bucket): PySpark スクリプト(
scripts/)、元の入力データ(source/)の保存に使用され、BigQuery コネクタの一時的なステージング領域としても使用されます。
-
DLQ バケット(gs://-dlq-bucket): 無効なレコードを保存します。
-
元データのファイル: Python スクリプトによって、1,000 レコードの CSV ファイル(source/customer_contacts_1000.csv)が自動的に生成され、メインバケットにアップロードされています。これらのレコードの約 20% には、パイプラインをテストするために意図的に不完全なデータ(ID の欠落、無効なメールアドレスなど)が含まれています。
目標は、正常なレコードと異常なレコードを識別して分離し、それぞれ適切な宛先に格納するスクリプトを作成することです。
バケット戦略に関する注意事項:
このラボでは簡略化のため、スクリプト、ソースデータ、BigQuery の一時的なステージングにメインバケットを使用します。本番環境では、3 つの別々のバケットを使用することがベスト プラクティスです。1 つは未加工の入力データ用、1 つはデッドレターキュー用、もう 1 つは BigQuery コネクタなどのコネクタで使用される一時的なステージング データ専用のバケットです。これにより、分離、セキュリティ、ライフサイクル管理が向上します。
タスク 1. 環境を探索してサービスを準備する
まず、ラボのリソースが正しく作成されていることを確認し、使用するソースデータをプレビューします。
Cloud Storage バケットを確認する
- Google Cloud コンソールのナビゲーション メニュー(☰)から、[Cloud Storage] > [バケット] に移動します。
-
-main-bucket で終わるバケットと -dlq-bucket で終わるバケットの 2 つが表示されていることを確認します。
未加工のデータをプレビューして API を有効にする

-
Cloud Shell をアクティブにします。
-
次のコマンドを実行して、メインバケットにある未加工の CSV ファイルのヘッダーと最初の 10 件のレコードを表示します。
gsutil cat gs://{{{project_0.project_id|Project_ID}}}-main-bucket/source/customer_contacts_1000.csv | head -n 11
-
ジョブを実行する前に、Dataproc API を有効にする必要があります。Cloud Shell で次のコマンドを実行して、これらを有効にします。
gcloud services enable dataproc.googleapis.com
[進行状況を確認] をクリックして、実行したタスクを確認します。
Dataproc API を有効にします。
タスク 2. BigQuery 環境を準備する
Terraform スクリプトでネットワークとストレージが設定されましたが、クリーンなデータを読み込む保存先となる BigQuery データセットはまだ作成する必要があります。
データセットの作成
-
Cloud Shell で次のコマンドを実行して、customer_data_clean という名前の新しい BigQuery データセットを作成します。
bq mk customer_data_clean
-
これで、コンソールでデータセットが正常に作成されたことを検証できます。ナビゲーション メニュー(☰)を使用して、[BigQuery] に移動します。[エクスプローラ] パネルで、プロジェクト ID の横にある矢印をクリックして内容を展開すると、新しい customer_data_clean データセットが表示されます。
[進行状況を確認] をクリックして、実行したタスクを確認します。
BigQuery データセットを作成します。
タスク 3. PySpark データ品質スクリプトを準備する
次に、データを検証するロジックを含むカスタム PySpark スクリプトを作成します。スクリプトのロジックは単純です。
- Cloud Storage からソース CSV ファイルを DataFrame に読みこむ
- 一連の検証ルールを適用して、null の ID と有効なメール形式をチェックする
- DataFrame を、クリーンなレコードと無効なレコードの 2 つに分割する。
- 最後に、クリーンなデータを BigQuery に、無効なデータを Cloud Storage の DLQ バケットに書き込む。
スクリプトを記述してアップロードする
-
Cloud Shell で、customer_dq.py という名前の PySpark スクリプト ファイルを作成します。
nano customer_dq.py
-
次のコメント付き Python コードを nano エディタに貼り付けます。
# 必要なライブラリをインポートする
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
# このスクリプトでは、1 つのコマンドライン引数を想定しています。
# 1. 宛先 BigQuery テーブルのパス('dataset.table' 形式)
if len(sys.argv) != 2:
print("Usage: customer_dq.py <bq_dataset_table>")
sys.exit(-1)
# コマンドライン引数を変数に割り当てる
bq_dataset_table = sys.argv[1]
# ラボの実行時に Qwiklabs 変数がここで置き換えられる
bq_project = "{{{project_0.project_id|Project_ID}}}"
gcs_source_path = f"gs://{bq_project}-main-bucket/source/customer_contacts_1000.csv"
gcs_dlq_path = f"gs://{bq_project}-dlq-bucket/errors/"
# 新しい SparkSession を初期化する
spark = SparkSession.builder.appName("Customer DQ Check").getOrCreate()
# ステップ 1: ソース CSV データを GCS バケットから読み取る
df = spark.read.option("header", "true").option("inferSchema", "true").csv(gcs_source_path)
# ステップ 2: データ品質ルールを定義する
# ルール 1: 'id' 列は null にすることはできない。
dq_rule_id = col("id").isNotNull()
# ルール 2: 'email' 列は null であってはならず、有効なメール形式の正規表現と一致する必要があります。
email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
dq_rule_email = col("email").isNotNull().__and__(col("email").rlike(email_regex))
# ステップ 3: ルールを適用し、DataFrame をクリーンなレコードとエラーレコードに分割する
df_with_dq = df.withColumn("dq_passed", when(dq_rule_id.__and__(dq_rule_email), True).otherwise(False))
clean_df = df_with_dq.filter(col("dq_passed") == True).drop("dq_passed")
error_df = df_with_dq.filter(col("dq_passed") == False).drop("dq_passed")
# ステップ 4: クリーンなレコードを指定された BigQuery テーブルに書き込む
# BigQuery コネクタには、一時的な GCS バケット名が必要です。
temp_gcs_bucket_name = f"{bq_project}-main-bucket"
clean_df.write \
.format("bigquery") \
.option("table", bq_dataset_table) \
.option("temporaryGcsBucket", temp_gcs_bucket_name) \
.option("project", bq_project) \
.mode("overwrite") \
.save()
# ステップ 5: エラーレコードを単一の CSV ファイルとして GCS の DLQ バケットに書き込む
error_df.repartition(1).write \
.option("header", "true") \
.mode("overwrite") \
.csv(gcs_dlq_path)
# Spark セッションを停止する
spark.stop()
重要な注意 スクリプトの最終行が spark.stop() であることを確認してください。</bq_dataset_table> など、その下の行はすべて削除してください。
-
Ctrl+X、Y、Enter の順にキーを押し、保存して nano を終了します。
-
新しい PySpark スクリプトをメインの Cloud Storage バケットにアップロードします。
# 次のコマンドは、スクリプトをメインデータバケットの 'scripts' フォルダにアップロードします
gcloud storage cp customer_dq.py gs://{{{project_0.project_id|Project_ID}}}-main-bucket/scripts/
[進行状況を確認] をクリックして、実行したタスクを確認します。
PySpark データ品質スクリプトを準備する。
タスク 4. バッチ パイプラインを構成して実行する
スクリプトをアップロードしたら、ジョブを構成して Apache Spark 向け Serverless に送信できます。
Spark ジョブを実行する
1. Cloud Shell で次の環境変数を設定します。これらの変数は、Terraform によってプロビジョニングされたリソースへのショートカットを作成します。
# BigQuery の最終テーブルの名前
export BQ_TABLE="valid_customers"
# BigQuery テーブルのパス('dataset.table' 形式)
export BQ_DATASET_TABLE="customer_data_clean.${BQ_TABLE}"
# 1,000 レコードのソース CSV ファイルのパス
export GCS_SOURCE_PATH="gs://{{{project_0.project_id|Project_ID}}}-main-bucket/source/customer_contacts_1000.csv"
# エラーレコードが書き込まれる GCS パス
export GCS_DLQ_PATH="gs://{{{project_0.project_id|Project_ID}}}-dlq-bucket/errors/"
# アップロードした PySpark スクリプトの GCS パス
export PYSPARK_SCRIPT_PATH="gs://{{{project_0.project_id|Project_ID}}}-main-bucket/scripts/customer_dq.py"
# Terraform で作成したカスタム サブネットの完全な URI
export SUBNET_URI="projects/{{{project_0.project_id |PROJECT_ID}}}/regions/{{{project_0.default_region |REGION}}}/subnetworks/spark-subnet"
-
実行する前に、以下のコマンドを確認してください。スクリプトをバッチジョブとして送信し、環境変数を引数として渡します。
-
--subnet: このフラグは重要です。Terraform によるセキュリティのベスト プラクティスに従って作成された、安全なカスタム spark-subnet 内でジョブを実行するように指示します。
-
--deps-bucket: このフラグはジョブの依存関係をステージングするための GCS バケットを指定します。
-
--: 二重ダッシュは、gcloud コマンドのフラグと、PySpark スクリプトに直接渡される引数を区切ります。
-
コマンドを実行してジョブを送信します。
gcloud dataproc batches submit pyspark $PYSPARK_SCRIPT_PATH \
--version=2.1 \
--batch="customer-dq-job-$(date +%s)" \
--region={{{project_0.default_region |REGION}}} \
--subnet=$SUBNET_URI \
--deps-bucket=gs://{{{project_0.project_id |PROJECT_ID}}}-main-bucket \
-- \
$BQ_DATASET_TABLE
注: ジョブが完了するまで 3~5 分かかります。Google Cloud コンソールで、[Dataproc] > [Serverless] > [バッチ] に移動して、進行状況をモニタリングできます。
[進行状況を確認] をクリックして、実行したタスクを確認します。
バッチ パイプラインを実行する
タスク 5. BigQuery でクリーンなデータを確認する
パイプラインが実行されたので、クリーンなレコードのみが BigQuery に読み込まれたことを確認します。
結果テーブルをクエリする
-
Cloud Shell でクエリを実行して、BigQuery テーブル内のクリーンなレコード数をカウントします。数は約 800 です。
bq query \
--use_legacy_sql=false \
'SELECT count(*) as total_clean_records FROM `customer_data_clean.valid_customers`;'
-
クリーンアップされたデータのサンプルを表示するには、次のコマンドを実行します。出力には、有効な ID とメール形式を持つレコードがすべて表示されます。
bq query \
--use_legacy_sql=false \
'SELECT * FROM `customer_data_clean.valid_customers` LIMIT 10;'
[進行状況を確認] をクリックして、実行したタスクを確認します。
BigQuery でデータを確認する。
タスク 6. DLQ の無効なレコードを確認する
最後に、データ品質チェックに失敗したレコードが、後で分析できるように DLQ バケットに正しくルーティングされたことを確認します。
Cloud Shell でエラーファイルを確認する
-
Cloud Shell で、DLQ バケット内の無効なレコードのサンプルを表示します。head -n 11 コマンドは、ヘッダー行と最初の 10 件のエラーレコードを表示します。
gcloud storage cat gs://{{{project_0.project_id |PROJECT_ID}}}-dlq-bucket/errors/*.csv | head -n 11
-
このコマンドは、検証に失敗した約 200 件のレコードのサンプルを返します。ID が欠落している行や、形式が不正なメールアドレスの行が表示されます。
出力例:
id,first_name,last_name,email
,Isabella,Smith,<REDACTED_EMAIL>
12,Michael,Johnson,
21,Sophia,Williams,sophia.williams@example
(省略可)コンソールでエラーファイルを確認する
エラーファイルは Google Cloud コンソールで直接表示することもできます。
-
ナビゲーション メニュー(☰)で、[Cloud Storage] > [バケット] に移動します。
-
-dlq-bucket で終わるバケットの名前をクリックします。
-
errors/ フォルダに移動します。
-
.csv ファイルの名前をクリックして開き、ブラウザで内容を表示します。
お疲れさまでした
本番環境グレードのバッチデータ品質パイプラインを構築し、テストを行いました。
このラボでは、Cloud Storage からのファイルを検証して処理するカスタム PySpark ジョブを作成し、クリーンな結果を BigQuery テーブルに読み込み、無効なレコードを DLQ バケットにルーティングしました。これらはすべて、事前プロビジョニングされた安全なネットワーク環境内で行われました。このパターンは、最新の信頼性の高いデータ プラットフォームの基盤となるコンポーネントです。
次のステップと詳細情報
Copyright 2026 Google LLC All rights reserved. Google および Google のロゴは、Google LLC の商標です。その他すべての社名および製品名は、それぞれ該当する企業の商標である可能性があります。