In this lab, you take many of the concepts introduced in a batch context and apply them in a streaming context to create a pipeline similar to BatchMinuteTrafficPipeline, but which operates in real time. The finished pipeline will first read JSON messages from Pub/Sub and parse those messages before branching. One branch writes some raw data to BigQuery and takes note of event and processing time. The other branch windows and aggregates the data and then writes the results to BigQuery.
Objectives
Read data from a streaming source.
Write data to a streaming sink.
Window data in a streaming context.
Experimentally verify the effects of lag.
You will build the following pipeline:
For each lab, you get a new Google Cloud project and set of resources for a fixed time at no cost.
Sign in to Qwiklabs using an incognito window.
Note the lab's access time (for example, 1:15:00), and make sure you can finish within that time.
There is no pause feature. You can restart if needed, but you have to start at the beginning.
When ready, click Start lab.
Note your lab credentials (Username and Password). You will use them to sign in to the Google Cloud Console.
Click Open Google Console.
Click Use another account and copy/paste credentials for this lab into the prompts.
If you use other credentials, you'll receive errors or incur charges.
Accept the terms and skip the recovery resource page.
Activate Google Cloud Shell
Google Cloud Shell is a virtual machine that is loaded with development tools. It offers a persistent 5GB home directory and runs on the Google Cloud.
Google Cloud Shell provides command-line access to your Google Cloud resources.
In Cloud console, on the top right toolbar, click the Open Cloud Shell button.
Click Continue.
It takes a few moments to provision and connect to the environment. When you are connected, you are already authenticated, and the project is set to your PROJECT_ID. For example:
gcloud is the command-line tool for Google Cloud. It comes pre-installed on Cloud Shell and supports tab-completion.
You can list the active account name with this command:
[core]
project = qwiklabs-gcp-44776a13dea667a6
Note:
Full documentation of gcloud is available in the
gcloud CLI overview guide
.
Check project permissions
Before you begin your work on Google Cloud, you need to ensure that your project has the correct permissions within Identity and Access Management (IAM).
In the Google Cloud console, on the Navigation menu (), select IAM & Admin > IAM.
Confirm that the default compute Service Account {project-number}-compute@developer.gserviceaccount.com is present and has the editor role assigned. The account prefix is the project number, which you can find on Navigation menu > Cloud Overview > Dashboard.
Note: If the account is not present in IAM or does not have the editor role, follow the steps below to assign the required role.
In the Google Cloud console, on the Navigation menu, click Cloud Overview > Dashboard.
Copy the project number (e.g. 729328892908).
On the Navigation menu, select IAM & Admin > IAM.
At the top of the roles table, below View by Principals, click Grant Access.
Replace {project-number} with your project number.
For Role, select Project (or Basic) > Editor.
Click Save.
Setting up your IDE
For the purposes of this lab, you will mainly be using a Theia Web IDE hosted on Google Compute Engine. It has the lab repo pre-cloned. There is java langauge server support, as well as a terminal for programmatic access to Google Cloud APIs via the gcloud command line tool, similar to Cloud Shell.
To access your Theia IDE, copy and paste the link shown in Google Cloud Skills Boost to a new tab.
Note: You may need to wait 3-5 minutes for the environment to be fully provisioned, even after the url appears. Until then you will see an error in the browser.
The lab repo has been cloned to your environment. Each lab is divided into a labs folder with code to be completed by you, and a solution folder with a fully workable example to reference if you get stuck.
Click on the File Explorer button to look:
You can also create multiple terminals in this environment, just as you would with cloud shell:
You can see with by running gcloud auth list on the terminal that you're logged in as a provided service account, which has the exact same permissions are your lab user account:
If at any point your environment stops working, you can try resetting the VM hosting your IDE from the GCE console like this:
Open the appropriate lab
Create a new terminal in your IDE environment, if you haven't already, and copy and paste the following command:
# Change directory into the lab
cd 5_Streaming_Analytics/labs
# Download dependencies
mvn clean dependency:resolve
export BASE_DIR=$(pwd)
Set up the data environment
# Create GCS buckets, BQ dataset, and Pubsub Topic
cd $BASE_DIR/../..
source create_streaming_sinks.sh
# Change to the directory containing the practice version of the code
cd $BASE_DIR
Click Check my progress to verify the objective.
Set up the data environment
Task 1. Reading from a streaming source
In previous labs, you used TextIO.read() to read from Google Cloud Storage. In this lab, instead of Google Cloud Storage, you use Pub/Sub. Pub/Sub is a fully managed real-time messaging service that allows publishers to send messages to a "topic," to which subscribers can subscribe via a "subscription."
The pipeline you create subscribes to a topic called my_topic that you just created via create_streaming_sinks.sh script. In a production situation, this topic will often be created by the publishing team. You can view it in the Pub/Sub portion of the console.
To read from Pub/Sub using Apache Beam’s IO connectors, open the file StreamingMinuteTrafficPipeline.java and add a transform to the pipeline that uses the PubsubIO.readStrings() function. This function returns an instance of PubsubIO.Read, which has a method for specifying the source topic as well as the timestamp attribute. Note that there is already a command-line option for the Pub/Sub topic name. Set the timestamp attribute to “timestamp”, which corresponds to an attribute that will be added to each Pub/Sub message. In the event that message publication time is sufficient, this step would be unnecessary.
Note: Publication time is the time when the Pub/Sub service first receives the message. In systems where there may be a delay between the actual event time and publish time (i.e. late data) and you would like to take this into account, the client code publishing the message needs to set a 'timestamp' metadata attribute on the message and provide the actual event timestamp, since Pub/Sub will not natively know how to extract the event timestamp embedded in the payload.
You can see the client code generating the messages you'll use in Github.
To complete this task, add a transform that reads from the Pub/Sub topic specified by the inputTopic command-line parameter. Then, use the provided DoFn, JsonToCommonLog, to convert each JSON string into a CommonLog instance. Collect the results from this transform into a PCollection of CommonLog instances.
Task 2. Window the data
In the previous non-SQL lab, you implemented fixed-time windowing in order to group events by event time into mutually-exclusive windows of fixed size. Do the same thing here with the streaming inputs. Feel free to reference the previous lab's code or the solution if you get stuck.
Window into one-minute windows
Implement a fixed-time window with a one-minute duration as follows:
To complete this task, add a transform to your pipeline that accepts the PCollection of CommonLog data and windows elements into windows of windowDuration seconds long, with windowDuration as another command-line parameter.
Task 3. Aggregate the data
In the previous lab, you used the Count transform to count the number of events per window. Do the same here.
Count events per window
To count the number of events within a non-global window, you can write code like the following:
As before, you would need to convert back from the <Long> value to a <Row> value and extract the timestamp as follows:
@ProcessElement
public void processElement(@Element T l, OutputReceiver<T> r, IntervalWindow window) {
Instant i = Instant.ofEpochMilli(window.end().getMillis());
Row row = Row.withSchema(appSchema)
.addValues(.......)
.build()
...
r.output(...);
}
Remember to indicate the schema for serialization purposes as such:
apply().setRowSchema(appSchema)
To complete this task, pass the windowed PCollection as input to a transform that counts the number of events per window.
Add an additional transform to convert the results back to a PCollection of Rows with the schema pageviewsSchema that has been provided for you.
Task 4. Write to BigQuery
This pipeline writes to BigQuery in two separate branches. The first branch writes the aggregated data to BigQuery. The second branch, which has already been authored for you, writes out some metadata regarding each raw event, including the event timestamp and the actual processing timestamp. Both write directly to BigQuery via streaming inserts.
Write aggregated data to BigQuery
Writing to BigQuery has been covered extensively in previous labs, so the basic mechanics will not be covered in depth here.
This task makes use of code that has been provided.
To complete this task:
Create a new command-line parameter called aggregateTableName for the table intended to house aggregated data.
Add a transform, as before, that writes to BigQuery and uses .useBeamSchema().
Note: When in a streaming context, BigQueryIO.write() does not support WriteDisposition of WRITE_TRUNCATE in which the table is dropped and recreated. In this example, use WRITE_APPEND.
BigQuery insertion method
BigQueryIO.Write will default to either streaming inserts for unbounded PCollections or batch file load jobs for bounded PCollections. Streaming inserts can be particularly useful when you want data to show up in aggregations immediately, but does incur extra charges.
In streaming use cases where you are OK with periodic batch uploads on the order of every couple minutes, you can specify this behavior via .withMethod(), and also set the frequency with .withTriggeringFrequency(org.joda.time.Duration).
Find the command-line parameter for the name of the table intended to house the raw data.
Examine the pipeline branch that has been authored for you already. It is capturing processing time in addition to the event time.
Task 5. Run your pipeline
To run your pipeline, construct a command resembling the example below:
Note: It may need to be modified to reflect the names of any command-line options that you have included.
export PROJECT_ID=$(gcloud config get-value project)
export REGION={{{ project_0.default_region | "REGION" }}}
export BUCKET=gs://${PROJECT_ID}
export PIPELINE_FOLDER=${BUCKET}
export MAIN_CLASS_NAME=com.mypackage.pipeline.StreamingMinuteTrafficPipeline
export RUNNER=DataflowRunner
export PUBSUB_TOPIC=projects/${PROJECT_ID}/topics/my_topic
export WINDOW_DURATION=60
export AGGREGATE_TABLE_NAME=${PROJECT_ID}:logs.windowed_traffic
export RAW_TABLE_NAME=${PROJECT_ID}:logs.raw
cd $BASE_DIR
mvn compile exec:java \
-Dexec.mainClass=${MAIN_CLASS_NAME} \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--project=${PROJECT_ID} \
--region=${REGION} \
--stagingLocation=${PIPELINE_FOLDER}/staging \
--tempLocation=${PIPELINE_FOLDER}/temp \
--runner=${RUNNER} \
--inputTopic=${PUBSUB_TOPIC} \
--windowDuration=${WINDOW_DURATION} \
--aggregateTableName=${AGGREGATE_TABLE_NAME} \
--rawTableName=${RAW_TABLE_NAME}"
Ensure in the Dataflow UI that it executes successfully without errors.
Note: There is no data yet being created and ingested by the pipeline, so it will be running but not processing anything. You will introduce data in the next step.
Click Check my progress to verify the objective.
Run your pipeline
Task 6. Generate lag-less streaming input
Because this is a streaming pipeline, it subscribes to the streaming source and will await input; there is none currently. In this section, you generate data with no lag. Actual data will almost invariably contain lag. However, it is instructive to understand lag-less streaming inputs.
The code for this quest includes a script for publishing JSON events using Pub/Sub.
To complete this task and start publishing messages, open a new terminal side-by-side with your current one and run the following script. It will keep publishing messages until you kill the script. Make sure you are in the training-data-analyst/quests/dataflow folder.
bash generate_streaming_events.sh
Click Check my progress to verify the objective.
Generate lag-less streaming input
Examine the results
Wait a couple minutes for the data to start to populate, then navigate to BigQuery and query the logs.minute_traffic table with something like the following query:
SELECT minute, pageviews
FROM `logs.windowed_traffic`
ORDER BY minute ASC
You should see that the number of pageviews hovered around 100 views a minute.
Alternatively, you can use the BigQuery command-line tool as a quick way to confirm results are being written:
bq head logs.raw
bq head logs.windowed_traffic
Now, enter the following query:
SELECT
UNIX_MILLIS(event_timestamp) - min_millis.min_event_millis AS event_millis,
UNIX_MILLIS(processing_timestamp) - min_millis.min_event_millis AS processing_millis,
user_id,
-- added as unique label so we see all the points
CAST(UNIX_MILLIS(event_timestamp) - min_millis.min_event_millis AS STRING) AS label
FROM
`logs.raw`
CROSS JOIN (
SELECT
MIN(UNIX_MILLIS(event_timestamp)) AS min_event_millis
FROM
`logs.raw`) min_millis
WHERE
event_timestamp IS NOT NULL
ORDER BY
event_millis ASC
This query illustrates the gap between event time and processing time. However, it can be hard to see the big picture by looking at just the raw tabular data. We will use Looker Studio, a lightweight data visualization and BI engine.
In the Account Setup screen, enter Country and Company name.
Accept the Looker Studio Terms of Service and click Continue.
Select No for receiving Tips and recommendations, Product announcements and Market research.
Click Continue.
Return to the BigQuery UI.
In the BigQuery UI, click on the Open in button and choose Looker Studio.
This will open a new window.
Select the existing bar chart. In the right-hand panel, change the chart type to scatter chart.
In the Setup column of the panel on the right-hand side, set the following values:
Dimension: label
Metric X: event_millis
Metric Y: processing_millis
The chart will transform to be a scatterplot, where all points are on the diagonal. This is because in the streaming data currently being generated, events are processed immediately after they were generated — there was no lag. If you started the data generation script quickly, i.e. before the Dataflow job was fully up and running, you may see a hockey stick, as there were messages queuing in Pub/Sub that were all processed more or less at once.
But in the real world, lag is something that pipelines need to cope with.
Task 7. Introduce lag to streaming input
The streaming event script is capable of generating events with simulated lag.
This represents scenarios where there is a time delay between when the events are generated and published to Pub/Sub, for example when a mobile client goes into offline mode if a user has no service, but events are collected on the device and all published at once when the device is back online.
Generate streaming input with lag
First, close the Looker Studio window.
Then, to turn on lag, return to the window containing IDE Terminal.
Next, stop the running script using CTRL+C in the Terminal.
Then, run the following:
bash generate_streaming_events.sh true
Examine the results
Return to the BigQuery UI, rerun the query, and then recreate the Looker Studio view as before.
The new data that arrive, which should appear on the right side of the chart, should no longer be perfect; instead, some will appear above the diagonal, indicating that they were processed after the events transpired.
Chart Type: Scatter
Dimension: label
Metric X: event_millis
Metric Y: processing_millis
End your lab
When you have completed your lab, click End Lab. Google Cloud Skills Boost removes the resources you’ve used and cleans the account for you.
You will be given an opportunity to rate the lab experience. Select the applicable number of stars, type a comment, and then click Submit.
The number of stars indicates the following:
1 star = Very dissatisfied
2 stars = Dissatisfied
3 stars = Neutral
4 stars = Satisfied
5 stars = Very satisfied
You can close the dialog box if you don't want to provide feedback.
For feedback, suggestions, or corrections, please use the Support tab.
Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.
Labs erstellen ein Google Cloud-Projekt und Ressourcen für einen bestimmten Zeitraum
Labs haben ein Zeitlimit und keine Pausenfunktion. Wenn Sie das Lab beenden, müssen Sie von vorne beginnen.
Klicken Sie links oben auf dem Bildschirm auf Lab starten, um zu beginnen
Privates Surfen verwenden
Kopieren Sie den bereitgestellten Nutzernamen und das Passwort für das Lab
Klicken Sie im privaten Modus auf Konsole öffnen
In der Konsole anmelden
Melden Sie sich mit Ihren Lab-Anmeldedaten an. Wenn Sie andere Anmeldedaten verwenden, kann dies zu Fehlern führen oder es fallen Kosten an.
Akzeptieren Sie die Nutzungsbedingungen und überspringen Sie die Seite zur Wiederherstellung der Ressourcen
Klicken Sie erst auf Lab beenden, wenn Sie das Lab abgeschlossen haben oder es neu starten möchten. Andernfalls werden Ihre bisherige Arbeit und das Projekt gelöscht.
Diese Inhalte sind derzeit nicht verfügbar
Bei Verfügbarkeit des Labs benachrichtigen wir Sie per E-Mail
Sehr gut!
Bei Verfügbarkeit kontaktieren wir Sie per E-Mail
Es ist immer nur ein Lab möglich
Bestätigen Sie, dass Sie alle vorhandenen Labs beenden und dieses Lab starten möchten
Privates Surfen für das Lab verwenden
Nutzen Sie den privaten oder Inkognitomodus, um dieses Lab durchzuführen. So wird verhindert, dass es zu Konflikten zwischen Ihrem persönlichen Konto und dem Teilnehmerkonto kommt und zusätzliche Gebühren für Ihr persönliches Konto erhoben werden.
In this lab you read data from a streaming source, perform the same aggregations you performed before, and write out results in a streaming fashion to BigQuery. You will also experiment with the difference in processing time and event time with lagging (late) data.