실습 설정 안내 및 요구사항
계정과 진행 상황을 보호하세요. 이 실습을 실행하려면 항상 시크릿 브라우저 창과 실습 사용자 인증 정보를 사용하세요.

Dataflow를 사용한 서버리스 데이터 처리 - Dataflow를 사용한 고급 스트리밍 분석 파이프라인(Java)

실습 1시간 30분 universal_currency_alt 크레딧 7개 show_chart 고급
info 이 실습에는 학습을 지원하는 AI 도구가 통합되어 있을 수 있습니다.
이 콘텐츠는 아직 휴대기기에 최적화되지 않음
최상의 경험을 위해 데스크톱 컴퓨터에서 이메일로 전송된 링크를 사용하여 방문하세요.

개요

이 실습에서 학습할 내용은 다음과 같습니다.

  • 지연된 데이터를 처리합니다.
  • 잘못된 형식의 데이터를 다음과 같이 처리합니다.
    1. 더 모듈화된 코드를 위한 복합 변환 작성
    2. 다양한 유형의 여러 출력을 내보내는 변환 작성
    3. 잘못된 형식의 데이터를 수집하여 검사 가능 위치에 작성

이전 실습의 마지막 부분에서는 실시간 파이프라인이 해결해야 할 과제 중 하나인 지연(이벤트가 발생한 시점과 처리되는 시점 사이의 간격)을 소개했습니다. 이 실습에서는 파이프라인 생성자가 이러한 지연을 공식적인 방식으로 처리하도록 파이프라인을 구성할 수 있는 Apache Beam 개념을 소개합니다.

하지만 스트리밍 컨텍스트에서 파이프라인이 직면하는 문제가 지연 시간만 있는 건 아닙니다. 시스템 외부에서 입력이 유입될 때마다 데이터가 어떤 형식으로든 잘못되어 있을 가능성은 항상 존재합니다. 이 실습에서는 이러한 입력을 처리하는 데 사용할 수 있는 기법도 소개합니다.

이 실습의 최종 파이프라인은 아래 그림과 유사하며 브랜치가 포함되어 있습니다.

최종 파이프라인 아키텍처 다이어그램

설정 및 요건

각 실습에서는 정해진 기간 동안 새 Google Cloud 프로젝트와 리소스 집합이 무료로 제공됩니다.

  1. 시크릿 창을 사용하여 Qwiklabs에 로그인합니다.

  2. 실습 사용 가능 시간(예: 1:15:00)을 참고하여 해당 시간 내에 완료합니다.
    일시중지 기능은 없습니다. 필요한 경우 다시 시작할 수 있지만 처음부터 시작해야 합니다.

  3. 준비가 되면 실습 시작을 클릭합니다.

  4. 실습 사용자 인증 정보(사용자 이름비밀번호)를 기록해 두세요. Google Cloud Console에 로그인합니다.

  5. Google Console 열기를 클릭합니다.

  6. 다른 계정 사용을 클릭한 다음, 안내 메시지에 실습에 대한 사용자 인증 정보를 복사하여 붙여넣습니다.
    다른 사용자 인증 정보를 사용하는 경우 오류가 발생하거나 요금이 부과됩니다.

  7. 약관에 동의하고 리소스 복구 페이지를 건너뜁니다.

프로젝트 권한 확인

Google Cloud에서 작업을 시작하기 전에 프로젝트가 Identity and Access Management(IAM) 내에서 올바른 권한을 보유하고 있는지 확인해야 합니다.

  1. Google Cloud 콘솔의 탐색 메뉴(탐색 메뉴 아이콘)에서 IAM 및 관리자 > IAM을 선택합니다.

  2. 기본 컴퓨팅 서비스 계정 {project-number}-compute@developer.gserviceaccount.com이 있고 editor 역할이 할당되어 있는지 확인하세요. 계정 프리픽스는 프로젝트 번호이며, 이 번호는 탐색 메뉴 > Cloud 개요 > 대시보드에서 확인할 수 있습니다.

Compute Engine 기본 서비스 계정 이름과 편집자 상태가 강조 표시된 권한 탭 페이지

참고: 계정이 IAM에 없거나 editor 역할이 없는 경우 다음 단계에 따라 필요한 역할을 할당합니다.
  1. Google Cloud 콘솔의 탐색 메뉴에서 Cloud 개요 > 대시보드를 클릭합니다.
  2. 프로젝트 번호(예: 729328892908)를 복사합니다.
  3. 탐색 메뉴에서 IAM 및 관리자 > IAM을 선택합니다.
  4. 역할 테이블 상단에서 주 구성원별로 보기 아래에 있는 액세스 권한 부여를 클릭합니다.
  5. 새 주 구성원 필드에 다음을 입력합니다.
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number}는 프로젝트 번호로 바꿉니다.
  2. 역할 필드에서 프로젝트(또는 기본) > 편집자를 선택합니다.
  3. 저장을 클릭합니다.

이 실습에서는 Google Compute Engine에서 호스팅되는 Theia Web IDE를 주로 사용합니다. 실습 저장소가 사전에 클론되어 있습니다. Java 언어 서버가 지원되며, Cloud Shell처럼 gcloud 명령줄 도구를 통해 Google Cloud API에 프로그래매틱 방식으로 액세스할 수 있는 터미널도 제공됩니다.

  1. Theia IDE에 액세스하려면 Google Skills에 표시된 링크를 복사하여 새 탭에 붙여넣습니다.
참고: 환경이 완전히 프로비저닝되려면 URL이 표시된 후에도 3~5분 정도 기다려야 할 수 있습니다. 환경이 완전히 프로비저닝되지 않으면 브라우저에 오류가 표시됩니다.

ide_url이 표시된 사용자 인증 정보 창

실습 저장소가 환경에 클론되었습니다. 각 실습은 사용자가 완성해야 하는 코드가 포함된 labs 폴더와, 문제 발생 시 참고할 수 있는 정상 작동 예시가 포함된 solution 폴더로 구분되어 있습니다.

  1. File Explorer 버튼을 클릭하여 다음을 확인합니다.

실습 폴더가 강조 표시된 확장된 파일 탐색기 메뉴

Cloud Shell을 사용할 때처럼 이 환경에서 여러 터미널을 만들 수도 있습니다.

터미널 메뉴에서 강조 표시된 새 터미널 옵션

터미널에서 gcloud auth list를 실행하면, 제공된 서비스 계정으로 로그인되어 있음을 확인할 수 있습니다. 이 서비스 계정은 실습 사용자 계정과 동일한 권한을 가지고 있습니다.

gcloud auth list 명령어를 표시하는 터미널

환경이 작동하지 않는다면, GCE 콘솔에서 IDE를 호스팅하는 VM을 다음과 같이 재설정할 수 있습니다.

VM 인스턴스 페이지에서 강조 표시된 재설정 버튼과 VM 인스턴스 이름

실습 1부. 지연 데이터 처리하기

이전 실습에서는 다음과 같은 코드를 사용하여 이벤트 시간을 기준으로 요소를 고정 너비의 윈도우로 분할하는 코드를 작성했습니다.

commonLogs .apply("WindowCommonLogs", Window.into( FixedWindows.of( Duration.standardMinutes( options.getWindowDuration())))) .apply("CountEventsPerWindow", Combine.globally( Count.<CommonLog>combineFn()).withoutDefaults());

그러나 직전 비SQL 실습의 끝부분에 나온 것처럼 데이터 스트림에 지연이 발생하는 경우가 많습니다. 이벤트 시간(처리 시간이 아님)을 사용하여 윈도잉할 때 지연은 불확실성을 야기하기 때문에 문제가 됩니다. 이벤트 시간의 특정 시점에 해당하는 모든 이벤트가 실제로 도착했는지 여부를 알 수 없기 때문입니다.

직접 작성한 파이프라인이 결과를 출력하려면 이와 관련하여 결정을 내려야하는 상황이었습니다. 이를 위해 사용된 개념이 바로 워터마크입니다. 워터마크는 시스템이 휴리스틱에 기반하여 이벤트 시간의 특정 시점까지의 모든 데이터가 파이프라인에 도착했다고 예상하는 시점을 나타냅니다. 워터마크가 윈도우의 끝을 넘어가면, 해당 윈도우의 타임스탬프를 가진 요소가 나중에 도착하더라도 지연 데이터로 간주되어 단순히 삭제됩니다. 따라서 기본적인 윈도잉 동작은 시스템이 모든 데이터가 도착했다고 확신하는 시점에 단 한 번, 완전하다고 판단되는 결과를 내보내는 것입니다.

Apache Beam은 워터마크가 무엇인지 추측하기 위해 여러 휴리스틱 방식을 사용하지만 어디까지나 추정일 뿐입니다. 무엇보다도 이러한 휴리스틱은 범용 목적으로 설계되어 모든 사용 사례에 적합하지는 않습니다. 따라서 파이프라인 설계자는 범용 휴리스틱 방식을 사용하는 대신 다음과 같은 질문을 신중하게 고려하여 어떤 절충안이 적절한지 판단해야 합니다.

  • 완전성: 결과를 계산하기 전에 모든 데이터를 확보하는 것이 얼마나 중요한가요?
  • 지연 시간: 데이터를 얼마나 대기할 수 있나요? 모든 데이터가 도착할 때까지 기다리나요, 아니면 도착하는 대로 처리하나요?
  • 비용: 지연 시간을 줄이기 위해 얼마만큼의 컴퓨팅 성능과 비용을 감수할 의향이 있나요?

이 질문들에 대한 답을 바탕으로, Apache Beam의 형식적 개념을 사용하여 적절한 절충안을 반영한 코드를 작성할 수 있습니다.

허용 지연 시간

허용 지연 시간은 윈도우가 상태를 유지하는 기간을 제어하며, 워터마크가 허용된 지연 시간의 끝에 도달하면 모든 상태가 삭제됩니다. 모든 영구 상태를 끝까지 유지할 수 있다면 좋겠지만, 무한 데이터 소스를 처리하는 실제 상황에서는 특정 윈도우의 상태를 무기한 유지하는 것이 실용적이지 않은 경우가 많습니다. 결국 디스크 공간이 부족해질 수 있기 때문입니다.

따라서 실제로 데이터를 순서 없이 처리하는 시스템에서는 처리 중인 윈도우의 수명을 제한하는 방법을 제공해야 합니다. 이 작업은 시스템 내에서 허용되는 지연 시간의 한계를 정의하는 방식으로 깔끔하고 간결하게 수행할 수 있습니다. 즉, 시스템이 특정 레코드를 처리할 수 있는 최대 지연 시간(워터마크 대비)을 설정하고 이 한계를 초과하여 도착한 데이터는 단순히 삭제하는 방식입니다. 개별 데이터의 최대 지연 한도를 설정하면 윈도우의 상태를 얼마나 오래 유지해야 하는지도 정확하게 결정할 수 있습니다. 즉, 워터마크가 윈도우 끝에 도달하여 지연 한도 범위를 초과할 때까지만 상태를 유지하면 됩니다.

작업 1. 환경 준비

이전 실습과 마찬가지로 첫 번째 단계는 파이프라인에서 처리할 데이터를 생성하는 것입니다. 실습 환경을 열고 이전과 동일한 방법으로 데이터를 생성합니다.

적절한 실습 열기

  1. 아직 IDE 환경에서 새 터미널을 만들지 않았다면 새 터미널을 만들고 다음 명령어를 복사해 붙여넣습니다.
# Change directory into the lab cd 7_Advanced_Streaming_Analytics/labs # Download dependencies mvn clean dependency:resolve export BASE_DIR=$(pwd)
  1. 다음과 같이 데이터 환경을 설정합니다.
# Create GCS buckets, BQ dataset, and Pubsub Topic cd $BASE_DIR/../.. source create_streaming_sinks.sh # Change to the directory containing the practice version of the code cd $BASE_DIR

내 진행 상황 확인하기를 클릭하여 목표를 확인합니다. 환경 준비

작업 2. 허용되는 지연 시간 설정

  1. IDE에서 7_Advanced_Streaming_Analytics/labs/src/main/java/com/mypackage/pipeline 경로의 StreamingMinuteTrafficPipeline.java를 엽니다.

Apache Beam에서는 아래 예시와 같이 withAllowedLateness() 메서드를 사용하여 허용 지연 시간을 설정합니다.

PCollection<String> items = ...; PCollection<String> windowed_items = items.apply( Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))) .withAllowedLateness(Duration.standardDays(1)));
  1. 이 작업을 완료하려면 윈도잉 변환을 검토하고 .withAllowedLateness() 호출을 추가합니다. 이때 적절한 명령줄 파라미터에서 구성된 유효한 Duration을 전달해야 합니다. 허용 지연 시간으로 적절한 값을 판단하고 명령줄을 업데이트하여 올바른 단위를 반영하세요.

트리거

파이프라인 설계자는 재량에 따라 임시 결과를 내보낼 시점을 결정할 수도 있습니다. 예를 들어 윈도우 끝에 해당하는 워터마크에는 아직 도달하지 않았지만 예상 데이터의 75%가 이미 도착했다고 가정해 보겠습니다. 이 정도 샘플은 충분히 대표성이 있으므로 최종 사용자에게 표시할 가치가 있습니다.

트리거(Trigger)는 처리 시간 중 어느 시점에서 결과가 구체화되는지 결정합니다. 윈도우의 각 특정 출력을 윈도우의 창이라고 합니다. 트리거는 조건이 충족될 때 창을 실행합니다. Apache Beam의 트리거 조건에는 워터마크 진행, 처리 시간 진행률(실제로 도착한 데이터 양과 관계없이 균일하게 진행됨), 요소 개수(예: 일정량의 새 데이터가 도착한 경우), 데이터 종속 트리거(파일 끝에 도달한 경우) 등이 포함됩니다.

트리거의 조건에 따라 하나의 윈도우에서 창이 여러 번 실행될 수 있으므로 결과를 누적하는 방법도 지정해야 합니다. Apache Beam은 현재 두 가지 누적 모드를 지원합니다. 하나는 결과를 함께 누적하는 모드이고 다른 하나는 마지막 창이 실행된 이후 새로 추가된 결과의 부분만 반환하는 모드입니다.

작업 3. 트리거 설정

Window 변환을 사용해 PCollection에 대한 윈도우 함수를 설정할 때 트리거도 지정할 수 있습니다.

다음과 같이 Window.into() 변환의 결과에 .triggering() 메소드를 호출해 PCollection에 해당하는 트리거를 설정할 수 있습니다. Window.triggering()은 trigger를 인수로 허용합니다. Apache Beam은 다음과 같은 여러 트리거를 제공합니다.

  • AfterWatermark: 워터마크가 윈도우의 끝 또는 창의 첫 번째 요소 도착 시점에 결정된 타임스탬프를 통과하면 실행됩니다.
  • AfterProcessingTime: 일정 처리 시간이 경과한 후(일반적으로 창의 첫 번째 요소 이후) 실행됩니다.
  • AfterPane: 현재 창에 할당된 요소의 수와 같이 현재 창 내 요소의 속성을 실행합니다.

이 코드 샘플은 해당 윈도우의 첫 번째 요소가 처리된 후 1분 뒤에 결과를 내보내는 PCollection에 대한 시간 기반 트리거를 설정합니다. 코드 샘플의 마지막 줄인 .discardingFiredPanes()는 윈도우의 누적 모드를 설정합니다.

PCollection<String> pc = ...; pc.apply(Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES)) .triggering(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1)) .discardingFiredPanes());
  • 이 작업을 완료하려면 윈도잉 변환 내부에 Window.triggering()에 대한 호출을 추가하고 유효한 Trigger를 전달합니다. 트리거를 설계할 때는 데이터가 1분 간격으로 윈도잉되고 데이터가 늦게 도착할 수 있는 이 사용 사례를 염두에 두세요.

트리거의 예시를 보려면 해결 방법(영문)을 살펴보세요.

실습 2부. 잘못된 형식의 데이터 처리

트리거(Trigger)를 어떻게 설정하느냐에 따라 지금 파이프라인을 실행해 이전 실습의 파이프라인과 비교하면 새 파이프라인이 더 이른 시점에 결과를 제공하는 것을 확인할 수 있습니다. 휴리스틱이 스트리밍 동작을 제대로 예측하지 못했고 허용 지연 시간이 더 적절하게 설정되었다면 결과의 정확도가 더 높아질 수 있습니다.

하지만 현재 파이프라인은 지연 데이터에는 잘 대응하지만 잘못된 형식의 데이터 입력에는 여전히 취약합니다. 파이프라인을 실행하고 CommonLog로 파싱할 수 있는 올바른 형식의 JSON 문자열이 아닌 메시지를 게시하면 파이프라인에서 오류가 발생합니다. Cloud Logging과 같은 도구를 사용하면 이러한 오류를 쉽게 읽을 수 있지만, 더 잘 설계된 파이프라인은 나중에 검사할 수 있도록 이러한 오류를 사전에 정의된 위치에 저장합니다.

이 섹션에서는 파이프라인을 더욱 모듈화하고 견고하게 만드는 구성요소를 추가합니다.

작업 1. 잘못된 형식의 데이터 수집

잘못된 형식의 데이터를 더 잘 처리하는 파이프라인을 만들려면 이러한 데이터를 필터링하고 브랜칭하여 다르게 처리하는 방법이 필요합니다. 파이프라인에 브랜치를 도입하는 한 가지 방법은 이미 살펴본 바와 같이 하나의 PCollection을 여러 변환의 입력으로 사용하는 것입니다.

이 방식의 브랜칭은 강력하지만 이 전략이 비효율적인 일부 사용 사례도 있습니다. 예를 들어 동일한 PCollection에서 두 가지 다른 하위 집합을 만들고자 한다고 가정해 보겠습니다. 이 경우 여러 변환 방법을 사용하면서 각 하위 집합별로 필터 변환을 하나씩 생성하고 이를 모두 원본 PCollection에 적용해야 하기 때문에 각 요소가 두 번씩 처리됩니다.

브랜치 파이프라인을 생성하는 또 다른 방법은 입력 PCollection을 한 번 처리하는 동안 단일 변환이 여러 출력을 생성하도록 만드는 것입니다. 이 작업에서는 여러 출력을 생성하는 변환을 작성합니다. 첫 번째 출력은 올바른 형식의 데이터에서 얻은 결과이고, 두 번째 출력은 원래 입력 스트림에서 유입된 잘못된 형식의 요소입니다.

Apache Beam은 단일 PCollection만 생성하면서 여러 결과를 내보내기 위해 PCollectionTuple이란 클래스를 사용합니다. PCollectionTuple은 서로 다른 유형의 PCollection으로 구성된 불변 튜플이며 TupleTag로 '키가 지정'됩니다.

다음은 서로 다른 두 가지 유형의 PCollection으로 인스턴스화되는 PCollectionTuple의 예시입니다. 그런 다음 PCollectionTuple.get() 메서드를 사용하여 해당 PCollection을 검색합니다.

PCollection<String> pc1 = ...; PCollection<Integer> pc2 = ...; TupleTag<String> tag1 = new TupleTag<>(); TupleTag<Integer> tag2 = new TupleTag<>(); PCollectionTuple pcs = PCollectionTuple.of(tag1, pc1) .and(tag2, pc2); PCollection<Integer> pcX = pcs.get(tag1); PCollection<String> pcY = pcs.get(tag2);

PTransform 컨텍스트에서 이 메서드를 사용하려면 다음 예시와 같이 콘텐츠를 기반으로 요소에 TupleTag를 할당하는 코드를 작성할 수 있습니다.

final TupleTag<String> aTag = new TupleTag<String>(){}; final TupleTag<String> bTag = new TupleTag<String>(){}; PCollectionTuple mixedCollection = input.apply(ParDo .of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { if (c.element().startsWith("A")) { // Emit to main output, which is the output with tag aTag. c.output(c.element()); } else if(c.element().startsWith("B")) { // Emit to output with tag bTag. c.output(bTag, c.element()); } } }) // Specify main output. In this example, it is the output // with tag startsWithATag. .withOutputTags(aTag, // Specify the output with tag bTag, as a TupleTagList. TupleTagList.of(bTag))); // Get subset of the output with tag bTag. mixedCollection.get(aTag).apply(...); // Get subset of the output with tag startsWithBTag. mixedCollection.get(bTag).apply(...);
  • 이 작업을 완료하려면 클래스 상단에 두 개의 TupleTag 상수를 선언하고 JsonToCommonLog 변환이 PCollectionTuple을 반환하도록 변경하세요. 이때 파싱되지 않은 요소에는 하나의 태그를, 파싱된 요소에는 다른 태그를 지정합니다. if/then/else 블록 대신 try/catch 문을 사용하세요.

작업 2. 복합 변환으로 코드 모듈화

변환은 중첩된 구조를 가질 수 있으며, 하나의 복잡한 변환이 여러 개의 단순한 변환(예: 여러 ParDo, Combine, GroupByKey 또는 다른 복합 변환)을 수행하도록 구성할 수 있습니다. 이러한 변환을 복합 변환이라고 합니다. 단일 복합 변환 안에 여러 개의 변환을 중첩하면 코드가 모듈화되고 이해하기도 쉬워집니다.

  1. 복합 변환을 직접 생성하려면 PTransform 클래스의 하위 클래스를 만들고 expand 메서드를 재정의하여 실제 처리 로직을 지정합니다. PTransform 클래스 유형 파라미터의 경우 변환이 입력으로 받고 출력으로 생성하는 PCollection 유형을 전달합니다.

다음 코드 샘플은 PCollection 문자열을 입력으로 받아 PCollection 정수를 출력하는 PTransform을 선언하는 방법을 보여줍니다.

#TODO: JsonToRow

static class MyCompositeTransform extends PTransform<PCollection<String>, PCollection<Integer>> { ... }
  1. PTransform 하위 클래스에서는 expand 메서드를 재정의해야 합니다. expand 메서드는 PTransform의 처리 로직을 추가하는 곳으로 expand 메서드를 재정의할 때는 적절한 유형의 입력 PCollection을 파라미터로 받고 출력 PCollection을 반환 값으로 지정해야 합니다.
static class MyCompositeTransform extends PTransform<PCollection<String>, PCollection<Integer>> { @Override public PCollection<Integer> expand(PCollection<String>) { ... // transform logic goes here ... } }
  1. 변환을 호출하려면 PCollection에서 PCollection.apply()를 사용하고 복합 변환의 인스턴스를 전달합니다.
PCollection<Integer> i = stringPColl.apply("CompositeTransform", new MyCompositeTransform());
  1. 이 작업을 완료하려면 앞서 수정한 JsonToCommonLog 변환을 복합 변환으로 전환하세요. 이 경우 현재의 Write 변환이 CommonLog 인스턴스를 예상하기 때문에 문제를 유발합니다. 복합 변환의 결과를 새로운 PCollectionTuple에 저장하고 .get()을 사용하여 Write 변환이 예상하는 PCollection을 가져오세요.

작업 3. 이후 분석을 위한 잘못된 형식 데이터 작성

잘못된 형식의 데이터를 생성하고 있는 업스트림 문제를 해결하려면 잘못된 형식의 데이터를 분석할 수 있어야 합니다. 이를 위해서는 데이터를 어딘가에 구체화해야 합니다. 이 작업에서는 잘못된 형식의 데이터를 Google Cloud Storage에 작성합니다. 이 패턴을 데드 레터 스토리지(dead-letter storage) 사용이라고 합니다.

이전 실습에서는 TextIO.write()를 사용하여 제한된 소스(일괄)에서 Cloud Storage로 직접 작성했습니다. 그러나 제한되지 않은 소스(스트리밍)에서 작성할 때는 이 접근방식을 약간 수정해야 합니다.

첫째, Write 변환의 업스트림에서는 Trigger를 사용하여 처리 시간 기준으로 쓰기를 수행할 시점을 지정해야 합니다. 그렇지 않고 기본값을 그대로 두면 쓰기가 이루어지지 않습니다. 기본적으로 모든 이벤트는 전역 윈도우에 속합니다. 일괄 처리에서는 전체 데이터 세트를 런타임에 알 수 있으므로 문제가 없지만 무한 소스에서는 전체 데이터 세트 크기를 알 수 없으므로 전역 윈도우 창이 완료되지 않아 실행되지 않습니다.

Trigger를 사용하므로 Window도 사용해야 하지만 반드시 윈도우를 변경할 필요는 없습니다. 이전 실습과 작업에서는 윈도잉 변환을 사용하여 이벤트 시간에 전역 윈도우를 고정 길이 윈도우로 대체했습니다. 이 경우에는 어떤 요소가 함께 그룹화되는지보다는 결과가 유용한 방식과 유용한 빈도로 구체화되는 것이 더 중요합니다.

다음 예시에서는 윈도우가 처리 시간 기준으로 10초마다 글로벌 윈도우 창을 실행하되 신규 이벤트만 작성합니다.

pCollection.apply("FireEvery10s", Window.<String>configure() .triggering(Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(10)))) .discardingFiredPanes())

Trigger를 설정한 후에는 TextIO.write()에 대한 호출을 수정하여 쓰기를 수행해야 합니다. 윈도잉 변환의 다운스트림에 작성할 때는 withWindowedWrites()에 대한 호출을 체인화하고 병렬로 작성이 이루어지도록 샤드 수를 지정합니다.

fixedWindowedItems.apply( "WriteWindowedPCollection", TextIO .write() .to("gs://path/to/somewhere") .withWindowedWrites() .withNumShards(NUM_SHARDS));
  • 이 작업을 완료하려면 PCollectionTuple에서 .get()을 사용하여 잘못된 형식의 데이터를 검색하는 새 변환을 생성하세요. 트리거에 대한 판단과 지식을 사용하여 이 트리거에 적합한 실행 조건을 설정하세요.

작업 4. 파이프라인 실행

  1. 파이프라인을 실행하려면 아래 예시와 유사한 명령어를 구성합니다. 포함한 명령줄 옵션의 이름을 반영하도록 명령어를 수정해야 한다는 점에 유의하세요.
export PROJECT_ID=$(gcloud config get-value project) export REGION={{{ project_0.default_region | "REGION" }}} export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export MAIN_CLASS_NAME=com.mypackage.pipeline.StreamingMinuteTrafficPipeline export RUNNER=DataflowRunner export PUBSUB_TOPIC=projects/${PROJECT_ID}/topics/my_topic export WINDOW_DURATION=60 export ALLOWED_LATENESS=1 export OUTPUT_TABLE_NAME=${PROJECT_ID}:logs.minute_traffic export DEADLETTER_BUCKET=${BUCKET} cd $BASE_DIR mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --inputTopic=${PUBSUB_TOPIC} \ --windowDuration=${WINDOW_DURATION} \ --allowedLateness=${ALLOWED_LATENESS} \ --outputTableName=${OUTPUT_TABLE_NAME} \ --deadletterBucket=${DEADLETTER_BUCKET}"

이 퀘스트의 코드에는 Pub/Sub를 사용하여 JSON 이벤트를 게시하는 스크립트가 포함되어 있습니다.

  1. 이 작업을 완료하고 메시지 게시를 시작하려면 현재 터미널과 나란히 새 터미널을 열고 다음 스크립트를 실행합니다. 이 스크립트는 종료할 때까지 메시지를 계속 게시합니다. 현재 위치는 training-data-analyst/quests/dataflow 폴더여야 합니다.
참고: true 플래그는 스트림에 지연 이벤트를 추가합니다. bash generate_streaming_events.sh true

내 진행 상황 확인하기를 클릭하여 목표를 확인합니다. 파이프라인 실행

작업 5. 파이프라인 테스트

  1. Google Cloud 콘솔 제목 표시줄의 검색 입력란에 Pub/Sub를 입력한 다음 제품 및 페이지 섹션에서 Pub/Sub를 클릭합니다.

  2. 주제를 클릭한 다음 my_topic 주제를 클릭합니다.

  3. 메시지 탭을 클릭한 다음 메시지 게시 버튼을 클릭합니다.

  4. 다음 페이지에서 게시할 메시지를 입력합니다.

CommonLog JSON 사양을 완벽하게 준수하지 않는다면 곧바로 데드 레터 Cloud Storage 버킷에 도달하게 됩니다. 파이프라인 모니터링 윈도우로 돌아가서 파싱되지 않은 메시지 처리를 담당하는 브랜치 노드를 클릭하면 파이프라인 내에서 메시지 경로를 추적할 수 있습니다.

  1. 이 브랜치에 요소가 추가된 것을 확인한 후 Cloud Storage로 이동하여 메시지가 디스크에 기록되었는지 확인할 수 있습니다.
export PROJECT_ID=$(gcloud config get-value project) export REGION={{{ project_0.default_region | "REGION" }}} export BUCKET=gs://${PROJECT_ID}/deadletter gsutil ls $BUCKET gsutil cat $BUCKET/*/*

내 진행 상황 확인하기를 클릭하여 목표를 확인합니다. 파이프라인 테스트

실습 종료하기

실습을 완료하면 실습 종료를 클릭합니다. Google Cloud Skills Boost에서 사용된 리소스를 자동으로 삭제하고 계정을 지웁니다.

실습 경험을 평가할 수 있습니다. 해당하는 별표 수를 선택하고 의견을 입력한 후 제출을 클릭합니다.

별점의 의미는 다음과 같습니다.

  • 별표 1개 = 매우 불만족
  • 별표 2개 = 불만족
  • 별표 3개 = 중간
  • 별표 4개 = 만족
  • 별표 5개 = 매우 만족

의견을 제공하고 싶지 않다면 대화상자를 닫으면 됩니다.

의견이나 제안 또는 수정할 사항이 있다면 지원 탭을 사용하세요.

Copyright 2026 Google LLC All rights reserved. Google 및 Google 로고는 Google LLC의 상표입니다. 기타 모든 회사명 및 제품명은 해당 업체의 상표일 수 있습니다.

시작하기 전에

  1. 실습에서는 정해진 기간 동안 Google Cloud 프로젝트와 리소스를 만듭니다.
  2. 실습에는 시간 제한이 있으며 일시중지 기능이 없습니다. 실습을 종료하면 처음부터 다시 시작해야 합니다.
  3. 화면 왼쪽 상단에서 실습 시작을 클릭하여 시작합니다.

시크릿 브라우징 사용

  1. 실습에 입력한 사용자 이름비밀번호를 복사합니다.
  2. 비공개 모드에서 콘솔 열기를 클릭합니다.

콘솔에 로그인

    실습 사용자 인증 정보를 사용하여
  1. 로그인합니다. 다른 사용자 인증 정보를 사용하면 오류가 발생하거나 요금이 부과될 수 있습니다.
  2. 약관에 동의하고 리소스 복구 페이지를 건너뜁니다.
  3. 실습을 완료했거나 다시 시작하려고 하는 경우가 아니면 실습 종료를 클릭하지 마세요. 이 버튼을 클릭하면 작업 내용이 지워지고 프로젝트가 삭제됩니다.

현재 이 콘텐츠를 이용할 수 없습니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

감사합니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

한 번에 실습 1개만 가능

모든 기존 실습을 종료하고 이 실습을 시작할지 확인하세요.

시크릿 브라우징을 사용하여 실습 실행하기

이 실습을 실행하는 가장 좋은 방법은 시크릿 모드 또는 시크릿 브라우저 창을 사용하는 것입니다. 개인 계정과 학생 계정 간의 충돌로 개인 계정에 추가 요금이 발생하는 일을 방지해 줍니다.