Zyskaj dostęp do ponad 700 modułów i kursów

Building Realtime Pipelines in Cloud Data Fusion

Moduł 1 godz. 30 godz. universal_currency_alt Punkty: 5 show_chart Wprowadzające
info Ten moduł może zawierać narzędzia AI, które ułatwią Ci naukę.
Zyskaj dostęp do ponad 700 modułów i kursów

GSP808

Google Cloud Self-Paced Labs logo

Overview

In addition to batch pipelines, Data Fusion also allows you to create realtime pipelines that can process events as they are generated. Currently, realtime pipelines execute using Apache Spark Streaming on Cloud Dataproc clusters. This lab will teach you how to build a streaming pipeline using Data Fusion.

You will create a pipeline that reads from a Cloud Pub/Sub topic and processes the events, runs some transformations, and writes the output to BigQuery.

Objectives

  1. Learn how to create a realtime pipeline
  2. Learn how to configure the Pub/Sub source plugin in Data Fusion
  3. Learn how to use Wrangler to define transformations for data located in unsupported connections

Setup and requirements

For each lab, you get a new Google Cloud project and set of resources for a fixed time at no cost.

  1. Sign in to Google Cloud Skills Boost using an incognito window.

  2. Note the lab's access time (for example, 02:00:00), and make sure you can finish within that time.
    There is no pause feature. You can restart if needed, but you have to start at the beginning.

  3. When ready, click Start lab.

    Note: Once you click Start lab, it will take about 15 - 20 minutes for the lab to provision necessary resources and create a Data Fusion instance. During that time, you can read through the steps below to get familiar with the goals of the lab.

    When you see lab credentials (Username and Password) in the left panel, the instance is created and you can continue logging into the console.
  4. Note your lab credentials (Username and Password). You will use them to sign in to the Google Cloud console.

  5. Click Open Google console.

  6. Click Use another account and copy/paste credentials for this lab into the prompts.
    If you use other credentials, you'll receive errors or incur charges.

  7. Accept the terms and skip the recovery resource page.

Note: Do not click End lab unless you have finished the lab or want to restart it. This clears your work and removes the project.

Log in to Google Cloud Console

  1. Using the browser tab or window you are using for this lab session, copy the Username from the Connection Details panel and click the Open Google Console button.
Note: If you are asked to choose an account, click Use another account.
  1. Paste in the Username, and then the Password as prompted.
  2. Click Next.
  3. Accept the terms and conditions.

Since this is a temporary account, which will last only as long as this lab:

  • Do not add recovery options
  • Do not sign up for free trials
  1. Once the console opens, view the list of services by clicking the Navigation menu (Navigation menu icon) at the top-left.

Navigation menu

Activate Cloud Shell

Cloud Shell is a virtual machine that contains development tools. It offers a persistent 5-GB home directory and runs on Google Cloud. Cloud Shell provides command-line access to your Google Cloud resources. gcloud is the command-line tool for Google Cloud. It comes pre-installed on Cloud Shell and supports tab completion.

  1. Click the Activate Cloud Shell button (Activate Cloud Shell icon) at the top right of the console.

  2. Click Continue.
    It takes a few moments to provision and connect to the environment. When you are connected, you are also authenticated, and the project is set to your PROJECT_ID.

Sample commands

  • List the active account name:
gcloud auth list

(Output)

Credentialed accounts: - <myaccount>@<mydomain>.com (active)

(Example output)

Credentialed accounts: - google1623327_student@qwiklabs.net
  • List the project ID:
gcloud config list project

(Output)

[core] project = <project_ID>

(Example output)

[core] project = qwiklabs-gcp-44776a13dea667a6 Note: Full documentation of gcloud is available in the gcloud CLI overview guide.

Task 1. Project permissions

Check project permissions

Before you begin working on Google Cloud, you must ensure that your project has the correct permissions within Identity and Access Management (IAM).

  1. In the Google Cloud console, on the Navigation menu (Navigation menu icon), click IAM & Admin > IAM.

  2. Confirm that the default compute Service Account {project-number}-compute@developer.gserviceaccount.com is present and has the editor role assigned. The account prefix is the project number, which you can find on Navigation menu > Cloud overview.

Default compute service account

If the account is not present in IAM or does not have the editor role, follow the steps below to assign the required role.

  1. In the Google Cloud console, on the Navigation menu, click Cloud overview.

  2. From the Project info card, copy the Project number.

  3. On the Navigation menu, click IAM & Admin > IAM.

  4. At the top of the IAM page, click Add.

  5. For New principals, type:

{project-number}-compute@developer.gserviceaccount.com

Replace {project-number} with your project number.

  1. For Select a role, select Basic (or Project) > Editor.

  2. Click Save.

Task 2. Ensure that the Dataflow API is successfully enabled

To ensure access to the necessary API, restart the connection to the Dataflow API.

  1. In the Cloud Console, enter "Dataflow API" in the top search bar. Click on the result for Dataflow API.

  2. Click Manage.

  3. Click Disable API.

If asked to confirm, click Disable.

  1. Click Enable.

Task 3. Load the data

  1. First, you need to download the sample tweets, into your computer. You will later upload this using Wrangler to create transformation steps.

You will also need to stage the same sample tweets file in your Cloud Storage bucket. Towards the end of this lab, you will stream the data from your bucket into a Pub/sub topic.

  1. In Cloud Shell, execute the following commands to create a new bucket:
export BUCKET=$GOOGLE_CLOUD_PROJECT gsutil mb gs://$BUCKET

The created bucket has the same name as your Project ID.

  1. Run the below command to copy tweets file into the bucket:
gsutil cp gs://cloud-training/OCBL164/pubnub_tweets_2019-06-09-05-50_part-r-00000 gs://$BUCKET
  1. Verify that file is copied into your Cloud Storage bucket.

Click Check my progress to verify the objective. Load the data

Task 4. Setting up Pub/Sub Topic

To use a Pub/Sub, you create a topic to hold data and a subscription to access data published to the topic.

  1. In the Cloud Console, from the Navigation menu click on View All products under Analytics section click Pub/Sub and then select Topics.

  2. Click Create topic.

Create topic button

  1. The topic must have a unique name. For this lab, name your topic cdf_lab_topic, then click CREATE.

Click Check my progress to verify the objective. Setting up Pub/Sub Topic

Task 5. Add a Pub/Sub subscription

Still on the topic page. Now you'll make a subscription to access the topic.

  1. Click on the Create subscription.

Create subscription link

  1. Type a name for the subscription, such as cdf_lab_subscription, set the Delivery Type to Pull, then click Create.

The Add subscription to topic page

Click Check my progress to verify the objective. Add a Pub/Sub subscription

Task 6. Add necessary permissions for your Cloud Data Fusion instance

  1. In the Google Cloud console, from the Navigation menu click on View All products under Analytics section click Data Fusion > Instances.
Note: Creation of the instance takes around 20 minutes. Please wait for it to be ready.

Next, you will grant permissions to the service account associated with the instance, using the following steps.

  1. From the Google Cloud console, navigate to the IAM & Admin > IAM.

  2. Confirm that the Compute Engine Default Service Account {project-number}-compute@developer.gserviceaccount.com is present, copy the Service Account to your clipboard.

  3. On the IAM Permissions page, click +Grant Access.

  4. In the New principals field paste the service account.

  5. Click into the Select a role field and start typing Cloud Data Fusion API Service Agent, then select it.

  6. Click ADD ANOTHER ROLE.

  7. Add the Dataproc Administrator role.

  8. Click Save.

Click Check my progress to verify the objective. Add Cloud Data Fusion API Service Agent role to service account

Grant service account user permission

  1. In the console, on the Navigation menu, click IAM & admin > IAM.

  2. Select the Include Google-provided role grants checkbox.

  3. Scroll down the list to find the Google-managed Cloud Data Fusion service account that looks like service-{project-number}@gcp-sa-datafusion.iam.gserviceaccount.com and then copy the service account name to your clipboard.

Google-managed Cloud Data Fusion service account listing

  1. Next, navigate to the IAM & admin > Service Accounts.

  2. Click on the default compute engine account that looks like {project-number}-compute@developer.gserviceaccount.com, and select the Principals with access tab on the top navigation.

  3. Click on the Grant Access button.

  4. In the New Principals field, paste the service account you copied earlier.

  5. In the Role dropdown menu, select Service Account User.

  6. Click Save.

Task 7. Navigate the Cloud Data Fusion UI

When using Cloud Data Fusion, you use both the Cloud Console and the separate Cloud Data Fusion UI. In the Cloud Console, you can create a Cloud Console project, and create and delete Cloud Data Fusion instances. In the Cloud Data Fusion UI, you can use the various pages, such as Pipeline Studio or Wrangler, to use Cloud Data Fusion features.

To navigate the Cloud Data Fusion UI, follow these steps:

  1. In the Cloud Console return to Data Fusion, then click the View Instance link next to your Data Fusion instance. Select your lab credentials to sign in. If prompted to take a tour of the service click on No, Thanks. You should now be in the Cloud Data Fusion UI.

View Instance link

  1. On the Cloud Data Fusion Control Center, use the Navigation menu to expose the left menu, then choose Pipeline > Studio.

  2. On the top left, use the dropdown menu to select Data Pipeline - Realtime.

Task 8. Build a realtime pipeline

When working with data it’s handy to be able to see what the raw data looks like so that you can use it as a starting point for your transformation. For this purpose you’ll be using Wrangler for preparing and cleaning data. This data-first approach will allow you to quickly visualize your transformations and the real-time feedback ensures that you’re on the right track.

  1. From the Transform section of the plugin palette, select Wrangler. The Wrangler node will appear on the canvas. Open it by clicking the Properties button.

  2. Click on the WRANGLE button under the Directives section.

  3. When it loads, on the left side menu click on Upload. Next, click on the upload icon to upload the sample tweets file you had earlier downloaded into your computer.

Wrangler Upload data from your computer page

  1. The data is loaded into the Wrangler screen in row/column form. It will take a couple of minutes.
Note: Treat this as a sample of the events you will eventually receive in Pub/Sub. This is representative of real-world scenarios, where typically you don’t have access to production data while you develop your pipeline.

However, your administrator might give you access to a small sample, or you may be working on mock data that adheres to the contract of an API. In this section, you will apply transformations on this sample iteratively, with feedback at each step. Then you will learn how to replay the transformations on real data.
  1. The first operation is to parse the JSON data into a tabular representation that is split into rows and columns. To do this, you will select the dropdown icon from the first column (body) heading , then select the Parse menu item, and then JSON from the submenu. On the popup, set the Depth as 1, then click Apply.

    The navigation path to the option JSON.

  2. Repeat the previous step to see a more meaningful data structure for further transformation. Click on the body column dropdown icon, then select Parse > JSON and set the Depth as 1, then click Apply.

    body_payload data

    Besides using the UI, you can also write Transformation Steps into the Wrangler directive command line box. This box appears at the lower section of your wrangler UI (look for the command console with the $ prompt in green). You will use the command console to paste a set of Transformation Steps in the next step.

  3. Add the Transformation Steps below by copying all and paste them into the Wrangler directive command line box:

columns-replace s/^body_payload_//g drop id_str parse-as-simple-date :created_at EEE MMM dd HH:mm:ss Z yyyy drop display_text_range drop truncated drop in_reply_to_status_id_str drop in_reply_to_user_id_str parse-as-json :user 1 drop coordinates set-type :place string drop geo,place,contributors,is_quote_status,favorited,retweeted,filter_level,user_id_str,user_url,user_description,user_translator_type,user_protected,user_verified,user_followers_count,user_friends_count,user_statuses_count,user_favourites_count,user_listed_count,user_is_translator,user_contributors_enabled,user_lang,user_geo_enabled,user_time_zone,user_utc_offset,user_created_at,user_profile_background_color,user_profile_background_image_url,user_profile_background_image_url_https,user_profile_background_tile,user_profile_link_color,user_profile_sidebar_border_color,user_profile_sidebar_fill_color,user_profile_text_color,user_profile_use_background_image drop user_following,user_default_profile_image,user_follow_request_sent,user_notifications,extended_tweet,quoted_status_id,quoted_status_id_str,quoted_status,quoted_status_permalink drop user_profile_image_url,user_profile_image_url_https,user_profile_banner_url,user_default_profile,extended_entities fill-null-or-empty :possibly_sensitive 'false' set-type :possibly_sensitive boolean drop :entities drop :user_location Note: If the message appears like 'No data. Try removing some transformation steps.' then remove any one of the transformation step by clicking on X and once the data appears you can proceed further.
  1. Click on the Apply button on the top right. Next, click on the X on the top right to close the properties box.

As you can see, you’re back inside the Pipeline Studio, where a single node has been placed on the canvas, representing the transformations you just defined in Wrangler. However, no source is connected to this pipeline, since as explained above, you applied these transformations on a representative sample of the data on your laptop, and not on data in its actual production location.

In the next step, let’s specify where the data will actually be located.

  1. From the Source section of the plugin palette, select PubSub. The PubSub source node will appear on the canvas. Open it by clicking the Properties button.

  2. Specify the various properties of the PubSub source as below:

a. Under Reference Name, enter Twitter_Input_Stream

b. Under Subscription enter cdf_lab_subscription (which is the name of your PubSub subscription you created earlier)

Note: The PubSub source does not accept the fully qualified subscription name, but only its last component after the .../subscriptions/ portion.

c. Click Validate to ensure that no errors will be found.

Pub/Sub Properties page

d. Click on the X on the top right to close the properties box.

  1. Now connect the PubSub source node to the Wrangler node that you previously added.

Pipeline; Pub/Sub to Wrangler

Note that since you previously used a sample of the data in Wrangler, the source column appeared as body in Wrangler. However, the PubSub source emits it in a field with the name message. In the next step, you will fix this discrepancy.

  1. Open the properties for your Wrangler node and add the following directive at the top of the existing Transformation Steps:
keep :message set-charset :message 'utf-8' rename :message :body

Wrangler Properties page

Click on the X on the top right to close the properties box.

  1. Now that you have connected a source and a transform to the pipeline, complete it by adding a sink. From the Sink section of the left side panel, choose BigQuery. A BigQuery sink node appears on the canvas.

  2. Connect the Wrangler node to the BigQuery node by dragging the arrow from the Wrangler node to the BigQuery node. Next, you will configure the BigQuery node properties.

    Pipeline

  3. Hover over the BigQuery node and click on Properties.

    a. Under Reference Name, enter realtime_pipeline

    b. Under Dataset, enter realtime

    c. Under Table, enter tweets

    d. Click Validate to ensure that no errors will be found.

  4. Click on the X on the top right to close the properties box.

BigQuery Properties window

  1. Click on Name your pipeline , add Realtime_Pipeline as the name, and click Save.

  2. Click on Deploy icon and then start the pipeline.

  3. Once deployed, click Run. Wait for the pipeline Status to change to Running. It will take a couple of minutes.

Task 9. Send messages into Cloud Pub/Sub

Send events by bulk loading events into the subscription using the Dataflow template.

You will now create a Dataflow job based on template to process multiple messages from the tweets file and publish them onto the earlier created pubsub topic. Use Text Files on Cloud Storage to Pub/Sub under Process Data Continuously (Stream) template from dataflow create job page.

  1. Back in the Cloud Console, go to Navigation menu click on View All products under Analytics section click Dataflow.

  2. In the top menu bar, click CREATE JOB FROM TEMPLATE.

  3. Enter streaming-pipeline as the Job name for your Cloud Dataflow job.

  4. Under Cloud Dataflow template, select Text Files on Cloud Storage to Pub/Sub template.

  5. Under Input Cloud Storage File(s), enter gs://<YOUR-BUCKET-NAME>/<FILE-NAME> Be sure to replace <YOUR-BUCKET-NAME> with the name of your bucket and <FILE-NAME> with the name of the file which you downloaded earlier into your computer.

For ex: gs://qwiklabs-gcp-01-dfdf34926367/pubnub_tweets_2019-06-09-05-50_part-r-00000

  1. Under Output Pub/Sub Topic, enter projects/<PROJECT-ID>/topics/cdf_lab_topic

Be sure to replace PROJECT-ID with your actual Project ID.

  1. Under Temporary Location, enter <YOUR-BUCKET-NAME>/tmp/

Be sure to replace <YOUR-BUCKET-NAME> with the name of your bucket.

  1. Click the Run job button.

  2. Execute dataflow job and wait for a couple of minutes. You can view messages on pubsub subscription and then view them being processed through real time CDF pipeline.

    Create job from template dialog

Click Check my progress to verify the objective. Build and execute runtime pipeline

Task 10. Viewing your pipeline metrics

As soon as events are loaded into the Pub/Sub topic , you should start seeing them being consumed by the pipeline - watch for the metrics on each node getting updated.

  • In the Data Fusion Console, wait for your pipeline metrics to change

    Pipeline metrics

Congratulations!

In this lab, you learned how to set up a realtime pipeline in Data fusion that reads streaming incoming messages from Cloud Pub/Sub, processes the data, and writes it out into BigQuery.

Manual Last Updated February 6, 2025

Lab Last Tested February 6, 2025

Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.

Zanim zaczniesz

  1. Moduły tworzą projekt Google Cloud i zasoby na określony czas.
  2. Moduły mają ograniczenie czasowe i nie mają funkcji wstrzymywania. Jeśli zakończysz moduł, musisz go zacząć od początku.
  3. Aby rozpocząć, w lewym górnym rogu ekranu kliknij Rozpocznij moduł.

Użyj przeglądania prywatnego

  1. Skopiuj podaną nazwę użytkownika i hasło do modułu.
  2. Kliknij Otwórz konsolę w trybie prywatnym.

Zaloguj się w konsoli

  1. Zaloguj się z użyciem danych logowania do modułu. Użycie innych danych logowania może spowodować błędy lub naliczanie opłat.
  2. Zaakceptuj warunki i pomiń stronę zasobów przywracania.
  3. Nie klikaj Zakończ moduł, chyba że właśnie został przez Ciebie zakończony lub chcesz go uruchomić ponownie, ponieważ spowoduje to usunięcie wyników i projektu.

Ta treść jest obecnie niedostępna

Kiedy dostępność się zmieni, wyślemy Ci e-maila z powiadomieniem

Świetnie

Kiedy dostępność się zmieni, skontaktujemy się z Tobą e-mailem

Jeden moduł, a potem drugi

Potwierdź, aby zakończyć wszystkie istniejące moduły i rozpocząć ten

Aby uruchomić moduł, użyj przeglądania prywatnego

Uruchom ten moduł w oknie incognito lub przeglądania prywatnego. Dzięki temu unikniesz konfliktu między swoim kontem osobistym a kontem do nauki, co mogłoby spowodować naliczanie dodatkowych opłat na koncie osobistym.