실습 설정 안내 및 요구사항
계정과 진행 상황을 보호하세요. 이 실습을 실행하려면 항상 시크릿 브라우저 창과 실습 사용자 인증 정보를 사용하세요.

Dataflow를 사용한 서버리스 데이터 처리 - Apache Beam을 사용한 테스트(Java)

실습 1시간 30분 universal_currency_alt 크레딧 5개 show_chart 고급
info 이 실습에는 학습을 지원하는 AI 도구가 통합되어 있을 수 있습니다.
이 콘텐츠는 아직 휴대기기에 최적화되지 않음
최상의 경험을 위해 데스크톱 컴퓨터에서 이메일로 전송된 링크를 사용하여 방문하세요.

개요

이 실습에서 학습할 내용은 다음과 같습니다.

  • Apache Beam의 테스트 도구를 사용하여 DoFnPTransform에 대한 단위 테스트를 작성합니다.
  • 파이프라인 통합 테스트를 수행합니다.
  • TestStream 클래스를 사용하여 스트리밍 파이프라인의 윈도잉 동작을 테스트합니다.

파이프라인 테스트는 효과적인 데이터 처리 솔루션을 개발하는 데 있어 특히 중요한 단계입니다. Beam 모델의 간접적 특성으로 인해, 실행이 실패했을 때 디버깅하는 작업이 쉽지 않을 수 있습니다.

이 실습에서는 DirectRunner를 사용하여 Beam SDK의 testing 패키지에 있는 도구로 로컬에서 단위 테스트를 수행하는 방법을 살펴봅니다.

설정 및 요건

각 실습에서는 정해진 기간 동안 새 Google Cloud 프로젝트와 리소스 집합이 무료로 제공됩니다.

  1. 시크릿 창을 사용하여 Qwiklabs에 로그인합니다.

  2. 실습 사용 가능 시간(예: 1:15:00)을 참고하여 해당 시간 내에 완료합니다.
    일시중지 기능은 없습니다. 필요한 경우 다시 시작할 수 있지만 처음부터 시작해야 합니다.

  3. 준비가 되면 실습 시작을 클릭합니다.

  4. 실습 사용자 인증 정보(사용자 이름비밀번호)를 기록해 두세요. Google Cloud Console에 로그인합니다.

  5. Google Console 열기를 클릭합니다.

  6. 다른 계정 사용을 클릭한 다음, 안내 메시지에 실습에 대한 사용자 인증 정보를 복사하여 붙여넣습니다.
    다른 사용자 인증 정보를 사용하는 경우 오류가 발생하거나 요금이 부과됩니다.

  7. 약관에 동의하고 리소스 복구 페이지를 건너뜁니다.

프로젝트 권한 확인

Google Cloud에서 작업을 시작하기 전에 프로젝트가 Identity and Access Management(IAM) 내에서 올바른 권한을 보유하고 있는지 확인해야 합니다.

  1. Google Cloud 콘솔의 탐색 메뉴(탐색 메뉴 아이콘)에서 IAM 및 관리자 > IAM을 선택합니다.

  2. 기본 컴퓨팅 서비스 계정 {project-number}-compute@developer.gserviceaccount.com이 있고 editor 역할이 할당되어 있는지 확인하세요. 계정 프리픽스는 프로젝트 번호이며, 이 번호는 탐색 메뉴 > Cloud 개요 > 대시보드에서 확인할 수 있습니다.

Compute Engine 기본 서비스 계정 이름과 편집자 상태가 강조 표시된 권한 탭 페이지

참고: 계정이 IAM에 없거나 editor 역할이 없는 경우 다음 단계에 따라 필요한 역할을 할당합니다.
  1. Google Cloud 콘솔의 탐색 메뉴에서 Cloud 개요 > 대시보드를 클릭합니다.
  2. 프로젝트 번호(예: 729328892908)를 복사합니다.
  3. 탐색 메뉴에서 IAM 및 관리자 > IAM을 선택합니다.
  4. 역할 테이블 상단에서 주 구성원별로 보기 아래에 있는 액세스 권한 부여를 클릭합니다.
  5. 새 주 구성원 필드에 다음을 입력합니다.
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number}는 프로젝트 번호로 바꿉니다.
  2. 역할 필드에서 프로젝트(또는 기본) > 편집자를 선택합니다.
  3. 저장을 클릭합니다.

IDE 설정

이 실습에서는 Google Compute Engine에서 호스팅되는 Theia Web IDE를 주로 사용합니다. 실습 저장소가 사전에 클론되어 있습니다. Java 언어 서버가 지원되며, Cloud Shell처럼 gcloud 명령줄 도구를 통해 Google Cloud API에 프로그래매틱 방식으로 액세스할 수 있는 터미널도 제공됩니다.

  1. Theia IDE에 액세스하려면 Google Skills에 표시된 링크를 복사하여 새 탭에 붙여넣습니다.
참고: 환경이 완전히 프로비저닝되려면 URL이 표시된 후에도 3~5분 정도 기다려야 할 수 있습니다. 환경이 완전히 프로비저닝되지 않으면 브라우저에 오류가 표시됩니다.

ide_url이 표시된 사용자 인증 정보 창

실습 저장소가 환경에 클론되었습니다. 각 실습은 사용자가 완성해야 하는 코드가 포함된 labs 폴더와, 문제 발생 시 참고할 수 있는 정상 작동 예시가 포함된 solution 폴더로 구분되어 있습니다.

  1. File Explorer 버튼을 클릭하여 다음을 확인합니다.

실습 폴더가 강조 표시된 확장된 파일 탐색기 메뉴

Cloud Shell을 사용할 때처럼 이 환경에서 여러 터미널을 만들 수도 있습니다.

터미널 메뉴에서 강조 표시된 새 터미널 옵션

터미널에서 gcloud auth list를 실행하면, 제공된 서비스 계정으로 로그인되어 있음을 확인할 수 있습니다. 이 서비스 계정은 실습 사용자 계정과 동일한 권한을 가지고 있습니다.

gcloud auth list 명령어를 표시하는 터미널

환경이 작동하지 않는다면, GCE 콘솔에서 IDE를 호스팅하는 VM을 다음과 같이 재설정할 수 있습니다.

VM 인스턴스 페이지에서 강조 표시된 재설정 버튼과 VM 인스턴스 이름

실습 코드는 8a_Batch_Testing_Pipeline/lab8b_Stream_Testing_Pipeline/lab의 두 폴더로 나뉩니다. 어떤 단계에서든 막히는 부분이 있다면 해당하는 solution 폴더에서 해결 방법을 찾을 수 있습니다.

실습 1부. DoFn 및 PTransform에 대한 단위 테스트 수행

이 단계에서는 날씨 센서에서 통계를 계산하는 일괄 파이프라인의 DoFn 및 PTransform에 대한 단위 테스트를 수행합니다. 생성한 변환을 테스트하려면 다음 패턴과 Beam에서 제공하는 변환을 사용할 수 있습니다.

  • TestPipeline을 만듭니다.
  • 테스트 입력 데이터를 만들고 Create 변환을 사용하여 입력 데이터의 PCollection을 만듭니다.
  • 입력 PCollection에 변환을 적용하고 그 결과인 PCollection을 저장합니다.
  • PAssert 및 해당 서브클래스를 사용하여 출력 PCollection에 예상한 요소가 포함되어 있는지 확인합니다.

TestPipeline은 Beam SDK에 포함된 특수 클래스로, 변환 및 파이프라인 로직을 테스트할 수 있도록 제공됩니다.

  • 테스트 시 파이프라인 객체를 만들 때 Pipeline 대신 TestPipeline을 사용합니다.
TestPipeline p = TestPipeline.create();

Create 변환은 객체의 인메모리 컬렉션(Java Iterable)을 가져와 이 컬렉션에서 PCollection을 만듭니다. 목표는 PTransform에서 나올 예상 출력 PCollection을 알 수 있는 소량의 테스트 입력 데이터 세트를 갖는 것입니다.

List<String> input = Arrays.asList(testInput); // Some code to create a TestPipeline p outputPColl = p.apply(Create.of(input).apply(...);

마지막으로 출력 PCollection이 예상 출력과 일치하는지 확인합니다. 이를 확인하기 위해 PAssert 클래스를 사용합니다. 예를 들어 containsInAnyOrder 메서드를 사용하여 출력 PCollection이 올바른 요소를 포함하는지 확인할 수 있습니다.

PAssert.that(outputPColl).containsInAnyOrder(expectedOutput);

작업 1. 기본 파이프라인 코드 살펴보기

  1. IDE에서 8a_Batch_Testing_Pipeline/lab으로 이동합니다.

이 디렉터리에는 종속 항목을 정의하는 pom.xml 파일과 2개의 하위 디렉터리가 포함된 src 폴더가 있습니다. src/main 폴더에는 파이프라인 패키지 코드가 포함되어 있고 src/test 폴더에는 테스트 코드가 포함되어 있습니다.

  1. 먼저 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherRecord.java를 엽니다.

이 파일에는 파이프라인에서 사용할 WeatherRecord 클래스의 정의가 포함되어 있습니다. WeatherRecord 클래스에는 연결된 스키마가 있으며 @DefaultSchema 주석을 사용하여 스키마를 정의하는 단계는 익숙할 것입니다. 그러나 클래스를 정의할 때 equals 메서드를 재정의해야 합니다.

@Override public boolean equals(final Object obj){ if(obj instanceof WeatherRecord){ final WeatherRecord other = (WeatherRecord) obj; return (locId.equals(other.locId)) && (Double.compare(lat, other.lat) == 0) && (Double.compare(lng, other.lng) == 0) && (date.equals(other.date)) && (Double.compare(lowTemp, other.lowTemp) == 0) && (Double.compare(highTemp, other.highTemp) == 0) && (Double.compare(precip, other.precip) == 0); } else{ return false; } }

이유가 뭘까요? PAssertequals 메서드를 사용하여 출력 PCollection의 멤버십을 확인합니다. 그러나 POJO(Plain Old Java Object)의 기본 equals 메서드는 객체의 주소만 비교합니다. 대신 객체의 콘텐츠를 비교하고 있는지 확인하고 싶습니다. 위와 같이 간단하게 수행할 수 있습니다.

  1. 이제 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java를 엽니다.

이것이 파이프라인의 기본 코드입니다. 이 파이프라인의 개념은 이전 실습에서 대부분 다루었지만 다음 부분은 주의 깊게 살펴보세요.

  • DoFn ConvertCsvToWeatherRecord(65행에서 시작) 및 ConvertTempUnits(81행에서 시작). 나중에 이러한 DoFn에 대한 단위 테스트를 수행하게 됩니다.
  • PTransform ComputeStatistics(103행에서 시작). DoFn과 동일한 방식으로 테스트할 수 있는 복합 변환의 예시입니다.
  • PTransform WeatherStatsTransform(123행에서 시작). 이 PTransform에는 전체 파이프라인의 처리 로직(소스 및 싱크 변환 제외)이 포함되어 있으므로 Create 변환으로 생성된 합성 데이터를 대상으로 작은 규모의 파이프라인 통합 테스트를 수행할 수 있습니다.

처리 코드에서 논리적 오류를 발견하더라도 아직 수정하지 마세요. 나중에 테스트를 통해 오류를 정확히 찾아내는 방법을 살펴보겠습니다.

작업 2. 테스트용 종속 항목 추가

  1. 이제 8a_Batch_Testing_Pipeline/lab/pom.xml을 엽니다.

테스트를 위해 몇 가지 종속 항목을 추가해야 합니다. 테스트를 위한 Beam Java 코드는 JUnitHamcrest에서 링크를 연결해야 합니다. Maven에서는 pom.xml 파일을 업데이트하기만 하면 됩니다.

  1. 이 작업을 완료하려면 다음 XML을 복사하여 주석에 표시된 pom.xml 파일에 붙여넣습니다.
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-core</artifactId> <version>2.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-library</artifactId> <version>2.1</version> <scope>test</scope> </dependency>

이러한 종속 항목의 범위는 'test'입니다. 이러한 패키지는 mvn test로 테스트를 실행할 때는 필요하지만 기본 파이프라인을 실행할 때는 필요하지 않습니다.

작업 3. Apache Beam에서 첫 번째 DoFn 단위 테스트 작성

  1. 이제 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java로 이동합니다.

이 파일에는 DoFnPTransform 단위 테스트를 위한 코드가 포함되어 있습니다. 현재 코드는 대부분 주석 처리되어 있으며 실습을 진행하면서 주석을 해제할 예정입니다.

이제 43행에서 시작하는 ConvertCsvToWeatherRecord DoFnDoFn 단위 테스트를 살펴보겠습니다.

  1. 먼저 파이프라인을 테스트하기 위한 클래스를 만들고 TestPipeline 객체를 만듭니다.
@RunWith(JUnit4.class) public class WeatherStatisticsPipelineTest { @Rule public final transient TestPipeline p = TestPipeline.create();

TestPipeline 객체는 다음 테스트에서 모두 사용됩니다. 객체를 만들 때 transient 키워드를 사용하므로 동일한 객체를 재사용할 때 발생하는 부작용에 대해서는 걱정할 필요가 없습니다.

  1. 이제 첫 번째 테스트의 (불완전한) 코드를 살펴보겠습니다.
@Test @Category(NeedsRunner.class) public void testConvertCsvToWeatherRecord() throws Exception { String testInput = "x,31.4,-39.2,2/2/21,4.0,7.5,0.1"; List<String> input = Arrays.asList(testInput); PCollection<WeatherRecord> output = p.apply(/* Create PCollection from in-memory object */) .apply(ParDo.of(new ConvertCsvToWeatherRecord())); WeatherRecord testOutput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 4.0, 7.5, 0.1); // Include PAssert statement to check for correct results p.run().waitUntilFinish(); }

파이프라인을 테스트하는 데 사용할 메서드에 @Test 주석을 추가합니다. CSV 파일의 한 행(파이프라인의 예상 입력 형식)에 해당하는 단일 테스트 입력(testInput)을 만들어 List 객체 input에 넣습니다.

현재 테스트 코드에는 아직 채워야 할 부분이 남아 있습니다.

  1. 이 작업을 완료하려면 먼저 Create 변환을 추가하여 inputPCollection으로 변환합니다.

  2. 둘째, containsInAnyOrder 메서드를 사용하여 inputtestOutput을 비교하는 PAssert 문을 포함합니다.

문제가 발생하면 이후에 주석 처리된 테스트나 해결책을 참고하세요.

작업 4. 첫 번째 DoFn 단위 테스트 실행

  1. 아직 IDE 환경에 새 터미널을 만들지 않았다면 새 터미널을 만들고 다음 명령어를 붙여넣습니다.
# Change directory into the lab cd 8a_Batch_Testing_Pipeline/lab # Download dependencies mvn clean dependency:resolve export BASE_DIR=$(pwd)

이제 테스트를 실행할 준비가 되었습니다.

  1. 이렇게 하려면 터미널에서 다음 명령어를 실행하기만 하면 됩니다.
mvn test

이전 작업을 올바르게 완료했다면 테스트가 완료된 후 터미널에 다음과 같이 표시됩니다(경과된 정확한 시간은 다를 수 있음).

[INFO] ------------------------------------------------------- [INFO] T E S T S [INFO] ------------------------------------------------------- [INFO] Running com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 5.479 s - in com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] [INFO] Results: [INFO] [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0 [INFO]

작업 5. 두 번째 DoFn 단위 테스트 실행 및 파이프라인 디버그

  1. 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java로 돌아가서 두 번째 단위 테스트의 코드(약 67~80행)에서 주석 처리를 삭제합니다. 코드를 강조 표시하고 Ctrl + /(MacOS에서는 Cmd + /)를 눌러 주석 처리를 삭제하세요. 아래 코드를 참고하세요.
@Test @Category(NeedsRunner.class) public void testConvertTempUnits() throws Exception { WeatherRecord testInput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 4.0, 7.5, 0.1); List<WeatherRecord> input = Arrays.asList(testInput); PCollection<WeatherRecord> output = p.apply(Create.of(input)) .apply(ParDo.of(new ConvertTempUnits())); WeatherRecord testOutput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 39.2, 45.5, 0.1); PAssert.that(output).containsInAnyOrder(testOutput); p.run().waitUntilFinish(); }

이 테스트를 통해 ConvertTempUnits() DoFn이 정상적으로 작동하는지 확인합니다.

  1. WeatherStatisticsPipelineTest.java를 저장하고 터미널로 돌아갑니다.

  2. 다시 한번 다음 명령어를 실행하여 테스트를 실행합니다.

mvn test

테스트에 실패했습니다. 출력을 스크롤하면 테스트 실패에 관한 다음 정보를 확인할 수 있습니다.

[ERROR] Failures: [ERROR] WeatherStatisticsPipelineTest.testConvertTempUnits:76 ParDo(ConvertTempUnits)/ParMultiDo(ConvertTempUnits).output: Expected: iterable with items [<com.mypackage.pipeline.WeatherRecord@e3daa587>] in any order but: not matched: <com.mypackage.pipeline.WeatherRecord@e3cb2587>

언뜻 보기에는 가장 유용한 오류 메시지처럼 보이지 않을 수 있습니다. 그러나 testOutput의 예상 WeatherRecord가 일치하지 않는 것을 확인할 수 있습니다. 온도 변환을 수행하는 방식에 문제가 있을 수 있습니다.

  1. 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java로 돌아가서 ConvertTempUnits 정의(81행 부근)까지 아래로 스크롤합니다.

  2. 이 작업을 완료하려면 DoFn 처리 로직에서 오류를 찾고 mvn test 명령어를 다시 실행하여 테스트가 성공하는지 확인합니다. 참고로 섭씨를 화씨로 변환하는 공식은 다음과 같습니다.

tempF = tempC * 1.8 + 32.0

문제가 발생하면 솔루션을 참고하세요.

작업 6. PTransform 단위 테스트 실행 및 엔드 투 엔드 파이프라인 테스트

  1. 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java로 돌아가서 마지막 두 테스트(84행 부근에서 시작)의 코드에 대한 주석 처리를 삭제합니다.

방금 주석 처리를 삭제한 첫 번째 테스트는 복합 PTransform ComputeStatistics를 테스트합니다. 아래에서 그중 일부 코드를 참고하세요.

@Test @Category(NeedsRunner.class) public void testComputeStatistics() throws Exception { WeatherRecord[] testInputs = new WeatherRecord[3]; //Define Testing Inputs (Omitted here) List<WeatherRecord> input = Arrays.asList(testInputs); PCollection<String> output = p.apply(Create.of(input)) .apply(new ComputeStatistics()); String testOutputs[] = new String[]{"[\"x\",34.2,45.5,0.4]", "[\"y\",72.5,82.5,0.5]"}; PAssert.that(output).containsInAnyOrder(testOutputs); p.run().waitUntilFinish(); }

이 코드는 앞서 살펴본 DoFn 단위 테스트와 매우 유사합니다. 테스트 입력과 출력을 제외한 실질적인 유일한 차이점은 ParDo(new DoFn()) 대신 PTransform을 적용한다는 점입니다.

마지막 테스트는 엔드 투 엔드 파이프라인을 대상으로 합니다. 파이프라인 코드(WeatherStatisticsPipeline.java)에서 소스와 싱크를 제외한 전체 엔드 투 엔드 파이프라인이 단일 PTransform WeatherStatsTransform에 포함되었습니다.

  1. 엔드 투 엔드 파이프라인을 테스트하기 위해 위에서 수행한 것과 비슷한 작업을 반복할 수 있지만, 이번에는 PTransform을 사용합니다.
@Test @Category(NeedsRunner.class) public void testWeatherStatsTransform() throws Exception { String[] testInputs = new String[] //Define Testing Inputs (Omitted here) List<String> input = Arrays.asList(testInputs); PCollection<String> output = p.apply(Create.of(input)) .apply(new WeatherStatsTransform()); String testOutputs[] = new String[]{"[\"x\",38.3,45.5,0.4]", "[\"y\",54.5,63.5,0.5]"}; PAssert.that(output).containsInAnyOrder(testOutputs); p.run().waitUntilFinish(); }
  1. 이제 터미널로 돌아가서 다음 명령어를 실행하여 테스트를 한 번 더 실행합니다.
mvn test

이전 작업을 성공적으로 완료했다면 테스트 완료 후 다음과 같이 터미널에 표시됩니다.

[INFO] ------------------------------------------------------- [INFO] T E S T S [INFO] ------------------------------------------------------- [INFO] Running com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 13.602 s - in com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] [INFO] Results: [INFO] [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0 [INFO]

내 진행 상황 확인하기를 클릭하여 목표를 확인합니다. DoFn 및 PTransform에 대한 단위 테스트 수행

실습 2부. TestStream으로 스트림 처리 로직 테스트

이 단계에서는 택시 운행 횟수의 윈도잉 계산을 위한 스트리밍 파이프라인의 단위 테스트를 수행합니다. 생성한 변환을 테스트하려면 다음 패턴과 Beam에서 제공하는 변환을 사용할 수 있습니다.

  • TestPipeline을 만듭니다.
  • TestStream 클래스를 사용하여 스트리밍 데이터를 생성합니다. 여기에는 일련의 이벤트 생성, 워터마크 진행, 처리 시간 진행이 포함됩니다.
  • PAssert 및 해당 서브클래스를 사용하여 출력 PCollection에 특정 윈도우에서 예상한 요소가 포함되어 있는지 확인합니다.

TestStream에서 이벤트를 읽는 파이프라인을 실행할 때 읽기 작업은 각 이벤트의 모든 결과가 완료된 후에 다음 이벤트로 이동합니다. 이 과정에는 처리 시간 진행, 적절한 트리거 실행이 포함됩니다. TestStream을 사용하면 트리거 및 허용된 지연의 영향을 파이프라인에서 관찰하고 테스트할 수 있습니다. 여기에는 지연된 트리거와 지연으로 인해 처리되지 않은 데이터에 관한 로직이 포함됩니다.

작업 1. 기본 파이프라인 코드 살펴보기

  1. IDE에서 8b_Stream_Testing_Pipeline/lab으로 이동합니다.

이 디렉터리에는 종속 항목을 정의하는 pom.xml 파일과 2개의 하위 디렉터리가 포함된 src 폴더가 있습니다. src/main 폴더에는 파이프라인 패키지 코드가 포함되어 있고 src/test 폴더에는 테스트 코드가 포함되어 있습니다.

  1. 먼저 8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiRide.java를 엽니다.

이 파일에는 파이프라인에서 사용할 TaxiRide 클래스의 정의가 포함되어 있습니다. TaxiRide 클래스에는 연결된 스키마가 있으며 @DefaultSchema 주석을 사용하여 스키마를 정의하는 단계는 익숙할 것입니다.

  1. 이제 8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiStreamingPipeline.java를 엽니다.

이것이 파이프라인의 기본 코드입니다. 이 파이프라인의 개념은 이전 실습에서 대부분 다루었지만 다음 부분은 주의 깊게 살펴보세요.

  • 94행에서 시작하는 DoFn JsonToTaxiRide는 수신된 Pub/Sub 메시지를 TaxiRide 클래스의 객체로 변환합니다.
  • 113행에서 시작하는 PTransform TaxiCountTransform입니다. 이 PTransform에는 파이프라인의 주요 계산 및 윈도잉 로직이 포함되어 있습니다. 테스트는 이 PTransform에 초점을 맞춥니다.

TaxiCountTransform의 출력은 윈도우 단위로 집계된 전체 택시 운행 횟수입니다. 그러나 각 운행은 승차, 하차 등 여러 이벤트로 구성됩니다.

  1. 각 운행을 한 번만 집계하도록 ride_status 속성을 기준으로 필터링합니다. 이를 위해 ride_status가 'pickup'인 요소만 유지합니다.
.apply("FilterForPickups", Filter.<TaxiRide>create().whereFieldName("ride_status", status -> "pickup".equals(status)))

파이프라인에 사용되는 윈도잉 로직을 자세히 살펴보면 다음과 같습니다.

.apply("WindowByMinute", Window.<TaxiRide>into(FixedWindows.of(Duration.standardSeconds(60))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings(AfterProcessingTime.pastFirstElementInPane())) .withAllowedLateness(Duration.standardMinutes(1)) .accumulatingFiredPanes())

60초 길이의 고정 윈도우로 윈도잉합니다. 조기 트리거는 사용하지 않으며 워터마크가 윈도우 종료 시점을 지난 후 결과를 출력합니다. 새 요소가 들어올 때마다 지연 실행을 수행하지만 허용 지연 시간은 1분으로 제한합니다. 마지막으로 허용 지연 시간이 경과할 때까지 윈도우에 상태가 누적됩니다.

작업 2. TestStream 사용 방법 살펴보기 및 첫 번째 테스트 실행

  1. 이제 8b_Stream_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > TaxiStreamingPipelineTest.java를 엽니다.

첫 번째 목표는 테스트 코드에서 TestStream이 어떻게 사용되는지 이해하는 것입니다. TestStream 클래스를 사용하면 메시지의 실시간 스트림을 시뮬레이션하면서 처리 시간과 워터마크 진행을 제어할 수 있습니다.

아래는 66행에서 시작하는 첫 번째 테스트의 코드입니다.

TestStream<String> createEvents = TestStream.create(StringUtf8Coder.of()) .advanceWatermarkTo(startTime) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime), TimestampedValue.of(json.format(json, "enroute"), startTime), TimestampedValue.of(json.format(json, "pickup"), startTime)) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime.plus(Duration.standardMinutes(1)))) .advanceWatermarkTo(startTime.plus(Duration.standardMinutes(1))) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime.plus(Duration.standardMinutes(2)))) .advanceWatermarkToInfinity();

create 메서드를 사용하여 새로운 TestStream을 만들고 동시에 코더도 지정합니다. JSON 메시지를 문자열로 전달하므로 StringUtf8Coder를 사용할 수 있습니다. 위의 TestStream은 어떤 작업을 수행하나요?

TestStream은 다음 작업을 수행합니다.

  • 초기 워터마크를 변수 startTime(Instant(0))으로 설정합니다.
  • startTime의 이벤트 타임스탬프를 사용하여 문자열에 세 가지 요소를 추가합니다. 이러한 이벤트 중 두 개는 집계되지만(ride_status = "pickup") 다른 하나는 집계되지 않습니다.
  • 다른 'pickup' 이벤트를 추가하되, 이벤트 타임스탬프는 startTime에서 1분 후로 설정합니다.
  • 워터마크를 startTime에서 1분 후로 이동하여 첫 번째 윈도우를 트리거합니다.
  • 다른 'pickup' 이벤트를 추가하되, 이벤트 타임스탬프는 startTime에서 2분 후로 설정합니다.
  • 워터마크를 '무한'으로 진행합니다. 이로 인해 모든 윈도우가 닫히고 새로운 데이터는 허용 지연 시간을 초과하게 됩니다.
  1. 첫 번째 테스트의 나머지 코드는 이전 일괄 처리 예시와 유사하지만 이제 Create 변환 대신 TestStream을 사용합니다.
PCollection<Long> outputCount = p .apply(createEvents) .apply(new TaxiCountTransform()); IntervalWindow window1 = new IntervalWindow(startTime, startTime.plus(Duration.standardMinutes(1))); IntervalWindow window2 = new IntervalWindow(startTime.plus(Duration.standardMinutes(1)), startTime.plus(Duration.standardMinutes(2))); IntervalWindow window3 = new IntervalWindow(startTime.plus(Duration.standardMinutes(2)), startTime.plus(Duration.standardMinutes(3))); PAssert.that(outputCount).inWindow(window1).containsInAnyOrder(2L); PAssert.that(outputCount).inWindow(window2).containsInAnyOrder(1L); PAssert.that(outputCount).inWindow(window3).containsInAnyOrder(1L); p.run().waitUntilFinish();

위의 코드에서는 TestStream을 만들고 TaxiCountTransform PTransform을 적용하여 출력 PCollection(outputCount)를 정의합니다. InvervalWindow 클래스를 사용하여 확인하려는 윈도우를 정의한 다음 PAssertinWindow 메서드를 사용하여 윈도우별로 결과를 검증합니다.

  1. 이제 IDE의 터미널로 돌아가거나 새 터미널을 열고 다음 명령어를 실행하여 올바른 디렉터리로 이동하고 종속 항목을 설치합니다.
# Change directory into the lab cd $BASE_DIR/../../8b_Stream_Testing_Pipeline/lab # Download dependencies mvn clean dependency:resolve export BASE_DIR=$(pwd)
  1. 이제 다음 명령어를 실행하여 위의 테스트를 실행합니다.
mvn test

테스트 후 다음과 같은 출력이 표시됩니다(경과 시간은 다를 수 있음).

[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0 [INFO] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 31.629 s [INFO] Finished at: 2021-05-13T12:24:20-04:00 [INFO] ------------------------------------------------------------------------

작업 3. 지연 데이터 처리 테스트를 위한 TestStream 만들기

이 작업에서는 TestStream의 코드를 작성하여 지연 데이터 처리와 관련된 로직을 테스트합니다.

  1. 8b_Stream_Testing_Pipeline/lab/src/test/java/com/mypackage/pipeline/TaxiStreamingPipelineTest.java로 돌아가서 testTaxiRideLateData 메서드가 주석 처리된 위치(104행 부근)까지 아래로 스크롤합니다.

  2. 이 작업의 코드를 완성할 예정이므로 이 테스트의 코드 주석 처리를 삭제합니다.

@Test @Category(NeedsRunner.class) public void testTaxiRideLateData() throws Exception { Instant startTime = new Instant(0); String json = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0," + "\"longitude\":0.0,\"timestamp\":\"00:00:00\",\"meter_reading\":1.0," + "\"meter_increment\":0.1,\"ride_status\":\"%s\",\"passenger_count\":1}"; TestStream<String> createEvents = /* CreateTestStream */ PCollection<Long> outputCount = p .apply(createEvents) .apply(new TaxiCountTransform()); IntervalWindow window1 = new IntervalWindow(startTime, startTime.plus(Duration.standardMinutes(1))); PAssert.that(outputCount).inOnTimePane(window1).containsInAnyOrder(2L); PAssert.that(outputCount).inFinalPane(window1).containsInAnyOrder(3L); p.run().waitUntilFinish(); }

테스트 코드는 TestStream을 만드는 부분을 제외하고는 모두 완성되었습니다.

  1. 이 작업을 완료하려면 다음 작업을 수행하는 TestStream 객체를 만듭니다.
  • 워터마크를 startTime으로 진행합니다.
  • 값이 json.format(json, "pickup")이고 타임스탬프가 startTime인 두 개의 TimestampedValue를 추가합니다.
  • 워터마크를 startTime에서 1분 후로 이동합니다.
  • 값이 json.format(json, "pickup")이고 타임스탬프가 startTime인 또 다른 TimestamedValue를 추가합니다.
  • 워터마크를 startTime에서 2분 후로 이동합니다.
  • 값이 json.format(json, "pickup")이고 타임스탬프가 startTime인 또 다른 TimestamedValue를 추가합니다.
  • 워터마크를 '무한'으로 진행합니다.

이렇게 하면 첫 번째 윈도우에 속하는 4개의 요소가 있는 TestStream이 생성됩니다. 처음 두 요소는 정시, 두 번째 요소는 허용 지연 시간 내 지연, 마지막 요소는 지연 시간이 허용 지연 시간을 초과합니다. 실행한 창을 누적하고 있기 때문에 첫 번째 트리거는 2개의 이벤트를 집계하고 마지막 트리거는 3개의 이벤트를 집계해야 합니다. 네 번째 이벤트는 포함하지 않아야 합니다. PAssert 클래스의 inOnTimePaneinFinalPane 메서드를 사용하여 이를 확인할 수 있습니다.

문제가 발생하면 솔루션을 참고하세요.

작업 4. 지연 데이터 처리 테스트 실행

  • 이제 터미널로 돌아가서 다음 명령어를 실행하여 테스트를 한 번 더 실행합니다.
mvn test

이전 작업을 성공적으로 완료했다면 테스트 완료 후 다음과 같이 터미널에 표시됩니다.

[INFO] Results: [INFO] [INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0 [INFO] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 24.849 s [INFO] Finished at: 2021-05-13T13:10:32-04:00 [INFO] ------------------------------------------------------------------------

내 진행 상황 확인하기를 클릭하여 목표를 확인합니다. TestStream으로 스트림 처리 로직 테스트

실습 종료하기

실습을 완료하면 실습 종료를 클릭합니다. Google Cloud Skills Boost에서 사용된 리소스를 자동으로 삭제하고 계정을 지웁니다.

실습 경험을 평가할 수 있습니다. 해당하는 별표 수를 선택하고 의견을 입력한 후 제출을 클릭합니다.

별점의 의미는 다음과 같습니다.

  • 별표 1개 = 매우 불만족
  • 별표 2개 = 불만족
  • 별표 3개 = 중간
  • 별표 4개 = 만족
  • 별표 5개 = 매우 만족

의견을 제공하고 싶지 않다면 대화상자를 닫으면 됩니다.

의견이나 제안 또는 수정할 사항이 있다면 지원 탭을 사용하세요.

Copyright 2026 Google LLC All rights reserved. Google 및 Google 로고는 Google LLC의 상표입니다. 기타 모든 회사명 및 제품명은 해당 업체의 상표일 수 있습니다.

시작하기 전에

  1. 실습에서는 정해진 기간 동안 Google Cloud 프로젝트와 리소스를 만듭니다.
  2. 실습에는 시간 제한이 있으며 일시중지 기능이 없습니다. 실습을 종료하면 처음부터 다시 시작해야 합니다.
  3. 화면 왼쪽 상단에서 실습 시작을 클릭하여 시작합니다.

시크릿 브라우징 사용

  1. 실습에 입력한 사용자 이름비밀번호를 복사합니다.
  2. 비공개 모드에서 콘솔 열기를 클릭합니다.

콘솔에 로그인

    실습 사용자 인증 정보를 사용하여
  1. 로그인합니다. 다른 사용자 인증 정보를 사용하면 오류가 발생하거나 요금이 부과될 수 있습니다.
  2. 약관에 동의하고 리소스 복구 페이지를 건너뜁니다.
  3. 실습을 완료했거나 다시 시작하려고 하는 경우가 아니면 실습 종료를 클릭하지 마세요. 이 버튼을 클릭하면 작업 내용이 지워지고 프로젝트가 삭제됩니다.

현재 이 콘텐츠를 이용할 수 없습니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

감사합니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

한 번에 실습 1개만 가능

모든 기존 실습을 종료하고 이 실습을 시작할지 확인하세요.

시크릿 브라우징을 사용하여 실습 실행하기

이 실습을 실행하는 가장 좋은 방법은 시크릿 모드 또는 시크릿 브라우저 창을 사용하는 것입니다. 개인 계정과 학생 계정 간의 충돌로 개인 계정에 추가 요금이 발생하는 일을 방지해 줍니다.