실습 설정 안내 및 요구사항
계정과 진행 상황을 보호하세요. 이 실습을 실행하려면 항상 시크릿 브라우저 창과 실습 사용자 인증 정보를 사용하세요.

Dataflow를 사용한 서버리스 데이터 처리 - Apache Beam 및 Dataflow를 사용한 ETL 파이프라인 작성(Python)

실습 1시간 30분 universal_currency_alt 크레딧 5개 show_chart 중급
info 이 실습에는 학습을 지원하는 AI 도구가 통합되어 있을 수 있습니다.
이 콘텐츠는 아직 휴대기기에 최적화되지 않음
최상의 경험을 위해 데스크톱 컴퓨터에서 이메일로 전송된 링크를 사용하여 방문하세요.

개요

이 실습에서는 다음 작업을 수행하는 방법을 배웁니다.

  • Apache Beam에서 일괄 ETL(Extract-Transform-Load) 파이프라인을 빌드합니다. 이 파이프라인은 Google Cloud Storage에서 원시 데이터를 가져와 BigQuery에 씁니다.
  • Dataflow에서 Apache Beam 파이프라인을 실행합니다.
  • 파이프라인 실행을 파라미터화합니다.

기본 요건:

  • Python에 대한 기본 지식

설정 및 요건

각 실습에서는 정해진 기간 동안 새 Google Cloud 프로젝트와 리소스 집합이 무료로 제공됩니다.

  1. 시크릿 창을 사용하여 Qwiklabs에 로그인합니다.

  2. 실습 사용 가능 시간(예: 1:15:00)을 참고하여 해당 시간 내에 완료합니다.
    일시중지 기능은 없습니다. 필요한 경우 다시 시작할 수 있지만 처음부터 시작해야 합니다.

  3. 준비가 되면 실습 시작을 클릭합니다.

  4. 실습 사용자 인증 정보(사용자 이름비밀번호)를 기록해 두세요. Google Cloud Console에 로그인합니다.

  5. Google Console 열기를 클릭합니다.

  6. 다른 계정 사용을 클릭한 다음, 안내 메시지에 실습에 대한 사용자 인증 정보를 복사하여 붙여넣습니다.
    다른 사용자 인증 정보를 사용하는 경우 오류가 발생하거나 요금이 부과됩니다.

  7. 약관에 동의하고 리소스 복구 페이지를 건너뜁니다.

프로젝트 권한 확인

Google Cloud에서 작업을 시작하기 전에 프로젝트가 Identity and Access Management(IAM) 내에서 올바른 권한을 보유하고 있는지 확인해야 합니다.

  1. Google Cloud 콘솔의 탐색 메뉴(탐색 메뉴 아이콘)에서 IAM 및 관리자 > IAM을 선택합니다.

  2. 기본 컴퓨팅 서비스 계정 {project-number}-compute@developer.gserviceaccount.com이 있고 editor 역할이 할당되어 있는지 확인하세요. 계정 프리픽스는 프로젝트 번호이며, 이 번호는 탐색 메뉴 > Cloud 개요 > 대시보드에서 확인할 수 있습니다.

Compute Engine 기본 서비스 계정 이름과 편집자 상태가 강조 표시된 권한 탭 페이지

참고: 계정이 IAM에 없거나 editor 역할이 없는 경우 다음 단계에 따라 필요한 역할을 할당합니다.
  1. Google Cloud 콘솔의 탐색 메뉴에서 Cloud 개요 > 대시보드를 클릭합니다.
  2. 프로젝트 번호(예: 729328892908)를 복사합니다.
  3. 탐색 메뉴에서 IAM 및 관리자 > IAM을 선택합니다.
  4. 역할 테이블 상단에서 주 구성원별로 보기 아래에 있는 액세스 권한 부여를 클릭합니다.
  5. 새 주 구성원 필드에 다음을 입력합니다.
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number}는 프로젝트 번호로 바꿉니다.
  2. 역할 필드에서 프로젝트(또는 기본) > 편집자를 선택합니다.
  3. 저장을 클릭합니다.

Workbench 인스턴스 개발 환경 설정

이 실습에서는 인스턴스 노트북의 터미널에서 모든 명령어를 실행합니다.

  1. Google Cloud 콘솔의 탐색 메뉴(탐색 메뉴)에서 Vertex AI를 선택합니다.

  2. 모든 권장 API 사용 설정을 클릭합니다.

  3. 탐색 메뉴에서 Workbench를 클릭합니다.

    Workbench 페이지 상단에서 인스턴스 뷰에 있는지 확인합니다.

  4. 상자 추가새로 만들기를 클릭합니다.

  5. 인스턴스를 구성합니다.

    • 이름: lab-workbench
    • 리전: 리전을 (으)로 설정합니다.
    • 영역: 영역을 (으)로 설정합니다.
    • 고급 옵션(선택사항): 필요한 경우 '고급 옵션'을 클릭하여 추가로 맞춤설정(예: 머신 유형, 디스크 크기)할 수 있습니다.

Vertex AI Workbench 인스턴스 생성

  1. 만들기를 클릭합니다.

인스턴스를 만드는 데 몇 분 정도 걸립니다. 준비되면 이름 옆에 녹색 체크표시가 나타납니다.

  1. 인스턴스 이름 옆에 있는 JupyterLab 열기를 클릭하여 JupyterLab 인터페이스를 실행합니다. 그러면 브라우저에서 새 탭이 열립니다.

Workbench 인스턴스 배포됨

  1. 그런 다음 터미널을 클릭합니다. 이 실습의 모든 명령어를 실행할 수 있는 터미널이 열립니다.

코드 저장소 다운로드

다음으로 이 실습에서 사용할 코드 저장소를 다운로드합니다.

  1. 방금 연 터미널에서 다음을 입력합니다.
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. 노트북 환경의 왼쪽 패널에 있는 파일 브라우저에 training-data-analyst 저장소가 추가됩니다.

  2. 클론된 저장소인 /training-data-analyst/quests/dataflow_python/으로 이동합니다. 각 실습의 폴더가 표시됩니다. 이 폴더는 사용자가 완료해야 하는 코드가 포함된 lab 하위 폴더와, 문제 발생 시 참고할 수 있는 정상 작동 예시가 포함된 solution 하위 폴더로 나뉩니다.

확장된 보기 메뉴에서 강조 표시된 탐색기 옵션

참고: 수정할 파일을 열려면 해당 파일로 이동하여 클릭하기만 하면 됩니다. 클릭하면 파일이 열리며, 여기에서 코드를 추가하거나 수정할 수 있습니다.

내 진행 상황 확인하기를 클릭하여 목표를 확인하세요. 노트북 인스턴스 만들기 및 과정 저장소 클론하기

Apache Beam 및 Dataflow

약 5분

Dataflow는 일괄 및 스트리밍 Apache Beam 데이터 처리 파이프라인을 실행하기 위한 완전 관리형 Google Cloud 서비스입니다.

Apache Beam은 오픈소스 기반의 고급 통합 이동식 데이터 처리 프로그래밍 모델로, 최종 사용자가 Java, Python 또는 Go를 사용하여 일괄 및 스트리밍 데이터 병렬 처리 파이프라인을 모두 정의할 수 있게 해 줍니다. Apache Beam 파이프라인은 소규모 데이터 세트의 로컬 개발 머신에서 실행할 수 있으며, Dataflow에서 대규모로 실행할 수도 있습니다. 그러나 Apache Beam은 오픈소스이므로 다른 실행기도 존재합니다. Apache Flink, Apache Spark 등에서 Beam 파이프라인을 실행할 수 있습니다.

실습 네트워크 아키텍처 다이어그램

실습 1부. ETL 파이프라인을 처음부터 작성

소개

이 섹션에서는 Apache Beam ETL, 즉 추출(Extract)-변환(Transform)-로드(Load) 파이프라인을 처음부터 작성합니다.

데이터 세트 및 사용 사례 검토

이 퀘스트의 각 실습에서 입력 데이터는 일반적인 로그 형식의 웹 서버 로그와 유사한 형태로 제공되며, 웹 서버에 포함될 수 있는 다른 데이터도 함께 제공됩니다. 첫 번째 실습에서는 데이터를 일괄 소스로 처리하며, 이후 실습에서는 데이터를 스트리밍 소스로 처리합니다. 데이터를 읽고 파싱한 다음 나중에 데이터 분석을 수행할 수 있도록 서버리스 데이터 웨어하우스인 BigQuery에 데이터를 작성해야 합니다.

적절한 실습 열기

  • IDE의 터미널로 돌아가서 다음 명령어를 복사하여 붙여넣습니다.
cd 1_Basic_ETL/lab export BASE_DIR=$(pwd)

가상 환경 및 종속 항목 설정

실제 파이프라인 코드를 편집하기 전에 필요한 종속 항목이 설치되어 있는지 확인해야 합니다.

  1. 터미널에서 이 실습의 작업을 위한 가상 환경을 만듭니다.
sudo apt-get update && sudo apt-get install -y python3-venv python3 -m venv df-env source df-env/bin/activate
  1. 다음으로 파이프라인을 실행하는 데 필요한 패키지를 설치합니다.
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. 마지막으로 Dataflow API가 사용 설정되어 있는지 확인합니다.
gcloud services enable dataflow.googleapis.com

첫 번째 파이프라인 작성

1시간

작업 1. 합성 데이터 생성

  1. 터미널에서 다음 명령어를 실행하여 합성 웹 서버 로그 생성을 위한 스크립트가 포함된 저장소를 클론합니다.
cd $BASE_DIR/../.. source create_batch_sinks.sh bash generate_batch_events.sh head events.json

이 스크립트는 다음과 유사한 줄이 포함된 events.json이라는 파일을 만듭니다.

{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

그러면 이 파일이 의 Google Cloud Storage 버킷에 자동으로 복사됩니다.

  1. 다른 브라우저 탭에서 Google Cloud Storage로 이동하여 스토리지 버킷에 events.json이라는 파일이 포함되어 있는지 확인합니다.

내 진행 상황 확인하기를 클릭하여 목표를 확인하세요. 합성 데이터 생성

작업 2. 소스에서 데이터 읽기

이 섹션이나 이후 섹션에서 문제가 발생하면 해결 방법을 참고하세요.

  1. 파일 탐색기에서 1_Basic_ETL/lab 실습 폴더로 이동하여 my_pipeline.py를 클릭합니다. 그러면 편집기 패널에서 파일이 열립니다. 다음 패키지를 가져왔는지 확인합니다.
import argparse import time import logging import json import apache_beam as beam from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.runners import DataflowRunner, DirectRunner
  1. 아래로 스크롤하여 run() 메서드를 찾습니다. 이 메서드에는 현재 아무것도 하지 않는 파이프라인이 포함되어 있습니다. PipelineOptions 객체를 사용하여 Pipeline 객체를 만드는 방법과 메서드의 마지막 줄에서 파이프라인을 실행하는 방법을 참고하세요.
options = PipelineOptions() # Set options p = beam.Pipeline(options=options) # Do stuff p.run()
  • Apache Beam 파이프라인의 모든 데이터는 PCollection에 상주합니다. 파이프라인의 초기 PCollection을 만들려면 파이프라인 객체에 루트 변환을 적용해야 합니다. 루트 변환은 외부 데이터 소스 또는 지정한 일부 로컬 데이터에서 PCollection을 만듭니다.

  • Beam SDK에는 ReadCreate라는 두 가지 종류의 루트 변환이 있습니다. Read 변환은 텍스트 파일이나 데이터베이스 테이블과 같은 외부 소스에서 데이터를 읽습니다. Create 변환은 메모리 내 목록에서 PCollection을 만들며 특히 테스트에 유용합니다.

다음 예시 코드는 ReadFromText 루트 변환을 적용하여 텍스트 파일에서 데이터를 읽는 방법을 보여줍니다. 변환은 Pipeline 객체 p에 적용되며, 파라미터화된 유형 힌트에서 가져온 표기법을 사용하여 PCollection[str] 형식으로 파이프라인 데이터 세트를 반환합니다. "ReadLines"는 변환의 이름으로, 나중에 더 큰 파이프라인으로 작업할 때 유용합니다.

lines = p | "ReadLines" >> beam.io.ReadFromText("gs://path/to/input.txt")
  1. run() 메서드 내에서 "input"이라는 문자열 상수를 만들고 값을 gs://<YOUR-PROJECT-ID>/events.json으로 설정합니다. 향후 실습에서는 명령줄 파라미터를 사용하여 이 정보를 전달합니다.

  2. textio.ReadFromText 변환을 호출하여 events.json의 모든 이벤트 문자열로 이루어진 PCollection을 만듭니다.

  3. my_pipeline.py 상단에 적절한 import 문을 추가합니다.

  4. 작업을 저장하려면 상단 탐색 메뉴에서 파일을 클릭하고 저장을 선택합니다.

작업 3. 파이프라인을 실행하여 작동하는지 확인

  • 터미널로 돌아가서 $BASE_DIR 폴더로 돌아가 다음 명령어를 실행합니다. 파이프라인을 실행하기 전에 PROJECT_ID 환경 변수를 설정해야 합니다.
cd $BASE_DIR # Set up environment variables export PROJECT_ID=$(gcloud config get-value project) # Run the pipeline python3 my_pipeline.py \ --project=${PROJECT_ID} \ --region={{{project_0.startup_script.lab_region|Region}}} \ --stagingLocation=gs://$PROJECT_ID/staging/ \ --tempLocation=gs://$PROJECT_ID/temp/ \ --runner=DirectRunner

지금은 파이프라인이 실제로 어떤 작업을 수행하지는 않으며 데이터를 읽어들이기만 합니다.

하지만 이 파이프라인을 실행하면 비용이 더 많이 드는 계산을 수행하기 전에 로컬 머신에서 실행되는 DirectRunner를 사용하여 파이프라인을 로컬에서 저렴하게 검증해 볼 수 있다는 장점이 있습니다. Google Dataflow를 사용하여 파이프라인을 실행하려면 runnerDataflowRunner로 변경하면 됩니다.

작업 4. 변환 추가

막히는 부분이 있다면 해결 방법을 참고하세요.

변환은 데이터를 변경하는 작업을 말합니다. Apache Beam에서는 PTransform 클래스를 통해 변환이 이루어집니다. 런타임 시 이러한 작업은 여러 독립적인 작업자를 통해 수행됩니다.

모든 PTransform의 입력과 출력은 PCollection입니다. 눈치채셨을지 모르겠지만, 사실 Google Cloud Storage에서 데이터를 읽을 때 이미 PTransform을 사용했습니다. 이를 변수에 할당했는지 여부와 관계없이 문자열의 PCollection이 생성되었습니다.

Beam은 Python에서 파이프 연산자 |로 표현되는 PCollection에 대해 일반적인 apply 메서드를 사용하므로 변환을 순차적으로 연결할 수 있습니다. 예를 들어, 다음과 같이 변환을 연결하여 순차 파이프라인을 만들 수 있습니다.

[Output_PCollection] = ([Input_PCollection] | [First Transform] | [Second Transform] | [Third Transform])

이 작업에서는 새로운 종류의 변환인 ParDo를 사용합니다. ParDo는 일반 병렬 처리를 위한 Beam 변환입니다.

ParDo 처리 패러다임은 Map/Shuffle/Reduce 스타일 알고리즘의 'Map' 단계와 유사합니다. ParDo 변환은 입력 PCollection의 각 요소를 고려하고, 해당 요소에 대해 일부 처리 함수(사용자 코드)를 수행하며, 0개, 1개 또는 여러 개의 요소를 출력 PCollection에 내보냅니다.

ParDo는 여러 가지 일반적인 데이터 처리 작업에 유용하지만, Python에서 특수한 PTransform을 사용하면 다음과 같은 프로세스를 더 간단하게 처리할 수 있습니다.

  • 데이터 세트 필터링. Filter를 사용하여 PCollection의 각 요소를 고려하고, 불리언 값을 반환하는 Python 호출 가능 항목의 출력에 따라 해당 요소를 새 PCollection에 출력 또는 삭제할 수 있습니다.
  • 데이터 세트의 각 요소에 대한 형식 지정 또는 유형 변환. 입력 PCollection에 원하는 것과 다른 유형이나 형식의 요소가 포함된 경우, Map을 사용하여 각 요소에서 변환을 수행하고 결과를 새 PCollection으로 출력할 수 있습니다.
  • 데이터 세트에서 각 요소의 부분 추출. 예를 들어, 필드가 여러 개인 레코드의 PCollection이 있는 경우 Map 또는 FlatMap을 사용하여 고려하려는 필드만 새 PCollection으로 파싱할 수도 있습니다.
  • 데이터 세트에서 각 요소의 계산 수행. ParDo, Map 또는 FlatMap을 사용하여 PCollection의 모든 요소 또는 특정 요소에 대해 단순하거나 복잡한 계산을 수행하고 그 결과를 새 PCollection으로 출력할 수 있습니다.

이 작업을 완료하려면 단일 이벤트를 나타내는 JSON 문자열을 읽고, Python json 패키지를 사용하여 파싱하고, json.loads가 반환한 사전을 출력하는 Map 변환을 작성해야 합니다.

Map 함수는 인라인 방식 또는 사전 정의된 호출 가능 방식을 통해 구현할 수 있습니다. 인라인 Map 함수는 다음과 같이 작성합니다.

p | beam.Map(lambda x : something(x))

또는 스크립트 앞부분에서 정의한 Python 호출 가능 항목과 함께 beam.Map을 사용할 수도 있습니다.

def something(x): y = # Do something! return y p | beam.Map(something)

beam.Map(및 기타 경량 DoFn)보다 더 많은 유연성이 필요한 경우 DoFn을 서브클래스로 지정하는 커스텀 DoFn으로 ParDo를 구현할 수 있습니다. 이렇게 하면 테스트 프레임워크와의 통합이 더 쉬워집니다.

class MyDoFn(beam.DoFn): def process(self, element): output = #Do Something! yield output p | beam.ParDo(MyDoFn())

막히는 부분이 있다면 해결 방법을 참고하세요.

작업 5. 싱크에 쓰기

이 시점에서 파이프라인은 Google Cloud Storage에서 파일을 읽고, 각 줄을 파싱하고, 각 요소에 대한 Python 사전을 내보냅니다. 다음 단계는 이러한 객체를 BigQuery 테이블에 작성하는 것입니다.

  1. 필요한 경우 파이프라인에 BigQuery 테이블을 만들도록 지시할 수 있지만, 데이터 세트는 미리 만들 필요가 있습니다. 이 작업은 generate_batch_events.sh 스크립트에서 이미 완료되었습니다. 다음 코드를 사용하여 이 데이터 세트를 검사할 수 있습니다.
# Examine dataset bq ls # No tables yet bq ls logs

파이프라인의 최종 PCollection을 출력하려면 해당 PCollectionWrite 변환을 적용합니다. Write 변환은 PCollection의 요소를 데이터베이스 테이블과 같은 외부 데이터 싱크에 출력할 수 있습니다. 일반적으로 파이프라인의 끝에 데이터를 쓰지만, Write를 사용하면 언제든지 파이프라인에서 PCollection을 출력할 수 있습니다.

다음 예시 코드는 WriteToText 변환을 적용하여 문자열의 PCollection을 텍스트 파일에 쓰는 방법을 보여줍니다.

p | "WriteMyFile" >> beam.io.WriteToText("gs://path/to/output")
  1. 이 경우 WriteToText 대신 WriteToBigQuery를 사용합니다.

이 함수에는 쓰기 대상이 되는 특정 테이블과 이 테이블의 스키마 등 여러 가지를 지정해야 합니다. 필요에 따라 기존 테이블에 추가할지, 기존 테이블을 다시 만들지(초기 파이프라인 반복에 유용), 테이블이 없는 경우 테이블을 만들지 여부를 지정할 수 있습니다. 기본적으로 이 변환은 존재하지 않는 테이블을 생성하고, 비어 있지 않은 테이블에는 쓰지 않습니다.

  1. 하지만 스키마는 지정해야 합니다. 여기에는 두 가지 방법이 있습니다. 스키마를 단일 문자열 또는 JSON 형식으로 지정할 수 있습니다. 예를 들어, 사전에 이름(str 유형), ID(int 유형), 잔액(float 유형)이라는 세 가지 필드가 있다고 가정해 보겠습니다. 그러면 스키마를 한 줄로 지정할 수 있습니다.
table_schema = 'name:STRING,id:INTEGER,balance:FLOAT'

JSON으로는 이렇게 지정할 수 있습니다.

table_schema = { "fields": [ { "name": "name", "type": "STRING" }, { "name": "id", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "balance", "type": "FLOAT", "mode": "REQUIRED" } ] }

첫 번째 경우(단일 문자열)에는 모든 필드가 NULLABLE로 간주됩니다. 대신 JSON 접근 방식을 사용하면 모드를 지정할 수 있습니다.

  1. 테이블 스키마를 정의한 후에는 싱크를 DAG에 추가할 수 있습니다.
p | 'WriteToBQ' >> beam.io.WriteToBigQuery( 'project:dataset.table', schema=table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ) 참고: WRITE_TRUNCATE는 매번 테이블을 삭제하고 다시 만듭니다. 이렇게 하면 파이프라인 초기 반복에 유용하지만(특히 스키마를 반복할 때), 프로덕션에서 의도치 않은 문제가 일어나기 쉽습니다. WRITE_APPEND 또는 WRITE_EMPTY가 더 안전합니다.

테이블 스키마를 정의하는 것과 더불어, 파이프라인에 BigQuery 싱크를 추가하는 것도 잊지 마세요. 막히는 부분이 있다면 해결 방법을 참고하세요.

작업 6. 파이프라인 실행

  1. 터미널로 돌아가서 이전과 거의 동일한 명령어를 사용하여 파이프라인을 실행합니다. 하지만 이번에는 DataflowRunner를 사용하여 Dataflow에서 파이프라인을 실행합니다.
# Set up environment variables cd $BASE_DIR export PROJECT_ID=$(gcloud config get-value project) # Run the pipelines python3 my_pipeline.py \ --project=${PROJECT_ID} \ --region={{{project_0.startup_script.lab_region|Region}}} \ --stagingLocation=gs://$PROJECT_ID/staging/ \ --tempLocation=gs://$PROJECT_ID/temp/ \ --runner=DataflowRunner

전체적인 형태는 Read 변환으로 시작하여 Write 변환으로 끝나는 단일 경로여야 합니다. 파이프라인이 실행되면 서비스가 파이프라인의 요구사항을 파악하여 작업자가 자동으로 추가되고, 작업자가 더 이상 필요하지 않으면 없어집니다. Compute Engine으로 이동하면 Dataflow 서비스에서 생성한 가상 머신을 확인할 수 있습니다.

참고: 파이프라인은 성공적으로 빌드되지만 Dataflow 서비스의 코드 또는 잘못된 구성으로 인해 많은 오류가 발생하는 경우, runnerDirectRunner로 다시 설정하여 로컬에서 실행하고 더 빠른 피드백을 받을 수 있습니다. 이 경우에는 데이터 세트가 작고 DirectRunner에서 지원하지 않는 특성을 사용하지 않기 때문에 이 방식이 적합합니다.
  1. 파이프라인이 완료되면 BigQuery 브라우저 창으로 돌아가 테이블을 쿼리합니다.

코드가 예상대로 작동하지 않고 어떻게 해야 할지 잘 모르겠다면 해결 방법을 참고하세요.

내 진행 상황 확인하기를 클릭하여 목표를 확인하세요. 파이프라인 실행

실습 2부. 기본 ETL 파라미터화

약 20분

데이터 엔지니어의 작업 대부분은 반복 작업과 같이 예측 가능하거나 다른 작업과 유사한 경우가 많습니다. 그러나 파이프라인을 실행하는 프로세스에는 엔지니어링 전문 지식이 필요합니다. 방금 완료한 단계를 떠올려 보세요.

  1. 개발 환경을 만들고 파이프라인을 개발했습니다. 이 환경에는 Apache Beam SDK와 기타 종속 항목이 포함되었습니다.
  2. 개발 환경에서 파이프라인을 실행했습니다. Apache Beam SDK는 Cloud Storage에서 파일을 스테이징하고, 작업 요청 파일을 만들고, 파일을 Dataflow 서비스에 제출했습니다.

API 호출을 통해 작업을 시작하거나 개발 환경을 설정하지 않고도 작업을 시작할 수 있는 방법이 있다면 훨씬 좋을 것입니다(기술 지식이 없는 사용자는 개발 환경을 설정하기가 어려움). 이렇게 되면 파이프라인을 실행하기도 용이해집니다.

Dataflow 템플릿을 사용하면 파이프라인이 컴파일될 때 생성되는 표현을 파라미터화할 수 있도록 변경하여 이 문제를 해결할 수 있습니다. 아쉽게도 이 방법은 명령줄 파라미터를 노출하는 것만큼 간단하지는 않습니다. 이 부분은 나중에 실습에서 진행하게 됩니다. Dataflow 템플릿을 사용하면 위의 워크플로가 다음과 같이 바뀝니다.

  1. 개발자는 개발 환경을 만들고 파이프라인을 개발합니다. 이 환경에는 Apache Beam SDK와 기타 종속 항목이 포함됩니다.
  2. 개발자는 파이프라인을 실행하고 템플릿을 만듭니다. Apache Beam SDK가 Cloud Storage에서 파일을 스테이징하고, 템플릿 파일을 만들고(작업 요청과 유사), 템플릿 파일을 Cloud Storage에 저장합니다.
  3. 개발자가 아닌 사용자도, 또는 Airflow와 같은 다른 워크플로 도구에서도 Google Cloud 콘솔, gcloud 명령줄 도구 또는 REST API로 작업을 간편하게 실행하여 템플릿 파일 실행 요청을 Dataflow 서비스에 제출할 수 있습니다.

이 실습에서는 Google에서 만든 Dataflow 템플릿 중 하나를 사용하여 1부에서 빌드한 파이프라인과 동일한 작업을 수행하는 연습을 진행합니다.

작업 1. JSON 스키마 파일 만들기

이전과 마찬가지로 이 예시에서는 스키마를 나타내는 JSON 파일을 Dataflow 템플릿에 전달해야 합니다.

  1. IDE의 터미널로 돌아갑니다. 다음 명령어를 실행하여 기본 디렉터리로 돌아간 다음, 기존 logs.logs 테이블에서 스키마를 가져옵니다.
cd $BASE_DIR/../.. bq show --schema --format=prettyjson logs.logs
  1. 이제 이 출력을 파일에 캡처하고 GCS에 업로드합니다. 추가 sed 명령어는 Dataflow가 예상하는 전체 JSON 객체를 빌드하기 위한 것입니다.
bq show --schema --format=prettyjson logs.logs | sed '1s/^/{"BigQuery Schema":/' | sed '$s/$/}/' > schema.json cat schema.json export PROJECT_ID=$(gcloud config get-value project) gcloud storage cp schema.json gs://${PROJECT_ID}/

내 진행 상황 확인하기를 클릭하여 목표를 확인하세요. JSON 스키마 파일 만들기

작업 2. JavaScript 사용자 정의 함수 작성

Cloud Storage에서 BigQuery로 데이터를 전달하는 Dataflow 템플릿을 사용하려면 원시 텍스트를 유효한 JSON으로 변환하는 JavaScript 함수가 필요합니다. 이 경우 텍스트의 각 줄은 유효한 JSON이므로 함수는 간단한 편입니다.

  1. 이 작업을 완료하려면 IDE의 파일 탐색기에서 dataflow_python 폴더에 새 파일을 만듭니다.

  2. 새 파일을 만들려면 파일 >> 새 파일 >> 텍스트 파일을 클릭합니다.

  3. 파일 이름을 transform.js로 변경합니다. 파일을 마우스 오른쪽 버튼으로 클릭하여 파일 이름을 변경할 수 있습니다.

  4. 편집기 패널에서 transform.js 파일을 클릭하여 엽니다.

  5. 아래 함수를 transform.js 파일에 복사하고 저장합니다.

function transform(line) { return line; }
  1. 그런 다음 아래 명령어를 실행하여 파일을 Google Cloud Storage에 복사합니다.
export PROJECT_ID=$(gcloud config get-value project) gcloud storage cp *.js gs://${PROJECT_ID}/

내 진행 상황 확인하기를 클릭하여 목표를 확인하세요. JavaScript 파일에 JavaScript 사용자 정의 함수 작성

작업 3. Dataflow 템플릿 실행

  1. Dataflow 웹 UI로 이동합니다.
  2. 템플릿에서 작업 생성을 클릭합니다.
  3. Dataflow 작업의 작업 이름을 입력합니다.
  4. Dataflow 템플릿 아래에서 스트리밍 섹션이 아닌 대량 데이터 처리(일괄 작업) 섹션의 Cloud Storage의 텍스트 파일을 BigQuery로 이동 템플릿을 선택합니다.
  5. Cloud Storage 입력 파일에서 events.json의 경로를 형식으로 입력합니다.
  6. BigQuery 스키마 파일의 Cloud Storage 위치에서 schema.json 파일의 경로를 형식으로 작성합니다.
  7. BigQuery 출력 테이블 값을 입력합니다.
  8. 임시 BigQuery 디렉터리에서 동일한 버킷 내에 새 폴더를 입력합니다. 작업에서 새 폴더가 자동으로 생성됩니다.
  9. 임시 위치에서 동일한 버킷 내에 두 번째 새 폴더를 입력합니다.
  10. 암호화Google 관리 암호화 키로 둡니다.
  11. 선택적 파라미터를 클릭하여 엽니다.
  12. Cloud Storage의 JavaScript UDF 경로에서 .js 파일의 경로를 형식으로 입력합니다.
  13. JavaScript UDF 이름transform을 입력합니다.
  14. 작업 실행 버튼을 클릭합니다.

작업이 실행되는 동안 Dataflow 웹 UI 내에서 작업을 검사할 수 있습니다.

내 진행 상황 확인하기를 클릭하여 목표를 확인하세요. Dataflow 템플릿 실행

작업 4. Dataflow 템플릿 코드 검사

TextIOToBigQuery 가이드에서 방금 사용한 Dataflow 템플릿의 코드를 찾아볼 수 있습니다.

  • 아래로 스크롤하여 기본 메서드를 찾습니다. 이 코드는 우리가 작성한 파이프라인과 유사합니다.

    • 코드는 PipelineOptions 객체를 사용하여 생성된 Pipeline 객체로 시작합니다.
    • TextIO.read() 변환으로 시작하는 여러 PTransform의 연결로 구성됩니다.
    • Read 변환 후의 PTransform은 약간 다릅니다. 예를 들어 소스 형식이 BigQuery 테이블 형식과 잘 맞지 않는 경우 JavaScript를 사용하여 입력 문자열을 변환할 수 있습니다. 이 특성의 사용 방법에 관한 문서는 이 페이지를 참고하세요.
    • JavaScript UDF 이후의 PTransform은 라이브러리 함수를 사용하여 JSON을 TableRow로 변환합니다. 여기에서 해당 코드를 검사할 수 있습니다.
    • Write PTransform은 그래프 컴파일 시간에 알려진 스키마를 사용하는 대신 런타임에만 알려지는 파라미터를 수락하도록 코드가 설계되었기 때문에 약간 다르게 보입니다. 이를 가능하게 하는 것이 바로 NestedValueProvider 클래스입니다.

다음 실습에서는 단순한 PTransform 연결이 아닌 파이프라인을 만드는 방법과 빌드한 파이프라인을 커스텀 Dataflow 템플릿으로 조정하는 방법을 알아보겠습니다.

실습 종료하기

실습을 완료하면 실습 종료를 클릭합니다. Google Cloud Skills Boost에서 사용된 리소스를 자동으로 삭제하고 계정을 지웁니다.

실습 경험을 평가할 수 있습니다. 해당하는 별표 수를 선택하고 의견을 입력한 후 제출을 클릭합니다.

별점의 의미는 다음과 같습니다.

  • 별표 1개 = 매우 불만족
  • 별표 2개 = 불만족
  • 별표 3개 = 중간
  • 별표 4개 = 만족
  • 별표 5개 = 매우 만족

의견을 제공하고 싶지 않다면 대화상자를 닫으면 됩니다.

의견이나 제안 또는 수정할 사항이 있다면 지원 탭을 사용하세요.

Copyright 2026 Google LLC All rights reserved. Google 및 Google 로고는 Google LLC의 상표입니다. 기타 모든 회사명 및 제품명은 해당 업체의 상표일 수 있습니다.

시작하기 전에

  1. 실습에서는 정해진 기간 동안 Google Cloud 프로젝트와 리소스를 만듭니다.
  2. 실습에는 시간 제한이 있으며 일시중지 기능이 없습니다. 실습을 종료하면 처음부터 다시 시작해야 합니다.
  3. 화면 왼쪽 상단에서 실습 시작을 클릭하여 시작합니다.

시크릿 브라우징 사용

  1. 실습에 입력한 사용자 이름비밀번호를 복사합니다.
  2. 비공개 모드에서 콘솔 열기를 클릭합니다.

콘솔에 로그인

    실습 사용자 인증 정보를 사용하여
  1. 로그인합니다. 다른 사용자 인증 정보를 사용하면 오류가 발생하거나 요금이 부과될 수 있습니다.
  2. 약관에 동의하고 리소스 복구 페이지를 건너뜁니다.
  3. 실습을 완료했거나 다시 시작하려고 하는 경우가 아니면 실습 종료를 클릭하지 마세요. 이 버튼을 클릭하면 작업 내용이 지워지고 프로젝트가 삭제됩니다.

현재 이 콘텐츠를 이용할 수 없습니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

감사합니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

한 번에 실습 1개만 가능

모든 기존 실습을 종료하고 이 실습을 시작할지 확인하세요.

시크릿 브라우징을 사용하여 실습 실행하기

이 실습을 실행하는 가장 좋은 방법은 시크릿 모드 또는 시크릿 브라우저 창을 사용하는 것입니다. 개인 계정과 학생 계정 간의 충돌로 개인 계정에 추가 요금이 발생하는 일을 방지해 줍니다.