
Before you begin
- Labs create a Google Cloud project and resources for a fixed time
- Labs have a time limit and no pause feature. If you end the lab, you'll have to restart from the beginning.
- On the top left of your screen, click Start lab to begin
Setup the data environment
/ 20
Run your pipeline
/ 10
Generate lag-less streaming input
/ 10
In this lab, you take many of the concepts introduced in a batch context and apply them in a streaming context to create a pipeline similar to batch_minute_traffic_pipeline, but which operates in real time. The finished pipeline will first read JSON messages from Pub/Sub and parse those messages before branching. One branch writes some raw data to BigQuery and takes note of event and processing time. The other branch windows and aggregates the data and then writes the results to BigQuery.
You will build the following pipeline:
This Qwiklabs hands-on lab lets you do the lab activities yourself in a real cloud environment, not in a simulation or demo environment. It does so by giving you new, temporary credentials that you use to sign in and access Google Cloud for the duration of the lab.
To complete this lab, you need:
Click the Start Lab button. If you need to pay for the lab, a pop-up opens for you to select your payment method. On the left is a panel populated with the temporary credentials that you must use for this lab.
Copy the username, and then click Open Google Console. The lab spins up resources, and then opens another tab that shows the Choose an account page.
On the Choose an account page, click Use Another Account. The Sign in page opens.
Paste the username that you copied from the Connection Details panel. Then copy and paste the password.
After a few moments, the Cloud console opens in this tab.
For this lab, you will be running all commands in a terminal from your Instance notebook.
In the Google Cloud console, from the Navigation menu (), select Vertex AI.
Click Enable All Recommended APIs.
In the Navigation menu, click Workbench.
At the top of the Workbench page, ensure you are in the Instances view.
Click Create New.
Configure the Instance:
This will take a few minutes to create the instance. A green checkmark will appear next to its name when it's ready.
Next you will download a code repository for use in this lab.
On the left panel of your notebook environment, in the file browser, you will notice the training-data-analyst repo added.
Navigate into the cloned repo /training-data-analyst/quests/dataflow_python/
. You will see a folder for each lab, which is further divided into a lab
sub-folder with code to be completed by you, and a solution
sub-folder with a fully workable example to reference if you get stuck.
Before you can begin editing the actual pipeline code, you need to ensure that you have installed the necessary dependencies.
dataflow.worker
role to the Compute Engine default service account:In the Cloud Console, navigate to IAM & ADMIN > IAM, click on Edit principal icon for Compute Engine default service account
.
Add Dataflow Worker as another role and click Save.
Click Check my progress to verify the objective.
In the previous labs, you used beam.io.ReadFromText
to read from Google Cloud Storage. In this lab, instead of Google Cloud Storage, you use Pub/Sub. Pub/Sub is a fully managed real-time messaging service that allows publishers to send messages to a "topic," to which subscribers can subscribe via a "subscription."
The pipeline you create subscribes to a topic called my_topic
that you just created via create_streaming_sinks.sh
script. In a production situation, this topic will often be created by the publishing team. You can view it in the Pub/Sub portion of the console.
training-data-analyst/quest/dataflow_python/5_Streaming_Analytics/lab/
and open the streaming_minute_traffic_pipeline.py
file.beam.io.ReadFromPubSub()
class. This class has attributes for specifying the source topic as well as the timestamp_attribute
. By default, this attribute is set to the message publishing time.To complete this task:
input_topic
command-line parameter.parse_json
with beam.Map
to convert each JSON string into a CommonLog
instance.PCollection
of CommonLog
instances using with_output_types()
.#TODO
, add the following code:In the previous non-SQL lab, you implemented fixed-time windowing in order to group events by event time into mutually-exclusive windows of fixed size. Do the same thing here with the streaming inputs. Feel free to reference the previous lab's code or the solution if you get stuck.
To complete this task:
PCollection
of CommonLog
data and windows elements into windows of window_duration
seconds long, with window_duration
as another command-line parameter.In the previous lab, you used the CountCombineFn()
combiner to count the number of events per window. Do the same here.
To complete this task:
PCollection
as input to a transform that counts the number of events per window.DoFn
, GetTimestampFn
, with beam.ParDo
to include the window start timestamp.This pipeline writes to BigQuery in two separate branches. The first branch writes the aggregated data to BigQuery. The second branch, which has already been authored for you, writes out some metadata regarding each raw event, including the event timestamp and the actual processing timestamp. Both write directly to BigQuery via streaming inserts.
Writing to BigQuery has been covered extensively in previous labs, so the basic mechanics will not be covered in depth here.
To complete this task:
agg_table_name
for the table intended to house aggregated data.beam.io.WriteToBigQuery()
does not support write_disposition
of WRITE_TRUNCATE
in which the table is dropped and recreated. In this example, use WRITE_APPEND
.
beam.io.WriteToBigQuery
will default to either streaming inserts for unbounded PCollections or batch file load jobs for bounded PCollections. Streaming inserts can be particularly useful when you want data to show up in aggregations immediately, but does incur extra charges. In streaming use cases where you are OK with periodic batch uploads on the order of every couple minutes, you can specify this behavior via the method
keyword argument, and also set the frequency with the triggering_frequency
keyword argument. Learn more from the Write data to BigQuery section of the apache_beam.io.gcp.bigquery module documentation.
pipeline.py
file, run the pipeline again and it should run with no issues.
Ensure in the Dataflow UI that it executes successfully without errors. Note that there is no data yet being created and ingested by the pipeline, so it will be running but not processing anything. You will introduce data in the next step.
Click Check my progress to verify the objective.
Because this is a streaming pipeline, it subscribes to the streaming source and will await input; there is none currently. In this section, you generate data with no lag. Actual data will almost invariably contain lag. However, it is instructive to understand lag-less streaming inputs.
The code for this quest includes a script for publishing JSON events using Pub/Sub.
training-data-analyst/quests/dataflow_python
folder.Click Check my progress to verify the objective.
logs.minute_traffic
table with the following query:You should see that the number of pageviews hovered around 100 views a minute.
This query illustrates the gap between event time and processing time. However, it can be hard to see the big picture by looking at just the raw tabular data. We will use Looker Studio, a lightweight data visualization and BI engine.
This will open a new window.
Notice that default visualizations are created for the data.
To remove the default visualizations, right-click on each, and select Delete.
Click Add a chart on the top menu bar.
Select the Scatter chart type.
In the Data column of the panel on the right hand side, set the following values:
The chart will transform to be a scatterplot, where all points are on the diagonal. This is because in the streaming data currently being generated, events are processed immediately after they were generated — there was no lag. If you started the data generation script quickly, i.e. before the Dataflow job was fully up and running, you may see a hockey stick, as there were messages queuing in Pub/Sub that were all processed more or less at once.
But in the real world, lag is something that pipelines need to cope with.
The streaming event script is capable of generating events with simulated lag.
This represents scenarios where there is a time delay between when the events are generated and published to Pub/Sub, for example when a mobile client goes into offline mode if a user has no service, but events are collected on the device and all published at once when the device is back online.
First, close the Looker Studio window.
Then, to turn on lag, return to the terminal and stop the running script using CTRL+C
.
Then, run the following:
Chart Type: Scatter
When you have completed your lab, click End Lab. Google Cloud Skills Boost removes the resources you’ve used and cleans the account for you.
You will be given an opportunity to rate the lab experience. Select the applicable number of stars, type a comment, and then click Submit.
The number of stars indicates the following:
You can close the dialog box if you don't want to provide feedback.
For feedback, suggestions, or corrections, please use the Support tab.
Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.
This content is not currently available
We will notify you via email when it becomes available
Great!
We will contact you via email if it becomes available
One lab at a time
Confirm to end all existing labs and start this one