Write a pipeline that aggregates site traffic by user.
Write a pipeline that aggregates site traffic by minute.
Implement windowing on time series data.
Prerequisites
Basic familiarity with Java.
Setup and requirements
For each lab, you get a new Google Cloud project and set of resources for a fixed time at no cost.
Sign in to Qwiklabs using an incognito window.
Note the lab's access time (for example, 1:15:00), and make sure you can finish within that time.
There is no pause feature. You can restart if needed, but you have to start at the beginning.
When ready, click Start lab.
Note your lab credentials (Username and Password). You will use them to sign in to the Google Cloud Console.
Click Open Google Console.
Click Use another account and copy/paste credentials for this lab into the prompts.
If you use other credentials, you'll receive errors or incur charges.
Accept the terms and skip the recovery resource page.
Check project permissions
Before you begin your work on Google Cloud, you need to ensure that your project has the correct permissions within Identity and Access Management (IAM).
In the Google Cloud console, on the Navigation menu (), select IAM & Admin > IAM.
Confirm that the default compute Service Account {project-number}-compute@developer.gserviceaccount.com is present and has the editor role assigned. The account prefix is the project number, which you can find on Navigation menu > Cloud Overview > Dashboard.
Note: If the account is not present in IAM or does not have the editor role, follow the steps below to assign the required role.
In the Google Cloud console, on the Navigation menu, click Cloud Overview > Dashboard.
Copy the project number (e.g. 729328892908).
On the Navigation menu, select IAM & Admin > IAM.
At the top of the roles table, below View by Principals, click Grant Access.
Replace {project-number} with your project number.
For Role, select Project (or Basic) > Editor.
Click Save.
Activate Google Cloud Shell
Google Cloud Shell is a virtual machine that is loaded with development tools. It offers a persistent 5GB home directory and runs on the Google Cloud.
Google Cloud Shell provides command-line access to your Google Cloud resources.
In Cloud console, on the top right toolbar, click the Open Cloud Shell button.
Click Continue.
It takes a few moments to provision and connect to the environment. When you are connected, you are already authenticated, and the project is set to your PROJECT_ID. For example:
gcloud is the command-line tool for Google Cloud. It comes pre-installed on Cloud Shell and supports tab-completion.
You can list the active account name with this command:
[core]
project = qwiklabs-gcp-44776a13dea667a6
Note:
Full documentation of gcloud is available in the
gcloud CLI overview guide
.
Setting up your IDE
For the purposes of this lab, you will mainly be using a Theia Web IDE hosted on Google Compute Engine. It has the lab repo pre-cloned. There is java langauge server support, as well as a terminal for programmatic access to Google Cloud APIs via the gcloud command line tool, similar to Cloud Shell.
To access your Theia IDE, copy and paste the link shown in Google Cloud Skills Boost to a new tab.
Note: You may need to wait 3-5 minutes for the environment to be fully provisioned, even after the url appears. Until then you will see an error in the browser.
The lab repo has been cloned to your environment. Each lab is divided into a labs folder with code to be completed by you, and a solution folder with a fully workable example to reference if you get stuck.
Click on the File Explorer button to look:
You can also create multiple terminals in this environment, just as you would with cloud shell:
You can see with by running gcloud auth list on the terminal that you're logged in as a provided service account, which has the exact same permissions are your lab user account:
If at any point your environment stops working, you can try resetting the VM hosting your IDE from the GCE console like this:
Part 1: Aggregating site traffic by user
In this part of the lab, you write a pipeline that:
Reads the day’s traffic from a file in Cloud Storage.
Converts each event into a CommonLog object.
Sums the number of hits for each unique user by grouping each object by user ID and combining the values to get the total number of hits for that particular user.
Performs additional aggregations on each user.
Writes the resulting data to BigQuery.
Task 1. Generate synthetic data
As in the prior labs, the first step is to generate data for the pipeline to process. You will open the lab environment and generate the data as before:
Open the appropriate lab
Create a new terminal in your IDE environment, if you haven't already, and copy and paste the following command:
# Change directory into the lab
cd 3_Batch_Analytics/labs
# Download dependencies
mvn clean dependency:resolve
export BASE_DIR=$(pwd)
Set up the data environment
# Create GCS buckets and BQ dataset
cd $BASE_DIR/../..
source create_batch_sinks.sh
# Generate event dataflow
source generate_batch_events.sh
# Change to the directory containing the practice version of the code
cd $BASE_DIR
The script creates a file called events.json containing lines resembling the following:
It then automatically copies this file to your Google Cloud Storage bucket at gs://my-project-id/events.json.
Navigate to Google Cloud Storage and confirm that your storage bucket contains a file called events.json.
Click Check my progress to verify the objective.
Generate synthetic data
Task 2. Sum pageviews per user
Open up BatchUserTrafficPipeline.java in your IDE, which can be found in 3_Batch_Analytics/labs/src/main/java/com/mypackage/pipeline.
This pipeline already contains the necessary code to accept command-line options for the input path and the output table name, as well as code to read in events from Google Cloud Storage, parse those events, and write results to BigQuery. However, some important parts are missing.
The next step in the pipeline is to aggregate the events by each unique user_id and count pagviews for each. Any easy way to do this on Rows or objects with a Beam schema is to use the Group.byFieldNames() transform and then perform some aggregations on the resulting group. For example:
will return a PCollection of rows with two fields, "key" and "values". The "key" field is itself a Row with schema <userID:STRING, address:STRING> representing every unique combination of userID and address. The "values" field is of type ITERABLE[ROW[MyObject]] containing all of the objects in that unique group.
FieldName
FieldType
key
ROW{userId:STRING, streetAddress:STRING}
values
ITERABLE[ROW[Purchase]]
This is most useful when you can perform aggregate calculations on this grouping and name the resulting fields, like so:
The Sum and Count transforms are perfect for this use. Sum and Count are examples of Combine transforms that can act on groups of data.
Note: In this example you could aggregate on any of the fields for Count.combineFn(), or even on the wildcard field *, as this transform is simply counting how many elements are in the entire group.
The next step in the pipeline is to aggregate events by user_id, sum the pageviews, and also calculate some additional aggregations on num_bytes, for example total user bytes.
To complete this task, add another transform to the pipeline which groups the events by user_id and then performs the relevant aggregations. Keep in mind the input, the CombineFns to use, and how you name the output fields.
Task 3. Flatten the schema
At this point, your new transform is returning a PCollection with schema <Key,Value> as already mentioned. If you run your pipeline as is, it will be written to BigQuery as two nested RECORDS, even though there is essentially only one row of values in each.
You can avoid this by adding a Select transform like the following:
This will retain the relevant field names in the new flattened Schema, and remove "key" and "value".
To complete this task, add a Select Transform to flatten the schema of your new row.
Note: Remember to change the object hint in BigQueryIO.<CommonLog>write() to <Row> if you haven't already.
Task 4. Run your pipeline
Return to Cloud Shell and execute the following command to run your pipeline using the Dataflow service. You can run it with DirectRunner if you're having trouble, or refer to the solution.
Click Check my progress to verify the objective.
Aggregating site traffic by user and running your pipeline
Task 5. Verify results in BigQuery
To complete this task, wait a few minutes for the pipeline to complete, then navigate to BigQuery and query the user_traffic table.
If you are curious, comment out your Select transform step and re-run the pipeline to see the resulting BigQuery schema.
Part 2: Aggregating site traffic by minute
In this part of the lab, you create a new pipeline called BatchMinuteTraffic. BatchMinuteTraffic expands on the basic batch analysis principles used in BatchUserTraffic and, instead of aggregating by users across the entire batch, aggregates by when events occurred.
In the IDE, open up the file BatchMinuteTrafficPipeline.java inside 3_Batch_Analytics/labs/src/main/java/com/mypackage/pipeline.
Task 1. Add timestamps to each element
An unbounded source provides a timestamp for each element. Depending on your unbounded source, you may need to configure how the timestamp is extracted from the raw data stream.
However, bounded sources (such as a file from TextIO, as is used in this pipeline) do not provide timestamps.
You can parse the timestamp field from each record and use the WithTimestamps transform to attach the timestamps to each element in your PCollection:
To complete this task, add a transform to the pipeline that adds timestamps to each element of the pipeline.
Task 2. Window into one-minute windows
Windowing subdivides a PCollection according to the timestamps of its individual elements. Transforms that aggregate multiple elements, such as GroupByKey and Combine, work implicitly on a per-window basis — they process each PCollection as a succession of multiple, finite windows, though the entire collection itself may be of unbounded size.
You can define different kinds of windows to divide the elements of your PCollection. Beam provides several windowing functions, including:
Fixed-time windows
Sliding-time windows
Per-session windows
Single global window
Calendar-based windows (not supported by the Beam SDK for Python)
In this lab, you use fixed-time windows. A fixed-time window represents a consistent duration, non-overlapping time interval of consistent duration in the data stream. Consider windows with a five-minute duration: all of the elements in your unbounded PCollection with timestamp values from 0:00:00 up to (but not including) 0:05:00 belong to the first window, elements with timestamp values from 0:05:00 up to (but not including) 0:10:00 belong to the second window, and so on.
Implement a fixed-time window with a one-second duration as follows:
Next, the pipeline needs to compute the number of events that occurred within each window. In the BatchUserTraffic pipeline, a Sum transform was used to sum per key. However, unlike in that pipeline, in this case the elements have been windowed and the desired computation needs to respect window boundaries.
Despite this new constraint, the Combine transform is still appropriate. That’s because Combine transforms automatically respect window boundaries.
Refer to the documentation for Count for how to add a new transform that counts the number of elements per window.
As of Beam 2.22, the best option to count elements of rows while windowing is to use Combine.globally(Count.<T>combineFn()).withoutDefaults() (that is, without using full-on SQL, which we will cover more in the next lab). This transform will output type PCollection<Long> which, you'll notice, is no longer using Beam schemas.
To complete this task, add a transform that counts all the elements in each window. Remember to refer to the solution if you get stuck.
Task 4. Convert back to a row and add timestamp
In order to write to BigQuery, each element needs to be converted back to a Row object with "pageviews" as a field and additional field called "minute". The idea is to use the boundary of each window as one field and the combined number of pageviews as the other.
Thus far, the elements have always conformed to a Beam schema once converted from a JSON String to CommonLog object, and sometimes reverting back to Row object. The original schema was inferred from the CommonLog POJO via the @DefaultSchema(JavaFieldSchema.class) annotation and any subsequently added/deleted fields were specified in pipeline transforms. However, at this point in the pipline, as per the output of the Count transform, every element is of type Long. Therefore, a new Row object will need to be created from scratch.
Schemas can be created and registered manually as follows. This code would be added outside the main() method, similar to the CommonLog object definition:
// Define the schema for the records.
Schema appSchema =
Schema
.builder()
.addInt32Field("appId")
.addStringField("description")
.addDateTimeField("rowtime")
.build();
Subsequent Row objects of this schema could then be created in a PTransform, potentially based on inputs such as a Long, like:
Usually Beam will require an indication of the new schema on the PTransform if the transform is creating a new row as opposed to mutating a previous one:
One other issue, at this point, is that the Count transform is only providing elements of type Long that no longer bear any sort of timestamp information.
In fact, however, they do, though not in so obvious a way. Apache Beam runners know by default how to supply the value for a number of additional parameters, including event timestamps, windows, and pipeline options; see Apache's DoFn parameters documentation for a full list.
To complete this task, write a ParDo function that accepts elements of type Long and emits elements of type Row with schema type pageViewsSchema, which has been provided for you, and that has an additional input parameter of type IntervalWindow. Use this additional parameter to create an instance of Instant, and use this to derive a string representation for the minute field":"
@ProcessElement
public void processElement(@Element T l, OutputReceiver<T> r, IntervalWindow window) {
Instant i = Instant.ofEpochMilli(window.start().getMillis());
...
r.output(...);
}
Task 5. Run the pipeline
Once you’ve finished coding, run the pipeline using the command below. Keep in mind that, while testing your code, it will be much faster to change the RUNNER environment variable to DirectRunner, which will run the pipeline locally.
Click Check my progress to verify the objective.
Aggregating site traffic by minute and running the pipeline
Task 6. Verify the results
To complete this task, wait a few minutes for the pipeline to execute, then navigate to BigQuery and query the minute_traffic table.
End your lab
When you have completed your lab, click End Lab. Google Cloud Skills Boost removes the resources you’ve used and cleans the account for you.
You will be given an opportunity to rate the lab experience. Select the applicable number of stars, type a comment, and then click Submit.
The number of stars indicates the following:
1 star = Very dissatisfied
2 stars = Dissatisfied
3 stars = Neutral
4 stars = Satisfied
5 stars = Very satisfied
You can close the dialog box if you don't want to provide feedback.
For feedback, suggestions, or corrections, please use the Support tab.
Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.
Lab membuat project dan resource Google Cloud untuk jangka waktu tertentu
Lab memiliki batas waktu dan tidak memiliki fitur jeda. Jika lab diakhiri, Anda harus memulainya lagi dari awal.
Di kiri atas layar, klik Start lab untuk memulai
Gunakan penjelajahan rahasia
Salin Nama Pengguna dan Sandi yang diberikan untuk lab tersebut
Klik Open console dalam mode pribadi
Login ke Konsol
Login menggunakan kredensial lab Anda. Menggunakan kredensial lain mungkin menyebabkan error atau dikenai biaya.
Setujui persyaratan, dan lewati halaman resource pemulihan
Jangan klik End lab kecuali jika Anda sudah menyelesaikan lab atau ingin mengulanginya, karena tindakan ini akan menghapus pekerjaan Anda dan menghapus project
Konten ini tidak tersedia untuk saat ini
Kami akan memberi tahu Anda melalui email saat konten tersedia
Bagus!
Kami akan menghubungi Anda melalui email saat konten tersedia
Satu lab dalam satu waktu
Konfirmasi untuk mengakhiri semua lab yang ada dan memulai lab ini
Gunakan penjelajahan rahasia untuk menjalankan lab
Gunakan jendela Samaran atau browser pribadi untuk menjalankan lab ini. Langkah ini akan mencegah konflik antara akun pribadi Anda dan akun Siswa yang dapat menyebabkan tagihan ekstra pada akun pribadi Anda.
In this lab you write a pipeline that aggregates site traffic by user and write a pipeline that aggregates site traffic by minute.