실습 설정 안내 및 요구사항
계정과 진행 상황을 보호하세요. 이 실습을 실행하려면 항상 시크릿 브라우저 창과 실습 사용자 인증 정보를 사용하세요.

Dataflow를 사용한 서버리스 데이터 처리 - Apache Beam을 활용한 테스트(Python)

실습 2시간 universal_currency_alt 크레딧 5개 show_chart 고급
info 이 실습에는 학습을 지원하는 AI 도구가 통합되어 있을 수 있습니다.
이 콘텐츠는 아직 휴대기기에 최적화되지 않음
최상의 경험을 위해 데스크톱 컴퓨터에서 이메일로 전송된 링크를 사용하여 방문하세요.

개요

이 실습에서 학습할 내용은 다음과 같습니다.

  • Apache Beam의 테스트 도구를 사용하여 DoFnPTransform에 대한 단위 테스트를 작성합니다.
  • 파이프라인 통합 테스트를 수행합니다.
  • TestStream 클래스를 사용하여 스트리밍 파이프라인의 윈도잉 동작을 테스트합니다.

파이프라인 테스트는 효과적인 데이터 처리 솔루션을 개발하는 데 있어 특히 중요한 단계입니다. Beam 모델의 간접적 특성으로 인해, 실행이 실패했을 때 디버깅 작업이 복잡해질 수 있습니다.

이 실습에서는 DirectRunner를 사용하여 Beam SDK의 testing 패키지에 포함된 도구로 로컬 단위 테스트를 수행하는 방법을 살펴봅니다.

설정 및 요건

실습 시작 버튼을 클릭하기 전에

참고: 다음 안내를 확인하세요.

실습에는 시간제한이 있으며 일시중지할 수 없습니다. 실습 시작을 클릭하면 타이머가 시작됩니다. 이 타이머는 Google Cloud 리소스를 사용할 수 있는 시간이 얼마나 남았는지를 표시합니다.

Google Skills 실무형 실습을 통해 시뮬레이션이나 데모 환경이 아닌 실제 클라우드 환경에서 직접 실습 활동을 진행할 수 있습니다. 실습 시간 동안 Google Cloud에 로그인하고 액세스하는 데 사용할 수 있는 새로운 임시 사용자 인증 정보가 제공됩니다.

필요한 사항

이 실습을 완료하려면 다음이 필요합니다.

  • 표준 인터넷 브라우저(Chrome 브라우저 권장)
  • 실습을 끝까지 진행할 수 있는 충분한 시간
참고: 이미 개인용 Google Cloud 계정이나 프로젝트가 있어도 이 실습에서는 사용하지 마세요. 참고: Pixelbook을 사용하는 경우 시크릿 창을 열어 이 실습을 실행하세요.

실습을 시작하고 콘솔에 로그인하는 방법

  1. 실습 시작 버튼을 클릭합니다. 실습 비용을 결제해야 하는 경우 결제 수단을 선택할 수 있는 팝업이 열립니다. 왼쪽에 있는 패널에서 이 실습에 사용해야 하는 임시 사용자 인증 정보를 확인할 수 있습니다.

    사용자 인증 정보 패널

  2. 사용자 이름을 복사한 다음 Google 콘솔 열기를 클릭합니다. 실습에서 리소스가 실행되며 계정 선택 페이지를 표시하는 다른 탭이 열립니다.

    참고: 두 개의 탭을 각각 별도의 창으로 나란히 여세요.
  3. 계정 선택 페이지에서 다른 계정 사용을 클릭합니다. 로그인 페이지가 열립니다.

    다른 계정 사용 옵션이 강조 표시된 계정 대화상자를 선택합니다.

  4. 연결 세부정보 패널에서 복사한 사용자 이름을 붙여넣습니다. 그런 다음 비밀번호를 복사하여 붙여넣습니다.

참고: 연결 세부정보 패널에 표시된 사용자 인증 정보를 사용해야 합니다. Google Skills 사용자 인증 정보를 사용하지 마세요. 개인용 Google Cloud 계정이 있어도 이 실습에서는 사용하지 마세요(요금 청구 방지).
  1. 이후에 표시되는 페이지를 클릭하여 넘깁니다.
  • 이용약관에 동의하세요.
  • 임시 계정이므로 복구 옵션이나 2단계 인증을 추가하지 마세요.
  • 무료 평가판을 신청하지 않습니다.

잠시 후 Cloud 콘솔이 이 탭에서 열립니다.

참고: 왼쪽 상단에 있는 탐색 메뉴를 클릭하면 Google Cloud 제품 및 서비스 목록이 있는 메뉴를 볼 수 있습니다. Cloud 콘솔 메뉴

이 실습에서는 인스턴스 노트북의 터미널에서 모든 명령어를 실행합니다.

  1. Google Cloud 콘솔의 탐색 메뉴(탐색 메뉴)에서 Vertex AI를 선택합니다.

  2. 모든 권장 API 사용 설정을 클릭합니다.

  3. 탐색 메뉴에서 Workbench를 클릭합니다.

    Workbench 페이지 상단에서 인스턴스 뷰에 있는지 확인합니다.

  4. 상자 추가새로 만들기를 클릭합니다.

  5. 인스턴스를 구성합니다.

    • 이름: lab-workbench
    • 리전: 리전을 (으)로 설정합니다.
    • 영역: 영역을 (으)로 설정합니다.
    • 고급 옵션(선택사항): 필요한 경우 '고급 옵션'을 클릭하여 추가로 맞춤설정(예: 머신 유형, 디스크 크기)할 수 있습니다.

Vertex AI Workbench 인스턴스 생성

  1. 만들기를 클릭합니다.

인스턴스를 만드는 데 몇 분 정도 걸립니다. 준비되면 이름 옆에 녹색 체크표시가 나타납니다.

  1. 인스턴스 이름 옆에 있는 JupyterLab 열기를 클릭하여 JupyterLab 인터페이스를 실행합니다. 그러면 브라우저에서 새 탭이 열립니다.

Workbench 인스턴스 배포됨

  1. 그런 다음 터미널을 클릭합니다. 이 실습의 모든 명령어를 실행할 수 있는 터미널이 열립니다.

코드 저장소 다운로드

다음으로 이 실습에서 사용할 코드 저장소를 다운로드합니다.

  1. 방금 연 터미널에서 다음을 입력합니다.
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. 노트북 환경의 왼쪽 패널에 있는 파일 브라우저에 training-data-analyst 저장소가 추가됩니다.

  2. 클론된 저장소인 /training-data-analyst/quests/dataflow_python/으로 이동합니다. 각 실습의 폴더가 표시됩니다. 이 폴더는 사용자가 완료해야 하는 코드가 포함된 lab 하위 폴더와, 문제 발생 시 참고할 수 있는 정상 작동 예시가 포함된 solution 하위 폴더로 나뉩니다.

확장된 보기 메뉴에서 강조 표시된 탐색기 옵션

참고: 수정할 파일을 열려면 파일로 이동하여 클릭하기만 하면 됩니다. 클릭하면 파일이 열리며, 여기에서 코드를 추가하거나 수정할 수 있습니다.

실습 코드는 8a_Batch_Testing_Pipeline/lab8b_Stream_Testing_Pipeline/lab의 두 폴더로 나뉩니다. 어떤 단계에서든 문제가 발생하면 해당하는 solution 폴더에서 해결 방법을 찾을 수 있습니다.

내 진행 상황 확인하기를 클릭하여 목표를 확인합니다. 노트북 인스턴스 만들기 및 과정 저장소 클론하기

실습 1부: DoFn 및 PTransform에 대한 단위 테스트 수행

작업 1. 환경 준비

이 단계에서는 날씨 센서에서 통계를 계산하는 일괄 파이프라인의 DoFn 및 PTransform에 대한 단위 테스트를 수행합니다. 생성한 변환을 테스트하려면 다음 패턴과 Beam에서 제공하는 변환을 사용할 수 있습니다.

  • TestPipeline을 만듭니다.
  • 테스트 입력 데이터를 만들고 Create 변환를 사용하여 입력 데이터의 PCollection을 만듭니다.
  • 입력 PCollection에 변환을 적용하고 생성된 PCollection을 저장합니다.
  • testing.util 모듈의 assert_that 메서드와 다른 메서드를 사용하여 출력 PCollection에 예상한 요소가 포함되어 있는지 확인합니다.

TestPipeline은 Beam SDK에 포함된 특수 클래스로, 변환 및 파이프라인 로직을 테스트할 수 있도록 제공됩니다. 테스트 시 파이프라인 객체를 만들 때 Pipeline 대신 TestPipeline을 사용합니다. Create 변환은 객체의 인메모리 컬렉션(Java Iterable)을 가져와 이 컬렉션에서 PCollection을 만듭니다. 목표는 PTransform에서 나올 예상 출력 PCollection을 알 수 있는 소량의 테스트 입력 데이터 세트를 갖는 것입니다.

with TestPipeline() as p: INPUTS = [fake_input_1, fake_input_2] test_output = p | beam.Create(INPUTS) | # Transforms to be tested

마지막으로 출력 PCollection이 예상 출력과 일치하는지 확인합니다. 이를 확인하기 위해 assert_that 메서드를 사용합니다. 예를 들어 equal_to 메서드를 사용하여 출력 PCollection이 올바른 요소를 포함하는지 확인할 수 있습니다.

assert_that(test_output, equal_to(EXPECTED_OUTPUTS))

이전 실습과 마찬가지로 첫 번째 단계는 파이프라인에서 처리할 데이터를 생성하는 것입니다. 이전과 동일하게 실습 환경을 열고 데이터를 생성합니다.

해당 실습 열기

  • IDE의 터미널에서 다음 명령어를 실행하여 이 실습에 사용할 디렉터리로 변경합니다.
# Change directory into the lab cd 8a_Batch_Testing_Pipeline/lab export BASE_DIR=$(pwd)

가상 환경 및 종속 항목 설정

실제 파이프라인 코드를 편집하기 전에 필요한 종속 항목이 설치되어 있는지 확인해야 합니다.

  1. 다음 명령어를 실행하여 이 실습에서 사용할 가상 환경을 만듭니다.
sudo apt-get install -y python3-venv # Create and activate virtual environment python3 -m venv df-env source df-env/bin/activate
  1. 다음으로 파이프라인을 실행하는 데 필요한 패키지를 설치합니다.
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Dataflow API가 사용 설정되어 있는지 확인합니다.
gcloud services enable dataflow.googleapis.com
  1. 마지막으로 스토리지 버킷을 만듭니다.
export PROJECT_ID=$(gcloud config get-value project) gcloud storage buckets create gs://$PROJECT_ID --location=US

내 진행 상황 확인하기를 클릭하여 목표를 확인합니다. 환경 준비

작업 2. 기본 파이프라인 코드 살펴보기

  1. 파일 탐색기에서 8a_Batch_Testing_Pipeline/lab으로 이동합니다. 이 디렉터리에는 기본 파이프라인 코드가 포함된 weather_statistics_pipeline.py와 테스트 코드가 포함된 weather_statistics_pipeline_test.py, 두 개의 파일이 있습니다.

  2. 먼저 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline.py를 엽니다.

단위 테스트를 진행하기 전에 파이프라인 코드를 간략하게 살펴보겠습니다. 먼저 6행에서 시작하는 WeatherRecord 클래스를 살펴봅니다. 이 클래스는 typing.NamedTuple의 서브클래스이므로 해당 클래스의 객체를 다룰 때 스키마 인식 변환을 사용할 수 있습니다. 또한 나중에 진행할 테스트를 위해 이 클래스의 객체로 구성된 인메모리 컬렉션을 정의합니다.

class WeatherRecord(typing.NamedTuple): loc_id: str lat: float lng: float date: str low_temp: float high_temp: float precip: float
  1. 이제 17행으로 스크롤하여 DoFnPTransform 정의를 시작하는 위치로 이동합니다.

이 파이프라인의 개념은 이전 실습에서 대부분 다루었지만 다음 부분은 주의 깊게 살펴보세요.

  • DoFn ConvertCsvToWeatherRecord(17행에서 시작) 및 ConvertTempUnits(27행에서 시작) 나중에 이 DoFn에 대한 단위 테스트를 수행하게 됩니다.
  • PTransform ComputeStatistics(41행에서 시작) DoFn과 동일한 방식으로 테스트할 수 있는 복합 변환의 예입니다.
  • PTransform WeatherStatsTransform(55행에서 시작) 이 PTransform에는 전체 파이프라인의 처리 로직(소스 및 싱크 변환 제외)이 포함되어 있으므로 Create 변환으로 생성된 합성 데이터를 대상으로 작은 규모의 파이프라인 통합 테스트를 수행할 수 있습니다.
참고: 처리 코드에서 논리적 오류를 발견하더라도 아직 수정하지 마세요. 나중에 테스트를 통해 오류를 정확히 찾아내는 방법을 살펴보겠습니다.

작업 3. 테스트용 종속 항목 추가

  1. 이제 파일 탐색기에서 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py를 엽니다.

테스트를 위해 몇 가지 종속 항목을 추가해야 합니다. Apache Beam에 포함된 테스트 유틸리티와 Python의 unittest 패키지를 활용합니다.

  1. 이 작업을 완료하려면 weather_statistics_pipeline_test.py의 상단에서 지시된 위치에 다음 Import 문을 추가합니다.
from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import BeamAssertException from apache_beam.testing.util import assert_that, equal_to

문제가 발생하면 솔루션을 참고하세요.

작업 4. Apache Beam에서 첫 번째 DoFn 단위 테스트 작성

8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py 파일에는 DoFnPTransform 단위 테스트를 위한 코드가 포함되어 있습니다. 현재 코드는 대부분 주석 처리되어 있으며 실습을 진행하면서 주석을 해제할 예정입니다.

Beam 코드를 살펴보기 전에 테스트 실행 및 테스트 출력 텍스트 파일 작성 관리를 위해 커스텀 main 메서드를 정의해 두었다는 점을 참고하세요. 이렇게 하면 현재 터미널 세션이 종료된 후에도 확인할 수 있는 테스트 기록이 남습니다. 예를 들어 logging 모듈을 사용하여 이 작업을 관리할 수도 있습니다.

def main(out = sys.stderr, verbosity = 2): loader = unittest.TestLoader() suite = loader.loadTestsFromModule(sys.modules[__name__]) unittest.TextTestRunner(out, verbosity = verbosity).run(suite) # Testing code omitted if __name__ == '__main__': with open('testing.out', 'w') as f: main(f)
  1. 이제 43행에서 시작하는 ConvertCsvToWeatherRecord DoFnDoFn 단위 테스트를 살펴보겠습니다. 먼저 파이프라인을 테스트하기 위한 클래스를 만들고 TestPipeline 객체를 만듭니다.
class ConvertToWeatherRecordTest(unittest.TestCase): def test_convert_to_csv(self): with TestPipeline() as p: ...
  1. 이제 첫 번째 테스트의 (불완전한) 코드를 살펴보겠습니다.
LINES = ['x,0.0,0.0,2/2/2021,1.0,2.0,0.1'] EXPECTED_OUTPUT = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 1.0, 2.0, 0.1)] input_lines = p | # TASK 4: Create PCollection from LINES output = input_lines | beam.ParDo(ConvertCsvToWeatherRecord()) # TASK 4: Write assert_that statement

CSV 파일의 한 행(파이프라인의 예상 입력 형식)에 해당하는 단일 테스트 입력(LINES)을 만들어 목록에 넣습니다. 또한 예상 출력(EXPECTED_OUTPUT)을 WeatherRecords 목록으로 정의합니다.

현재 테스트 코드에는 아직 채워야 할 부분이 남아 있습니다.

  1. 이 작업을 완료하려면 먼저 Create 변환을 추가하여 LINESPCollection으로 변환합니다.

  2. 두 번째로 equal_to 메서드를 사용하는 assert_that 문을 포함하여 outputEXPECTED_OUTPUT을 비교합니다.

문제가 발생하면 이후에 주석 처리된 테스트나 해결책을 참고하세요.

작업 5. 첫 번째 DoFn 단위 테스트 실행

  • 터미널로 돌아가서 다음 명령어를 실행합니다.
python3 weather_statistics_pipeline_test.py cat testing.out

테스트 출력은 testing.out 파일에 기록됩니다. 터미널에서 cat testing.out을 실행하여 이 파일의 내용을 볼 수 있습니다. 이전 작업이 정상적으로 수행되었다면 testing.out 파일의 내용은 다음과 같아야 합니다(경과 시간은 다를 수 있음).

test_convert_to_csv (__main__.ConvertToWeatherRecordTest) ... ok ---------------------------------------------------------------------- Ran 1 test in 0.918s OK 참고: 위 테스트는 'python3 -m unittest test_script.py' 명령어를 사용하여 실행할 수도 있습니다. 여기서는 python 스크립트를 직접 실행하여 기본 메서드에 액세스했지만, 실제로 여기에 언급된 방식이 더 일반적으로 사용됩니다.

작업 6. 두 번째 DoFn 단위 테스트 실행 및 파이프라인 디버그

  1. 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py로 돌아가서 두 번째 단위 테스트의 코드 (약 33~50행)에서 주석 처리를 삭제합니다. 코드를 강조 표시하고 Ctrl + /(MacOS에서는 Cmd + /)를 눌러 주석 처리를 삭제하세요. 아래 코드를 참고하세요.
class ConvertTempUnitsTest(unittest.TestCase): def test_convert_temp_units(self): with TestPipeline() as p: RECORDS = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 1.0, 2.0, 0.1), WeatherRecord('y', 0.0, 0.0, '2/2/2021', -3.0, -1.0, 0.3)] EXPECTED_RECORDS = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 33.8, 35.6, 0.1), WeatherRecord('y', 0.0, 0.0, '2/2/2021', 26.6, 30.2, 0.3)] input_records = p | beam.Create(RECORDS) output = input_records | beam.ParDo(ConvertTempUnits()) assert_that(output, equal_to(EXPECTED_RECORDS))

이 테스트를 통해 ConvertTempUnits() DoFn이 정상적으로 작동하는지 확인합니다. weather_statistics_pipeline_test.py를 저장하고 터미널로 돌아갑니다.

  1. 다음 명령어를 실행하여 테스트를 실행하고 출력을 확인합니다.
rm testing.out python3 weather_statistics_pipeline_test.py cat testing.out

테스트에 실패했습니다. 출력을 스크롤하면 테스트 실패에 관한 다음 정보를 확인할 수 있습니다.

test_compute_statistics (__main__.ComputeStatsTest) ... ok test_convert_temp_units (__main__.ConvertTempUnitsTest) ... ERROR ... apache_beam.testing.util.BeamAssertException: Failed assert: [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=33.8, high_temp=35.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=26.6, high_temp=30.2, precip=0.3)] == [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=32.8, high_temp=34.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=25.6, high_temp=29.2, precip=0.3)], unexpected elements [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=32.8, high_temp=34.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=25.6, high_temp=29.2, precip=0.3)], missing elements [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=33.8, high_temp=35.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=26.6, high_temp=30.2, precip=0.3)] [while running 'assert_that/Match']

BeamAssertException을 더 자세히 살펴보면 low_temphigh_temp 값이 잘못된 것을 알 수 있습니다. ConvertTempUnits DoFn의 처리 로직에 문제가 있습니다.

  1. 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline.py로 돌아가서 ConvertTempUnits 정의(약 32행)로 스크롤합니다. 이 작업을 완료하려면 DoFn 처리 로직에서 오류를 찾고 다음 명령어를 다시 실행하여 테스트가 성공하는지 확인합니다.
rm testing.out python3 weather_statistics_pipeline_test.py cat testing.out

참고로 섭씨를 화씨로 변환하는 공식은 다음과 같습니다.

temp_f = temp_c * 1.8 + 32.0

문제가 발생하면 솔루션을 참고하세요.

작업 7. PTransform 단위 테스트 실행 및 엔드 투 엔드 파이프라인 테스트

  1. 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py로 돌아가서 마지막 두 테스트(약 53행에서 시작) 코드에서 주석 처리를 삭제합니다.

방금 주석 처리를 삭제한 첫 번째 테스트는 복합 PTransform ComputeStatistics를 테스트합니다. 아래에서 그중 일부 코드를 참고하세요.

def test_compute_statistics(self): with TestPipeline() as p: INPUT_RECORDS = # Test input omitted here EXPECTED_STATS = # Expected output omitted here inputs = p | beam.Create(INPUT_RECORDS) output = inputs | ComputeStatistics() assert_that(output, equal_to(EXPECTED_STATS))

이 코드는 앞서 살펴본 DoFn 단위 테스트와 매우 유사합니다. 테스트 입력과 출력을 제외한 실질적인 유일한 차이점은 beam.ParDo(DoFn()) 대신 PTransform을 적용한다는 점입니다.

마지막 테스트는 엔드 투 엔드 파이프라인을 대상으로 합니다. 파이프라인 코드(weather_statistics_pipeline.py)에서 전체 엔드 투 엔드 파이프라인은 소스와 싱크를 제외하고 단일한 PTransform WeatherStatsTransform에 포함되었습니다. 엔드 투 엔드 파이프라인을 테스트하기 위해 위와 비슷한 방식으로 진행하되 대신 PTransform을 사용합니다.

  1. 이제 터미널로 돌아가서 다음 명령어를 실행하여 테스트를 한 번 더 실행합니다.
rm testing.out python3 weather_statistics_pipeline_test.py cat testing.out

이전 작업을 성공적으로 완료했다면 테스트 완료 후 다음과 같이 터미널에 표시됩니다.

test_compute_statistics (__main__.ComputeStatsTest) ... ok test_convert_temp_units (__main__.ConvertTempUnitsTest) ... ok test_convert_to_csv (__main__.ConvertToWeatherRecordTest) ... ok test_weather_stats_transform (__main__.WeatherStatsTransformTest) ... ok ---------------------------------------------------------------------- Ran 4 tests in 2.295s OK
  1. 이제 testing.out 파일을 스토리지 버킷에 복사합니다.
export PROJECT_ID=$(gcloud config get-value project) gcloud storage cp testing.out gs://$PROJECT_ID/8a_Batch_Testing_Pipeline/

내 진행 상황 확인하기를 클릭하여 목표를 확인합니다. DoFn 및 PTransform에 대한 단위 테스트 수행

실습 2부: TestStream으로 스트림 처리 로직 테스트

이 단계에서는 택시 운행 횟수의 윈도잉 계산을 위한 스트리밍 파이프라인의 단위 테스트를 수행합니다. 작성한 변환을 테스트하려면 다음 패턴과 Beam에서 제공하는 변환을 사용할 수 있습니다.

  • TestPipeline을 만듭니다.
  • TestStream 클래스를 사용하여 스트리밍 데이터를 생성합니다. 여기에는 일련의 이벤트 생성, 워터마크 진행, 처리 시간 진행이 포함됩니다.
  • testing.util 모듈의 assert_that 메서드와 다른 메서드를 사용하여 출력 PCollection에 예상한 요소가 포함되어 있는지 확인합니다.

TestStream에서 이벤트를 읽는 파이프라인을 실행할 때 읽기 작업은 각 이벤트의 모든 결과가 완료된 후에 다음 이벤트로 이동합니다. 이 과정에는 처리 시간 진행, 적절한 트리거 실행이 포함됩니다. TestStream을 사용하면 트리거 및 허용된 지연의 영향을 파이프라인에서 관찰하고 테스트할 수 있습니다. 여기에는 지연된 트리거와 지연으로 인해 처리되지 않은 데이터에 관한 로직이 포함됩니다.

작업 1. 기본 파이프라인 코드 살펴보기

  1. 파일 탐색기에서 8b_Stream_Testing_Pipeline/lab으로 이동합니다.

이 디렉터리에는 기본 파이프라인 코드가 포함된 taxi_streaming_pipeline.py와 테스트 코드가 포함된 taxi_streaming_pipeline_test.py, 두 개의 파일이 있습니다.

  1. 먼저 8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline.py를 엽니다.

단위 테스트를 진행하기 전에 파이프라인 코드를 간략하게 살펴보겠습니다. 먼저 6행에서 시작하는 TaxiRide 클래스를 살펴봅니다. 이 클래스는 typing.NamedTuple의 서브클래스이므로 해당 클래스의 객체를 다룰 때 스키마 인식 변환을 사용할 수 있습니다. 또한 나중에 진행할 테스트를 위해 이 클래스의 객체로 구성된 인메모리 컬렉션을 정의합니다.

class TaxiRide(typing.NamedTuple): ride_id: str point_idx: int latitude: float longitude: float timestamp: str meter_reading: float meter_increment: float ride_status: str passenger_count: int

다음은 파이프라인의 기본 코드입니다. 이 파이프라인의 개념은 이전 실습에서 대부분 다루었지만 다음 부분은 주의 깊게 살펴보세요.

  • 22행에서 시작하는 DoFn JsonToTaxiRide는 수신된 Pub/Sub 메시지를 TaxiRide 클래스의 객체로 변환합니다.
  • 36행에서 시작하는 PTransform TaxiCountTransform입니다. 이 PTransform에는 파이프라인의 주요 계산 및 윈도잉 로직이 포함되어 있습니다. 테스트는 이 PTransform에 초점을 맞춥니다.

TaxiCountTransform의 출력은 윈도우 단위로 집계된 전체 택시 운행 횟수입니다. 그러나 각 운행은 승차, 하차 등 여러 이벤트로 구성됩니다. 각 운행을 한 번만 집계하도록 ride_status 속성을 기준으로 필터링합니다. 이를 위해 ride_status가 'pickup'인 요소만 유지합니다.

... | "FilterForPickups" >> beam.Filter(lambda x : x.ride_status == 'pickup')

파이프라인에 사용되는 윈도잉 로직을 자세히 살펴보면 다음과 같습니다.

... | "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(60), trigger=AfterWatermark(late=AfterCount(1)), allowed_lateness=60, accumulation_mode=AccumulationMode.ACCUMULATING)

60초 길이의 고정 윈도우로 윈도잉합니다. 조기 트리거는 사용하지 않으며 워터마크가 윈도우 종료 시점을 지난 후 결과를 출력합니다. 새 요소가 들어올 때마다 지연 실행을 수행하지만 허용 지연 시간은 60초로 제한합니다. 마지막으로 허용 지연 시간이 경과할 때까지 윈도우에 상태가 누적됩니다.

작업 2. TestStream 사용 방법 살펴보기 및 첫 번째 테스트 실행

  1. 8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline_test.py를 엽니다.

첫 번째 목표는 테스트 코드에서 TestStream이 어떻게 사용되는지 이해하는 것입니다. TestStream 클래스를 사용하면 메시지의 실시간 스트림을 시뮬레이션하면서 처리 시간과 워터마크 진행을 제어할 수 있습니다. 아래는 66행에서 시작하는 첫 번째 테스트의 코드입니다.

test_stream = TestStream().advance_watermark_to(0).add_elements([ TimestampedValue(base_json_pickup, 0), TimestampedValue(base_json_pickup, 0), TimestampedValue(base_json_enroute, 0), TimestampedValue(base_json_pickup, 60) ]).advance_watermark_to(60).advance_processing_time(60).add_elements([ TimestampedValue(base_json_pickup, 120) ]).advance_watermark_to_infinity()
  1. 새로운 TestStream 객체를 만든 다음 JSON 메시지를 문자열로 전달합니다. 문자열 이름은 운행 상태에 따라 base_json_pickup 또는 base_json_enroute를 사용합니다. 위의 TestStream은 어떤 작업을 수행하나요?

TestStream은 다음 작업을 수행합니다.

  • 초기 워터마크를 시간 0으로 설정합니다(모든 타임스탬프는 초 단위).
  • add_elements 메서드를 통해 이벤트 타임스탬프가 0인 요소 세 개를 스트림에 추가합니다. 이 이벤트 중 두 개는 집계되지만(ride_status = "pickup") 다른 하나는 집계되지 않습니다.
  • 다른 'pickup' 이벤트를 추가하고 이벤트 타임스탬프는 60으로 설정합니다.
  • 워터마크와 처리 시간을 60으로 진행하여 첫 번째 윈도우를 트리거합니다.
  • 다른 'pickup' 이벤트를 추가하고 이벤트 타임스탬프는 120으로 설정합니다.
  • 워터마크를 '무한'으로 진행합니다. 이로 인해 모든 윈도우가 닫히고 새로운 데이터는 허용 지연 시간을 초과하게 됩니다.
  1. 첫 번째 테스트의 나머지 코드는 이전 일괄 처리 예시와 유사하지만 이제 Create 변환 대신 TestStream을 사용합니다.
taxi_counts = (p | test_stream | TaxiCountTransform() ) EXPECTED_WINDOW_COUNTS = {IntervalWindow(0,60): [3], IntervalWindow(60,120): [1], IntervalWindow(120,180): [1]} assert_that(taxi_counts, equal_to_per_window(EXPECTED_WINDOW_COUNTS), reify_windows=True)

위의 코드에서 TestStream을 만들고 TaxiCountTransform PTransform을 적용하여 출력 PCollection(taxi_counts)를 정의합니다. InvervalWindow 클래스를 사용하여 확인하려는 윈도우를 정의한 다음 assert_thatequal_to_per_window 메서드를 사용하여 윈도우별 결과를 검증합니다.

  1. 파일을 저장하고 터미널로 돌아가서 다음 명령어를 실행하여 올바른 디렉터리로 이동합니다.
# Change directory into the lab cd $BASE_DIR/../../8b_Stream_Testing_Pipeline/lab export BASE_DIR=$(pwd)
  1. 이제 위의 테스트를 실행하고 다음 명령어를 실행하여 출력을 확인합니다.
python3 taxi_streaming_pipeline_test.py cat testing.out

테스트 후 다음과 같은 출력이 표시됩니다(경과 시간은 다를 수 있음).

test_windowing_behavior (__main__.TaxiWindowingTest) ... ok ---------------------------------------------------------------------- Ran 1 test in 1.113s OK

작업 3. 지연 데이터 처리 테스트를 위한 TestStream 만들기

이 작업에서는 TestStream의 코드를 작성하여 지연 데이터 처리와 관련된 로직을 테스트합니다.

  1. 8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline_test.py로 돌아가서 test_late_data_behavior 메서드가 주석 처리된 위치(약 60행)로 스크롤합니다. 이 작업의 코드를 완성할 예정이므로 이 테스트의 코드 주석 처리를 삭제합니다.
class TaxiLateDataTest(unittest.TestCase): def test_late_data_behavior(self): options = PipelineOptions() options.view_as(StandardOptions).streaming = True with TestPipeline(options=options) as p: base_json_pickup = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0,\"longitude\":0.0," \ "\"timestamp\":\"00:00:00\",\"meter_reading\":1.0,\"meter_increment\":0.1," \ "\"ride_status\":\"pickup\",\"passenger_count\":1}" test_stream = # TASK 3: Create TestStream Object EXPECTED_RESULTS = {IntervalWindow(0,60): [2,3]} #On Time and Late Result taxi_counts = (p | test_stream | TaxiCountTransform() ) assert_that(taxi_counts, equal_to(EXPECTED_RESULTS))

EXPECTED_RESULTS에는 IntervalWindow(0,60)에 대한 두 개의 결과가 포함되어 있습니다. 이는 이 윈도우의 정시 트리거와 지연 트리거에서 발생한 결과를 나타냅니다.

테스트 코드는 TestStream을 만드는 부분을 제외하고는 모두 완성되었습니다.

  1. 이 작업을 완료하려면 다음 작업을 수행하는 TestStream 객체를 만듭니다.

    1. 워터마크를 0으로 진행합니다(모든 타임스탬프는 초 단위).
    2. base_json_pickup 값을 가지며 타임스탬프가 0인 두 개의 TimestampedValues를 추가합니다.
    3. 워터마크 및 처리 시간을 60으로 진행합니다.
    4. base_json_pickup 값을 가지며 타임스탬프가 0인 또 다른 TimestampedValue를 추가합니다.
    5. 워터마크 및 처리 시간을 300으로 진행합니다.
    6. base_json_pickup 값을 가지며 타임스탬프가 0인 또 다른 TimestampedValue를 추가합니다.
    7. 워터마크를 '무한'으로 진행합니다.

이렇게 하면 첫 번째 윈도우에 속하는 4개의 요소가 있는 TestStream이 생성됩니다. 처음 두 요소는 정시, 두 번째 요소는 허용 지연 시간 내 지연, 마지막 요소는 지연 시간이 허용 지연 시간을 초과합니다. 실행한 창을 누적하고 있기 때문에 첫 번째 트리거는 2개의 이벤트를 집계하고 마지막 트리거는 3개의 이벤트를 집계해야 합니다. 네 번째 이벤트는 포함하지 않아야 합니다.

문제가 발생하면 솔루션을 참고하세요.

작업 4. 지연 데이터 처리 테스트 실행

  1. 이제 터미널로 돌아가서 다음 명령어를 실행하여 테스트를 한 번 더 실행합니다.
rm testing.out python3 taxi_streaming_pipeline_test.py cat testing.out

이전 작업을 성공적으로 완료했다면 테스트 완료 후 다음과 같이 터미널에 표시됩니다.

test_late_data_behavior (__main__.TaxiLateDataTest) ... ok test_windowing_behavior (__main__.TaxiWindowingTest) ... ok ---------------------------------------------------------------------- Ran 2 tests in 2.225s OK
  1. 이제 testing.out 파일을 스토리지 버킷에 복사합니다.
export PROJECT_ID=$(gcloud config get-value project) gcloud storage cp testing.out gs://$PROJECT_ID/8b_Stream_Testing_Pipeline/

내 진행 상황 확인하기를 클릭하여 목표를 확인합니다. TestStream으로 스트림 처리 로직 테스트

실습 종료하기

실습을 완료하면 실습 종료를 클릭합니다. Google Cloud Skills Boost에서 사용된 리소스를 자동으로 삭제하고 계정을 지웁니다.

실습 경험을 평가할 수 있습니다. 해당하는 별표 수를 선택하고 의견을 입력한 후 제출을 클릭합니다.

별점의 의미는 다음과 같습니다.

  • 별표 1개 = 매우 불만족
  • 별표 2개 = 불만족
  • 별표 3개 = 중간
  • 별표 4개 = 만족
  • 별표 5개 = 매우 만족

의견을 제공하고 싶지 않다면 대화상자를 닫으면 됩니다.

의견이나 제안 또는 수정할 사항이 있다면 지원 탭을 사용하세요.

Copyright 2026 Google LLC All rights reserved. Google 및 Google 로고는 Google LLC의 상표입니다. 기타 모든 회사명 및 제품명은 해당 업체의 상표일 수 있습니다.

시작하기 전에

  1. 실습에서는 정해진 기간 동안 Google Cloud 프로젝트와 리소스를 만듭니다.
  2. 실습에는 시간 제한이 있으며 일시중지 기능이 없습니다. 실습을 종료하면 처음부터 다시 시작해야 합니다.
  3. 화면 왼쪽 상단에서 실습 시작을 클릭하여 시작합니다.

시크릿 브라우징 사용

  1. 실습에 입력한 사용자 이름비밀번호를 복사합니다.
  2. 비공개 모드에서 콘솔 열기를 클릭합니다.

콘솔에 로그인

    실습 사용자 인증 정보를 사용하여
  1. 로그인합니다. 다른 사용자 인증 정보를 사용하면 오류가 발생하거나 요금이 부과될 수 있습니다.
  2. 약관에 동의하고 리소스 복구 페이지를 건너뜁니다.
  3. 실습을 완료했거나 다시 시작하려고 하는 경우가 아니면 실습 종료를 클릭하지 마세요. 이 버튼을 클릭하면 작업 내용이 지워지고 프로젝트가 삭제됩니다.

현재 이 콘텐츠를 이용할 수 없습니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

감사합니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

한 번에 실습 1개만 가능

모든 기존 실습을 종료하고 이 실습을 시작할지 확인하세요.

시크릿 브라우징을 사용하여 실습 실행하기

이 실습을 실행하는 가장 좋은 방법은 시크릿 모드 또는 시크릿 브라우저 창을 사용하는 것입니다. 개인 계정과 학생 계정 간의 충돌로 개인 계정에 추가 요금이 발생하는 일을 방지해 줍니다.