시작하기 전에
- 실습에서는 정해진 기간 동안 Google Cloud 프로젝트와 리소스를 만듭니다.
- 실습에는 시간 제한이 있으며 일시중지 기능이 없습니다. 실습을 종료하면 처음부터 다시 시작해야 합니다.
- 화면 왼쪽 상단에서 실습 시작을 클릭하여 시작합니다.
Create Vertex AI Platform Notebooks instance and clone course repo
/ 20
Prepare Environment
/ 15
Run your pipeline
/ 15
Test your pipeline
/ 15
이 실습에서 학습할 내용은 다음과 같습니다.
이전 실습의 마지막 부분에서는 실시간 파이프라인이 해결해야 하는 한 가지 종류의 과제인 이벤트가 발생한 시점과 처리된 시점 사이의 간격, 즉 지연을 소개했습니다. 이 실습에서는 파이프라인 제작자가 파이프라인이 지연을 처리하는 방법을 공식적인 방식으로 지정할 수 있도록 하는 Apache Beam 개념을 소개합니다.
하지만 지연 시간만이 스트리밍 컨텍스트에서 파이프라인이 겪을 수 있는 유일한 문제는 아닙니다. 시스템 외부에서 입력이 들어올 때마다 입력이 어떤 식으로든 잘못된 형식일 가능성이 항상 존재합니다. 이 실습에서는 이러한 입력을 처리하는 데 사용할 수 있는 기법도 소개합니다.
이 실습의 최종 파이프라인은 아래 그림과 유사합니다. 브랜치가 포함되어 있습니다.
Google Skills 실무형 실습을 통해 시뮬레이션이나 데모 환경이 아닌 실제 클라우드 환경에서 직접 실습 활동을 진행할 수 있습니다. 실습 시간 동안 Google Cloud에 로그인하고 액세스하는 데 사용할 수 있는 새로운 임시 사용자 인증 정보가 제공됩니다.
이 실습을 완료하려면 다음이 필요합니다.
실습 시작 버튼을 클릭합니다. 실습 비용을 결제해야 하는 경우 결제 수단을 선택할 수 있는 팝업이 열립니다. 왼쪽에 있는 패널에서 이 실습에 사용해야 하는 임시 사용자 인증 정보를 확인할 수 있습니다.
사용자 이름을 복사한 다음 Google 콘솔 열기를 클릭합니다. 실습에서 리소스가 실행되며 계정 선택 페이지를 표시하는 다른 탭이 열립니다.
계정 선택 페이지에서 다른 계정 사용을 클릭합니다. 로그인 페이지가 열립니다.
연결 세부정보 패널에서 복사한 사용자 이름을 붙여넣습니다. 그런 다음 비밀번호를 복사하여 붙여넣습니다.
잠시 후 Cloud 콘솔이 이 탭에서 열립니다.
이 실습에서는 인스턴스 노트북의 터미널에서 모든 명령어를 실행합니다.
Google Cloud 콘솔의 탐색 메뉴()에서 Vertex AI를 선택합니다.
모든 권장 API 사용 설정을 클릭합니다.
탐색 메뉴에서 Workbench를 클릭합니다.
Workbench 페이지 상단에서 인스턴스 뷰에 있는지 확인합니다.
새로 만들기를 클릭합니다.
인스턴스를 구성합니다.
인스턴스를 만드는 데 몇 분 정도 걸립니다. 준비되면 이름 옆에 녹색 체크표시가 나타납니다.
다음으로 이 실험실에서 사용할 코드 저장소를 다운로드합니다.
노트북 환경의 왼쪽 패널에 있는 파일 브라우저에 training-data-analyst 저장소가 추가됩니다.
클론된 저장소인 /training-data-analyst/quests/dataflow_python/으로 이동합니다. 각 실습의 폴더가 표시됩니다. 이 폴더는 사용자가 완료해야 하는 코드가 포함된 lab 하위 폴더와, 문제 발생 시 참고할 수 있는 정상 작동 예시가 포함된 solution 하위 폴더로 나뉩니다.
내 진행 상황 확인하기를 클릭하여 목표를 확인합니다.
이전 실습에서는 다음과 같은 코드를 사용하여 이벤트 시간별로 요소를 고정 너비의 기간으로 나누는 코드를 작성했습니다.
그러나 마지막 비SQL 실습의 끝부분에서 보셨듯이 데이터 스트림에는 지연이 발생하는 경우가 많습니다. 이벤트 시간(처리 시간이 아님)을 사용하여 윈도잉할 때 지연은 불확실성을 야기하기 때문에 문제가 됩니다. 이벤트 시간의 특정 시점에 대한 모든 이벤트가 실제로 도착했는지, 아니면 도착하지 않았는지 알 수 없기 때문입니다.
결과를 출력하려면 작성한 파이프라인이 이와 관련해 결정을 내려야 합니다. 워터마크라는 개념을 사용했습니다. 워터마크는 시스템의 휴리스틱 기반 개념으로, 이벤트 시간의 특정 시점까지의 모든 데이터가 파이프라인에 도착한 것으로 예상할 수 있는 시점을 나타냅니다. 워터마크가 기간 끝을 지나간 후 이 기간에 타임스탬프가 있는 요소가 추가로 도착하면 지연 데이터로 간주되어 삭제됩니다. 따라서 기본 윈도잉 동작은 시스템이 모든 데이터를 가지고 있다고 확신할 때 단일의 완전한 결과를 내보내는 것입니다.
Apache Beam은 워터마크가 무엇인지 추측하기 위해 여러 휴리스틱을 사용합니다. 그러나 이는 여전히 휴리스틱입니다. 더 중요한 점은 이러한 휴리스틱이 범용적이며 모든 사용 사례에 적합하지 않다는 것입니다. 파이프라인 설계자는 범용 휴리스틱을 사용하는 대신 적절한 절충안을 결정하기 위해 다음 질문을 신중하게 고려해야 합니다.
질문의 답변을 바탕으로 Apache Beam의 형식 기법 사용하여 적절한 절충안을 제시하는 코드를 작성할 수 있습니다.
허용된 지연 시간은 윈도우 상태를 유지해야 하는 기간을 제어합니다. 워터마크가 허용된 지연 기한에 도달하면 모든 상태가 삭제됩니다. 모든 영구 상태를 기한까지 유지할 수 있다면 좋겠지만, 실제로 무제한 데이터 소스를 처리할 때는 주어진 윈도우의 상태를 무기한 유지하는 것이 실용적이지 않은 경우가 많습니다. 결국 디스크 공간이 부족해지기 때문입니다.
따라서 실제로 데이터를 순서 없이 처리하는 시스템에서는 처리 중인 윈도우의 전체 기간을 제한하는 방법을 제공해야 합니다. 이를 수행하는 깔끔하고 간결한 방법은 시스템 내에서 허용되는 지연 시간 범위를 정의하는 것입니다. 즉, 시스템이 처리하기 위해 주어진 레코드가 워터마크에 비해 얼마나 늦을 수 있는지에 대한 한도를 설정합니다. 이 한도를 초과하여 도착한 데이터는 간단히 삭제됩니다. 개별 데이터에 허용되는 지연 한도를 설정하면 윈도우의 상태를 얼마나 오래 유지해야 하는지도 정확히 정한 것입니다. 즉, 워터마크가 윈도우 끝에 도달하여 지연 한도 범위를 초과할 때까지 유지해야 합니다.
이전 실습과 마찬가지로 첫 번째 단계는 파이프라인에서 처리할 데이터를 생성하는 것입니다. 이전과 동일하게 실습 환경을 열고 데이터를 생성합니다.
실제 파이프라인 코드를 편집하기 전에 필요한 종속 항목이 설치되어 있는지 확인해야 합니다.
내 진행 상황 확인하기를 클릭하여 목표를 확인합니다.
training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab으로 이동한 다음 streaming_minute_traffic_pipeline.py 파일을 엽니다.Apache Beam에서는 아래 예시와 같이 WindowInto PTransform 내에서 AfterWatermark() 트리거와 함께 allowed_lateness 키워드 인수를 사용하여 허용 지연 시간을 설정합니다.
allowed_lateness 명령줄 인수로 정의된 윈도잉 변환과 허용된 지연 시간을 검사합니다. 적절한 값이 무엇인지 판단하고 명령줄을 업데이트하여 올바른 단위를 반영합니다.파이프라인 설계자는 임시 결과를 내보낼 시점을 재량에 따라 결정할 수도 있습니다. 이전 단계에서는 허용된 지연 시간을 지정하여 AfterWatermark() 트리거를 사용했습니다. 예를 들어 기간 종료 워터마크에 아직 도달하지 않았지만 예상 데이터의 75%가 이미 도착한 경우를 생각해 보겠습니다. 이러한 샘플은 대표성을 띠는 경우가 많으므로 최종 사용자에게 표시할 가치가 있습니다.
트리거는 처리 시간 중 어느 시점에서 결과가 구체화되는지 결정합니다. 윈도우의 각 특정 출력을 윈도우의 창이라고 합니다. 트리거의 조건이 충족되면 트리거가 창을 실행합니다. Apache Beam에서 트리거 조건으로 워터마크 진행, 처리 시간 진행(실제로 도착한 데이터 양과 관계없이 균일하게 진행됨), 요소 개수(예: 일정량의 새 데이터가 도착한 경우), 데이터 종속 트리거(파일 끝에 도달한 경우) 등이 포함됩니다.
트리거의 조건으로 인해 창이 여러 번 실행될 수 있습니다. 따라서 이러한 결과를 누적하는 방법도 지정해야 합니다. Apache Beam은 현재 두 가지 누적 모드를 지원합니다. 하나는 결과를 함께 누적하는 모드이고 다른 하나는 마지막 창이 실행된 이후 새로 추가된 결과의 부분만 반환하는 모드입니다.
Window 변환을 사용해 PCollection에 대한 윈도우 함수를 설정하는 경우 트리거를 지정할 수도 있습니다.
WindowInto PTransform의 trigger 키워드 인수를 설정하여 PCollection에 대한 트리거를 설정합니다. Apache Beam은 다음과 같은 여러 트리거를 제공합니다.
AfterWatermark: 워터마크가 기간의 끝 또는 창의 첫 번째 요소의 도착 시점에 결정되는 타임스탬프를 초과하면 실행됩니다.
AfterProcessingTime: 일정 처리 시간이 경과한 후(일반적으로 창의 첫 번째 요소 이후) 실행됩니다.
AfterCount: 윈도우의 요소 수가 특정 개수에 도달하면 실행됩니다.
이 코드 샘플은 기간의 첫 번째 요소가 처리된 후 1분 뒤에 결과를 내보내는 PCollection에 대한 시간 기반 트리거를 설정합니다. 코드 샘플의 마지막 행에서는 키워드 인수 accumulation_mode를 AccumulationMode.DISCARDING으로 정의하여 윈도우의 누적 모드를 설정합니다.
이 작업을 완료하려면 AfterWatermark 트리거를 전달하는 WindowInto에 trigger 키워드 인수를 추가합니다. 트리거를 설계할 때는 데이터가 1분 기간으로 윈도잉되고 데이터가 늦게 도착할 수 있는 이 사용 사례를 염두에 두세요. 또한 AfterWatermark 트리거에 대한 인수로 허용된 지연 시간 내에서 모든 지연 요소에 대한 지연된 트리거를 추가합니다.
막히는 부분이 있다면 솔루션을 살펴보세요.
113행 부근에 다음 #TODO를 작성하여 트리거 및 누적 모드를 설정합니다.
#TODO를 작성하여 허용된 지연 시간, 트리거, 누적 모드를 설정합니다.트리거를 어떻게 설정했는지에 따라 지금 파이프라인을 실행하고 이전 실습의 파이프라인과 비교하면 새 파이프라인이 더 일찍 결과를 제공한다는 것을 알 수 있습니다. 또한 휴리스틱이 스트리밍 동작을 제대로 예측하지 못하고 허용된 지연 시간이 더 나은 경우 결과가 더 정확할 수도 있습니다.
그러나 현재 파이프라인은 지연에 더 강력하지만 여전히 잘못된 형식의 데이터에 취약합니다. 파이프라인을 실행하고 CommonLog로 파싱할 수 있는 올바른 형식의 JSON 문자열이 아닌 다른 것을 포함하는 메시지를 게시하면 파이프라인에서 오류가 발생합니다. Cloud Logging과 같은 도구를 사용하면 이러한 오류를 간단하게 읽을 수 있지만, 더 잘 설계된 파이프라인은 나중에 검사할 수 있도록 미리 정의된 위치에 이러한 오류를 저장합니다.
이 섹션에서는 파이프라인에 구성요소를 추가하여 파이프라인을 더욱 모듈화하고 강력하게 만듭니다.
잘못된 형식의 데이터에 대해 더 강력한 파이프라인을 만들려면 이러한 데이터를 필터링하고 브랜칭하여 다르게 처리하는 방법이 필요합니다. 파이프라인에 브랜치를 도입하는 한 가지 방법은 이미 살펴보았습니다. 하나의 PCollection을 여러 변환의 입력으로 만드는 것입니다.
이러한 형태의 브랜치는 강력합니다. 그러나 이 전략이 비효율적인 사용 사례도 있습니다. 예를 들어 동일한 PCollection의 두 가지 다른 하위 집합을 만든다고 가정해 보겠습니다. 여러 변환 방법을 사용하면 각 하위 집합에 대해 하나의 필터 변환을 만들고 둘 다 원래 PCollection에 적용합니다. 하지만 이렇게 하면 각 요소가 두 번 처리됩니다.
브랜치 파이프라인을 생성하는 또 다른 방법은 단일 변환이 입력 PCollection을 한 번 처리하는 동안 여러 출력을 생성하도록 하는 것입니다. 이 작업에서는 여러 출력을 생성하는 변환을 작성합니다. 첫 번째 출력은 올바른 형식의 데이터에서 얻은 결과이고, 두 번째 출력은 원래 입력 스트림의 잘못된 형식의 요소입니다.
Apache Beam은 단일 PCollection만 생성하면서 여러 결과를 내보내기 위해 TaggedOutput이라는 클래스를 사용하여 DoFn의 출력을 여러 개의(이질적일 수 있는) 출력으로 키 지정합니다.
다음은 TaggedOutput을 사용하여 DoFn의 여러 출력을 태그하는 예시입니다. 그런 다음 이러한 PCollection은 with_outputs() 메서드를 사용하여 복구되고 TaggedOutput에 지정된 태그 이름으로 참조됩니다.
이 작업을 완료하려면 위와 같이 ConvertToCommonLogFn 클래스에서 두 개의 TaggedOutput 반환을 선언합니다. try 문에서 파싱된 행을 CommonLog 클래스의 인스턴스로 반환하고, catch 문에서 파싱되지 않은 (디코딩된) 행을 반환합니다.
ConvertToCommonLogFn 클래스에서 첫 번째 #TODO를 작성합니다.ConvertToCommonLogFn 클래스에서 두 번째 #TODO를 작성합니다.잘못된 형식의 데이터를 생성하는 업스트림 문제를 해결하려면 잘못된 형식의 데이터를 분석할 수 있어야 합니다. 이를 위해서는 어딘가에 구체화해야 합니다. 이 작업에서는 잘못된 형식의 데이터를 Google Cloud Storage에 작성합니다. 이 패턴을 데드 레터 스토리지 사용이라고 합니다.
이전 실습에서는 beam.io.WriteToText()를 사용하여 제한된 소스(일괄)에서 Cloud Storage로 직접 작성했습니다. 그러나 제한되지 않은 소스(스트리밍)에서 작성할 때는 이 접근방식을 약간 수정해야 합니다.
첫째, 쓰기 변환의 업스트림에서 Trigger를 사용하여 처리 시간 내에 언제 쓰기를 수행할지 지정해야 합니다. 그렇지 않으면 기본값이 그대로 유지되어 쓰기가 이루어지지 않습니다. 기본적으로 모든 이벤트는 전역 윈도우에 속합니다. 일괄 처리로 작동할 때는 런타임에 전체 데이터 세트를 알 수 있으므로 괜찮습니다. 그러나 제한되지 않은 소스의 경우 전체 데이터 세트 크기를 알 수 없으므로 전역 윈도우 창이 완료되지 않아 실행되지 않습니다.
Trigger를 사용하고 있으므로 Window도 사용해야 합니다. 하지만 반드시 윈도우를 변경할 필요는 없습니다. 이전 실습과 작업에서는 윈도잉 변환을 사용하여 전역 윈도우를 이벤트 시간의 고정된 기간의 윈도우로 대체했습니다. 이 경우 어떤 요소가 함께 그룹화되는지는 결과가 유용한 방식으로 유용한 속도로 구체화되는 것만큼 중요하지 않습니다.
아래 예시에서 윈도우는 10초의 처리 시간마다 전역 윈도우 창을 실행하지만 새 이벤트만 작성합니다.
트리거를 설정했으므로 스트리밍을 지원하지 않는 beam.io.WriteToText()에서 쓰기를 수행하기 위해 beam.io.fileio.WriteToFiles()로 싱크를 변경해야 합니다. 윈도잉 변환의 다운스트림을 작성할 때 병렬로 작성할 수 있도록 샤드 수를 지정합니다.
이 작업을 완료하려면 rows.unparsed_row를 입력으로 사용하여 새로운 변환을 만들어 잘못된 형식의 데이터를 검색합니다. 고정 윈도우의 길이가 120초이고 누적 모드가 AccumulationMode.DISCARDING으로 설정된 경우 120초의 처리 시간 트리거를 사용합니다.
#TODO를 작성하여 beam.fileio.WriteToFiles를 사용해 GCS에 작성합니다.
아래 예시와 유사한 명령어를 구성하여 파이프라인을 실행합니다. 포함한 명령줄 옵션의 이름을 반영하도록 수정해야 합니다.
이 퀘스트의 코드에는 Pub/Sub를 사용하여 JSON 이벤트를 게시하는 스크립트가 포함되어 있습니다. 이 작업을 완료하고 메시지 게시를 시작하려면 현재 터미널과 나란히 새 터미널을 열고 다음 스크립트를 실행합니다. 이렇게 하면 스크립트를 종료할 때까지 메시지가 계속 게시됩니다. training-data-analyst/quests/dataflow_python 폴더에 있는지 확인합니다.
true 플래그는 스트림에 지연된 이벤트를 추가합니다.내 진행 상황 확인하기를 클릭하여 목표를 확인합니다.
Google Cloud 콘솔 제목 표시줄의 검색 필드에 Pub/Sub를 입력한 다음 제품 및 페이지 섹션에서 Pub/Sub를 클릭합니다.
주제를 클릭한 다음 my_topic 주제를 클릭합니다.
메시지를 클릭합니다.
메시지를 가져올 Cloud Pub/Sub 구독 선택*을 클릭하고 드롭다운에서 내 주제 구독을 선택합니다.
메시지 게시 버튼을 클릭합니다.
다음 페이지에서 게시할 메시지를 입력한 다음 게시를 클릭합니다.
CommonLog JSON 사양을 완벽하게 준수하지 않는 한, 곧바로 데드 레터 Cloud Storage 버킷에 도착해야 합니다. 파이프라인 모니터링 윈도우로 돌아가서 파싱되지 않은 메시지 처리를 담당하는 브랜치의 노드를 클릭하면 파이프라인을 통해 경로를 추적할 수 있습니다. 이 브랜치에 요소가 추가된 것을 확인한 후 Cloud Storage로 이동하여 메시지가 디스크에 기록되었는지 확인할 수 있습니다.
내 진행 상황 확인하기를 클릭하여 목표를 확인합니다.
실습을 완료하면 실습 종료를 클릭합니다. Google Cloud Skills Boost에서 사용된 리소스를 자동으로 삭제하고 계정을 지웁니다.
실습 경험을 평가할 수 있습니다. 해당하는 별표 수를 선택하고 의견을 입력한 후 제출을 클릭합니다.
별점의 의미는 다음과 같습니다.
의견을 제공하고 싶지 않다면 대화상자를 닫으면 됩니다.
의견이나 제안 또는 수정할 사항이 있다면 지원 탭을 사용하세요.
Copyright 2026 Google LLC All rights reserved. Google 및 Google 로고는 Google LLC의 상표입니다. 기타 모든 회사명 및 제품명은 해당 업체의 상표일 수 있습니다.
현재 이 콘텐츠를 이용할 수 없습니다
이용할 수 있게 되면 이메일로 알려드리겠습니다.
감사합니다
이용할 수 있게 되면 이메일로 알려드리겠습니다.
한 번에 실습 1개만 가능
모든 기존 실습을 종료하고 이 실습을 시작할지 확인하세요.