实验设置说明和要求
保护您的账号和进度。请务必在无痕浏览器窗口中,使用实验凭证运行此实验。

在 Cloud Data Fusion 中构建实时流水线

实验 1 小时 30 分钟 universal_currency_alt 5 个点数 show_chart 入门级
info 此实验可能会提供 AI 工具来支持您学习。
此内容尚未针对移动设备进行优化。
为获得最佳体验,请在桌面设备上访问通过电子邮件发送的链接。

GSP808

Google Cloud 自学实验的徽标

概览

除了批处理流水线,Data Fusion 还允许您创建实时流水线,在事件生成的同时即可处理。目前,实时流水线通过 Cloud Dataproc 集群上的 Apache Spark Streaming 执行。在本实验中,您将学习如何使用 Data Fusion 构建流处理流水线。

您将创建一个流水线,该流水线会从 Cloud Pub/Sub 主题读取数据并处理事件,然后执行一些转换,最后将输出写入 BigQuery。

目标

  1. 了解如何创建实时流水线
  2. 了解如何在 Data Fusion 中配置 Pub/Sub 来源插件
  3. 了解如何使用 Wrangler 为来自不支持的连接的数据定义转换

设置和要求

对于每个实验,您都会免费获得一个新的 Google Cloud 项目及一组资源,它们都有固定的使用时限。

  1. 使用无痕式窗口登录 Google Skills。

  2. 留意实验的访问时限(例如 02:00:00)并确保能在此时限内完成实验。
    系统不提供暂停功能。如有需要,您可以重新开始实验,不过必须从头开始。

  3. 准备就绪时,点击开始实验

    注意:点击开始实验后,系统需要 15 到 20 分钟的时间为实验预配必要的资源,并创建一个 Data Fusion 实例。 在此期间,您可以通读下方的步骤,以熟悉实验目标。

    在左侧面板中看到实验凭据(用户名密码)后,实例即已创建,您可继续登录控制台。
  4. 请记好您的实验凭据(用户名密码)。您需要使用这组凭据来登录 Google Cloud 控制台。

  5. 点击打开 Google 控制台

  6. 点击使用其他账号,然后将实验的凭据复制并粘贴到相应提示框中。
    如果您使用其他凭据,将会收到错误消息或产生费用

  7. 接受条款并跳过恢复资源页面。

注意:除非您完成了此实验或想要重新开始,否则请勿点击结束实验。点击此按钮会清除您的实验成果并移除此项目。

登录到 Google Cloud 控制台

  1. 使用您在本次实验课程中使用的浏览器标签页或窗口,从连接详情面板中复制用户名,然后点击打开 Google 控制台按钮。
注意:如果您看到选择账号的提示,请点击使用其他账号
  1. 将其粘贴在用户名中,然后按照提示粘贴密码
  2. 点击下一步
  3. 接受条款及条件。

由于这是一个临时账号,仅在本次实验期间有效:

  • 请勿添加恢复选项
  • 请勿用其注册免费试用服务
  1. 控制台打开后,点击左上角的导航菜单 (“导航菜单”图标) 即可查看服务列表。

导航菜单

激活 Cloud Shell

Cloud Shell 是一种包含开发工具的虚拟机。它提供了一个 5 GB 的永久性主目录,并且在 Google Cloud 上运行。Cloud Shell 可让您通过命令行访问 Google Cloud 资源。gcloud 是 Google Cloud 的命令行工具。它会预先安装在 Cloud Shell 上,且支持 Tab 键自动补全功能。

  1. 在 Google Cloud Console 的导航窗格中,点击激活 Cloud Shell (Cloud Shell 图标)。

  2. 点击继续
    预配和连接到环境需要一些时间。若连接成功,也就表明您已通过身份验证,且相关项目的 ID 会被设为您的 PROJECT_ID。例如:

Cloud Shell 终端

命令示例

  • 列出有效的帐号名称:

gcloud auth list

(输出)

Credentialed accounts: - <myaccount>@<mydomain>.com (active)

(输出示例)

Credentialed accounts: - google1623327_student@qwiklabs.net
  • 列出项目 ID:

gcloud config list project

(输出)

[core] project = <project_ID>

(输出示例)

[core] project = qwiklabs-gcp-44776a13dea667a6

任务 1. 项目权限

检查项目权限

在开始在 Google Cloud 中工作之前,您必须确保您的项目在 Identity and Access Management (IAM) 中拥有正确的权限。

  1. 在 Google Cloud 控制台的导航菜单 (“导航菜单”图标) 中,点击 IAM 和管理 > IAM

  2. 确认默认计算服务账号 {project-number}-compute@developer.gserviceaccount.com 已存在且被授予了 editor 角色。账号前缀是项目编号,您可以在导航菜单 > Cloud 概览中找到此编号。

默认计算服务账号

如果该账号在 IAM 中不存在或不具有 editor 角色,请按照以下步骤向其分配所需的角色。

  1. 在 Google Cloud 控制台的导航菜单中,点击 Cloud 概览

  2. 项目信息卡片中复制项目编号

  3. 导航菜单中,点击 IAM 和管理 > IAM

  4. IAM 页面顶部,点击添加

  5. 新的主账号字段中,输入:

{project-number}-compute@developer.gserviceaccount.com

{project-number} 替换为您的项目编号。

  1. 选择角色部分,依次选择基本(或“项目”)> Editor

  2. 点击保存

任务 2. 确保 Dataflow API 已成功启用

为了确保能访问这个必要的 API,请重新启动与 Dataflow API 的连接。

  1. 在 Cloud 控制台的顶部搜索栏中输入“Dataflow API”。点击 Dataflow API 的搜索结果。

  2. 点击管理

  3. 点击停用 API

如果系统要求您确认,请点击停用

  1. 点击启用

任务 3. 加载数据

  1. 首先,您需要将示例推文下载到计算机上。稍后,您将使用 Wrangler 上传这些示例推文,以定义转换步骤。

您还需要将同一份示例推文文件暂存在 Cloud Storage 存储桶中。在本实验的最后,您将把存储桶中的数据流式传输到 Pub/Sub 主题。

  1. 在 Cloud Shell 中,执行以下命令以创建新存储桶:
export BUCKET=$GOOGLE_CLOUD_PROJECT gsutil mb gs://$BUCKET

新建存储桶的名称与您的项目 ID 相同。

  1. 运行以下命令,将推文文件复制到存储桶中:
gsutil cp gs://cloud-training/OCBL164/pubnub_tweets_2019-06-09-05-50_part-r-00000 gs://$BUCKET
  1. 验证该文件是否已复制到您的 Cloud Storage 存储桶中。

点击“检查我的进度”以验证是否完成了以下目标: 加载数据

任务 4. 设置 Pub/Sub 主题

要使用 Pub/Sub,您需创建一个用于保存数据的主题,然后创建订阅来访问发布到该主题的数据。

  1. 在 Cloud 控制台的导航菜单中,点击查看所有产品,然后在“Analytics”部分下点击 Pub/Sub,再选择主题

  2. 点击创建主题

“创建主题”按钮

  1. 主题名称必须唯一。在本实验中,请将主题命名为 cdf_lab_topic,然后点击创建

点击“检查我的进度”以验证是否完成了以下目标: 设置 Pub/Sub 主题

任务 5. 添加 Pub/Sub 订阅

还是在主题页面上。现在您要创建订阅以访问该主题。

  1. 点击创建订阅

“创建订阅”链接

  1. 为该订阅输入名称(例如 cdf_lab_subscription),将传送类型设置为拉取,然后点击创建

“为主题添加订阅”页面

点击“检查我的进度”以验证是否完成了以下目标: 添加 Pub/Sub 订阅

任务 6. 为 Cloud Data Fusion 实例添加必要权限

  1. 在 Google Cloud 控制台中,依次点击导航菜单 > 查看所有产品。在“分析”部分下,依次点击 Data Fusion > 实例
注意:创建实例大约需要 20 分钟。请等待其准备就绪。

接下来,您将按照以下步骤,向与实例关联的服务账号授予权限。

  1. 在 Google Cloud 控制台中,找到 IAM 和管理 > IAM

  2. 确认 Compute Engine 默认服务账号 {project-number}-compute@developer.gserviceaccount.com 已存在,并将服务账号复制到剪贴板。

  3. 在 IAM 权限页面,点击 +授予访问权限

  4. 在“新的主账号”字段中,粘贴刚刚复制的服务账号。

  5. 点击选择角色字段,输入 Cloud Data Fusion API Service Agent,然后选择该角色。

  6. 点击添加其他角色

  7. 添加 Dataproc Administrator 角色。

  8. 点击保存

点击“检查我的进度”以验证是否完成了以下目标: 为服务账号添加 Cloud Data Fusion API Service Agent 角色

授予服务账号用户权限

  1. 在控制台中,点击导航菜单下的 IAM 和管理 > IAM

  2. 选中包括 Google 提供的角色授权复选框。

  3. 在列表中向下滚动,找到由 Google 管理的 Cloud Data Fusion 服务账号(其形式类似于 service-{项目编号}@gcp-sa-datafusion.iam.gserviceaccount.com),随后将该服务账号名称复制到剪贴板。

Google 管理的 Cloud Data Fusion 服务账号列表

  1. 然后前往 IAM 和管理 > 服务账号

  2. 点击默认 Compute Engine 账号(其形式类似于 {项目编号}-compute@developer.gserviceaccount.com),然后选择顶部导航栏中的具有访问权限的主账号标签页。

  3. 点击授予访问权限按钮。

  4. 将您先前复制的服务账号粘贴到新的主账号字段中。

  5. 角色下拉菜单中,选择 Service Account User

  6. 点击保存

任务 7. 浏览 Cloud Data Fusion 界面

使用 Cloud Data Fusion 时,您将同时使用 Cloud 控制台和单独的 Cloud Data Fusion 界面。在 Cloud 控制台中,您可以创建 Cloud 控制台项目,以及创建和删除 Cloud Data Fusion 实例。在 Cloud Data Fusion 界面中,您可以通过各种页面(例如 Pipeline StudioWrangler)来使用 Cloud Data Fusion 功能。

若要浏览 Cloud Data Fusion 界面,请按以下步骤操作:

  1. 在 Cloud 控制台中,返回 Data Fusion,然后点击 Data Fusion 实例旁边的查看实例链接。选择您的实验凭证进行登录。如果系统提示您浏览该服务,请点击不用了。现在,您应该已进入 Cloud Data Fusion 界面。

“查看实例”链接

  1. 在 Cloud Data Fusion 控制中心,使用导航菜单显示左侧菜单,然后依次选择 Pipeline > Studio

  2. 在左上角,使用下拉菜单选择 Data Pipeline - Realtime

任务 8. 构建实时流水线

在处理数据时,如果能直观地查看原始数据的样貌会非常方便,这能为您后续的转换工作提供基础。为此,您将使用 Wrangler 来准备和清理数据。这种以数据为先的方法可让您快速直观地了解转换情况,而实时反馈则可确保您走在正确的轨道上。

  1. 在插件面板的转换部分中,选择 Wrangler。Wrangler 节点将显示在画布上,点击“属性”按钮即可打开。

  2. 点击指令部分下的 WRANGLE 按钮。

  3. 加载完成后,点击左侧侧边菜单中的上传。接下来,点击上传图标,上传您之前下载到计算机中的示例推文文件。

Wrangler“从您的计算机上传数据”页面

  1. 数据会以行/列的形式加载到 Wrangler 界面中。这需要几分钟时间。
注意:请将这些数据视为您最终将在 Pub/Sub 中收到的事件的样本。这也符合实际情况:在开发流水线时,您通常无法访问生产数据。

不过,您的管理员可能会授予您访问少量样本的权限,或者您可能正在处理符合 API 合约的模拟数据。在本部分中,您将对这个样本进行迭代式转换,并在每个步骤中获得反馈。然后,您将学习如何对真实数据复现转换。
  1. 第一项操作是将 JSON 数据解析成划分为行和列的表格形式。为此,您需要从第一列 (body) 标题中选择下拉菜单图标,然后选择解析菜单项,再从子菜单中选择 JSON。在弹出式窗口中,将深度设置为 1,然后点击应用

    指向 JSON 选项的导航路径。

  2. 重复上一步,以查看更易于理解的数据结构,方便后续转换。点击 body 列下拉菜单图标,依次选择解析 > JSON,并将深度设置为 1,然后点击应用

    body_payload 数据

    除了使用界面之外,您还可以在 Wrangler 界面底部的指令命令行框(查找带有绿色 $ 提示的命令控制台)中编写转换步骤。 在下一步中,您将使用命令控制台粘贴一组转换步骤。

  3. 将以下指令全部复制粘贴至 Wrangler 指令命令行框中,以添加转换步骤:

columns-replace s/^body_payload_//g drop id_str parse-as-simple-date :created_at EEE MMM dd HH:mm:ss Z yyyy drop display_text_range drop truncated drop in_reply_to_status_id_str drop in_reply_to_user_id_str parse-as-json :user 1 drop coordinates set-type :place string drop geo,place,contributors,is_quote_status,favorited,retweeted,filter_level,user_id_str,user_url,user_description,user_translator_type,user_protected,user_verified,user_followers_count,user_friends_count,user_statuses_count,user_favourites_count,user_listed_count,user_is_translator,user_contributors_enabled,user_lang,user_geo_enabled,user_time_zone,user_utc_offset,user_created_at,user_profile_background_color,user_profile_background_image_url,user_profile_background_image_url_https,user_profile_background_tile,user_profile_link_color,user_profile_sidebar_border_color,user_profile_sidebar_fill_color,user_profile_text_color,user_profile_use_background_image drop user_following,user_default_profile_image,user_follow_request_sent,user_notifications,extended_tweet,quoted_status_id,quoted_status_id_str,quoted_status,quoted_status_permalink drop user_profile_image_url,user_profile_image_url_https,user_profile_banner_url,user_default_profile,extended_entities fill-null-or-empty :possibly_sensitive 'false' set-type :possibly_sensitive boolean drop :entities drop :user_location 注意:如果显示的消息类似于“No data. Try removing some transformation steps.”(无数据,请尝试移除一些转换步骤。),请点击任意转换步骤旁的 X 移除该步骤,待数据出现后,您就可以继续操作。
  1. 点击右上角的应用按钮。然后,点击右上角的 X 关闭属性框。

可以看到,您回到了 Pipeline Studio,画布上放置了一个节点,代表您刚刚在 Wrangler 中定义的转换。不过,如前所述,由于您在笔记本电脑上对数据的代表性样本应用了这些转换,而不是对实际生产位置的数据应用这些转换,因此没有将任何来源连接到此流水线。

在下一步中,我们来指定数据的实际位置。

  1. 在插件面板的来源部分,选择 PubSub。PubSub 来源节点将显示在画布上,点击属性按钮即可打开。

  2. 指定 PubSub 来源的各种属性,如下所示:

a. 在参考名称下,输入 Twitter_Input_Stream

b. 在订阅下,输入 cdf_lab_subscription(这是您之前创建的 PubSub 订阅的名称)

注意:PubSub 来源不接受完全限定的订阅名称,而只接受 .../subscriptions/ 之后的部分。

c. 点击验证,确保没有错误。

Pub/Sub 属性页面

d. 点击右上角的 X 关闭属性框。

  1. 现在,将 PubSub 来源节点连接到您之前添加的 Wrangler 节点。

流水线;Pub/Sub 到 Wrangler

请注意,由于您之前在 Wrangler 中使用了数据样本,因此来源列在 Wrangler 中显示为“body”。不过,PubSub 来源会在名为“message”的字段中发出数据。在下一步中,您将修复字段名称不一致的问题。

  1. 打开 Wrangler 节点的属性,并在现有转换步骤的顶部添加以下指令:
keep :message set-charset :message 'utf-8' rename :message :body

Wrangler 属性页面

点击右上角的 X 关闭属性框。

  1. 现在,您已将来源和转换连接到流水线,接下来添加一个接收器来完成流水线。在左侧边栏的接收器部分,选择 BigQuery。画布上会显示一个 BigQuery 接收器节点。

  2. 将 Wrangler 节点连接到 BigQuery 节点,方法是将箭头从 Wrangler 节点拖动到 BigQuery 节点。接下来,您将配置 BigQuery 节点属性。

    流水线

  3. 将鼠标悬停在 BigQuery 节点上,然后点击属性

    a. 在参考名称下,输入 realtime_pipeline

    b. 在数据集下,输入 realtime

    c. 在下,输入 tweets

    d. 点击验证,确保没有错误。

  4. 点击右上角的 X 关闭属性框。

BigQuery 属性窗口

  1. 点击 Name your pipeline(为流水线命名),添加 Realtime_Pipeline 作为名称,然后点击保存

  2. 点击部署图标,然后启动流水线。

  3. 部署完成后,点击运行。等待流水线的状态变为“正在运行”。这需要几分钟时间。

任务 9. 将消息发送到 Cloud Pub/Sub

使用 Dataflow 模板将事件批量加载到订阅中,以发送事件。

现在,您将基于模板创建一个 Dataflow 作业,以处理推文文件中的多条消息,并将这些消息发布到之前创建的 Pub/Sub 主题。在 Dataflow 的作业创建页面中,使用连续处理数据(流处理)下的 Text Files on Cloud Storage to Pub/Sub 模板。

  1. 返回 Cloud 控制台,依次点击导航菜单 > 查看所有产品,然后点击“分析”部分下的 Dataflow

  2. 在顶部菜单栏中,点击基于模板创建作业

  3. 输入 streaming-pipeline 作为 Cloud Dataflow 作业的名称。

  4. 在 Cloud Dataflow 模板下,选择 Text Files on Cloud Storage to Pub/Sub 模板。

  5. Input Cloud Storage File(s)(输入 Cloud Storage 文件)下,输入 gs://<YOUR-BUCKET-NAME>/<FILE-NAME> 请务必将 <YOUR-BUCKET-NAME> 替换为您的存储桶名称,并将 <FILE-NAME> 替换为您之前下载到计算机中的文件名。

例如:gs://qwiklabs-gcp-01-dfdf34926367/pubnub_tweets_2019-06-09-05-50_part-r-00000

  1. Output Pub/Sub Topic(输出 Pub/Sub 主题)下,输入 projects/<PROJECT-ID>/topics/cdf_lab_topic

请务必将 PROJECT-ID 替换为您的实际项目 ID。

  1. 临时位置下,输入 <YOUR-BUCKET-NAME>/tmp/

请务必将 <YOUR-BUCKET-NAME> 替换为您的存储桶名称。

  1. 点击运行作业按钮。

  2. 执行 Dataflow 作业并等待几分钟。您可以查看 Pub/Sub 订阅中的消息,然后查看这些消息通过实时 CDF 流水线进行处理的过程。

    “基于模板创建作业”对话框

点击“检查我的进度”以验证是否完成了以下目标: 构建并执行运行时流水线

任务 10. 查看流水线指标

事件加载到 Pub/Sub 主题后,您应该会看到流水线开始处理这些事件 - 请注意每个节点上的指标是否更新。

  • 在 Data Fusion 控制台中,等待流水线指标发生变化

    流水线指标

恭喜!

在本实验中,您学习了如何在 Data Fusion 中设置实时流水线,该流水线可以从 Cloud Pub/Sub 读取传入的流式消息、处理数据并将其写入 BigQuery。

本手册的最后更新时间:2025 年 2 月 6 日

本实验的最后测试时间:2025 年 2 月 6 日

版权所有 2026 Google LLC 保留所有权利。Google 和 Google 徽标是 Google LLC 的商标。其他所有公司名称和产品名称可能是其各自相关公司的商标。

准备工作

  1. 实验会创建一个 Google Cloud 项目和一些资源,供您使用限定的一段时间
  2. 实验有时间限制,并且没有暂停功能。如果您中途结束实验,则必须重新开始。
  3. 在屏幕左上角,点击开始实验即可开始

使用无痕浏览模式

  1. 复制系统为实验提供的用户名密码
  2. 在无痕浏览模式下,点击打开控制台

登录控制台

  1. 使用您的实验凭证登录。使用其他凭证可能会导致错误或产生费用。
  2. 接受条款,并跳过恢复资源页面
  3. 除非您已完成此实验或想要重新开始,否则请勿点击结束实验,因为点击后系统会清除您的工作并移除该项目

此内容目前不可用

一旦可用,我们会通过电子邮件告知您

太好了!

一旦可用,我们会通过电子邮件告知您

一次一个实验

确认结束所有现有实验并开始此实验

使用无痕浏览模式运行实验

使用无痕模式或无痕浏览器窗口是运行此实验的最佳方式。这可以避免您的个人账号与学生账号之间发生冲突,这种冲突可能导致您的个人账号产生额外费用。