실습 설정 안내 및 요구사항
계정과 진행 상황을 보호하세요. 이 실습을 실행하려면 항상 시크릿 브라우저 창과 실습 사용자 인증 정보를 사용하세요.

Apache Spark용 서버리스를 사용하여 일괄 데이터 파이프라인의 데이터 품질 검증하기

실습 1시간 universal_currency_alt 크레딧 5개 show_chart 중급
info 이 실습에는 학습을 지원하는 AI 도구가 통합되어 있을 수 있습니다.
이 콘텐츠는 아직 휴대기기에 최적화되지 않음
최상의 경험을 위해 데스크톱 컴퓨터에서 이메일로 전송된 링크를 사용하여 방문하세요.

개요

Google Cloud의 Apache Spark용 서버리스는 인프라를 관리하지 않고도 Spark 일괄 워크로드 실행을 간소화하는 완전 관리형 서비스입니다. 이 패턴은 ETL(추출, 변환, 로드) 워크플로에 강력한 접근방식을 제공하여 분석 시스템에 고품질 데이터만 저장되도록 보장합니다.

과제

데이터 레이크에 수집된 원시 데이터에는 누락된 값, 잘못된 형식, 유효하지 않은 항목과 같은 불완전한 정보가 포함되어 있는 경우가 많습니다. 이 데이터를 분석 웨어하우스에 직접 로드하면 보고서가 손상되고 잘못된 비즈니스 의사 결정으로 이어질 수 있습니다.

솔루션

자동화된 데이터 품질 파이프라인을 만듭니다. 이 파이프라인은 원시 데이터를 가로채고 일련의 검증 규칙을 적용한 다음 데이터를 지능적으로 라우팅합니다. 정리된 레코드는 프로덕션 데이터 웨어하우스로 전송되고, 검증에 실패한 레코드는 검사 및 해결을 위해 '데드 레터 큐(DLQ)'로 전송됩니다.

이 실습에서는 Apache Spark용 서버리스에서 커스텀 PySpark 작업을 실행하여 이 솔루션을 빌드합니다. 작업은 다음과 같습니다.

  1. Cloud Storage 버킷에서 원시 CSV 파일을 읽습니다.
  2. 데이터 품질 규칙을 적용하여 각 레코드를 검증합니다.
  3. 정리된 유효한 레코드를 BigQuery 테이블에 로드합니다.
  4. 유효하지 않은 레코드를 Cloud Storage의 별도 DLQ 버킷에 씁니다.

이 패턴을 사용하면 데이터 웨어하우스를 깨끗하게 유지하고 데이터 오류 처리를 위한 명확하고 감사 가능한 프로세스를 제공할 수 있습니다.

엔터프라이즈 사용 사례

  • 전자상거래: 파이프라인은 수신되는 주문 데이터를 검증하여 제품 ID가 유효하고 고객 이메일이 올바른 형식인지 확인한 후 판매 분석 BigQuery 테이블에 로드합니다. 유효하지 않은 주문은 수동 검토를 위해 DLQ로 라우팅됩니다.
  • 의료: 시스템이 환자 기록을 처리하여 의료 코드가 존재하고 날짜가 올바른 형식인지 확인합니다. 오류가 있는 레코드는 데이터 관리 책임 검토를 위해 보안 DLQ 버킷으로 전송되어 규정 준수를 보장합니다.
  • 재무: 일일 파이프라인은 주식 시장 데이터를 수집하여 close_price와 같은 중요 필드에 null 값이 있는지 확인합니다. 불완전한 티커 데이터는 DLQ로 전송되어 시계열 분석 모델의 손상을 방지합니다.

목표

이 실습에서는 다음 작업을 수행하는 방법을 배웁니다.

  • Terraform에서 프로비저닝한 사전 구성된 실습 환경을 살펴봅니다.
  • BigQuery 데이터 세트를 만들어 환경 설정을 완료합니다.
  • 데이터 품질 및 라우팅 로직을 포함하는 주석이 달린 커스텀 PySpark 스크립트를 작성합니다.
  • 커스텀 보안 VPC 네트워크에서 Spark 일괄 작업을 구성하고 실행합니다.
  • BigQuery 테이블에서 정리된 데이터 출력을 확인합니다.
  • Cloud Storage DLQ에서 유효하지 않은 레코드를 검토합니다.

설정 및 요건

각 실습에서는 정해진 기간 동안 새 Google Cloud 프로젝트와 리소스 집합이 무료로 제공됩니다.

  1. 시크릿 창을 사용하여 Google Skills에 로그인합니다.

  2. 실습 사용 가능 시간(예: 1:15:00)을 참고하여 해당 시간 내에 완료합니다.
    일시중지 기능은 없습니다. 필요한 경우 다시 시작할 수 있지만 처음부터 시작해야 합니다.

  3. 준비가 되면 실습 시작을 클릭합니다.

  4. 실습 사용자 인증 정보(사용자 이름비밀번호)를 기록해 두세요. Google Cloud Console에 로그인합니다.

  5. Google Console 열기를 클릭합니다.

  6. 다른 계정 사용을 클릭한 다음, 안내 메시지에 실습에 대한 사용자 인증 정보를 복사하여 붙여넣습니다.
    다른 사용자 인증 정보를 사용하는 경우 오류가 발생하거나 요금이 부과됩니다.

  7. 약관에 동의하고 리소스 복구 페이지를 건너뜁니다.

Cloud Shell 활성화

Cloud Shell은 여러 개발 도구가 포함된 가상 머신입니다. 5GB의 영구적인 홈 디렉터리를 제공하며 Google Cloud에서 실행됩니다. Cloud Shell을 사용하면 명령줄을 통해 Google Cloud 리소스에 액세스할 수 있습니다. gcloud는 Google Cloud의 명령줄 도구입니다. Cloud Shell에 사전 설치되어 있으며 탭 자동 완성을 지원합니다.

  1. Google Cloud Console의 탐색창에서 Cloud Shell 활성화(Cloud Shell 아이콘)를 클릭합니다.

  2. 계속을 클릭합니다.
    환경을 프로비저닝하고 연결하는 데는 몇 분 정도 소요됩니다. 연결되면 사용자 인증도 처리되어 프로젝트가 PROJECT_ID로 설정됩니다. 예를 들면 다음과 같습니다.

Cloud Shell 터미널

샘플 명령어

  • 활성 계정 이름을 나열합니다.

gcloud auth list

(출력)

Credentialed accounts: - <myaccount>@<mydomain>.com (active)

(출력 예시)

Credentialed accounts: - google1623327_student@qwiklabs.net
  • 프로젝트 ID를 나열합니다.

gcloud config list project

(출력)

[core] project = <project_ID>

(출력 예시)

[core] project = qwiklabs-gcp-44776a13dea667a6

실습 환경

이 실습을 시작하면 필요한 인프라와 리소스 대부분을 자동으로 프로비저닝하는 Terraform 스크립트가 실행됩니다. 다음 항목이 생성되어 있습니다.

  • 프로젝트 ID =

  • 리전 =

  • 영역 =

  • Apache Spark용 서버리스에 필요한 네트워크 액세스로 구성된 커스텀 VPC 네트워크(spark-network) 및 서브넷(spark-subnet)

  • Cloud Storage 버킷 2개:

    1. PySpark 스크립트(scripts/), 원시 입력 데이터(source/)를 저장하고 BigQuery 커넥터의 임시 스테이징 영역으로 사용되는 기본 버킷(gs://-main-bucket)
    2. 유효하지 않은 레코드를 저장하는 데 사용되는 DLQ 버킷(gs://-dlq-bucket)
  • 원시 데이터 파일: Python 스크립트가 1,000개의 레코드가 있는 CSV 파일(source/customer_contacts_1000.csv)을 자동으로 생성하여 기본 버킷에 업로드했습니다. 이러한 레코드의 약 20%에는 파이프라인을 테스트하기 위해 의도적으로 불완전한 정보(예: ID 누락, 유효하지 않은 이메일)가 포함되어 있습니다.

목표는 양호한 레코드와 불량한 레코드를 식별하고 분리한 다음 올바른 대상으로 로드할 수 있는 스크립트를 작성하는 것입니다.

버킷 전략 관련 참고사항:

이 실습에서는 편의상 스크립트, 소스 데이터, BigQuery 임시 스테이징에 기본 버킷을 사용합니다. 프로덕션 환경에서는 원시/입력 데이터용 버킷, 데드 레터 큐용 버킷, BigQuery 커넥터와 같은 커넥터에서 사용하는 임시 스테이징 데이터용 전용 버킷 등 3개의 별도 버킷을 사용하는 것이 좋습니다. 이를 통해 격리, 보안, 수명 주기 관리가 개선됩니다.

작업 1. 환경 탐색 및 서비스 준비

먼저 실습 리소스가 올바르게 생성되었는지 확인하고 작업할 소스 데이터를 미리 봅니다.

Cloud Storage 버킷 확인

  1. Google Cloud 콘솔에서 탐색 메뉴(☰)를 사용하여 Cloud Storage > 버킷으로 이동합니다.
  2. -main-bucket로 끝나는 버킷과 -dlq-bucket로 끝나는 버킷이 나열되어 있는지 확인합니다.

원시 데이터 미리보기 및 API 사용 설정

Cloud Shell 아이콘 클릭

  1. Cloud Shell을 활성화합니다.

  2. 다음 명령어를 실행하여 기본 버킷에 있는 원시 CSV 파일의 헤더와 처음 10개 레코드를 확인합니다.

    gsutil cat gs://{{{project_0.project_id|Project_ID}}}-main-bucket/source/customer_contacts_1000.csv | head -n 11
  3. 작업을 실행하려면 먼저 Dataproc API를 사용 설정해야 합니다. Cloud Shell에서 다음 명령어를 실행하여 사용 설정합니다.

    gcloud services enable dataproc.googleapis.com

내 진행 상황 확인하기를 클릭하여 실행한 작업을 확인합니다.

Dataproc API 사용 설정

작업 2. BigQuery 환경 준비

Terraform 스크립트가 네트워크와 스토리지를 설정했지만 여전히 정리된 데이터가 로드될 대상 BigQuery 데이터 세트를 만들어야 합니다.

데이터 세트 생성

  1. Cloud Shell에서 다음 명령어를 실행하여 customer_data_clean이라는 새 BigQuery 데이터 세트를 만듭니다.

    bq mk customer_data_clean
  2. 이제 콘솔에서 데이터 세트가 생성되었는지 확인할 수 있습니다. 탐색 메뉴(☰)를 사용하여 BigQuery로 이동합니다. 탐색기 패널에서 프로젝트 ID 옆에 있는 화살표를 클릭하여 콘텐츠를 펼치면 새로운 customer_data_clean 데이터 세트가 표시됩니다.

내 진행 상황 확인하기를 클릭하여 실행한 작업을 확인합니다.

BigQuery 데이터 세트 만들기

작업 3. PySpark 데이터 품질 스크립트 준비

이제 데이터 검증 로직이 포함된 커스텀 PySpark 스크립트를 만듭니다. 스크립트의 로직은 간단합니다.

  • Cloud Storage에서 소스 CSV 파일을 DataFrame으로 읽고,
  • 일련의 검증 규칙을 적용하여 null ID와 유효한 이메일 형식을 확인합니다.
  • 이어서 DataFrame을 두 개로 분할합니다. 하나는 정리된 레코드가 포함된 DataFrame이고 다른 하나는 유효하지 않은 레코드가 포함된 DataFrame입니다. 마지막으로,
  • 정리된 데이터를 BigQuery에 쓰고, 유효하지 않은 데이터를 Cloud Storage의 DLQ 버킷에 씁니다.

스크립트 작성 및 업로드

  1. Cloud Shell에서 customer_dq.py라는 PySpark 스크립트 파일을 만듭니다.

    nano customer_dq.py
  2. 다음 주석 처리된 Python 코드를 nano 편집기에 붙여넣습니다.

    # Import necessary libraries import sys from pyspark.sql import SparkSession from pyspark.sql.functions import col, when # This script expects 1 command-line argument: # 1. The destination BigQuery table path in format 'dataset.table' if len(sys.argv) != 2: print("Usage: customer_dq.py <bq_dataset_table>") sys.exit(-1) # Assign command-line argument to variable bq_dataset_table = sys.argv[1] # Qwiklabs variables are substituted here when the lab runs bq_project = "{{{project_0.project_id|Project_ID}}}" gcs_source_path = f"gs://{bq_project}-main-bucket/source/customer_contacts_1000.csv" gcs_dlq_path = f"gs://{bq_project}-dlq-bucket/errors/" # Initialize a new Spark Session spark = SparkSession.builder.appName("Customer DQ Check").getOrCreate() # Step 1: Read the source CSV data from the GCS bucket df = spark.read.option("header", "true").option("inferSchema", "true").csv(gcs_source_path) # Step 2: Define the Data Quality rules # Rule 1: The 'id' column must not be null. dq_rule_id = col("id").isNotNull() # Rule 2: The 'email' column must not be null and must match a valid email format regex. email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" dq_rule_email = col("email").isNotNull().__and__(col("email").rlike(email_regex)) # Step 3: Apply rules and split the DataFrame into clean and error records df_with_dq = df.withColumn("dq_passed", when(dq_rule_id.__and__(dq_rule_email), True).otherwise(False)) clean_df = df_with_dq.filter(col("dq_passed") == True).drop("dq_passed") error_df = df_with_dq.filter(col("dq_passed") == False).drop("dq_passed") # Step 4: Write the clean records to the specified BigQuery table # The BigQuery connector requires a temporary GCS bucket NAME. temp_gcs_bucket_name = f"{bq_project}-main-bucket" clean_df.write \ .format("bigquery") \ .option("table", bq_dataset_table) \ .option("temporaryGcsBucket", temp_gcs_bucket_name) \ .option("project", bq_project) \ .mode("overwrite") \ .save() # Step 5: Write the error records to the DLQ bucket in GCS as a single CSV file error_df.repartition(1).write \ .option("header", "true") \ .mode("overwrite") \ .csv(gcs_dlq_path) # Stop the Spark session spark.stop()

중요 참고사항 스크립트의 마지막 줄이 spark.stop()인지 확인합니다. </bq_dataset_table>과 같이 그 아래에 있는 모든 항목을 삭제합니다.

  1. Ctrl+X, Y, Enter를 차례로 눌러 저장하고 nano를 종료합니다.

  2. 새로운 PySpark 스크립트를 기본 Cloud Storage 버킷에 업로드합니다.

    # The command below uploads the script to a 'scripts' folder in the main data bucket gcloud storage cp customer_dq.py gs://{{{project_0.project_id|Project_ID}}}-main-bucket/scripts/

내 진행 상황 확인하기를 클릭하여 실행한 작업을 확인합니다.

PySpark 데이터 품질 스크립트 준비

작업 4. 일괄 파이프라인 구성 및 실행

스크립트가 업로드되었으므로 이제 작업을 구성하고 Apache Spark용 서버리스에 제출할 수 있습니다.

Spark 작업 실행

1.  Cloud Shell에서 다음 환경 변수를 설정합니다. 이러한 변수는 Terraform에서 프로비저닝한 리소스에 대한 바로가기를 만듭니다.

# The name for the final table in BigQuery export BQ_TABLE="valid_customers" # The BigQuery table path in 'dataset.table' format export BQ_DATASET_TABLE="customer_data_clean.${BQ_TABLE}" # The path to the 1000-record source CSV file export GCS_SOURCE_PATH="gs://{{{project_0.project_id|Project_ID}}}-main-bucket/source/customer_contacts_1000.csv" # The GCS path where error records will be written export GCS_DLQ_PATH="gs://{{{project_0.project_id|Project_ID}}}-dlq-bucket/errors/" # The GCS path to the PySpark script you just uploaded export PYSPARK_SCRIPT_PATH="gs://{{{project_0.project_id|Project_ID}}}-main-bucket/scripts/customer_dq.py" # The full URI of the custom subnet created by Terraform export SUBNET_URI="projects/{{{project_0.project_id |PROJECT_ID}}}/regions/{{{project_0.default_region |REGION}}}/subnetworks/spark-subnet"
  1. 아래 명령어를 실행하기 전에 검토합니다. 스크립트를 일괄 작업으로 제출하고 환경 변수를 인수로 전달합니다.

    • --subnet: 이 플래그는 매우 중요합니다. Terraform이 생성한 안전한 커스텀 spark-subnet 내에서 작업을 실행하도록 지시하며, 이는 보안 권장사항입니다.
    • --deps-bucket: 이 플래그는 작업 종속 항목을 스테이징하기 위한 GCS 버킷을 지정합니다.
    • --: 이 이중 대시는 gcloud 명령어의 플래그를 PySpark 스크립트에 직접 전달되는 인수와 구분합니다.
  2. 다음 명령어를 실행하여 작업을 제출합니다.

    gcloud dataproc batches submit pyspark $PYSPARK_SCRIPT_PATH \ --version=2.1 \ --batch="customer-dq-job-$(date +%s)" \ --region={{{project_0.default_region |REGION}}} \ --subnet=$SUBNET_URI \ --deps-bucket=gs://{{{project_0.project_id |PROJECT_ID}}}-main-bucket \ -- \ $BQ_DATASET_TABLE
참고: 작업이 완료되는 데 3~5분 정도 걸립니다. Google Cloud 콘솔에서 Dataproc > 서버리스 > 일괄로 이동하여 진행 상황을 모니터링할 수 있습니다.

내 진행 상황 확인하기를 클릭하여 실행한 작업을 확인합니다.

일괄 파이프라인 실행

작업 5. BigQuery에서 정리된 데이터 확인

파이프라인이 실행되었으므로 정리된 레코드만 BigQuery에 로드되었는지 확인합니다.

결과 테이블 쿼리

  1. Cloud Shell에서 쿼리를 실행하여 BigQuery 테이블의 정리된 레코드 수를 계산합니다. 개수는 약 800개여야 합니다.

    bq query \ --use_legacy_sql=false \ 'SELECT count(*) as total_clean_records FROM `customer_data_clean.valid_customers`;'
  2. 정리된 데이터 샘플을 보려면 다음 명령어를 실행합니다. 출력에는 유효한 ID와 이메일 형식을 모두 갖춘 레코드가 표시됩니다.

    bq query \ --use_legacy_sql=false \ 'SELECT * FROM `customer_data_clean.valid_customers` LIMIT 10;'

내 진행 상황 확인하기를 클릭하여 실행한 작업을 확인합니다.

BigQuery에서 데이터 확인

작업 6. DLQ에서 유효하지 않은 레코드 검토

마지막으로 데이터 품질 검사를 통과하지 못한 레코드가 나중에 분석할 수 있도록 DLQ 버킷으로 올바르게 라우팅되었는지 확인합니다.

Cloud Shell을 통해 오류 파일 검사

  1. Cloud Shell에서 DLQ 버킷에 있는 유효하지 않은 레코드의 샘플을 봅니다. head -n 11 명령어는 헤더 행과 처음 10개의 오류 레코드를 표시합니다.

    gcloud storage cat gs://{{{project_0.project_id |PROJECT_ID}}}-dlq-bucket/errors/*.csv | head -n 11
  2. 이 명령어는 검증에 실패한 약 200개의 레코드 샘플을 반환합니다. ID가 누락되었거나 이메일 형식이 잘못된 행이 표시됩니다.

    출력 예시:

    id,first_name,last_name,email ,Isabella,Smith,<REDACTED_EMAIL> 12,Michael,Johnson, 21,Sophia,Williams,sophia.williams@example

(선택사항) 콘솔을 통해 오류 파일 검사

Google Cloud 콘솔에서 오류 파일을 직접 볼 수도 있습니다.

  1. 탐색 메뉴(☰)에서 Cloud Storage > 버킷으로 이동합니다.
  2. 이름이 -dlq-bucket으로 끝나는 버킷을 클릭합니다.
  3. errors/ 폴더로 이동합니다.
  4. .csv 파일 이름을 클릭하여 브라우저에서 열고 내용을 확인합니다.

수고하셨습니다

프로덕션급 일괄 데이터 품질 파이프라인을 성공적으로 빌드하고 테스트했습니다.

이 실습에서는 커스텀 PySpark 작업을 작성하여 Cloud Storage의 파일을 검증 및 처리하고, 정리된 결과를 BigQuery 테이블에 로드하고, 유효하지 않은 레코드를 DLQ 버킷으로 라우팅했습니다. 이 모든 작업은 사전 프로비저닝된 안전한 네트워크 환경 내에서 진행했습니다. 이 패턴은 안정적인 최신 데이터 플랫폼의 기본 구성요소입니다.

다음 단계/더 학습하기

Copyright 2026 Google LLC All rights reserved. Google 및 Google 로고는 Google LLC의 상표입니다. 기타 모든 회사명 및 제품명은 해당 업체의 상표일 수 있습니다.

시작하기 전에

  1. 실습에서는 정해진 기간 동안 Google Cloud 프로젝트와 리소스를 만듭니다.
  2. 실습에는 시간 제한이 있으며 일시중지 기능이 없습니다. 실습을 종료하면 처음부터 다시 시작해야 합니다.
  3. 화면 왼쪽 상단에서 실습 시작을 클릭하여 시작합니다.

시크릿 브라우징 사용

  1. 실습에 입력한 사용자 이름비밀번호를 복사합니다.
  2. 비공개 모드에서 콘솔 열기를 클릭합니다.

콘솔에 로그인

    실습 사용자 인증 정보를 사용하여
  1. 로그인합니다. 다른 사용자 인증 정보를 사용하면 오류가 발생하거나 요금이 부과될 수 있습니다.
  2. 약관에 동의하고 리소스 복구 페이지를 건너뜁니다.
  3. 실습을 완료했거나 다시 시작하려고 하는 경우가 아니면 실습 종료를 클릭하지 마세요. 이 버튼을 클릭하면 작업 내용이 지워지고 프로젝트가 삭제됩니다.

현재 이 콘텐츠를 이용할 수 없습니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

감사합니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

한 번에 실습 1개만 가능

모든 기존 실습을 종료하고 이 실습을 시작할지 확인하세요.

시크릿 브라우징을 사용하여 실습 실행하기

이 실습을 실행하는 가장 좋은 방법은 시크릿 모드 또는 시크릿 브라우저 창을 사용하는 것입니다. 개인 계정과 학생 계정 간의 충돌로 개인 계정에 추가 요금이 발생하는 일을 방지해 줍니다.