실습 설정 안내 및 요구사항
계정과 진행 상황을 보호하세요. 이 실습을 실행하려면 항상 시크릿 브라우저 창과 실습 사용자 인증 정보를 사용하세요.

Dataflow를 사용한 서버리스 데이터 처리 - Apache Beam 및 Dataflow를 사용한 ETL 파이프라인 작성(Java)

실습 1시간 30분 universal_currency_alt 크레딧 5개 show_chart 중급
info 이 실습에는 학습을 지원하는 AI 도구가 통합되어 있을 수 있습니다.
이 콘텐츠는 아직 휴대기기에 최적화되지 않음
최상의 경험을 위해 데스크톱 컴퓨터에서 이메일로 전송된 링크를 사용하여 방문하세요.

개요

이 실습에서 학습할 내용은 다음과 같습니다.

  • Apache Beam에서 일괄 ETL(Extract-Transform-Load) 파이프라인을 빌드합니다. 이 파이프라인은 Google Cloud Storage에서 원시 데이터를 가져와 BigQuery에 작성하는 역할을 합니다.
  • Dataflow에서 Apache Beam 파이프라인을 실행합니다.
  • 파이프라인 실행을 파라미터화합니다.

기본 요건:

  • Java에 대한 기본 지식

설정 및 요건

각 실습에서는 정해진 기간 동안 새 Google Cloud 프로젝트와 리소스 집합이 무료로 제공됩니다.

  1. 시크릿 창을 사용하여 Qwiklabs에 로그인합니다.

  2. 실습 사용 가능 시간(예: 1:15:00)을 참고하여 해당 시간 내에 완료합니다.
    일시중지 기능은 없습니다. 필요한 경우 다시 시작할 수 있지만 처음부터 시작해야 합니다.

  3. 준비가 되면 실습 시작을 클릭합니다.

  4. 실습 사용자 인증 정보(사용자 이름비밀번호)를 기록해 두세요. Google Cloud Console에 로그인합니다.

  5. Google Console 열기를 클릭합니다.

  6. 다른 계정 사용을 클릭한 다음, 안내 메시지에 실습에 대한 사용자 인증 정보를 복사하여 붙여넣습니다.
    다른 사용자 인증 정보를 사용하는 경우 오류가 발생하거나 요금이 부과됩니다.

  7. 약관에 동의하고 리소스 복구 페이지를 건너뜁니다.

Google Cloud Shell 활성화하기

Google Cloud Shell은 다양한 개발 도구가 탑재된 가상 머신으로, 5GB의 영구 홈 디렉터리를 제공하며 Google Cloud에서 실행됩니다.

Google Cloud Shell을 사용하면 명령줄을 통해 Google Cloud 리소스에 액세스할 수 있습니다.

  1. Cloud 콘솔의 오른쪽 상단 툴바에서 'Cloud Shell 열기' 버튼을 클릭합니다.

    강조 표시된 Cloud Shell 아이콘

  2. 계속을 클릭합니다.

환경을 프로비저닝하고 연결하는 데 몇 분 정도 소요됩니다. 연결되면 사용자가 미리 인증되어 프로젝트가 PROJECT_ID로 설정됩니다. 예:

Cloud Shell 터미널에 강조 표시된 프로젝트 ID

gcloud는 Google Cloud의 명령줄 도구입니다. Cloud Shell에 사전 설치되어 있으며 명령줄 자동 완성을 지원합니다.

  • 다음 명령어를 사용하여 사용 중인 계정 이름을 나열할 수 있습니다.
gcloud auth list

출력:

Credentialed accounts: - @.com (active)

출력 예시:

Credentialed accounts: - google1623327_student@qwiklabs.net
  • 다음 명령어를 사용하여 프로젝트 ID를 나열할 수 있습니다.
gcloud config list project

출력:

[core] project =

출력 예시:

[core] project = qwiklabs-gcp-44776a13dea667a6 참고: gcloud 전체 문서는 gcloud CLI 개요 가이드를 참조하세요.

프로젝트 권한 확인

Google Cloud에서 작업을 시작하기 전에 프로젝트가 Identity and Access Management(IAM) 내에서 올바른 권한을 보유하고 있는지 확인해야 합니다.

  1. Google Cloud 콘솔의 탐색 메뉴(탐색 메뉴 아이콘)에서 IAM 및 관리자 > IAM을 선택합니다.

  2. 기본 컴퓨팅 서비스 계정 {project-number}-compute@developer.gserviceaccount.com이 있고 editor 역할이 할당되어 있는지 확인하세요. 계정 프리픽스는 프로젝트 번호이며, 이 번호는 탐색 메뉴 > Cloud 개요 > 대시보드에서 확인할 수 있습니다.

Compute Engine 기본 서비스 계정 이름과 편집자 상태가 강조 표시된 권한 탭 페이지

참고: 계정이 IAM에 없거나 editor 역할이 없는 경우 다음 단계에 따라 필요한 역할을 할당합니다.
  1. Google Cloud 콘솔의 탐색 메뉴에서 Cloud 개요 > 대시보드를 클릭합니다.
  2. 프로젝트 번호(예: 729328892908)를 복사합니다.
  3. 탐색 메뉴에서 IAM 및 관리자 > IAM을 선택합니다.
  4. 역할 테이블 상단에서 주 구성원별로 보기 아래에 있는 액세스 권한 부여를 클릭합니다.
  5. 새 주 구성원 필드에 다음을 입력합니다.
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number}는 프로젝트 번호로 바꿉니다.
  2. 역할 필드에서 프로젝트(또는 기본) > 편집자를 선택합니다.
  3. 저장을 클릭합니다.

IDE 설정

이 실습에서는 Google Compute Engine에서 호스팅되는 Theia Web IDE를 주로 사용합니다. 실습 저장소가 사전에 클론되어 있습니다. Java 언어 서버가 지원되며, Cloud Shell처럼 gcloud 명령줄 도구를 통해 Google Cloud API에 프로그래매틱 방식으로 액세스할 수 있는 터미널도 제공됩니다.

  1. Theia IDE에 액세스하려면 Google Skills에 표시된 링크를 복사하여 새 탭에 붙여넣습니다.
참고: 환경이 완전히 프로비저닝되려면 URL이 표시된 후에도 3~5분 정도 기다려야 할 수 있습니다. 환경이 완전히 프로비저닝되지 않으면 브라우저에 오류가 표시됩니다.

ide_url이 표시된 사용자 인증 정보 창

실습 저장소가 환경에 클론되었습니다. 각 실습은 사용자가 완성해야 하는 코드가 포함된 labs 폴더와, 문제 발생 시 참고할 수 있는 정상 작동 예시가 포함된 solution 폴더로 구분되어 있습니다.

  1. File Explorer 버튼을 클릭하여 다음을 확인합니다.

실습 폴더가 강조 표시된 확장된 파일 탐색기 메뉴

Cloud Shell을 사용할 때처럼 이 환경에서 여러 터미널을 만들 수도 있습니다.

터미널 메뉴에서 강조 표시된 새 터미널 옵션

터미널에서 gcloud auth list를 실행하면, 제공된 서비스 계정으로 로그인되어 있음을 확인할 수 있습니다. 이 서비스 계정은 실습 사용자 계정과 동일한 권한을 가지고 있습니다.

gcloud auth list 명령어를 표시하는 터미널

환경이 작동하지 않는다면, GCE 콘솔에서 IDE를 호스팅하는 VM을 다음과 같이 재설정할 수 있습니다.

VM 인스턴스 페이지에서 강조 표시된 재설정 버튼과 VM 인스턴스 이름

Apache Beam 및 Dataflow

약 5분

Dataflow는 일괄 및 스트리밍 Apache Beam 데이터 처리 파이프라인을 실행하기 위한 완전 관리형 Google Cloud 서비스입니다.

Apache Beam은 오픈소스 기반의 포팅 가능한 고급 통합 데이터 처리 프로그래밍 모델로, 최종 사용자가 Java, Python 또는 Go를 사용하여 일괄 및 스트리밍 데이터 병렬 처리 파이프라인을 모두 정의하는 용도로 활용할 수 있습니다. Apache Beam 파이프라인은 로컬 개발 머신에서 소규모 데이터 세트에 대해 실행할 수도 있고, Dataflow에서 대규모로 실행할 수도 있습니다. 다만 Apache Beam은 오픈소스이므로 Apache Flink, Apache Spark를 비롯한 다른 실행기에서도 Beam 파이프라인을 실행할 수 있습니다.

dBeam 모델 아키텍처 다이어그램

실습 1부. ETL 파이프라인 처음부터 작성하기

소개

이 섹션에서는 Apache Beam 추출-변환-로드(ETL) 파이프라인을 처음부터 작성합니다.

데이터 세트 및 사용 사례 검토

이 퀘스트의 각 실습에서 사용하는 입력 데이터는 일반적인 로그 형식의 웹 서버 로그, 그리고 웹 서버에 포함될 법한 다른 데이터와 유사하게 구성되었습니다. 첫 번째 실습에서는 데이터를 배치 소스로 처리하며, 이후 실습에서는 데이터를 스트리밍 소스로 처리합니다. 이 실습 과제는 데이터를 읽고 파싱한 다음, 나중에 데이터 분석에 활용할 수 있도록 서버리스 데이터 웨어하우스인 BigQuery에 데이터를 쓰는 방식으로 진행됩니다.

적절한 실습 열기

  • 아직 IDE 환경에서 새 터미널을 만들지 않았다면 새 터미널을 만들고 다음 명령어를 복사해 붙여넣습니다.
# Change directory into the lab cd 1_Basic_ETL/labs export BASE_DIR=$(pwd)

pom.xml 파일 수정

실제 파이프라인 코드를 수정하기 전에 필요한 종속 항목을 추가해야 합니다.

  1. 1_Basic_ETL/labs에 있는 pom.xml 파일의 dependencies 태그 안에 다음 종속 항목을 추가합니다.
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId> <version>${beam.version}</version> </dependency>
  1. 설치할 Beam 버전을 나타내는 <beam.version> 태그가 이미 pom.xml에 추가되어 있습니다.

  2. 파일을 저장합니다.

  3. 마지막으로 파이프라인에서 사용하기 위해 종속 항목을 다운로드합니다.

# Download dependencies listed in pom.xml mvn clean dependency:resolve

첫 번째 파이프라인 작성

1시간

작업 1. 합성 데이터 생성

  1. 셸에서 다음 명령어를 실행하여 합성 웹 서버 로그 생성을 위한 스크립트가 포함된 저장소를 클론합니다.
# Change to the directory containing the relevant code cd $BASE_DIR/../.. # Create GCS buckets and BQ dataset source create_batch_sinks.sh # Run a script to generate a batch of web server log events bash generate_batch_events.sh # Examine some sample events head events.json

이 스크립트를 실행하면 다음과 유사한 줄을 포함하는 events.json 파일이 생성됩니다.

{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

그런 다음 이 파일은 gs://<YOUR-PROJECT-ID>/events.json 경로의 Google Cloud Storage 버킷에 자동으로 복사됩니다.

  1. Google Cloud Storage로 이동하여 스토리지 버킷에 events.json이란 파일이 있는지 확인합니다.

내 진행 상황 확인하기를 클릭하여 목표를 확인하세요. 합성 데이터 생성

작업 2. 소스에서 데이터 읽기

이 섹션이나 이후 섹션에서 이해되지 않는 부분이 있는 경우 해결 방법(영문)을 참고하세요.

  1. IDE에서 1_Basic_ETL/labs/src/main/java/com/mypackage/pipeline 경로의 MyPipeline.java를 엽니다. 다음 패키지를 가져왔는지 확인합니다.
import com.google.gson.Gson; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.JavaFieldSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
  1. 아래로 스크롤하여 run() 메서드를 찾습니다. 이 메서드는 현재 아무 작업도 하지 않는 파이프라인을 포함하고 있습니다. PipelineOptions 객체를 통해 Pipeline 객체가 생성된 방식과 메서드의 마지막 줄에서 파이프라인을 실행하는 방식을 확인하세요.
Pipeline pipeline = Pipeline.create(options); // Do stuff pipeline.run();

Apache Beam 파이프라인의 모든 데이터는 PCollection에 상주합니다. 파이프라인의 초기 PCollection을 생성하려면 파이프라인 객체에 루트 변환을 적용하세요. 루트 변환은 외부 데이터 소스 또는 직접 지정한 로컬 데이터에서 PCollection을 만듭니다.

Beam SDK에는 Read와 Create라는 두 가지 종류의 루트 변환이 있습니다. Read 변환은 텍스트 파일이나 데이터베이스 테이블과 같은 외부 소스에서 데이터를 읽습니다. Create 변환은 메모리 내 java.util.Collection에서 PCollection을 만들며 특히 테스트에 유용합니다.

다음 예시 코드는 TextIO.Read 루트 변환을 적용하여 텍스트 파일에서 데이터를 읽는 방법을 보여줍니다. 이 변환은 Pipeline 객체 p에 적용되며 PCollection<String> 형식의 파이프라인 데이터 세트를 반환합니다. 'ReadLines'는 해당 변환의 이름으로, 나중에 더 규모가 큰 파이프라인에서 작업을 수행할 때 활용됩니다.

PCollection<String> lines = pipeline.apply("ReadLines", TextIO.read().from("gs://path/to/input.txt"));
  1. run() 메서드 내에서 'input'이라는 문자열 상수를 만들고 값을 gs://<YOUR-PROJECT-ID>/events.json으로 설정합니다. 향후 실습에서 명령줄 파라미터를 사용하여 이 정보를 전달할 예정입니다.

  2. TextIO.read() 변환을 호출하여 events.json의 모든 이벤트 문자열로 구성된 PCollection을 생성합니다.

  3. 또한 적절한 import 문을 MyPipeline.java 상단에 추가합니다. 이 경우에는 import org.apache.beam.sdk.values.PCollection;이 됩니다.

작업 3. 파이프라인 실행을 통한 작동 여부 확인

  • 터미널로 돌아가서 $BASE_DIR 폴더로 이동한 후 mvn compile exec:java 명령어를 실행합니다.
cd $BASE_DIR # Set up environment variables export MAIN_CLASS_NAME=com.mypackage.pipeline.MyPipeline mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} 참고: 빌드가 실패하면 mvn clean install 명령어를 실행하세요.

이 시점의 파이프라인은 실제로는 아무 작업도 수행하지 않고 데이터를 읽기만 합니다. 하지만 이 파이프라인 실행은 더 높은 비용이 드는 계산을 수행하기 전에 로컬 머신에서 실행되는 DirectRunner를 사용하여 로컬에서 저렴하게 파이프라인을 검증할 수 있다는 점에서 유용합니다. Dataflow를 사용하여 파이프라인을 실행하려면 runnerDataflowRunner로 변경하면 됩니다.

작업 4. 변환 추가

막히는 부분이 있다면 해결 방법(영문)을 참고하세요.

변환은 데이터를 변경하는 작업입니다. Apache Beam에서는 PTransform 클래스를 통해 변환이 이루어집니다. 런타임 시 이러한 작업은 여러 독립적인 작업자를 통해 수행되며 모든 PTransform의 입력과 출력은 PCollection으로 이루어집니다. 인지하지 못했을 수도 있지만 사실 Google Cloud Storage에서 데이터를 읽을 때 이미 PTransform을 사용했습니다. 직접 변수에 할당하지 않았더라도 이 과정에서 PCollection 문자열이 생성되었습니다.

Beam은 PCollection에 대해 일반적인 apply 메서드를 사용하므로 변환을 순차적으로 체인화할 수 있습니다. 예를 들어 다음과 같이 변환을 체인화하여 다음과 같은 순차적인 파이프라인을 만들 수 있습니다.

[Final Output PCollection] = [Initial Input PCollection].apply([First Transform]) .apply([Second Transform]) .apply([Third Transform]);

이 작업에서는 새로운 종류의 변환인 ParDo를 사용합니다. ParDo는 일반 병렬 처리를 위한 Beam 변환입니다. ParDo 처리 패러다임은 Map/Shuffle/Reduce 스타일 알고리즘의 'Map' 단계와 유사합니다. ParDo 변환은 입력 PCollection의 각 요소를 고려하고, 해당 요소에 대해 일부 처리 함수(사용자 코드)를 수행하며, 0개, 1개 또는 여러 개의 요소를 출력 PCollection에 내보냅니다.

ParDo는 다음과 같은 다양한 데이터 처리 작업에 유용합니다.

  • 데이터 세트 필터링. ParDo를 사용하여 PCollection의 각 요소를 고려하고, 해당 요소를 새 컬렉션에 출력할 것인지 아니면 삭제할 것인지 결정할 수 있습니다.
  • 데이터 세트의 각 요소 형식 지정 또는 유형 변환. 입력 PCollection에 원하는 것과 다른 유형이나 형식의 요소가 포함된 경우, ParDo를 사용하여 각 요소에서 변환을 수행하고 결과를 새 PCollection으로 출력할 수 있습니다.
  • 데이터 세트에서 각 요소의 부분 추출. 예를 들어 여러 필드가 있는 레코드의 PCollection이 있는 경우 ParDo를 사용하여 고려하려는 필드만 새 PCollection으로 파싱할 수 있습니다.
  • 데이터 세트에서 각 요소의 계산 수행. ParDo를 사용하여 PCollection의 모든 요소 또는 특정 요소에 대해 단순하거나 복잡한 계산을 수행하고 결과를 새 PCollection으로 출력할 수 있습니다.
  1. 이 작업을 완료하려면 단일 이벤트를 나타내는 JSON 문자열을 읽고, Gson을 사용하여 파싱하고, Gson이 반환한 커스텀 객체를 출력하는 ParDo 변환을 작성합니다.

ParDo 함수는 인라인 방식이나 정적 클래스 방식으로 구현할 수 있습니다. 인라인 ParDo 함수 작성 방법은 다음과 같습니다.

pCollection.apply(ParDo.of(new DoFn<T1, T2>() { @ProcessElement public void processElement(@Element T1 i, OutputReceiver<T2> r) { // Do something r.output(0); } }));

또는 다음과 같이 DoFn을 확장하는 정적 클래스로 구현할 수도 있습니다. 이렇게 하면 테스트 프레임워크와의 통합이 더 쉬워집니다.

static class MyDoFn extends DoFn<T1, T2> { @ProcessElement public void processElement(@Element T1 json, OutputReceiver<T2> r) throws Exception { // Do something r.output(0); } }

그런 다음 파이프라인 자체 내에서 다음과 같이 실행합니다.

[Initial Input PCollection].apply(ParDo.of(new MyDoFn());
  1. Gson을 사용하려면 MyPipeline 내부에 내부 클래스를 만들어야 합니다. Beam 스키마를 활용하려면 @DefaultSchema 주석을 추가하세요. 이에 대한 자세한 내용은 후반부에 나옵니다. 다음은 Gson을 사용하는 방법의 예입니다.
// Elsewhere @DefaultSchema(JavaFieldSchema.class) class MyClass { int field1; String field2; } // Within the DoFn Gson gson = new Gson(); MyClass myClass = gson.fromJson(jsonString, MyClass.class);
  1. 내부 클래스 이름을 CommonLog로 지정합니다. 이 내부 클래스를 올바른 상태 변수로 구성하려면 위의 JSON 샘플을 참조하세요. 클래스에는 수신되는 JSON에 포함된 모든 키마다 하나의 상태 변수가 있어야 하며 이 변수는 유형과 이름이 값 및 키와 일치해야 합니다.

  2. 지금은 'timestamp'에는 String을, 'INTEGER'에는 Long(BigQuery Integer는 INT64)을, 'FLOAT'에는 Double(BigQuery FLOAT는 FLOAT64)을 사용하고 다음 BigQuery 스키마와 일치하도록 구성해야 합니다.

user_id, timestamp, num_bytes 등의 로그 정보가 포함된 CommonLog 스키마 탭 페이지

막히는 부분이 있다면 해결 방법(영문)을 참고하세요.

작업 5. 싱크에 쓰기

이 단계의 파이프라인은 Google Cloud Storage에서 파일을 읽고 각 줄을 파싱한 다음 각 요소마다 CommonLog를 내보냅니다. 다음 단계는 CommonLog 객체를 BigQuery 테이블에 작성하는 것입니다.

필요한 경우 파이프라인에 BigQuery 테이블을 만들도록 지시할 수 있지만 데이터 세트는 미리 생성되어 있어야 합니다. 이 작업은 generate_batch_events.sh 스크립트에서 이미 완료되었습니다.

다음 명령어로 데이터 세트를 확인할 수 있습니다.

# Examine dataset bq ls # No tables yet bq ls logs

파이프라인의 최종 PCollection을 출력하려면 해당 PCollection에 Write 변환을 적용합니다. Write 변환은 PCollection의 요소를 데이터베이스 테이블과 같은 외부 데이터 싱크에 출력할 수 있습니다. 일반적으로는 파이프라인의 마지막에 데이터를 출력하지만 Write를 사용하면 파이프라인의 어느 시점에나 PCollection을 출력할 수 있습니다.

다음 예시 코드는 TextIO.Write 변환을 적용하여 PCollection 문자열을 텍스트 파일에 쓰는 방식을 보여줍니다.

PCollection<String> pCollection = ...; pCollection.apply("WriteMyFile", TextIO.write().to("gs://path/to/output"));
  1. 이 경우 TextIO.write() 대신 BigQueryIO.write()를 사용합니다.

이 함수에는 쓰기 대상 테이블, 테이블 스키마 등 여러 항목을 지정해야 합니다. 필요에 따라 기존 테이블에 추가할지, 기존 테이블을 다시 만들지(초기 파이프라인 반복 단계에 유용함), 테이블이 없는 경우 테이블을 생성할지 여부를 지정할 수 있습니다. 이 변환은 기본적으로 테이블이 없으면 생성하지만 기존 테이블이 비어 있지 않으면 쓰기를 허용하지 않습니다.

Beam 스키마가 SDK에 추가된 이후로는 .useBeamSchema()를 사용하고 입력 유형을 표시하여 변환에 전달되는 객체에서 테이블 스키마를 추론하도록 변환에 지시할 수 있습니다. 또는 .withSchema()를 사용하여 스키마를 명시적으로 제공할 수 있지만 이 경우에는 전달할 BigQuery TableSchema 객체를 빌드해야 합니다. CommonLog 클래스에 @DefaultSchema(JavaFieldSchema.class)를 주석으로 지정했기 때문에 각 변환은 객체의 필드 이름과 유형을 모두 인식하며 여기에는 BigQueryIO.write()가 포함됩니다.

  1. BigQueryIO의 'Writing' 섹션에서 다양한 대안을 살펴보세요. 이 경우에는 CommonLog 객체에 주석을 지정했으므로 다음과 같이 .useBeamSchema()를 활용하고 <YOUR-PROJECT-ID>:logs.logs 테이블을 타겟으로 설정합니다.
pCollection.apply(BigQueryIO.<MyObject>write() .to("my-project:output_dataset.output_table") .useBeamSchema() .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) ); 참고: WRITE_TRUNCATE는 매번 테이블을 삭제하고 다시 만듭니다. 이는 초기 파이프라인 반복 단계, 특히 스키마를 반복 수정하는 과정에서는 유용하지만 프로덕션 환경에서는 의도치 않은 문제를 쉽게 유발할 수 있습니다. WRITE_APPEND 또는 WRITE_EMPTY가 더 안전합니다.

Beam 스키마에서 사용할 수 있는 모든 유형의 집합은 Schema. FieldType 문서를 참조하세요. 표준 SQL에서 사용 가능한 모든 BigQuery 데이터 유형은 setType 문서에서 확인할 수 있으며 궁금한 경우 Beam 스키마가 BigQuery로 변환되는 방식도 살펴보세요.

작업 6. 파이프라인 실행

터미널로 돌아가서 RUNNER 환경 변수 값을 DataflowRunner로 변경하고 전반부와 동일한 명령어를 사용하여 파이프라인을 실행합니다. 실행이 시작되면 Dataflow 제품 페이지로 이동해 파이프라인의 배열을 확인합니다. 변환에 이름을 지정했다면 해당 이름이 표시되고 각 변환을 클릭하면 초당 처리되는 요소 수를 실시간으로 확인할 수 있습니다.

전체적인 형태는 Read 변환으로 시작하여 Write 변환으로 끝나는 단일 경로여야 합니다. 파이프라인이 실행되면 서비스가 파이프라인의 요구사항을 파악하여 작업자를 자동으로 추가하고, 부하가 줄면 다시 제거합니다. 이를 확인하려면 Compute Engine으로 이동하면 Dataflow 서비스가 생성한 가상 머신을 볼 수 있습니다.

참고: 파이프라인은 성공적으로 빌드되지만 Dataflow 서비스의 코드 또는 잘못된 구성으로 인해 많은 오류가 발생하는 경우, runner를 DirectRunner로 다시 설정하여 로컬에서 실행해 더 빠른 피드백을 받을 수 있습니다. 이 경우에는 데이터 세트 규모가 적고 DirectRunner에서 지원하지 않는 특성을 사용하지 않기 때문에 이 방식이 적합합니다. # Set up environment variables export PROJECT_ID=$(gcloud config get-value project) export REGION='{{{project_0.default_region|Region}}}' export PIPELINE_FOLDER=gs://${PROJECT_ID} export MAIN_CLASS_NAME=com.mypackage.pipeline.MyPipeline export RUNNER=DataflowRunner cd $BASE_DIR mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER}"
  • 파이프라인 실행이 완료되면 BigQuery 브라우저 창으로 돌아가 테이블을 쿼리하세요.

코드가 예상대로 작동하지 않고 어떻게 해야 할지 잘 모르겠다면 해결 방법을 참고하세요.

참고: Dataflow > 작업으로 이동하여 작업 상태가 성공으로 변경될 때까지 기다립니다.

내 진행 상황 확인하기를 클릭하여 목표를 확인하세요. 파이프라인 실행

실습 2부. 기본 ETL 파라미터화

약 20분

데이터 엔지니어의 작업은 대부분 반복 작업처럼 예측 가능한 경우도 많고 다른 작업과 유사한 경우도 많습니다. 하지만 파이프라인 실행 과정은 엔지니어링 전문 지식을 요구합니다. 지금까지 완료한 단계를 정리하면 다음과 같습니다.

  1. 개발 환경을 생성하고 파이프라인을 개발했습니다. 그 결과 Apache Beam SDK와 기타 종속 항목을 포함한 환경이 생성되었습니다.
  2. 개발 환경에서 파이프라인을 실행했습니다. Apache Beam SDK가 Cloud Storage에서 파일을 스테이징하고, 작업 요청 파일을 만들고, 파일을 Dataflow 서비스에 제출했습니다.

비기술 사용자에게는 어려운 개발 환경 설정 작업 없이 API 호출이나 간단한 방법으로 작업을 실행할 수 있다면 훨씬 더 좋을 것입니다. 이렇게 하면 파이프라인을 손쉽게 반복 실행할 수도 있습니다.

Dataflow 템플릿은 이 문제를 해결하고자 파이프라인이 컴파일될 때 생성되는 표현을 파라미터화 가능한 형태로 변경합니다. 아쉽게도 이 방법은 명령줄 파라미터를 노출하는 것처럼 간단하지는 않습니다. 이 부분은 이후 실습을 통해 알아볼 예정입니다. Dataflow 템플릿을 사용하면 위의 워크플로가 다음과 같이 바뀝니다.

  1. 개발자는 개발 환경을 만들고 파이프라인을 개발합니다. 이 환경에는 Apache Beam SDK와 기타 종속 항목이 포함됩니다.
  2. 개발자는 파이프라인을 실행하고 템플릿을 만듭니다. Apache Beam SDK가 Cloud Storage에서 파일을 스테이징하고, 템플릿 파일을 만들고(작업 요청과 유사), 템플릿 파일을 Cloud Storage에 저장합니다.
  3. 개발자가 아닌 사용자 또는 Airflow와 같은 다른 워크플로 도구에서도 Google Cloud 콘솔, gcloud 명령줄 도구 또는 REST API를 통해 손쉽게 작업을 실행하여 Dataflow 서비스에 템플릿 파일 실행 요청을 제출할 수 있습니다.

이 실습에서는 Google에서 만든 Dataflow 템플릿 중 하나를 사용하여 1부에서 빌드한 파이프라인과 동일한 작업을 수행하는 연습을 진행합니다.

작업 1. JSON 스키마 파일 만들기

.usedBeamSchema()를 활용했기 때문에 BigQueryIO.writeTableRows() 변환에 TableSchema 객체를 전달할 필요는 없었지만, 이 예시에서는 스키마를 나타내는 JSON 파일을 Dataflow 템플릿에 전달해야 합니다.

  1. 터미널을 열고 기본 디렉터리로 다시 이동한 뒤 다음 명령어를 실행하여 기존 logs.logs 테이블에서 스키마를 가져옵니다.
cd $BASE_DIR/../.. bq show --schema --format=prettyjson logs.logs
  1. 이제 이 출력을 파일에 캡처하고 GCS에 업로드합니다. 추가 sed 명령어는 Dataflow가 예상하는 전체 JSON 객체를 빌드하기 위한 것입니다.
bq show --schema --format=prettyjson logs.logs | sed '1s/^/{"BigQuery Schema":/' | sed '$s/$/}/' > schema.json cat schema.json export PROJECT_ID=$(gcloud config get-value project) gcloud storage cp schema.json gs://${PROJECT_ID}/

내 진행 상황 확인하기를 클릭하여 목표를 확인하세요. JSON 스키마 파일 만들기

작업 2. JavaScript 사용자 정의 함수 작성

Cloud Storage에서 BigQuery로 데이터를 전달하는 Dataflow 템플릿을 사용하려면 원시 텍스트를 유효한 JSON으로 변환하는 JavaScript 함수가 필요합니다. 이 경우 텍스트의 각 줄이 유효한 JSON이므로 함수는 간단한 편입니다.

이 작업을 완료하려면 IDE를 사용하여 아래 내용이 포함된 .js 파일을 만든 다음 Google Cloud Storage에 복사하세요.

  1. 아래 함수를 1_Basic_ETL/ 폴더에 있는 자체 transform.js 파일에 복사합니다.
function transform(line) { return line; }
  1. 그런 다음 아래 명령어를 실행하여 파일을 Google Cloud Storage에 복사합니다.
cd 1_Basic_ETL/ export PROJECT_ID=$(gcloud config get-value project) gcloud storage cp *.js gs://${PROJECT_ID}/

내 진행 상황 확인하기를 클릭하여 목표를 확인하세요. JavaScript 사용자 정의 함수 작성

작업 3. Dataflow 템플릿 실행

  1. Dataflow 웹 UI로 이동합니다.

  2. 템플릿에서 작업 만들기를 클릭합니다.

  3. Dataflow 작업의 이름을 입력합니다.

  4. 리전 엔드포인트 리전을 선택합니다.

  5. Dataflow 템플릿 아래에서 스트리밍 섹션이 아닌 대량 데이터 처리(일괄 작업) 섹션의 Cloud Storage의 텍스트 파일을 BigQuery로 이동 템플릿을 선택합니다.

  6. Cloud Storage 입력 경로gs://<YOUR-PROJECT-ID>/events.json 형식으로 events.json의 경로를 입력합니다.

  7. BigQuery 스키마 파일의 Cloud Storage 위치gs://<YOUR-PROJECT-ID>/schema.json 형식으로 schema.json 파일의 경로를 작성합니다.

  8. BigQuery 출력 테이블<myprojectid>:logs.logs를 입력합니다.

  9. 임시 BigQuery 디렉터리에서는 동일한 버킷 내에 새 폴더를 입력합니다. 작업에서 새 폴더가 자동으로 생성됩니다.

  10. 임시 위치에서 동일한 버킷 내에 두 번째 새 폴더를 입력합니다.

  11. 암호화Google 관리 키로 둡니다.

  12. 선택적 파라미터를 클릭하여 엽니다.

  13. Cloud Storage의 JavaScript UDF 경로에서 gs://<YOUR-PROJECT-ID>/transform.js 형식으로 .js 경로를 입력합니다.

  14. JavaScript UDF 이름transform을 입력합니다.

  15. 작업 실행 버튼을 클릭합니다.

작업이 실행되는 동안 Dataflow 웹 UI 내에서 작업을 살펴볼 수 있습니다.

참고: Dataflow > 작업으로 이동하여 작업 상태가 성공으로 변경될 때까지 기다립니다.

내 진행 상황 확인하기를 클릭하여 목표를 확인하세요. Dataflow 템플릿 실행

작업 4. Dataflow 템플릿 코드 살펴보기

  1. 방금 사용한 Dataflow 템플릿의 코드를 다시 생각해 보세요.

  2. 아래로 스크롤하여 기본 메서드를 찾습니다. 이 코드는 앞서 작성한 파이프라인과 유사합니다.

  • 코드는 PipelineOptions 객체를 사용하여 생성된 Pipeline 객체로 시작합니다.
  • TextIO.read() 변환으로 시작하는 여러 PTransform 체인으로 구성됩니다.
  • Read 변환 후의 PTransform은 약간 다릅니다. 예를 들어 소스 형식이 BigQuery 테이블 형식과 잘 맞지 않는 경우 JavaScript를 사용하여 입력 문자열을 변환할 수 있습니다. 이 특성의 사용 방법에 관한 문서는 이 페이지를 참고하세요.
  • JavaScript UDF 이후의 PTransform은 라이브러리 함수를 사용하여 JSON을 TableRow로 변환합니다. 해당 코드는 여기에서 살펴볼 수 있습니다.
  • write PTransform은 그래프 컴파일 시점에 알려진 스키마를 활용하는 대신 런타임에만 알려지는 파라미터를 수락하도록 코드가 설계되었기 때문에 형태가 다소 다릅니다. 이를 가능하게 하는 것이 바로 NestedValueProvider 클래스입니다. 또한 .useBeamSchema()를 사용하여 Beam 스키마에서 추론한 스키마가 아닌 명시적으로 설정된 스키마를 사용하고 있습니다.
  1. 다음 실습에서는 단순한 PTransform 체인이 아닌 파이프라인을 만드는 방법과 빌드한 파이프라인을 커스텀 Dataflow 템플릿으로 변환하는 방법을 다룰 예정이니 꼭 확인해 보세요.

실습 종료하기

실습을 완료하면 실습 종료를 클릭합니다. Google Cloud Skills Boost에서 사용된 리소스를 자동으로 삭제하고 계정을 지웁니다.

실습 경험을 평가할 수 있습니다. 해당하는 별표 수를 선택하고 의견을 입력한 후 제출을 클릭합니다.

별점의 의미는 다음과 같습니다.

  • 별표 1개 = 매우 불만족
  • 별표 2개 = 불만족
  • 별표 3개 = 중간
  • 별표 4개 = 만족
  • 별표 5개 = 매우 만족

의견을 제공하고 싶지 않다면 대화상자를 닫으면 됩니다.

의견이나 제안 또는 수정할 사항이 있다면 지원 탭을 사용하세요.

Copyright 2026 Google LLC All rights reserved. Google 및 Google 로고는 Google LLC의 상표입니다. 기타 모든 회사명 및 제품명은 해당 업체의 상표일 수 있습니다.

시작하기 전에

  1. 실습에서는 정해진 기간 동안 Google Cloud 프로젝트와 리소스를 만듭니다.
  2. 실습에는 시간 제한이 있으며 일시중지 기능이 없습니다. 실습을 종료하면 처음부터 다시 시작해야 합니다.
  3. 화면 왼쪽 상단에서 실습 시작을 클릭하여 시작합니다.

시크릿 브라우징 사용

  1. 실습에 입력한 사용자 이름비밀번호를 복사합니다.
  2. 비공개 모드에서 콘솔 열기를 클릭합니다.

콘솔에 로그인

    실습 사용자 인증 정보를 사용하여
  1. 로그인합니다. 다른 사용자 인증 정보를 사용하면 오류가 발생하거나 요금이 부과될 수 있습니다.
  2. 약관에 동의하고 리소스 복구 페이지를 건너뜁니다.
  3. 실습을 완료했거나 다시 시작하려고 하는 경우가 아니면 실습 종료를 클릭하지 마세요. 이 버튼을 클릭하면 작업 내용이 지워지고 프로젝트가 삭제됩니다.

현재 이 콘텐츠를 이용할 수 없습니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

감사합니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

한 번에 실습 1개만 가능

모든 기존 실습을 종료하고 이 실습을 시작할지 확인하세요.

시크릿 브라우징을 사용하여 실습 실행하기

이 실습을 실행하려면 시크릿 모드 또는 시크릿 브라우저 창을 사용하세요. 개인 계정과 학생 계정 간의 충돌로 개인 계정에 추가 요금이 발생하는 일을 방지해 줍니다.