개요
이 실습에서 학습할 내용은 다음과 같습니다.
- Apache Beam의 테스트 도구를 사용하여
DoFn 및 PTransform에 대한 단위 테스트를 작성합니다.
- 파이프라인 통합 테스트를 수행합니다.
-
TestStream 클래스를 사용하여 스트리밍 파이프라인의 윈도잉 동작을 테스트합니다.
파이프라인 테스트는 효과적인 데이터 처리 솔루션을 개발하는 데 있어 특히 중요한 단계입니다. Beam 모델의 간접적 특성으로 인해, 실행이 실패했을 때 디버깅 작업이 복잡해질 수 있습니다.
이 실습에서는 DirectRunner를 사용하여 Beam SDK의 testing 패키지에 포함된 도구로 로컬 단위 테스트를 수행하는 방법을 살펴봅니다.
설정 및 요건
실습 시작 버튼을 클릭하기 전에
참고: 다음 안내를 확인하세요.
실습에는 시간제한이 있으며 일시중지할 수 없습니다. 실습 시작을 클릭하면 타이머가 시작됩니다. 이 타이머는 Google Cloud 리소스를 사용할 수 있는 시간이 얼마나 남았는지를 표시합니다.
Google Skills 실무형 실습을 통해 시뮬레이션이나 데모 환경이 아닌 실제 클라우드 환경에서 직접 실습 활동을 진행할 수 있습니다. 실습 시간 동안 Google Cloud에 로그인하고 액세스하는 데 사용할 수 있는 새로운 임시 사용자 인증 정보가 제공됩니다.
필요한 사항
이 실습을 완료하려면 다음이 필요합니다.
- 표준 인터넷 브라우저(Chrome 브라우저 권장)
- 실습을 끝까지 진행할 수 있는 충분한 시간
참고: 이미 개인용 Google Cloud 계정이나 프로젝트가 있어도 이 실습에서는 사용하지 마세요.
참고: Pixelbook을 사용하는 경우 시크릿 창을 열어 이 실습을 실행하세요.
실습을 시작하고 콘솔에 로그인하는 방법
-
실습 시작 버튼을 클릭합니다. 실습 비용을 결제해야 하는 경우 결제 수단을 선택할 수 있는 팝업이 열립니다.
왼쪽에 있는 패널에서 이 실습에 사용해야 하는 임시 사용자 인증 정보를 확인할 수 있습니다.

-
사용자 이름을 복사한 다음 Google 콘솔 열기를 클릭합니다.
실습에서 리소스가 실행되며 계정 선택 페이지를 표시하는 다른 탭이 열립니다.
참고: 두 개의 탭을 각각 별도의 창으로 나란히 여세요.
-
계정 선택 페이지에서 다른 계정 사용을 클릭합니다. 로그인 페이지가 열립니다.

-
연결 세부정보 패널에서 복사한 사용자 이름을 붙여넣습니다. 그런 다음 비밀번호를 복사하여 붙여넣습니다.
참고: 연결 세부정보 패널에 표시된 사용자 인증 정보를 사용해야 합니다. Google Skills 사용자 인증 정보를 사용하지 마세요. 개인용 Google Cloud 계정이 있어도 이 실습에서는 사용하지 마세요(요금 청구 방지).
- 이후에 표시되는 페이지를 클릭하여 넘깁니다.
- 이용약관에 동의하세요.
- 임시 계정이므로 복구 옵션이나 2단계 인증을 추가하지 마세요.
- 무료 평가판을 신청하지 않습니다.
잠시 후 Cloud 콘솔이 이 탭에서 열립니다.
참고: 왼쪽 상단에 있는 탐색 메뉴를 클릭하면 Google Cloud 제품 및 서비스 목록이 있는 메뉴를 볼 수 있습니다.
이 실습에서는 인스턴스 노트북의 터미널에서 모든 명령어를 실행합니다.
-
Google Cloud 콘솔의 탐색 메뉴(
)에서 Vertex AI를 선택합니다.
-
모든 권장 API 사용 설정을 클릭합니다.
-
탐색 메뉴에서 Workbench를 클릭합니다.
Workbench 페이지 상단에서 인스턴스 뷰에 있는지 확인합니다.
-
새로 만들기를 클릭합니다.
-
인스턴스를 구성합니다.
-
이름: lab-workbench
-
리전: 리전을 (으)로 설정합니다.
-
영역: 영역을 (으)로 설정합니다.
-
고급 옵션(선택사항): 필요한 경우 '고급 옵션'을 클릭하여 추가로 맞춤설정(예: 머신 유형, 디스크 크기)할 수 있습니다.

-
만들기를 클릭합니다.
인스턴스를 만드는 데 몇 분 정도 걸립니다. 준비되면 이름 옆에 녹색 체크표시가 나타납니다.
- 인스턴스 이름 옆에 있는 JupyterLab 열기를 클릭하여 JupyterLab 인터페이스를 실행합니다. 그러면 브라우저에서 새 탭이 열립니다.

- 그런 다음 터미널을 클릭합니다. 이 실습의 모든 명령어를 실행할 수 있는 터미널이 열립니다.
코드 저장소 다운로드
다음으로 이 실습에서 사용할 코드 저장소를 다운로드합니다.
- 방금 연 터미널에서 다음을 입력합니다.
git clone https://github.com/GoogleCloudPlatform/training-data-analyst
cd /home/jupyter/training-data-analyst/quests/dataflow_python/
-
노트북 환경의 왼쪽 패널에 있는 파일 브라우저에 training-data-analyst 저장소가 추가됩니다.
-
클론된 저장소인 /training-data-analyst/quests/dataflow_python/으로 이동합니다. 각 실습의 폴더가 표시됩니다. 이 폴더는 사용자가 완료해야 하는 코드가 포함된 lab 하위 폴더와, 문제 발생 시 참고할 수 있는 정상 작동 예시가 포함된 solution 하위 폴더로 나뉩니다.

참고: 수정할 파일을 열려면 파일로 이동하여 클릭하기만 하면 됩니다. 클릭하면 파일이 열리며, 여기에서 코드를 추가하거나 수정할 수 있습니다.
실습 코드는 8a_Batch_Testing_Pipeline/lab 및 8b_Stream_Testing_Pipeline/lab의 두 폴더로 나뉩니다. 어떤 단계에서든 문제가 발생하면 해당하는 solution 폴더에서 해결 방법을 찾을 수 있습니다.
내 진행 상황 확인하기를 클릭하여 목표를 확인합니다.
노트북 인스턴스 만들기 및 과정 저장소 클론하기
실습 1부: DoFn 및 PTransform에 대한 단위 테스트 수행
작업 1. 환경 준비
이 단계에서는 날씨 센서에서 통계를 계산하는 일괄 파이프라인의 DoFn 및 PTransform에 대한 단위 테스트를 수행합니다. 생성한 변환을 테스트하려면 다음 패턴과 Beam에서 제공하는 변환을 사용할 수 있습니다.
-
TestPipeline을 만듭니다.
- 테스트 입력 데이터를 만들고
Create 변환를 사용하여 입력 데이터의 PCollection을 만듭니다.
- 입력
PCollection에 변환을 적용하고 생성된 PCollection을 저장합니다.
-
testing.util 모듈의 assert_that 메서드와 다른 메서드를 사용하여 출력 PCollection에 예상한 요소가 포함되어 있는지 확인합니다.
TestPipeline은 Beam SDK에 포함된 특수 클래스로, 변환 및 파이프라인 로직을 테스트할 수 있도록 제공됩니다. 테스트 시 파이프라인 객체를 만들 때 Pipeline 대신 TestPipeline을 사용합니다. Create 변환은 객체의 인메모리 컬렉션(Java Iterable)을 가져와 이 컬렉션에서 PCollection을 만듭니다. 목표는 PTransform에서 나올 예상 출력 PCollection을 알 수 있는 소량의 테스트 입력 데이터 세트를 갖는 것입니다.
with TestPipeline() as p:
INPUTS = [fake_input_1, fake_input_2]
test_output = p | beam.Create(INPUTS) | # Transforms to be tested
마지막으로 출력 PCollection이 예상 출력과 일치하는지 확인합니다. 이를 확인하기 위해 assert_that 메서드를 사용합니다. 예를 들어 equal_to 메서드를 사용하여 출력 PCollection이 올바른 요소를 포함하는지 확인할 수 있습니다.
assert_that(test_output, equal_to(EXPECTED_OUTPUTS))
이전 실습과 마찬가지로 첫 번째 단계는 파이프라인에서 처리할 데이터를 생성하는 것입니다. 이전과 동일하게 실습 환경을 열고 데이터를 생성합니다.
해당 실습 열기
- IDE의 터미널에서 다음 명령어를 실행하여 이 실습에 사용할 디렉터리로 변경합니다.
# Change directory into the lab
cd 8a_Batch_Testing_Pipeline/lab
export BASE_DIR=$(pwd)
가상 환경 및 종속 항목 설정
실제 파이프라인 코드를 편집하기 전에 필요한 종속 항목이 설치되어 있는지 확인해야 합니다.
- 다음 명령어를 실행하여 이 실습에서 사용할 가상 환경을 만듭니다.
sudo apt-get install -y python3-venv
# Create and activate virtual environment
python3 -m venv df-env
source df-env/bin/activate
- 다음으로 파이프라인을 실행하는 데 필요한 패키지를 설치합니다.
python3 -m pip install -q --upgrade pip setuptools wheel
python3 -m pip install apache-beam[gcp]
- Dataflow API가 사용 설정되어 있는지 확인합니다.
gcloud services enable dataflow.googleapis.com
- 마지막으로 스토리지 버킷을 만듭니다.
export PROJECT_ID=$(gcloud config get-value project)
gcloud storage buckets create gs://$PROJECT_ID --location=US
내 진행 상황 확인하기를 클릭하여 목표를 확인합니다.
환경 준비
작업 2. 기본 파이프라인 코드 살펴보기
-
파일 탐색기에서 8a_Batch_Testing_Pipeline/lab으로 이동합니다. 이 디렉터리에는 기본 파이프라인 코드가 포함된 weather_statistics_pipeline.py와 테스트 코드가 포함된 weather_statistics_pipeline_test.py, 두 개의 파일이 있습니다.
-
먼저 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline.py를 엽니다.
단위 테스트를 진행하기 전에 파이프라인 코드를 간략하게 살펴보겠습니다. 먼저 6행에서 시작하는 WeatherRecord 클래스를 살펴봅니다. 이 클래스는 typing.NamedTuple의 서브클래스이므로 해당 클래스의 객체를 다룰 때 스키마 인식 변환을 사용할 수 있습니다. 또한 나중에 진행할 테스트를 위해 이 클래스의 객체로 구성된 인메모리 컬렉션을 정의합니다.
class WeatherRecord(typing.NamedTuple):
loc_id: str
lat: float
lng: float
date: str
low_temp: float
high_temp: float
precip: float
- 이제 17행으로 스크롤하여
DoFn과 PTransform 정의를 시작하는 위치로 이동합니다.
이 파이프라인의 개념은 이전 실습에서 대부분 다루었지만 다음 부분은 주의 깊게 살펴보세요.
-
DoFn ConvertCsvToWeatherRecord(17행에서 시작) 및 ConvertTempUnits(27행에서 시작) 나중에 이 DoFn에 대한 단위 테스트를 수행하게 됩니다.
-
PTransform ComputeStatistics(41행에서 시작) DoFn과 동일한 방식으로 테스트할 수 있는 복합 변환의 예입니다.
-
PTransform WeatherStatsTransform(55행에서 시작) 이 PTransform에는 전체 파이프라인의 처리 로직(소스 및 싱크 변환 제외)이 포함되어 있으므로 Create 변환으로 생성된 합성 데이터를 대상으로 작은 규모의 파이프라인 통합 테스트를 수행할 수 있습니다.
참고: 처리 코드에서 논리적 오류를 발견하더라도 아직 수정하지 마세요. 나중에 테스트를 통해 오류를 정확히 찾아내는 방법을 살펴보겠습니다.
작업 3. 테스트용 종속 항목 추가
- 이제 파일 탐색기에서 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py를 엽니다.
테스트를 위해 몇 가지 종속 항목을 추가해야 합니다. Apache Beam에 포함된 테스트 유틸리티와 Python의 unittest 패키지를 활용합니다.
- 이 작업을 완료하려면 weather_statistics_pipeline_test.py의 상단에서 지시된 위치에 다음 Import 문을 추가합니다.
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import BeamAssertException
from apache_beam.testing.util import assert_that, equal_to
문제가 발생하면 솔루션을 참고하세요.
작업 4. Apache Beam에서 첫 번째 DoFn 단위 테스트 작성
8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py 파일에는 DoFn 및 PTransform 단위 테스트를 위한 코드가 포함되어 있습니다. 현재 코드는 대부분 주석 처리되어 있으며 실습을 진행하면서 주석을 해제할 예정입니다.
Beam 코드를 살펴보기 전에 테스트 실행 및 테스트 출력 텍스트 파일 작성 관리를 위해 커스텀 main 메서드를 정의해 두었다는 점을 참고하세요. 이렇게 하면 현재 터미널 세션이 종료된 후에도 확인할 수 있는 테스트 기록이 남습니다. 예를 들어 logging 모듈을 사용하여 이 작업을 관리할 수도 있습니다.
def main(out = sys.stderr, verbosity = 2):
loader = unittest.TestLoader()
suite = loader.loadTestsFromModule(sys.modules[__name__])
unittest.TextTestRunner(out, verbosity = verbosity).run(suite)
# Testing code omitted
if __name__ == '__main__':
with open('testing.out', 'w') as f:
main(f)
- 이제 43행에서 시작하는
ConvertCsvToWeatherRecord DoFn의 DoFn 단위 테스트를 살펴보겠습니다. 먼저 파이프라인을 테스트하기 위한 클래스를 만들고 TestPipeline 객체를 만듭니다.
class ConvertToWeatherRecordTest(unittest.TestCase):
def test_convert_to_csv(self):
with TestPipeline() as p:
...
- 이제 첫 번째 테스트의 (불완전한) 코드를 살펴보겠습니다.
LINES = ['x,0.0,0.0,2/2/2021,1.0,2.0,0.1']
EXPECTED_OUTPUT = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 1.0, 2.0, 0.1)]
input_lines = p | # TASK 4: Create PCollection from LINES
output = input_lines | beam.ParDo(ConvertCsvToWeatherRecord())
# TASK 4: Write assert_that statement
CSV 파일의 한 행(파이프라인의 예상 입력 형식)에 해당하는 단일 테스트 입력(LINES)을 만들어 목록에 넣습니다. 또한 예상 출력(EXPECTED_OUTPUT)을 WeatherRecords 목록으로 정의합니다.
현재 테스트 코드에는 아직 채워야 할 부분이 남아 있습니다.
-
이 작업을 완료하려면 먼저 Create 변환을 추가하여 LINES를 PCollection으로 변환합니다.
-
두 번째로 equal_to 메서드를 사용하는 assert_that 문을 포함하여 output과 EXPECTED_OUTPUT을 비교합니다.
문제가 발생하면 이후에 주석 처리된 테스트나 해결책을 참고하세요.
작업 5. 첫 번째 DoFn 단위 테스트 실행
python3 weather_statistics_pipeline_test.py
cat testing.out
테스트 출력은 testing.out 파일에 기록됩니다. 터미널에서 cat testing.out을 실행하여 이 파일의 내용을 볼 수 있습니다. 이전 작업이 정상적으로 수행되었다면 testing.out 파일의 내용은 다음과 같아야 합니다(경과 시간은 다를 수 있음).
test_convert_to_csv (__main__.ConvertToWeatherRecordTest) ... ok
----------------------------------------------------------------------
Ran 1 test in 0.918s
OK
참고: 위 테스트는 'python3 -m unittest test_script.py' 명령어를 사용하여 실행할 수도 있습니다. 여기서는 python 스크립트를 직접 실행하여 기본 메서드에 액세스했지만, 실제로 여기에 언급된 방식이 더 일반적으로 사용됩니다.
작업 6. 두 번째 DoFn 단위 테스트 실행 및 파이프라인 디버그
-
8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py로 돌아가서 두 번째 단위 테스트의 코드 (약 33~50행)에서 주석 처리를 삭제합니다. 코드를 강조 표시하고 Ctrl + /(MacOS에서는 Cmd + /)를 눌러 주석 처리를 삭제하세요. 아래 코드를 참고하세요.
class ConvertTempUnitsTest(unittest.TestCase):
def test_convert_temp_units(self):
with TestPipeline() as p:
RECORDS = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 1.0, 2.0, 0.1),
WeatherRecord('y', 0.0, 0.0, '2/2/2021', -3.0, -1.0, 0.3)]
EXPECTED_RECORDS = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 33.8, 35.6, 0.1),
WeatherRecord('y', 0.0, 0.0, '2/2/2021', 26.6, 30.2, 0.3)]
input_records = p | beam.Create(RECORDS)
output = input_records | beam.ParDo(ConvertTempUnits())
assert_that(output, equal_to(EXPECTED_RECORDS))
이 테스트를 통해 ConvertTempUnits() DoFn이 정상적으로 작동하는지 확인합니다. weather_statistics_pipeline_test.py를 저장하고 터미널로 돌아갑니다.
- 다음 명령어를 실행하여 테스트를 실행하고 출력을 확인합니다.
rm testing.out
python3 weather_statistics_pipeline_test.py
cat testing.out
테스트에 실패했습니다. 출력을 스크롤하면 테스트 실패에 관한 다음 정보를 확인할 수 있습니다.
test_compute_statistics (__main__.ComputeStatsTest) ... ok
test_convert_temp_units (__main__.ConvertTempUnitsTest) ... ERROR
...
apache_beam.testing.util.BeamAssertException: Failed assert: [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=33.8, high_temp=35.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=26.6, high_temp=30.2, precip=0.3)] == [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=32.8, high_temp=34.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=25.6, high_temp=29.2, precip=0.3)], unexpected elements [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=32.8, high_temp=34.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=25.6, high_temp=29.2, precip=0.3)], missing elements [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=33.8, high_temp=35.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=26.6, high_temp=30.2, precip=0.3)] [while running 'assert_that/Match']
BeamAssertException을 더 자세히 살펴보면 low_temp와 high_temp 값이 잘못된 것을 알 수 있습니다. ConvertTempUnits DoFn의 처리 로직에 문제가 있습니다.
-
8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline.py로 돌아가서
ConvertTempUnits 정의(약 32행)로 스크롤합니다. 이 작업을 완료하려면 DoFn 처리 로직에서 오류를 찾고 다음 명령어를 다시 실행하여 테스트가 성공하는지 확인합니다.
rm testing.out
python3 weather_statistics_pipeline_test.py
cat testing.out
참고로 섭씨를 화씨로 변환하는 공식은 다음과 같습니다.
temp_f = temp_c * 1.8 + 32.0
문제가 발생하면 솔루션을 참고하세요.
작업 7. PTransform 단위 테스트 실행 및 엔드 투 엔드 파이프라인 테스트
-
8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py로 돌아가서 마지막 두 테스트(약 53행에서 시작) 코드에서 주석 처리를 삭제합니다.
방금 주석 처리를 삭제한 첫 번째 테스트는 복합 PTransform ComputeStatistics를 테스트합니다. 아래에서 그중 일부 코드를 참고하세요.
def test_compute_statistics(self):
with TestPipeline() as p:
INPUT_RECORDS = # Test input omitted here
EXPECTED_STATS = # Expected output omitted here
inputs = p | beam.Create(INPUT_RECORDS)
output = inputs | ComputeStatistics()
assert_that(output, equal_to(EXPECTED_STATS))
이 코드는 앞서 살펴본 DoFn 단위 테스트와 매우 유사합니다. 테스트 입력과 출력을 제외한 실질적인 유일한 차이점은 beam.ParDo(DoFn()) 대신 PTransform을 적용한다는 점입니다.
마지막 테스트는 엔드 투 엔드 파이프라인을 대상으로 합니다. 파이프라인 코드(weather_statistics_pipeline.py)에서 전체 엔드 투 엔드 파이프라인은 소스와 싱크를 제외하고 단일한 PTransform WeatherStatsTransform에 포함되었습니다. 엔드 투 엔드 파이프라인을 테스트하기 위해 위와 비슷한 방식으로 진행하되 대신 PTransform을 사용합니다.
- 이제 터미널로 돌아가서 다음 명령어를 실행하여 테스트를 한 번 더 실행합니다.
rm testing.out
python3 weather_statistics_pipeline_test.py
cat testing.out
이전 작업을 성공적으로 완료했다면 테스트 완료 후 다음과 같이 터미널에 표시됩니다.
test_compute_statistics (__main__.ComputeStatsTest) ... ok
test_convert_temp_units (__main__.ConvertTempUnitsTest) ... ok
test_convert_to_csv (__main__.ConvertToWeatherRecordTest) ... ok
test_weather_stats_transform (__main__.WeatherStatsTransformTest) ... ok
----------------------------------------------------------------------
Ran 4 tests in 2.295s
OK
- 이제
testing.out 파일을 스토리지 버킷에 복사합니다.
export PROJECT_ID=$(gcloud config get-value project)
gcloud storage cp testing.out gs://$PROJECT_ID/8a_Batch_Testing_Pipeline/
내 진행 상황 확인하기를 클릭하여 목표를 확인합니다.
DoFn 및 PTransform에 대한 단위 테스트 수행
실습 2부: TestStream으로 스트림 처리 로직 테스트
이 단계에서는 택시 운행 횟수의 윈도잉 계산을 위한 스트리밍 파이프라인의 단위 테스트를 수행합니다. 작성한 변환을 테스트하려면 다음 패턴과 Beam에서 제공하는 변환을 사용할 수 있습니다.
-
TestPipeline을 만듭니다.
-
TestStream 클래스를 사용하여 스트리밍 데이터를 생성합니다. 여기에는 일련의 이벤트 생성, 워터마크 진행, 처리 시간 진행이 포함됩니다.
-
testing.util 모듈의 assert_that 메서드와 다른 메서드를 사용하여 출력 PCollection에 예상한 요소가 포함되어 있는지 확인합니다.
TestStream에서 이벤트를 읽는 파이프라인을 실행할 때 읽기 작업은 각 이벤트의 모든 결과가 완료된 후에 다음 이벤트로 이동합니다. 이 과정에는 처리 시간 진행, 적절한 트리거 실행이 포함됩니다. TestStream을 사용하면 트리거 및 허용된 지연의 영향을 파이프라인에서 관찰하고 테스트할 수 있습니다. 여기에는 지연된 트리거와 지연으로 인해 처리되지 않은 데이터에 관한 로직이 포함됩니다.
작업 1. 기본 파이프라인 코드 살펴보기
- 파일 탐색기에서 8b_Stream_Testing_Pipeline/lab으로 이동합니다.
이 디렉터리에는 기본 파이프라인 코드가 포함된 taxi_streaming_pipeline.py와 테스트 코드가 포함된 taxi_streaming_pipeline_test.py, 두 개의 파일이 있습니다.
- 먼저 8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline.py를 엽니다.
단위 테스트를 진행하기 전에 파이프라인 코드를 간략하게 살펴보겠습니다. 먼저 6행에서 시작하는 TaxiRide 클래스를 살펴봅니다. 이 클래스는 typing.NamedTuple의 서브클래스이므로 해당 클래스의 객체를 다룰 때 스키마 인식 변환을 사용할 수 있습니다. 또한 나중에 진행할 테스트를 위해 이 클래스의 객체로 구성된 인메모리 컬렉션을 정의합니다.
class TaxiRide(typing.NamedTuple):
ride_id: str
point_idx: int
latitude: float
longitude: float
timestamp: str
meter_reading: float
meter_increment: float
ride_status: str
passenger_count: int
다음은 파이프라인의 기본 코드입니다. 이 파이프라인의 개념은 이전 실습에서 대부분 다루었지만 다음 부분은 주의 깊게 살펴보세요.
- 22행에서 시작하는
DoFn JsonToTaxiRide는 수신된 Pub/Sub 메시지를 TaxiRide 클래스의 객체로 변환합니다.
- 36행에서 시작하는
PTransform TaxiCountTransform입니다. 이 PTransform에는 파이프라인의 주요 계산 및 윈도잉 로직이 포함되어 있습니다. 테스트는 이 PTransform에 초점을 맞춥니다.
TaxiCountTransform의 출력은 윈도우 단위로 집계된 전체 택시 운행 횟수입니다. 그러나 각 운행은 승차, 하차 등 여러 이벤트로 구성됩니다. 각 운행을 한 번만 집계하도록 ride_status 속성을 기준으로 필터링합니다. 이를 위해 ride_status가 'pickup'인 요소만 유지합니다.
... | "FilterForPickups" >> beam.Filter(lambda x : x.ride_status == 'pickup')
파이프라인에 사용되는 윈도잉 로직을 자세히 살펴보면 다음과 같습니다.
... | "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(60),
trigger=AfterWatermark(late=AfterCount(1)),
allowed_lateness=60,
accumulation_mode=AccumulationMode.ACCUMULATING)
60초 길이의 고정 윈도우로 윈도잉합니다. 조기 트리거는 사용하지 않으며 워터마크가 윈도우 종료 시점을 지난 후 결과를 출력합니다. 새 요소가 들어올 때마다 지연 실행을 수행하지만 허용 지연 시간은 60초로 제한합니다. 마지막으로 허용 지연 시간이 경과할 때까지 윈도우에 상태가 누적됩니다.
작업 2. TestStream 사용 방법 살펴보기 및 첫 번째 테스트 실행
-
8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline_test.py를 엽니다.
첫 번째 목표는 테스트 코드에서 TestStream이 어떻게 사용되는지 이해하는 것입니다. TestStream 클래스를 사용하면 메시지의 실시간 스트림을 시뮬레이션하면서 처리 시간과 워터마크 진행을 제어할 수 있습니다. 아래는 66행에서 시작하는 첫 번째 테스트의 코드입니다.
test_stream = TestStream().advance_watermark_to(0).add_elements([
TimestampedValue(base_json_pickup, 0),
TimestampedValue(base_json_pickup, 0),
TimestampedValue(base_json_enroute, 0),
TimestampedValue(base_json_pickup, 60)
]).advance_watermark_to(60).advance_processing_time(60).add_elements([
TimestampedValue(base_json_pickup, 120)
]).advance_watermark_to_infinity()
- 새로운
TestStream 객체를 만든 다음 JSON 메시지를 문자열로 전달합니다. 문자열 이름은 운행 상태에 따라 base_json_pickup 또는 base_json_enroute를 사용합니다. 위의 TestStream은 어떤 작업을 수행하나요?
TestStream은 다음 작업을 수행합니다.
- 초기 워터마크를 시간
0으로 설정합니다(모든 타임스탬프는 초 단위).
-
add_elements 메서드를 통해 이벤트 타임스탬프가 0인 요소 세 개를 스트림에 추가합니다. 이 이벤트 중 두 개는 집계되지만(ride_status = "pickup") 다른 하나는 집계되지 않습니다.
- 다른 'pickup' 이벤트를 추가하고 이벤트 타임스탬프는
60으로 설정합니다.
- 워터마크와 처리 시간을
60으로 진행하여 첫 번째 윈도우를 트리거합니다.
- 다른 'pickup' 이벤트를 추가하고 이벤트 타임스탬프는
120으로 설정합니다.
- 워터마크를 '무한'으로 진행합니다. 이로 인해 모든 윈도우가 닫히고 새로운 데이터는 허용 지연 시간을 초과하게 됩니다.
- 첫 번째 테스트의 나머지 코드는 이전 일괄 처리 예시와 유사하지만 이제
Create 변환 대신 TestStream을 사용합니다.
taxi_counts = (p | test_stream
| TaxiCountTransform()
)
EXPECTED_WINDOW_COUNTS = {IntervalWindow(0,60): [3],
IntervalWindow(60,120): [1],
IntervalWindow(120,180): [1]}
assert_that(taxi_counts, equal_to_per_window(EXPECTED_WINDOW_COUNTS),
reify_windows=True)
위의 코드에서 TestStream을 만들고 TaxiCountTransform PTransform을 적용하여 출력 PCollection(taxi_counts)를 정의합니다. InvervalWindow 클래스를 사용하여 확인하려는 윈도우를 정의한 다음 assert_that와 equal_to_per_window 메서드를 사용하여 윈도우별 결과를 검증합니다.
- 파일을 저장하고 터미널로 돌아가서 다음 명령어를 실행하여 올바른 디렉터리로 이동합니다.
# Change directory into the lab
cd $BASE_DIR/../../8b_Stream_Testing_Pipeline/lab
export BASE_DIR=$(pwd)
- 이제 위의 테스트를 실행하고 다음 명령어를 실행하여 출력을 확인합니다.
python3 taxi_streaming_pipeline_test.py
cat testing.out
테스트 후 다음과 같은 출력이 표시됩니다(경과 시간은 다를 수 있음).
test_windowing_behavior (__main__.TaxiWindowingTest) ... ok
----------------------------------------------------------------------
Ran 1 test in 1.113s
OK
작업 3. 지연 데이터 처리 테스트를 위한 TestStream 만들기
이 작업에서는 TestStream의 코드를 작성하여 지연 데이터 처리와 관련된 로직을 테스트합니다.
-
8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline_test.py로 돌아가서
test_late_data_behavior 메서드가 주석 처리된 위치(약 60행)로 스크롤합니다. 이 작업의 코드를 완성할 예정이므로 이 테스트의 코드 주석 처리를 삭제합니다.
class TaxiLateDataTest(unittest.TestCase):
def test_late_data_behavior(self):
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
base_json_pickup = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0,\"longitude\":0.0," \
"\"timestamp\":\"00:00:00\",\"meter_reading\":1.0,\"meter_increment\":0.1," \
"\"ride_status\":\"pickup\",\"passenger_count\":1}"
test_stream = # TASK 3: Create TestStream Object
EXPECTED_RESULTS = {IntervalWindow(0,60): [2,3]} #On Time and Late Result
taxi_counts = (p | test_stream
| TaxiCountTransform()
)
assert_that(taxi_counts, equal_to(EXPECTED_RESULTS))
EXPECTED_RESULTS에는 IntervalWindow(0,60)에 대한 두 개의 결과가 포함되어 있습니다. 이는 이 윈도우의 정시 트리거와 지연 트리거에서 발생한 결과를 나타냅니다.
테스트 코드는 TestStream을 만드는 부분을 제외하고는 모두 완성되었습니다.
-
이 작업을 완료하려면 다음 작업을 수행하는 TestStream 객체를 만듭니다.
- 워터마크를
0으로 진행합니다(모든 타임스탬프는 초 단위).
-
base_json_pickup 값을 가지며 타임스탬프가 0인 두 개의 TimestampedValues를 추가합니다.
- 워터마크 및 처리 시간을
60으로 진행합니다.
-
base_json_pickup 값을 가지며 타임스탬프가 0인 또 다른 TimestampedValue를 추가합니다.
- 워터마크 및 처리 시간을
300으로 진행합니다.
-
base_json_pickup 값을 가지며 타임스탬프가 0인 또 다른 TimestampedValue를 추가합니다.
- 워터마크를 '무한'으로 진행합니다.
이렇게 하면 첫 번째 윈도우에 속하는 4개의 요소가 있는 TestStream이 생성됩니다. 처음 두 요소는 정시, 두 번째 요소는 허용 지연 시간 내 지연, 마지막 요소는 지연 시간이 허용 지연 시간을 초과합니다. 실행한 창을 누적하고 있기 때문에 첫 번째 트리거는 2개의 이벤트를 집계하고 마지막 트리거는 3개의 이벤트를 집계해야 합니다. 네 번째 이벤트는 포함하지 않아야 합니다.
문제가 발생하면 솔루션을 참고하세요.
작업 4. 지연 데이터 처리 테스트 실행
- 이제 터미널로 돌아가서 다음 명령어를 실행하여 테스트를 한 번 더 실행합니다.
rm testing.out
python3 taxi_streaming_pipeline_test.py
cat testing.out
이전 작업을 성공적으로 완료했다면 테스트 완료 후 다음과 같이 터미널에 표시됩니다.
test_late_data_behavior (__main__.TaxiLateDataTest) ... ok
test_windowing_behavior (__main__.TaxiWindowingTest) ... ok
----------------------------------------------------------------------
Ran 2 tests in 2.225s
OK
- 이제
testing.out 파일을 스토리지 버킷에 복사합니다.
export PROJECT_ID=$(gcloud config get-value project)
gcloud storage cp testing.out gs://$PROJECT_ID/8b_Stream_Testing_Pipeline/
내 진행 상황 확인하기를 클릭하여 목표를 확인합니다.
TestStream으로 스트림 처리 로직 테스트
실습 종료하기
실습을 완료하면 실습 종료를 클릭합니다. Google Cloud Skills Boost에서 사용된 리소스를 자동으로 삭제하고 계정을 지웁니다.
실습 경험을 평가할 수 있습니다. 해당하는 별표 수를 선택하고 의견을 입력한 후 제출을 클릭합니다.
별점의 의미는 다음과 같습니다.
- 별표 1개 = 매우 불만족
- 별표 2개 = 불만족
- 별표 3개 = 중간
- 별표 4개 = 만족
- 별표 5개 = 매우 만족
의견을 제공하고 싶지 않다면 대화상자를 닫으면 됩니다.
의견이나 제안 또는 수정할 사항이 있다면 지원 탭을 사용하세요.
Copyright 2026 Google LLC All rights reserved. Google 및 Google 로고는 Google LLC의 상표입니다. 기타 모든 회사명 및 제품명은 해당 업체의 상표일 수 있습니다.