实验设置说明和要求
保护您的账号和进度。请务必在无痕浏览器窗口中,使用实验凭证运行此实验。

在 Cloud Data Fusion 中构建批量流水线

实验 1 小时 30 分钟 universal_currency_alt 5 个点数 show_chart 入门级
info 此实验可能会提供 AI 工具来支持您学习。
此内容尚未针对移动设备进行优化。
为获得最佳体验,请在桌面设备上访问通过电子邮件发送的链接。

GSP807

Google Cloud 自学实验的徽标

概览

ETL 代表提取、转换和加载。这一概念还有多种其他变体,包括 EL、ELT 和 ELTL。

本实验为您介绍如何使用 Cloud Data Fusion 中的 Pipeline Studio 来构建 ETL 流水线。Pipeline Studio 公开了各种构建块和内置插件,让您可以逐个节点地构建批量流水线。您还将使用 Wrangler 插件来构建转换,并将其应用于流经流水线的数据。

ETL 应用最常见的数据来源通常是以逗号分隔值 (CSV) 格式存储在文本文件中的数据,因为许多数据库系统都以此种方式导出和导入数据。在本实验中,您将使用 CSV 文件,但同样的技术也可应用于数据库源,以及您可用的任何其他数据源。

输出结果将写入 BigQuery 表中,随后您将使用标准 SQL 对该目标数据集执行数据分析。

目标

在本实验中,您将学习如何完成以下操作:

  • 使用 Cloud Data Fusion 中的 Pipeline Studio 创建批量流水线。
  • 使用 Wrangler 以交互方式转换数据。
  • 将输出结果写入 BigQuery。

设置和要求

对于每个实验,您都会免费获得一个新的 Google Cloud 项目及一组资源,它们都有固定的使用时限。

  1. 使用无痕式窗口登录 Google Skills。

  2. 留意实验的访问时限(例如 02:00:00)并确保能在此时限内完成实验。
    系统不提供暂停功能。如有需要,您可以重新开始实验,不过必须从头开始。

  3. 准备就绪时,点击开始实验

    注意:点击开始实验后,系统需要 15 到 20 分钟的时间为实验预配必要的资源,并创建一个 Data Fusion 实例。 在此期间,您可以通读下方的步骤,以熟悉实验目标。

    在左侧面板中看到实验凭据(用户名密码)后,实例即已创建,您可继续登录控制台。
  4. 请记好您的实验凭据(用户名密码)。您需要使用这组凭据来登录 Google Cloud 控制台。

  5. 点击打开 Google 控制台

  6. 点击使用其他账号,然后将实验的凭据复制并粘贴到相应提示框中。
    如果您使用其他凭据,将会收到错误消息或产生费用

  7. 接受条款并跳过恢复资源页面。

注意:除非您完成了此实验或想要重新开始,否则请勿点击结束实验。点击此按钮会清除您的实验成果并移除此项目。

登录到 Google Cloud 控制台

  1. 使用您在本次实验课程中使用的浏览器标签页或窗口,从连接详情面板中复制用户名,然后点击打开 Google 控制台按钮。
注意:如果您看到选择账号的提示,请点击使用其他账号
  1. 将其粘贴在用户名中,然后按照提示粘贴密码
  2. 点击下一步
  3. 接受条款及条件。

由于这是一个临时账号,仅在本次实验期间有效:

  • 请勿添加恢复选项
  • 请勿用其注册免费试用服务
  1. 控制台打开后,点击左上角的导航菜单 (“导航菜单”图标) 即可查看服务列表。

导航菜单

激活 Cloud Shell

Cloud Shell 是一种包含开发工具的虚拟机。它提供了一个 5 GB 的永久性主目录,并且在 Google Cloud 上运行。Cloud Shell 可让您通过命令行访问 Google Cloud 资源。gcloud 是 Google Cloud 的命令行工具。它会预先安装在 Cloud Shell 上,且支持 Tab 键自动补全功能。

  1. 在 Google Cloud Console 的导航窗格中,点击激活 Cloud Shell (Cloud Shell 图标)。

  2. 点击继续
    预配和连接到环境需要一些时间。若连接成功,也就表明您已通过身份验证,且相关项目的 ID 会被设为您的 PROJECT_ID。例如:

Cloud Shell 终端

命令示例

  • 列出有效的帐号名称:

gcloud auth list

(输出)

Credentialed accounts: - <myaccount>@<mydomain>.com (active)

(输出示例)

Credentialed accounts: - google1623327_student@qwiklabs.net
  • 列出项目 ID:

gcloud config list project

(输出)

[core] project = <project_ID>

(输出示例)

[core] project = qwiklabs-gcp-44776a13dea667a6

检查项目权限

在开始在 Google Cloud 中工作之前,您必须确保您的项目在 Identity and Access Management (IAM) 中拥有正确的权限。

  1. 在 Google Cloud 控制台的导航菜单 (“导航菜单”图标) 中,点击 IAM 和管理 > IAM

  2. 确认默认计算服务账号 {project-number}-compute@developer.gserviceaccount.com 已存在且被授予了 editor 角色。账号前缀是项目编号,您可以在导航菜单 > Cloud 概览中找到此编号。

默认计算服务账号

如果该账号在 IAM 中不存在或不具有 editor 角色,请按照以下步骤向其分配所需的角色。

  1. 在 Google Cloud 控制台的导航菜单中,点击 Cloud 概览

  2. 项目信息卡片中复制项目编号

  3. 导航菜单中,点击 IAM 和管理 > IAM

  4. IAM 页面顶部,点击添加

  5. 新的主账号字段中,输入:

{project-number}-compute@developer.gserviceaccount.com

{project-number} 替换为您的项目编号。

  1. 选择角色部分,依次选择基本(或“项目”)> Editor

  2. 点击保存

任务 1.加载数据

在此任务中,您将在项目中创建一个 Cloud Storage 存储桶并暂存一个 CSV 文件。Cloud Data Fusion 稍后会从该存储桶中读取数据。

  1. 在 Cloud Shell 中,执行以下命令来创建一个新的存储桶,并将相关数据复制到其中:
export BUCKET=$GOOGLE_CLOUD_PROJECT gcloud storage buckets create gs://$BUCKET

创建的存储桶名称就是您的项目 ID。

  1. 要将数据文件(一个 CSV 文件和一个 XML 文件)复制到您的存储桶中,请运行以下命令:
gsutil cp gs://cloud-training/OCBL163/titanic.csv gs://$BUCKET

点击“检查我的进度”,验证是否完成了以下目标:加载数据

任务 2.为 Cloud Data Fusion 实例添加必要权限

在此任务中,您将为与 Cloud Data Fusion 实例关联的服务账号授予必要的 IAM 角色。

  1. 在 Google Cloud 控制台的导航菜单中,点击查看所有产品,然后在分析类别中选择 Data Fusion > 实例。您应该会看到一个已经设置完毕并可供使用的 Cloud Data Fusion 实例。
注意:创建该实例大约需要 10 分钟。请等待其准备就绪。
  1. 在 Google Cloud 控制台的导航菜单中,点击 IAM 和管理 > IAM

  2. 找到 Compute Engine 默认服务账号 {project-number}-compute@developer.gserviceaccount.com,然后将服务账号复制到剪贴板。

  3. 在 IAM 权限页面上,点击 +授予访问权限

  4. 在“新的主账号”字段中,粘贴该服务账号。

  5. 选择角色部分,输入 Cloud Data Fusion API Service Agent,然后将其选中。

  6. 点击 + 添加其他角色

  7. 选择角色部分,选择 Dataproc Administrator 角色。

  8. 点击保存

点击“检查我的进度”,验证是否完成了以下目标:为服务账号添加 Cloud Data Fusion API Service Agent 角色

授予服务账号用户权限

  1. 在控制台中,点击导航菜单下的 IAM 和管理 > IAM

  2. 选中包括 Google 提供的角色授权复选框。

  3. 在列表中向下滚动,找到由 Google 管理的 Cloud Data Fusion 服务账号(其形式类似于 service-{项目编号}@gcp-sa-datafusion.iam.gserviceaccount.com),随后将该服务账号名称复制到剪贴板。

Google 管理的 Cloud Data Fusion 服务账号列表

  1. 然后前往 IAM 和管理 > 服务账号

  2. 点击默认 Compute Engine 账号(其形式类似于 {项目编号}-compute@developer.gserviceaccount.com),然后选择顶部导航栏中的具有访问权限的主账号标签页。

  3. 点击授予访问权限按钮。

  4. 将您先前复制的服务账号粘贴到新的主账号字段中。

  5. 角色下拉菜单中,选择 Service Account User

  6. 点击保存

任务 3.构建批量流水线

在此任务中,您将使用 Cloud Data Fusion 中的 Wrangler 组件来准备和清理原始数据。这一迭代过程可以让您实时、直观地查看转换结果。

  1. 在 Google Cloud 控制台的导航菜单中,点击 Data Fusion > 实例

  2. 对于您的实例,请点击查看实例。如果系统提示,请使用实验凭证登录。如果系统提示您观看导览,请点击不用了

  3. 在 Cloud Data Fusion 界面中,点击导航菜单上的 Wrangler

  4. 在左侧面板中,点击 (GCS) Google Cloud Storage,然后选择 Cloud Storage Default(Cloud Storage 默认存储桶)。

  5. 点击与您的项目 ID 对应的存储桶。

  6. 点击 titanic.csv

    Cloud Data Fusion Cloud Storage 存储桶 titanic1

  7. Parsing Options(解析选项)对话框中,为格式选择文本,然后点击确认

“解析选项”对话框

数据会加载到 Wrangler 屏幕中。现在,您可以开始以迭代方式应用数据转换。

  1. 如需将原始 CSV 数据解析为表格格式,请点击 body 列标题旁边的箭头,选择解析,然后选择 CSV

    Google Cloud Storage“解析 > CSV”菜单选项

  2. Parse as CSV(解析为 CSV)对话框中,选中 Set first row as header(将第一行设置为标题)复选框,然后点击应用

    “解析为 CSV”对话框

注意:您可以忽略 Set first row as header(将第一行设置为标题)复选框旁边的弃用警告。
  1. 至此,原始数据已解析完毕,您可以看到通过此操作生成的各个列(body 列右侧的列)。在最右侧,您可以看到所有列名称的列表。

Cloud Data Fusion Wrangler 界面

  1. 若要移除原始数据列,请点击 body 列标题旁边的箭头,然后点击删除列

“删除列”菜单选项

注意:您也可以使用命令行界面 (CLI) 来应用转换。CLI 是屏幕底部的黑条(带有绿色 $ 提示)。当您开始输入命令时,自动填充功能将被激活并为您呈现匹配选项。例如,若要删除 body 列,您也可以使用指令:drop :body

CLI 删除 body 列语法

  1. 在 Wrangler 界面右侧,点击 Transformation steps(转换步骤)标签页,查看您的当前 recipe。

Titanic.csv drop :body column

注意:菜单选项和 CLI 创建指令都会显示在屏幕右侧的 Transformation steps(转换步骤)标签页中。指令是单个转换操作,它们统称为 recipe。

对于本实验而言,这两个转换步骤(即 recipe)足以创建该 ETL 流水线。下一步是将此 recipe 引入到流水线构建步骤中,该 recipe 即代表 ETL 中的“T”。

  1. 点击创建流水线按钮,跳转到下一部分以创建流水线,您将在那里看到 ETL 流水线是如何组合在一起的。

    “创建流水线”按钮

  2. 在随后出现的对话框中,选择 Batch pipeline(批量流水线)以继续。

Batch pipeline(批量流水线)选项

注意:批量流水线可以交互式运行,也可以按计划运行,频率最高可达每 5 分钟一次,最低可至每年一次。

任务 4.配置 BigQuery 接收器

剩余的流水线构建任务将在 Pipeline Studio 中进行,该界面可让您以直观方式构建数据流水线。现在,您应该能在 Studio 中看到 ETL 流水线的主要构建块。

此时,您将在流水线中看到两个节点:GCS 文件插件(用于从 Google Cloud Storage 读取 CSV 文件)和 Wrangler 插件(包含带有转换的 recipe)。

注意:流水线中的节点是按特定顺序连接在一起的对象,旨在生成有向无环图。例如:来源、接收器、转换、操作等。

这两个插件(节点)代表 ETL 流水线中的“E”和“T”。要完成此流水线,请添加 BigQuery 接收器,即 ETL 中的“L”部分。

流水线架构图

  1. 要将 BigQuery 接收器添加到流水线,请前往左侧面板上的接收器部分,然后点击 BigQuery 图标,将其放置在画布上。

“BigQuery 接收器”部分

  1. 将 BigQuery 接收器放置在画布上后,再将 Wrangler 节点与 BigQuery 节点连接起来。具体做法是:将箭头从 Wrangler 节点拖至 BigQuery 节点进行连接,如下图所示。现在剩下的唯一工作就是指定一些配置选项,以便您能够将数据写入目标数据集。

    将 Wrangler 节点与 BigQuery 节点进行连接

任务 5.配置流水线

现在开始配置流水线。您可以打开每个节点的属性页面来验证其设置,或进行任何其他更改。

  1. 将鼠标悬停在 GCS 节点上,此时会显示一个属性按钮。点击此按钮即可打开配置设置。

GCS 节点“属性”对话框

每个插件都有一些必填字段,这些字段必须存在,并用星号 (*) 标记。根据您使用的插件,您可能会在左侧看到输入架构,在中间看到配置部分,在右侧看到输出架构

您会发现,接收器插件没有输出架构,而来源插件没有输入架构。接收器和来源插件都将包含一个必填的参考名称字段,用于标识数据源/接收器以实现沿袭。

每个插件都有一个标签字段。这是您在展示流水线的画布上所看到的节点标签。

  1. 点击“属性”框右上角的 X 将其关闭。

  2. 接下来,将鼠标悬停在您的 Wrangler 节点上,然后点击属性

Wrangler 节点属性对话框

注意:像 Wrangler 这样的插件包含输入架构。这些是正在传递到插件中以供处理的字段。插件将其处理完毕后,传出数据可能就会以输出架构的形式发送到流水线中的下一个节点;如果是接收器,这些数据则会被写入数据集。
  1. 点击“属性”框右上角的 X 将其关闭。

  2. 将鼠标悬停在 BigQuery 节点上,点击属性,然后输入以下配置设置:

    • 参考名称部分,输入 Titanic_BQ

    • 数据集部分,输入 demo

    • 部分,输入 titanic

  3. 点击“属性”框右上角的 X 将其关闭。

BigQuery 属性对话框

任务 6.测试流水线

现在剩下的唯一工作就是测试您的流水线,看看其是否按预期运行。不过,在执行此操作前,请务必命名并保存您的草稿,以免丢失任何工作。

  1. 现在,点击右上角菜单中的保存。系统会提示您输入一个名称并为流水线添加说明

    • 输入 ETL-batch-pipeline 作为流水线的名称。
    • 输入 ETL pipeline to parse CSV, transform and write output to BigQuery(用于解析 CSV、执行转换并将输出写入 BigQuery 的 ETL 流水线)作为说明
  2. 点击保存

  3. 若要测试您的流水线,请点击预览图标。按钮栏现在会显示一个“运行”图标,您可以点击该图标,在预览模式下运行流水线。

  4. 点击运行图标。在预览模式下运行流水线时,系统实际上不会将任何数据写入 BigQuery 表中,但您将能够确认系统在正确读取数据,并且一旦流水线完成部署,系统也将按预期写入数据。“预览”按钮是一个切换开关,完成操作后,请务必再次点击该按钮以退出预览模式。

    正在准备运行的流水线

  5. 流水线运行完成后,将鼠标悬停在 Wrangler 节点上并点击属性。然后点击预览标签页。如果一切顺利,您应该会看到从输入(左侧节点)传入的原始数据,以及将作为输出发往右侧节点的解析后记录。点击“属性”框右上角的 X 将其关闭。

Wrangler 节点输出

注意:对数据执行操作的每个节点都应该向您显示类似输出。在部署流水线之前,这是验证您的工作并确保方向正确的好方法。如果您遇到任何错误,都可以在草稿模式下轻松修复。
  1. 再次点击预览图标,这次是退出预览模式。

  2. 如果到目前为止一切正常,您可以继续部署流水线。点击右上角的部署图标 “部署”图标 以部署流水线。

您会看到一个确认对话框,提示正在部署流水线:

“部署流水线”确认对话框

  1. 成功部署流水线后,您就可以运行 ETL 流水线并将一些数据加载到 BigQuery 中。

  2. 点击运行图标执行 ETL 作业。

  3. 完成后,您应该会看到流水线状态更改为成功,这表示流水线运行成功。

    Cloud Data Fusion 流水线运行成功

  4. 随着流水线处理数据,您将看到流水线中的每个节点发出指标,指明已处理的记录数。 在解析操作中,它显示了 892 条记录,而在来源中,则有 893 条记录,这是为什么?解析操作提取了第一行并将其用作列标题,因此剩余的 892 条记录是需要处理的数据。

流水线解析 CSV 的示意图

点击“检查我的进度”,验证是否完成了以下目标:部署并执行批量流水线

任务 7.查看结果

该流水线会将输出写入 BigQuery 表中。您可通过以下步骤验证这一点。

  1. 在新标签页中,打开 Cloud 控制台中的 BigQuery 界面,或右键点击控制台标签页并选择复制,然后使用导航菜单选择 BigQuery。出现提示时,点击完成

  2. 在左侧窗格的传统版探索器部分,点击您的项目 ID(以 qwiklabs 开头)。

  3. 在项目的 demo 数据集下,点击 titanic 表,然后点击 “+”(SQL 查询),接着运行一个简单的查询,例如:

SELECT * FROM `demo.titanic` LIMIT 10

查询结果

点击“检查我的进度”,验证是否完成了以下目标:查看结果

恭喜!

现在,您已经学会使用 Cloud Data Fusion 的 Pipeline Studio 中提供的构建块来构建批量流水线。您还学会了使用 Wrangler 为数据创建转换步骤。

参与下一项实验

继续学习使用 Cloud Data Fusion 中的 Wrangler 构建转换并准备数据

上次更新手册的时间:2026 年 1 月 27 日

上次测试实验的时间:2026 年 1 月 27 日

版权所有 2026 Google LLC 保留所有权利。Google 和 Google 徽标是 Google LLC 的商标。其他所有公司名称和产品名称可能是其各自相关公司的商标。

准备工作

  1. 实验会创建一个 Google Cloud 项目和一些资源,供您使用限定的一段时间
  2. 实验有时间限制,并且没有暂停功能。如果您中途结束实验,则必须重新开始。
  3. 在屏幕左上角,点击开始实验即可开始

使用无痕浏览模式

  1. 复制系统为实验提供的用户名密码
  2. 在无痕浏览模式下,点击打开控制台

登录控制台

  1. 使用您的实验凭证登录。使用其他凭证可能会导致错误或产生费用。
  2. 接受条款,并跳过恢复资源页面
  3. 除非您已完成此实验或想要重新开始,否则请勿点击结束实验,因为点击后系统会清除您的工作并移除该项目

此内容目前不可用

一旦可用,我们会通过电子邮件告知您

太好了!

一旦可用,我们会通过电子邮件告知您

一次一个实验

确认结束所有现有实验并开始此实验

使用无痕浏览模式运行实验

使用无痕模式或无痕浏览器窗口是运行此实验的最佳方式。这可以避免您的个人账号与学生账号之间发生冲突,这种冲突可能导致您的个人账号产生额外费用。