시작하기 전에
- 실습에서는 정해진 기간 동안 Google Cloud 프로젝트와 리소스를 만듭니다.
- 실습에는 시간 제한이 있으며 일시중지 기능이 없습니다. 실습을 종료하면 처음부터 다시 시작해야 합니다.
- 화면 왼쪽 상단에서 실습 시작을 클릭하여 시작합니다.
Setup the data environment
/ 20
Run your pipeline
/ 10
Generate lag-less streaming input
/ 10
이 실습에서는 배치 컨텍스트에서 소개된 많은 개념을 스트리밍 컨텍스트에 적용하여 batch_minute_traffic_pipeline과 유사하지만 실시간으로 작동하는 파이프라인을 만듭니다. 완성된 파이프라인은 먼저 Pub/Sub에서 JSON 메시지를 읽고 파싱한 후 브랜치 처리합니다. 한 브랜치는 일부 원시 데이터를 BigQuery에 작성하고 이벤트 및 처리 시간을 기록합니다. 다른 브랜치는 데이터를 윈도잉하고 합산한 후, 결과를 BigQuery에 씁니다.
다음 파이프라인을 빌드합니다.
Google Skills 실무형 실습을 통해 시뮬레이션이나 데모 환경이 아닌 실제 클라우드 환경에서 직접 실습 활동을 진행할 수 있습니다. 실습 시간 동안 Google Cloud에 로그인하고 액세스하는 데 사용할 수 있는 새로운 임시 사용자 인증 정보가 제공됩니다.
이 실습을 완료하려면 다음이 필요합니다.
실습 시작 버튼을 클릭합니다. 실습 비용을 결제해야 하는 경우 결제 수단을 선택할 수 있는 팝업이 열립니다. 왼쪽에 있는 패널에서 이 실습에 사용해야 하는 임시 사용자 인증 정보를 확인할 수 있습니다.
사용자 이름을 복사한 다음 Google 콘솔 열기를 클릭합니다. 실습에서 리소스가 실행되며 계정 선택 페이지를 표시하는 다른 탭이 열립니다.
계정 선택 페이지에서 다른 계정 사용을 클릭합니다. 로그인 페이지가 열립니다.
연결 세부정보 패널에서 복사한 사용자 이름을 붙여넣습니다. 그런 다음 비밀번호를 복사하여 붙여넣습니다.
잠시 후 Cloud 콘솔이 이 탭에서 열립니다.
이 실습에서는 인스턴스 노트북의 터미널에서 모든 명령어를 실행합니다.
Google Cloud 콘솔의 탐색 메뉴()에서 Vertex AI를 선택합니다.
모든 권장 API 사용 설정을 클릭합니다.
탐색 메뉴에서 Workbench를 클릭합니다.
Workbench 페이지 상단에서 인스턴스 뷰에 있는지 확인합니다.
새로 만들기를 클릭합니다.
인스턴스를 구성합니다.
인스턴스를 만드는 데 몇 분 정도 걸립니다. 준비되면 이름 옆에 녹색 체크표시가 나타납니다.
다음으로 이 실습에서 사용할 코드 저장소를 다운로드합니다.
노트북 환경의 왼쪽 패널에 있는 파일 브라우저에 training-data-analyst 저장소가 추가됩니다.
클론된 저장소인 /training-data-analyst/quests/dataflow_python/으로 이동합니다. 각 실습의 폴더가 표시됩니다. 이 폴더는 사용자가 완료해야 하는 코드가 포함된 lab 하위 폴더와, 문제 발생 시 참고할 수 있는 정상 작동 예시가 포함된 solution 하위 폴더로 나뉩니다.
실제 파이프라인 코드를 편집하기 전에 필요한 종속 항목이 설치되어 있는지 확인해야 합니다.
dataflow.worker 역할을 부여합니다.Cloud 콘솔에서 IAM 및 관리자 > IAM으로 이동하고 Compute Engine 기본 서비스 계정의 주 구성원 수정 아이콘을 클릭합니다.
Dataflow 작업자를 다른 역할로 추가하고 저장을 클릭합니다.
내 진행 상황 확인하기를 클릭하여 목표를 확인합니다.
이전 실습에서는 beam.io.ReadFromText를 사용하여 Google Cloud Storage에서 읽었습니다. 이 실습에서는 Google Cloud Storage 대신 Pub/Sub를 사용합니다. Pub/Sub는 완전 관리형 실시간 메시지 서비스로, 게시자가 '주제'에 메시지를 전송하면 구독자가 '구독'을 통해 해당 주제를 구독할 수 있습니다.
여기에서 만드는 파이프라인은 create_streaming_sinks.sh 스크립트를 통해 방금 만든 my_topic이라는 주제를 구독합니다. 프로덕션 상황에서는 이 주제가 게시팀에서 생성되는 경우가 많습니다. 콘솔의 Pub/Sub 부분에서 확인할 수 있습니다.
training-data-analyst/quest/dataflow_python/5_Streaming_Analytics/lab/으로 이동한 다음, streaming_minute_traffic_pipeline.py 파일을 엽니다.beam.io.ReadFromPubSub() 클래스를 사용하는 변환을 파이프라인에 추가합니다. 이 클래스에는 소스 주제와 timestamp_attribute를 지정하는 속성이 있습니다. 기본적으로 이 속성은 메시지 게시 시간으로 설정됩니다.이 작업을 완료하려면 다음 안내를 따르세요.
input_topic 명령줄 파라미터로 지정된 Pub/Sub 주제에서 읽는 변환을 추가합니다.parse_json을 beam.Map과 함께 사용하여 각 JSON 문자열을 CommonLog 인스턴스로 변환합니다.with_output_types()를 사용하여 이 변환의 결과를 CommonLog 인스턴스의 PCollection으로 수집합니다.#TODO에 다음 코드를 추가합니다.이전 비SQL 실습에서는 이벤트 시간을 기준으로 이벤트를 고정 크기의 상호 배타적인 기간으로 그룹화하기 위해 고정 시간 윈도잉을 구현했습니다. 여기서도 스트리밍 입력으로 동일한 작업을 수행합니다. 문제가 발생하면 이전 실습의 코드나 솔루션을 참고하세요.
이 작업을 완료하려면 다음 안내를 따르세요.
CommonLog 데이터의 PCollection을 수락하고 요소를 window_duration초 길이의 기간으로 윈도잉하는 변환을 파이프라인에 추가합니다. window_duration은 또 다른 명령줄 파라미터입니다.이전 실습에서는 CountCombineFn() Combiner를 사용하여 기간당 이벤트 수를 계산했습니다. 여기에서도 동일한 작업을 합니다.
이 작업을 완료하려면 다음 안내를 따르세요.
PCollection을 입력으로 전달합니다.DoFn, GetTimestampFn을 beam.ParDo와 함께 사용하여 기간 시작 타임스탬프를 포함합니다.이 파이프라인은 두 개의 별도 브랜치에서 BigQuery에 씁니다. 첫 번째 브랜치는 합산 데이터를 BigQuery에 씁니다. 두 번째 브랜치는 이미 작성되어 있으며 이벤트 타임스탬프와 실제 처리 타임스탬프를 포함하여 각 원시 이벤트에 관한 일부 메타데이터를 씁니다. 둘 다 스트리밍 삽입을 통해 BigQuery에 직접 씁니다.
BigQuery에 쓰기는 이전 실습에서 광범위하게 다루었으므로 여기서는 기본 메커니즘을 자세히 다루지 않습니다.
이 작업을 완료하려면 다음 안내를 따르세요.
agg_table_name이라는 새로운 명령줄 파라미터를 만듭니다.beam.io.WriteToBigQuery()는 테이블이 삭제되고 다시 생성되는 WRITE_TRUNCATE의 write_disposition을 지원하지 않습니다. 이 예시에서는 WRITE_APPEND를 사용합니다.
beam.io.WriteToBigQuery는 기본적으로 unbounded PCollection의 경우 스트리밍 삽입을, bounded PCollection의 경우 일괄 파일 로드 작업을 사용합니다. 스트리밍 삽입은 데이터를 즉시 합산에 표시하려는 경우에 특히 유용하지만 추가 요금이 발생합니다. 몇 분마다 주기적으로 일괄 업로드해도 괜찮은 스트리밍 사용 사례에서는 method 키워드 인수를 통해 이 동작을 지정하고 triggering_frequency 키워드 인수로 빈도를 설정할 수 있습니다. 자세한 내용은 apache_beam.io.gcp.bigquery 모듈 문서의 BigQuery에 데이터 쓰기 섹션을 참고하세요.
pipeline.py 파일을 열 수 없다는 Dataflow 파이프라인 실패 오류가 발생하면 파이프라인을 다시 실행하세요. 그러면 문제없이 실행될 것입니다.
Dataflow UI에서 오류 없이 성공적으로 실행되는지 확인합니다. 파이프라인에서 아직 생성 및 수집되는 데이터가 없으므로 실행은 되지만 아무것도 처리되지 않습니다. 다음 단계에서 데이터를 도입합니다.
내 진행 상황 확인하기를 클릭하여 목표를 확인합니다.
스트리밍 파이프라인이므로 스트리밍 소스를 구독하고 입력을 기다립니다. 현재는 입력이 없습니다. 이 섹션에서는 지연 없이 데이터를 생성합니다. 실제 데이터에는 거의 항상 지연이 포함됩니다. 하지만 지연 없는 스트리밍 입력을 이해하는 것은 도움이 됩니다.
이 퀘스트의 코드에는 Pub/Sub를 사용하여 JSON 이벤트를 게시하는 스크립트가 포함되어 있습니다.
training-data-analyst/quests/dataflow_python 폴더에 있는지 확인합니다.내 진행 상황 확인하기를 클릭하여 목표를 확인합니다.
logs.minute_traffic 테이블을 쿼리합니다.페이지 조회수가 분당 100회 정도를 유지하는 것을 확인할 수 있습니다.
이 쿼리는 이벤트 시간과 처리 시간 사이의 차이를 보여줍니다. 하지만 원시 테이블 형식 데이터만으로는 전체적인 그림을 파악하기 어려울 수 있습니다. 여기서는 경량 데이터 시각화 및 BI 엔진인 Looker Studio를 사용합니다.
그러면 새 창이 열립니다.
데이터에 대한 기본 시각화가 생성됩니다.
기본 시각화를 삭제하려면 각 시각화를 마우스 오른쪽 버튼으로 클릭하고 삭제를 선택합니다.
상단 메뉴 바에서 차트 추가를 클릭합니다.
분산형 차트 유형을 선택합니다.
오른쪽 패널의 데이터 열에서 다음 값을 설정합니다.
차트는 모든 데이터 포인트가 대각선에 있는 분산형 차트로 변환됩니다. 이는 현재 생성 중인 스트리밍 데이터에서 이벤트가 생성된 직후에 바로 처리되어 지연이 없었기 때문입니다. 데이터 생성 스크립트를 빠르게 시작한 경우, 즉 Dataflow 작업이 완전히 가동되기 전에 시작한 경우 Pub/Sub에서 큐에 추가된 메시지가 모두 거의 한 번에 처리되었으므로 하키 스틱 모양이 표시될 수 있습니다.
하지만 실제 환경에서는 파이프라인이 지연에 대처해야 합니다.
스트리밍 이벤트 스크립트는 시뮬레이션된 지연이 있는 이벤트를 생성할 수 있습니다.
이 작업은 이벤트가 생성된 시점과 Pub/Sub에 게시된 시점 사이에 시간 지연이 발생하는 시나리오를 나타냅니다. 예를 들어 사용자가 서비스를 이용할 수 없는 경우 모바일 클라이언트가 오프라인 모드로 전환되지만, 이벤트는 기기에서 수집되고 기기가 다시 온라인 상태가 되면 한 번에 모두 게시됩니다.
먼저 Looker Studio 창을 닫습니다.
그런 다음 지연을 사용 설정하려면 터미널로 돌아가서 Ctrl+C를 사용하여 실행 중인 스크립트를 중지합니다.
그리고 다음을 실행합니다.
차트 유형: 분산형
실습을 완료하면 실습 종료를 클릭합니다. Google Cloud Skills Boost에서 사용된 리소스를 자동으로 삭제하고 계정을 지웁니다.
실습 경험을 평가할 수 있습니다. 해당하는 별표 수를 선택하고 의견을 입력한 후 제출을 클릭합니다.
별점의 의미는 다음과 같습니다.
의견을 제공하고 싶지 않다면 대화상자를 닫으면 됩니다.
의견이나 제안 또는 수정할 사항이 있다면 지원 탭을 사용하세요.
Copyright 2026 Google LLC All rights reserved. Google 및 Google 로고는 Google LLC의 상표입니다. 기타 모든 회사명 및 제품명은 해당 업체의 상표일 수 있습니다.
현재 이 콘텐츠를 이용할 수 없습니다
이용할 수 있게 되면 이메일로 알려드리겠습니다.
감사합니다
이용할 수 있게 되면 이메일로 알려드리겠습니다.
한 번에 실습 1개만 가능
모든 기존 실습을 종료하고 이 실습을 시작할지 확인하세요.