실습 설정 안내 및 요구사항
계정과 진행 상황을 보호하세요. 이 실습을 실행하려면 항상 시크릿 브라우저 창과 실습 사용자 인증 정보를 사용하세요.

Dataflow를 사용한 서버리스 데이터 처리 - Dataflow를 사용한 일괄 분석 파이프라인(Python)

실습 2시간 universal_currency_alt 크레딧 5개 show_chart 고급
info 이 실습에는 학습을 지원하는 AI 도구가 통합되어 있을 수 있습니다.
이 콘텐츠는 아직 휴대기기에 최적화되지 않음
최상의 경험을 위해 데스크톱 컴퓨터에서 이메일로 전송된 링크를 사용하여 방문하세요.

개요

이 실습에서 학습할 내용은 다음과 같습니다.

  • 사용자별로 사이트 트래픽을 집계하는 파이프라인 작성
  • 분 단위로 사이트 트래픽을 집계하는 파이프라인 작성
  • 시계열 데이터에 윈도잉 구현

기본 요건

참고: 이 실습은 고급 수준이며 Python에 대한 숙련된 지식이 필요합니다.

설정 및 요건

각 실습에서는 정해진 기간 동안 새 Google Cloud 프로젝트와 리소스 집합이 무료로 제공됩니다.

  1. 시크릿 창을 사용하여 Qwiklabs에 로그인합니다.

  2. 실습 사용 가능 시간(예: 1:15:00)을 참고하여 해당 시간 내에 완료합니다.
    일시중지 기능은 없습니다. 필요한 경우 다시 시작할 수 있지만 처음부터 시작해야 합니다.

  3. 준비가 되면 실습 시작을 클릭합니다.

  4. 실습 사용자 인증 정보(사용자 이름비밀번호)를 기록해 두세요. Google Cloud Console에 로그인합니다.

  5. Google Console 열기를 클릭합니다.

  6. 다른 계정 사용을 클릭한 다음, 안내 메시지에 실습에 대한 사용자 인증 정보를 복사하여 붙여넣습니다.
    다른 사용자 인증 정보를 사용하는 경우 오류가 발생하거나 요금이 부과됩니다.

  7. 약관에 동의하고 리소스 복구 페이지를 건너뜁니다.

파트 A. Workbench 인스턴스 개발 환경 설정

이 실습에서는 Workbench 인스턴스 노트북의 터미널에서 모든 명령어를 실행합니다.

  1. Google Cloud 콘솔의 탐색 메뉴(탐색 메뉴)에서 Vertex AI를 선택하거나 Vertex AI 대시보드로 이동합니다.

  2. 모든 권장 API 사용 설정을 클릭합니다. 이제 Notebook API가 사용 설정되어 있는지 확인해 보겠습니다.

  3. 탐색 메뉴에서 Workbench를 클릭합니다.

    Workbench 페이지 상단에서 인스턴스 뷰에 있는지 확인합니다.

  4. 상자 추가새로 만들기를 클릭합니다.

  5. 다음과 같이 인스턴스를 구성합니다.

이름 리전 영역 고급 옵션(선택사항)
lab-workbench 필요한 경우 '고급 옵션'을 클릭하여 추가로 맞춤설정(예: 머신 유형, 디스크 크기)할 수 있습니다.
  1. 만들기를 클릭합니다.

인스턴스를 만드는 데 몇 분 정도 걸립니다. 준비되면 이름 옆에 녹색 체크표시가 나타납니다.

  1. 인스턴스 이름 옆에 있는 JupyterLab 열기를 클릭하여 JupyterLab 인터페이스를 실행합니다. 그러면 브라우저에서 새 탭이 열립니다.

  2. 그런 다음 터미널을 클릭합니다. 이 실습의 모든 명령어를 실행할 수 있는 터미널이 열립니다.

코드 저장소 다운로드

다음으로 이 실습에서 사용할 코드 저장소를 다운로드합니다.

  1. 방금 연 터미널에서 다음을 입력합니다.
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. 노트북 환경의 왼쪽 패널에 있는 파일 브라우저에 training-data-analyst 저장소가 추가됩니다.

  2. 클론된 저장소인 /training-data-analyst/quests/dataflow_python/으로 이동합니다. 각 실습의 폴더가 표시됩니다. 이 폴더는 사용자가 완료해야 하는 코드가 포함된 lab 하위 폴더와, 문제 발생 시 참고할 수 있는 정상 작동 예시가 포함된 solution 하위 폴더로 나뉩니다.

확장된 뷰 메뉴에서 강조 표시된 탐색기 옵션

참고: 수정할 파일을 열려면 파일로 이동하여 클릭하기만 하면 됩니다. 클릭하면 파일이 열리며, 여기에서 코드를 추가하거나 수정할 수 있습니다.

내 진행 상황 확인하기를 클릭하여 목표를 확인합니다. 노트북 인스턴스 만들기 및 과정 저장소 클론

작업 1. 사용자별 사이트 트래픽 집계

실습의 이 부분에서는 다음 작업을 수행하는 파이프라인을 작성합니다.

  1. Cloud Storage의 파일에서 하루치 트래픽을 읽습니다.
  2. 각 이벤트를 CommonLog 객체로 변환합니다.
  3. 각 객체를 사용자 ID별로 그룹화하고 값을 결합하여 순 사용자 각각의 조회수를 합산하고 해당 사용자의 총 조회수를 구합니다.
  4. 각 사용자에 대해 추가 집계를 수행합니다.
  5. 결과 데이터를 BigQuery에 작성합니다.

작업 2. 합성 데이터 생성

이전 실습과 마찬가지로 첫 번째 단계는 파이프라인에서 처리할 데이터를 생성하는 것입니다. 이전과 마찬가지로 실습 환경을 열고 데이터를 생성합니다.

적절한 실습 열기

  • IDE 환경의 터미널에서 다음 명령어를 실행합니다.
# Change directory into the lab cd 3_Batch_Analytics/lab export BASE_DIR=$(pwd)

가상 환경 및 종속 항목 설정

실제 파이프라인 코드를 편집하기 전에 필요한 종속 항목이 설치되어 있는지 확인해야 합니다.

  1. 다음 명령어를 실행하여 이 실습에서 사용할 가상 환경을 만듭니다.
sudo apt-get update && sudo apt-get install -y python3-venv # Create and activate virtual environment python3 -m venv df-env source df-env/bin/activate
  1. 다음으로 파이프라인을 실행하는 데 필요한 패키지를 설치합니다.
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Dataflow API가 사용 설정되어 있는지 확인합니다.
gcloud services enable dataflow.googleapis.com

데이터 환경 설정

# Create GCS buckets and BQ dataset cd $BASE_DIR/../.. source create_batch_sinks.sh # Generate event dataflow source generate_batch_events.sh # Change to the directory containing the practice version of the code cd $BASE_DIR

이 스크립트는 다음과 유사한 줄이 포함된 events.json이라는 파일을 만듭니다.

{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

그런 다음 이 파일이 의 Google Cloud Storage 버킷에 자동으로 복사됩니다.

  • Google Cloud Storage로 이동하여 스토리지 버킷에 events.json이라는 파일이 있는지 확인합니다.

내 진행 상황 확인하기를 클릭하여 목표를 확인합니다. 합성 데이터 생성

작업 3. 사용자당 페이지 조회수 합계

이 작업에는 두 가지 미니 과제가 있으며 솔루션을 참고할 수 있습니다.

파일 탐색기에서 아래 언급된 경로로 이동하여 batch_user_traffic_pipeline.py 파일을 엽니다.

training-data-analyst/quests/dataflow_python/3_Batch_Analytics/lab

미니 과제 - #TODO 1:

이 파이프라인에는 입력 경로와 출력 테이블 이름에 대한 명령줄 옵션을 수락하는 데 필요한 코드와 Google Cloud Storage에서 이벤트를 읽고, 이벤트를 파싱하고, 결과를 BigQuery에 쓰는 코드도 이미 포함되어 있습니다. 하지만 몇 가지 중요한 부분이 누락되어 #TODO로 표시되어 있습니다.

이 과제는 데이터 모델링 작업입니다. 이 단계에서는 모든 로그 이벤트를 사용자별로 그룹화한 후, 각 사용자에 대해 어떤 정보를 계산할지 고려해야 합니다.

집계 결과를 저장할 구조(스키마)를 정의해야 합니다. 집계에 사용할 수 있는 필드를 확인하려면 CommonLog 클래스를 살펴봐야 합니다.

해결 방법:

  1. 키 식별: 클래스 이름은 PerUserAggregation이므로 유지해야 하는 주요 정보는 user_id입니다.

  2. 계산할 측정항목 선택: 사용자의 CommonLog 항목 모음에서 무엇을 계산할 수 있나요?

    • 개수: 사용자가 서버에 액세스한 횟수
    • 합계: 사용자가 다운로드한 총 바이트 수(num_bytes)
    • 최솟값/최댓값: 첫 번째 및 마지막 활동 타임스탬프
  3. 예를 들면 다음과 같습니다.

user_id: str page_views: int ...

미니 과제 - #TODO 2:

이 과제는 Apache Beam 프레임워크의 기술 요구사항입니다. 이 과제에서는 Beam이 커스텀 데이터 유형을 처리하는 방식에 대한 지식을 테스트합니다.

Apache Beam이 파이프라인을 실행할 때는 작업자라고 하는 여러 컴퓨터 간에 데이터를 전송해야 하는 경우가 많습니다. 이를 위해서는 Python 객체(예: PerUserAggregation 인스턴스)를 바이트 스트림으로 직렬화하고, 네트워크를 통해 전송한 다음, 다른 쪽에서 다시 객체로 역직렬화해야 합니다. Coder는 Beam에 이 직렬화 및 역직렬화를 수행하는 방법을 알려주는 객체입니다.

Beam에 커스텀 PerUserAggregation 클래스를 인코딩/디코딩하는 방법을 알려주지 않으면 파이프라인이 오류와 함께 실패합니다.

해결 방법:

솔루션은 #TODO 바로 위 줄에 있습니다. Beam은 NamedTuple 클래스와 완벽하게 작동하는 RowCoder를 제공합니다. CommonLog에 대해 수행한 것과 마찬가지로 새 PerUserAggregation 클래스에 대해 RowCoder를 등록하기만 하면 됩니다.

  1. 예를 들면 다음과 같습니다.
beam.coders.registry.register_coder(PerUserAggregation, ...)

작업 4. 파이프라인 실행

터미널로 돌아가서 다음 명령어를 실행하여 Cloud Dataflow 서비스를 사용해 파이프라인을 실행합니다. 문제가 있는 경우 DirectRunner로 실행하거나 솔루션을 참고할 수 있습니다.

  • 아래 코드 스니펫에서 ENTER_REGION_IDENTER_ZONE_ID 필드를 아래 표에 따라 바꿉니다.
리전 영역
  • 실습 사양에 따라 리전 및 영역 값을 바꿉니다.
# 1. Set all environment variables export PROJECT_ID=$(gcloud config get-value project) export REGION=ENTER_REGION_ID export ZONE=ENTER_ZONE_ID export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export TABLE_NAME=${PROJECT_ID}:logs.user_traffic # 2. Double-check the input file exists # This command should NOT return an error. echo "Verifying input file exists at ${INPUT_PATH}..." gcloud storage ls ${INPUT_PATH} # 3. Execute the pipeline script echo "Running the Dataflow pipeline..." python3 batch_user_traffic_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --worker_zone=${ZONE} \ --staging_location=${PIPELINE_FOLDER}/staging \ --temp_location=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --input_path=${INPUT_PATH} \ --table_name=${TABLE_NAME}
  • 제출된 작업은 Dataflow 대시보드에서 확인할 수 있습니다.

  • 작업 상태가 성공이면 BigQuery에서 결과를 확인해 보겠습니다. 이 작업을 완료하려면 파이프라인이 완료될 때까지 몇 분 정도 기다린 다음 BigQuery로 이동하여 user_traffic 테이블을 쿼리합니다.

내 진행 상황 확인하기를 클릭하여 목표를 확인합니다. 사용자별로 사이트 트래픽을 집계하고 파이프라인 실행

파트 B. 분당 사이트 트래픽 집계

실습의 이 부분에서는 batch_minute_traffic이라는 새 파이프라인을 만듭니다. batch_user_traffic에 사용된 기본 일괄 분석 원칙을 확장하여 전체 일괄 처리에서 사용자를 기준으로 집계하는 대신 이벤트가 발생한 시점을 기준으로 집계합니다. 다시 여러 개의 #TODO가 있습니다. 기본적으로 수정해야 하거나 솔루션을 참고할 수 있습니다.

IDE에서 아래 경로로 이동하여 batch_minute_traffic_pipeline.py 파일을 엽니다.

3_Batch_Analytics/lab 참고: 기본 스크립트(batch_minute_traffic_pipeline.py)로 작업을 시작하기 전에 pipeline_utils.pysetup.py라는 두 개의 도우미 파일을 살펴봅니다. Dataflow 작업을 실행하기 위해 이러한 도우미 파일을 활용합니다. pipeline_utils는 패키징되어 원격 Dataflow 작업자에게 전송될 수 있는 별도의 모듈에 커스텀 Python 로직(클래스 및 함수)을 보유합니다. 반면 setup.py는 Dataflow의 설명서 역할을 하며, pipeline_utils.py 파일을 모든 작업자에 설치할 적절한 패키지로 번들링하는 방법을 정확히 알려줍니다.

작업 5. 각 요소에 타임스탬프 추가

이러한 #TODO 항목은 Apache Beam에서 기존 시계열 집계 파이프라인을 빌드하는 과정을 안내합니다. 각각은 일괄 처리 및 스트림 처리의 핵심 개념을 나타냅니다. 이 파이프라인의 목표는 웹 이벤트(events.json)의 JSON 파일을 처리하고, 매분 발생한 이벤트 수를 세고, 이러한 분 단위 개수를 BigQuery 테이블에 쓰는 것입니다.

파이프라인 흐름은 다음과 같습니다. Read Text -> Parse to CommonLog -> TODO -> Write to BigQuery

#TODO는 파이프라인의 핵심 개념으로, 실제 집계 로직이 발생하는 곳입니다.

미니 과제 - #TODO 1:

CommonLog 객체 모음이 있습니다. 파이프라인의 다음 단계는 이러한 이벤트를 시간별로 그룹화하는 것입니다(WindowByMinute). Apache Beam은 각 데이터의 이벤트 시간을 알지 못하면 이 작업을 수행할 수 없습니다. 이 과제에서는CommonLog 객체 내에서 이 타임스탬프를 찾는 방법을 Beam에 알려줘야 합니다.

해결 방법:

  1. add_timestamp 함수(pipeline_utils.py에 정의됨)는 각 로그 레코드에서 타임스탬프 문자열을 파싱하고 이를 윈도잉에 필요한 적절한 Beam 타임스탬프로 요소에 연결합니다.

  2. 예를 들면 다음과 같습니다.

| 'AddEventTimestamp' >> beam.Map(...)

작업 6. 1분 기간으로 윈도잉

스크립트의 두 번째 작업으로 이동하여 과제에 따라 그룹 요소를 변환해 보겠습니다.

미니 과제 - #TODO 2:

  1. 이 변환은 이벤트 타임스탬프를 기준으로 고정된 크기(60초(1분))의 비중첩 기간으로 요소를 그룹화합니다.

  2. 예를 들면 다음과 같습니다.

| "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(...))
  1. 다른 유형의 윈도잉에 대해 자세히 알아보려면 Apache Beam 문서의 섹션 8.2. 제공된 윈도우 함수를 참고하세요.

작업 7. 기간당 이벤트 수 계산

스크립트의 세 번째 작업으로 넘어가서 기간별 이벤트 수를 계산해 보겠습니다.

미니 과제 - #TODO 3:

  1. 이 Combiner는 각 기간(1분) 내의 요소 수를 계산합니다. .without_defaults()를 사용하여 빈 기간에 대해 출력이 생성되지 않도록 합니다.

  2. 예를 들면 다음과 같습니다.

| "CountPerMinute" >> beam.CombineGlobally(CountCombineFn())...()
  1. 이 작업을 완료하려면 각 기간의 모든 요소를 계산하는 변환을 추가합니다. 문제가 발생하면 솔루션을 참고하세요.

작업 8. 다시 행으로 변환하고 타임스탬프 추가

스크립트의 마지막 작업으로 이동하여 다시 행으로 변환하고 타임스탬프를 추가해 보겠습니다.

미니 과제 - #TODO 4:

  1. GetTimestampFn(pipeline_utils.py에 정의됨)은 각 기간의 정수 카운트를 가져와 딕셔너리로 포맷하고 BigQuery 스키마와 일치하도록 기간의 시작 시간을 문자열로 추가합니다.

  2. 이 작업을 완료하려면 int 유형의 요소를 허용하고 추가 파라미터를 전달하여 기간 정보에 액세스하는 ParDo 함수를 작성합니다. BigQuery 테이블 스키마의 타임스탬프 필드는 STRING이므로 타임스탬프를 문자열로 변환해야 합니다.

  3. 예를 들면 다음과 같습니다.

| "AddWindowTimestamp" >> beam.ParDo(...()) | 'WriteToBQ' >> beam.io.WriteToBigQuery( table_name, schema=table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE )

작업 9. 파이프라인 실행

코딩을 완료한 후 아래 명령어를 사용하여 파이프라인을 실행합니다. 코드를 테스트하는 동안 파이프라인을 로컬에서 실행하는 DirectRunner로 RUNNER 환경 변수를 변경하면 훨씬 더 빠르게 테스트할 수 있습니다. 지금은 Dataflow를 사용하여 파이프라인을 실행합니다.

  • 실습 사양에 따라 리전 및 영역 값을 바꿉니다.
리전
export PROJECT_ID=$(gcloud config get-value project) export REGION=ENTER_REGION_ID export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export TABLE_NAME=${PROJECT_ID}:logs.minute_traffic python3 batch_minute_traffic_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --staging_location=${PIPELINE_FOLDER}/staging \ --temp_location=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --input_path=${INPUT_PATH} \ --table_name=${TABLE_NAME} \ --setup_file=./setup.py

작업 10. 결과 확인

  • 이 작업을 완료하려면 파이프라인이 실행될 때까지 몇 분 정도 기다린 다음 BigQuery로 이동하여 minute_traffic 테이블을 쿼리합니다.

내 진행 상황 확인하기를 클릭하여 목표를 확인합니다. 분 단위로 사이트 트래픽을 집계하고 파이프라인 실행

실습 종료하기

실습을 완료하면 실습 종료를 클릭합니다. Google Cloud Skills Boost에서 사용된 리소스를 자동으로 삭제하고 계정을 지웁니다.

실습 경험을 평가할 수 있습니다. 해당하는 별표 수를 선택하고 의견을 입력한 후 제출을 클릭합니다.

별점의 의미는 다음과 같습니다.

  • 별표 1개 = 매우 불만족
  • 별표 2개 = 불만족
  • 별표 3개 = 중간
  • 별표 4개 = 만족
  • 별표 5개 = 매우 만족

의견을 제공하고 싶지 않다면 대화상자를 닫으면 됩니다.

의견이나 제안 또는 수정할 사항이 있다면 지원 탭을 사용하세요.

Copyright 2026 Google LLC All rights reserved. Google 및 Google 로고는 Google LLC의 상표입니다. 기타 모든 회사명 및 제품명은 해당 업체의 상표일 수 있습니다.

시작하기 전에

  1. 실습에서는 정해진 기간 동안 Google Cloud 프로젝트와 리소스를 만듭니다.
  2. 실습에는 시간 제한이 있으며 일시중지 기능이 없습니다. 실습을 종료하면 처음부터 다시 시작해야 합니다.
  3. 화면 왼쪽 상단에서 실습 시작을 클릭하여 시작합니다.

시크릿 브라우징 사용

  1. 실습에 입력한 사용자 이름비밀번호를 복사합니다.
  2. 비공개 모드에서 콘솔 열기를 클릭합니다.

콘솔에 로그인

    실습 사용자 인증 정보를 사용하여
  1. 로그인합니다. 다른 사용자 인증 정보를 사용하면 오류가 발생하거나 요금이 부과될 수 있습니다.
  2. 약관에 동의하고 리소스 복구 페이지를 건너뜁니다.
  3. 실습을 완료했거나 다시 시작하려고 하는 경우가 아니면 실습 종료를 클릭하지 마세요. 이 버튼을 클릭하면 작업 내용이 지워지고 프로젝트가 삭제됩니다.

현재 이 콘텐츠를 이용할 수 없습니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

감사합니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

한 번에 실습 1개만 가능

모든 기존 실습을 종료하고 이 실습을 시작할지 확인하세요.

시크릿 브라우징을 사용하여 실습 실행하기

이 실습을 실행하는 가장 좋은 방법은 시크릿 모드 또는 시크릿 브라우저 창을 사용하는 것입니다. 개인 계정과 학생 계정 간의 충돌로 개인 계정에 추가 요금이 발생하는 일을 방지해 줍니다.