시작하기 전에
- 실습에서는 정해진 기간 동안 Google Cloud 프로젝트와 리소스를 만듭니다.
- 실습에는 시간 제한이 있으며 일시중지 기능이 없습니다. 실습을 종료하면 처음부터 다시 시작해야 합니다.
- 화면 왼쪽 상단에서 실습 시작을 클릭하여 시작합니다.
Create Vertex AI Platform Notebooks instance and clone course repo
/ 25
Generate synthetic data
/ 25
Aggregating site traffic by user and run your pipeline
/ 25
Aggregating site traffic by minute and run the pipeline
/ 25
이 실습에서 학습할 내용은 다음과 같습니다.
각 실습에서는 정해진 기간 동안 새 Google Cloud 프로젝트와 리소스 집합이 무료로 제공됩니다.
시크릿 창을 사용하여 Qwiklabs에 로그인합니다.
실습 사용 가능 시간(예: 1:15:00)을 참고하여 해당 시간 내에 완료합니다.
일시중지 기능은 없습니다. 필요한 경우 다시 시작할 수 있지만 처음부터 시작해야 합니다.
준비가 되면 실습 시작을 클릭합니다.
실습 사용자 인증 정보(사용자 이름 및 비밀번호)를 기록해 두세요. Google Cloud Console에 로그인합니다.
Google Console 열기를 클릭합니다.
다른 계정 사용을 클릭한 다음, 안내 메시지에 이 실습에 대한 사용자 인증 정보를 복사하여 붙여넣습니다.
다른 사용자 인증 정보를 사용하는 경우 오류가 발생하거나 요금이 부과됩니다.
약관에 동의하고 리소스 복구 페이지를 건너뜁니다.
이 실습에서는 Workbench 인스턴스 노트북의 터미널에서 모든 명령어를 실행합니다.
Google Cloud 콘솔의 탐색 메뉴()에서 Vertex AI를 선택하거나 Vertex AI 대시보드로 이동합니다.
모든 권장 API 사용 설정을 클릭합니다. 이제 Notebook API가 사용 설정되어 있는지 확인해 보겠습니다.
탐색 메뉴에서 Workbench를 클릭합니다.
Workbench 페이지 상단에서 인스턴스 뷰에 있는지 확인합니다.
새로 만들기를 클릭합니다.
다음과 같이 인스턴스를 구성합니다.
| 이름 | 리전 | 영역 | 고급 옵션(선택사항) |
|---|---|---|---|
| lab-workbench | 필요한 경우 '고급 옵션'을 클릭하여 추가로 맞춤설정(예: 머신 유형, 디스크 크기)할 수 있습니다. |
인스턴스를 만드는 데 몇 분 정도 걸립니다. 준비되면 이름 옆에 녹색 체크표시가 나타납니다.
인스턴스 이름 옆에 있는 JupyterLab 열기를 클릭하여 JupyterLab 인터페이스를 실행합니다. 그러면 브라우저에서 새 탭이 열립니다.
그런 다음 터미널을 클릭합니다. 이 실습의 모든 명령어를 실행할 수 있는 터미널이 열립니다.
다음으로 이 실습에서 사용할 코드 저장소를 다운로드합니다.
노트북 환경의 왼쪽 패널에 있는 파일 브라우저에 training-data-analyst 저장소가 추가됩니다.
클론된 저장소인 /training-data-analyst/quests/dataflow_python/으로 이동합니다. 각 실습의 폴더가 표시됩니다. 이 폴더는 사용자가 완료해야 하는 코드가 포함된 lab 하위 폴더와, 문제 발생 시 참고할 수 있는 정상 작동 예시가 포함된 solution 하위 폴더로 나뉩니다.
내 진행 상황 확인하기를 클릭하여 목표를 확인합니다.
실습의 이 부분에서는 다음 작업을 수행하는 파이프라인을 작성합니다.
CommonLog 객체로 변환합니다.이전 실습과 마찬가지로 첫 번째 단계는 파이프라인에서 처리할 데이터를 생성하는 것입니다. 이전과 마찬가지로 실습 환경을 열고 데이터를 생성합니다.
실제 파이프라인 코드를 편집하기 전에 필요한 종속 항목이 설치되어 있는지 확인해야 합니다.
이 스크립트는 다음과 유사한 줄이 포함된 events.json이라는 파일을 만듭니다.
그런 다음 이 파일이
내 진행 상황 확인하기를 클릭하여 목표를 확인합니다.
이 작업에는 두 가지 미니 과제가 있으며 솔루션을 참고할 수 있습니다.
파일 탐색기에서 아래 언급된 경로로 이동하여 batch_user_traffic_pipeline.py 파일을 엽니다.
이 파이프라인에는 입력 경로와 출력 테이블 이름에 대한 명령줄 옵션을 수락하는 데 필요한 코드와 Google Cloud Storage에서 이벤트를 읽고, 이벤트를 파싱하고, 결과를 BigQuery에 쓰는 코드도 이미 포함되어 있습니다. 하지만 몇 가지 중요한 부분이 누락되어 #TODO로 표시되어 있습니다.
이 과제는 데이터 모델링 작업입니다. 이 단계에서는 모든 로그 이벤트를 사용자별로 그룹화한 후, 각 사용자에 대해 어떤 정보를 계산할지 고려해야 합니다.
집계 결과를 저장할 구조(스키마)를 정의해야 합니다. 집계에 사용할 수 있는 필드를 확인하려면 CommonLog 클래스를 살펴봐야 합니다.
키 식별: 클래스 이름은 PerUserAggregation이므로 유지해야 하는 주요 정보는 user_id입니다.
계산할 측정항목 선택: 사용자의 CommonLog 항목 모음에서 무엇을 계산할 수 있나요?
예를 들면 다음과 같습니다.
이 과제는 Apache Beam 프레임워크의 기술 요구사항입니다. 이 과제에서는 Beam이 커스텀 데이터 유형을 처리하는 방식에 대한 지식을 테스트합니다.
Apache Beam이 파이프라인을 실행할 때는 작업자라고 하는 여러 컴퓨터 간에 데이터를 전송해야 하는 경우가 많습니다. 이를 위해서는 Python 객체(예: PerUserAggregation 인스턴스)를 바이트 스트림으로 직렬화하고, 네트워크를 통해 전송한 다음, 다른 쪽에서 다시 객체로 역직렬화해야 합니다. Coder는 Beam에 이 직렬화 및 역직렬화를 수행하는 방법을 알려주는 객체입니다.
Beam에 커스텀 PerUserAggregation 클래스를 인코딩/디코딩하는 방법을 알려주지 않으면 파이프라인이 오류와 함께 실패합니다.
솔루션은 #TODO 바로 위 줄에 있습니다. Beam은 NamedTuple 클래스와 완벽하게 작동하는 RowCoder를 제공합니다. CommonLog에 대해 수행한 것과 마찬가지로 새 PerUserAggregation 클래스에 대해 RowCoder를 등록하기만 하면 됩니다.
터미널로 돌아가서 다음 명령어를 실행하여 Cloud Dataflow 서비스를 사용해 파이프라인을 실행합니다. 문제가 있는 경우 DirectRunner로 실행하거나 솔루션을 참고할 수 있습니다.
ENTER_REGION_ID 및 ENTER_ZONE_ID 필드를 아래 표에 따라 바꿉니다.| 리전 | 영역 | |
|---|---|---|
제출된 작업은 Dataflow 대시보드에서 확인할 수 있습니다.
작업 상태가 성공이면 BigQuery에서 결과를 확인해 보겠습니다. 이 작업을 완료하려면 파이프라인이 완료될 때까지 몇 분 정도 기다린 다음 BigQuery로 이동하여 user_traffic 테이블을 쿼리합니다.
내 진행 상황 확인하기를 클릭하여 목표를 확인합니다.
실습의 이 부분에서는 batch_minute_traffic이라는 새 파이프라인을 만듭니다. batch_user_traffic에 사용된 기본 일괄 분석 원칙을 확장하여 전체 일괄 처리에서 사용자를 기준으로 집계하는 대신 이벤트가 발생한 시점을 기준으로 집계합니다. 다시 여러 개의 #TODO가 있습니다. 기본적으로 수정해야 하거나 솔루션을 참고할 수 있습니다.
IDE에서 아래 경로로 이동하여 batch_minute_traffic_pipeline.py 파일을 엽니다.
이러한 #TODO 항목은 Apache Beam에서 기존 시계열 집계 파이프라인을 빌드하는 과정을 안내합니다. 각각은 일괄 처리 및 스트림 처리의 핵심 개념을 나타냅니다.
이 파이프라인의 목표는 웹 이벤트(events.json)의 JSON 파일을 처리하고, 매분 발생한 이벤트 수를 세고, 이러한 분 단위 개수를 BigQuery 테이블에 쓰는 것입니다.
파이프라인 흐름은 다음과 같습니다.
Read Text -> Parse to CommonLog -> TODO -> Write to BigQuery
#TODO는 파이프라인의 핵심 개념으로, 실제 집계 로직이 발생하는 곳입니다.
CommonLog 객체 모음이 있습니다. 파이프라인의 다음 단계는 이러한 이벤트를 시간별로 그룹화하는 것입니다(WindowByMinute). Apache Beam은 각 데이터의 이벤트 시간을 알지 못하면 이 작업을 수행할 수 없습니다. 이 과제에서는CommonLog 객체 내에서 이 타임스탬프를 찾는 방법을 Beam에 알려줘야 합니다.
add_timestamp 함수(pipeline_utils.py에 정의됨)는 각 로그 레코드에서 타임스탬프 문자열을 파싱하고 이를 윈도잉에 필요한 적절한 Beam 타임스탬프로 요소에 연결합니다.
예를 들면 다음과 같습니다.
스크립트의 두 번째 작업으로 이동하여 과제에 따라 그룹 요소를 변환해 보겠습니다.
이 변환은 이벤트 타임스탬프를 기준으로 고정된 크기(60초(1분))의 비중첩 기간으로 요소를 그룹화합니다.
예를 들면 다음과 같습니다.
스크립트의 세 번째 작업으로 넘어가서 기간별 이벤트 수를 계산해 보겠습니다.
이 Combiner는 각 기간(1분) 내의 요소 수를 계산합니다. .without_defaults()를 사용하여 빈 기간에 대해 출력이 생성되지 않도록 합니다.
예를 들면 다음과 같습니다.
스크립트의 마지막 작업으로 이동하여 다시 행으로 변환하고 타임스탬프를 추가해 보겠습니다.
GetTimestampFn(pipeline_utils.py에 정의됨)은 각 기간의 정수 카운트를 가져와 딕셔너리로 포맷하고 BigQuery 스키마와 일치하도록 기간의 시작 시간을 문자열로 추가합니다.
이 작업을 완료하려면 int 유형의 요소를 허용하고 추가 파라미터를 전달하여 기간 정보에 액세스하는 ParDo 함수를 작성합니다. BigQuery 테이블 스키마의 타임스탬프 필드는 STRING이므로 타임스탬프를 문자열로 변환해야 합니다.
예를 들면 다음과 같습니다.
코딩을 완료한 후 아래 명령어를 사용하여 파이프라인을 실행합니다. 코드를 테스트하는 동안 파이프라인을 로컬에서 실행하는 DirectRunner로 RUNNER 환경 변수를 변경하면 훨씬 더 빠르게 테스트할 수 있습니다. 지금은 Dataflow를 사용하여 파이프라인을 실행합니다.
| 리전 | ||
|---|---|---|
minute_traffic 테이블을 쿼리합니다.내 진행 상황 확인하기를 클릭하여 목표를 확인합니다.
실습을 완료하면 실습 종료를 클릭합니다. Google Cloud Skills Boost에서 사용된 리소스를 자동으로 삭제하고 계정을 지웁니다.
실습 경험을 평가할 수 있습니다. 해당하는 별표 수를 선택하고 의견을 입력한 후 제출을 클릭합니다.
별점의 의미는 다음과 같습니다.
의견을 제공하고 싶지 않다면 대화상자를 닫으면 됩니다.
의견이나 제안 또는 수정할 사항이 있다면 지원 탭을 사용하세요.
Copyright 2026 Google LLC All rights reserved. Google 및 Google 로고는 Google LLC의 상표입니다. 기타 모든 회사명 및 제품명은 해당 업체의 상표일 수 있습니다.
현재 이 콘텐츠를 이용할 수 없습니다
이용할 수 있게 되면 이메일로 알려드리겠습니다.
감사합니다
이용할 수 있게 되면 이메일로 알려드리겠습니다.
한 번에 실습 1개만 가능
모든 기존 실습을 종료하고 이 실습을 시작할지 확인하세요.