Acesse mais de 700 laboratórios e cursos

Creating a Streaming Data Pipeline With Apache Kafka

Laboratório 45 minutos universal_currency_alt 1 crédito show_chart Introdutório
info Este laboratório pode incorporar ferramentas de IA para ajudar no seu aprendizado.
Acesse mais de 700 laboratórios e cursos

This lab was developed with our partner, Confluent. Your personal information may be shared with Confluent, the lab sponsor, if you have opted-in to receive product updates, announcements, and offers in your Account Profile.

GSP730

Google Cloud self-paced labs logo

Overview

In this lab, you create a streaming data pipeline with Kafka providing you a hands-on look at the Kafka Streams API. You will run a Java application that uses the Kafka Streams library by showcasing a simple end-to-end data pipeline powered by Apache Kafka®.

Objectives

In this lab, you will:

  • Start a Kafka cluster on a Compute Engine single machine
  • Write example input data to a Kafka topic, using the console producer included in Kafka
  • Process the input data with a Java application called WordCount that uses the Kafka Streams library
  • Inspect the output data of the application, using the console consumer included in Kafka

Setup and requirements

Before you click the Start Lab button

Read these instructions. Labs are timed and you cannot pause them. The timer, which starts when you click Start Lab, shows how long Google Cloud resources are made available to you.

This hands-on lab lets you do the lab activities in a real cloud environment, not in a simulation or demo environment. It does so by giving you new, temporary credentials you use to sign in and access Google Cloud for the duration of the lab.

To complete this lab, you need:

  • Access to a standard internet browser (Chrome browser recommended).
Note: Use an Incognito (recommended) or private browser window to run this lab. This prevents conflicts between your personal account and the student account, which may cause extra charges incurred to your personal account.
  • Time to complete the lab—remember, once you start, you cannot pause a lab.
Note: Use only the student account for this lab. If you use a different Google Cloud account, you may incur charges to that account.

How to start your lab and sign in to the Google Cloud console

  1. Click the Start Lab button. If you need to pay for the lab, a dialog opens for you to select your payment method. On the left is the Lab Details pane with the following:

    • The Open Google Cloud console button
    • Time remaining
    • The temporary credentials that you must use for this lab
    • Other information, if needed, to step through this lab
  2. Click Open Google Cloud console (or right-click and select Open Link in Incognito Window if you are running the Chrome browser).

    The lab spins up resources, and then opens another tab that shows the Sign in page.

    Tip: Arrange the tabs in separate windows, side-by-side.

    Note: If you see the Choose an account dialog, click Use Another Account.
  3. If necessary, copy the Username below and paste it into the Sign in dialog.

    {{{user_0.username | "Username"}}}

    You can also find the Username in the Lab Details pane.

  4. Click Next.

  5. Copy the Password below and paste it into the Welcome dialog.

    {{{user_0.password | "Password"}}}

    You can also find the Password in the Lab Details pane.

  6. Click Next.

    Important: You must use the credentials the lab provides you. Do not use your Google Cloud account credentials. Note: Using your own Google Cloud account for this lab may incur extra charges.
  7. Click through the subsequent pages:

    • Accept the terms and conditions.
    • Do not add recovery options or two-factor authentication (because this is a temporary account).
    • Do not sign up for free trials.

After a few moments, the Google Cloud console opens in this tab.

Note: To access Google Cloud products and services, click the Navigation menu or type the service or product name in the Search field. Navigation menu icon and Search field

Task 1. Set up Kafka

  1. Execute the following command in Cloud Shell:
gcloud compute firewall-rules create allow-iap-ssh \ --direction=INGRESS --priority=1000 --network=default \ --action=ALLOW --rules=tcp:22 --source-ranges=35.235.240.0/20 \ --target-tags=iap-gce
  1. In the Cloud Console, open the Navigation menu and click Marketplace.

  2. Locate the Apache Kafka® deployment by searching for Apache Kafka.

  3. Click on Apache Kafka Server on Ubuntu Server 24.04. It should look like this:

The Apache Kafka Server on Ubuntu Server 24.04 populated within the Marketplace, which includes the Launch and View Past Deployment buttons.

Note: Make sure you select the one published by Cloud Infrastructure Services
  1. Click on Get Started, and Accept the Terms and Conditions checkbox and then click the Agree button.

  2. Click Deploy and click on Enable the Required APIs.

  3. Under New Apache Kafka Server on Ubuntu Server 20.04 deployment page.

  4. Select Deployment Service Account as New account and enter the Service account name as apache-kafka

  5. Select a zone .

  6. Under Machine Type, change the Series to E2, and select the e2-medium Machine type.

  7. Leave all the other values as default, and click Deploy.

The completed Deployment name and Zone fields, as well as the machine type description.

Once the deployment completes, the VM instance is available in the zone you selected.

  1. Run the following command in Cloud Shell:
gcloud compute instances add-tags kafka-1-vm --tags=iap-gce zone={{{project_0.default_zone | ZONE}}}

Click Check my progress to verify the objective. Deploy an Apache Kafka VM

While you're waiting for deployment, you can check out this quick start which shows how to run the WordCount demo application that is included in Kafka.

Here's the gist of the code, converted to use Java 8 lambda expressions so that it is easier to read (taken from the variant WordCountLambdaExample):

// Serializers/deserializers (serde) for String and Long types final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long(); // Construct a `KStream` from the input topic "streams-plaintext-input", where message values // represent lines of text (for the sake of this example, we ignore whatever may be stored // in the message keys). KStream<String, String> textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde)); KTable<String, Long> wordCounts = textLines // Split each text line, by whitespace, into words. The text lines are the message // values, i.e. we can ignore whatever data is in the message keys and thus invoke // `flatMapValues` instead of the more generic `flatMap`. .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) // We use `groupBy` to ensure the words are available as message keys .groupBy((key, value) -> value) // Count the occurrences of each word (message key). .count(); // Convert the `KTable<String, Long>` into a `KStream<String, Long>` and write to the output topic. wordCounts.toStream().to("streams-wordcount-output", Produced.with(stringSerde, longSerde));

Start the Kafka environment

  1. In the Console, open the Navigation Menu and select Compute Engine > VM Instances.

  2. Next to the VM name kafka-1-vm, click the SSH button to connect to the Kafka VM.

The Instances tabbed page, which lists the kafka-ubuntu-1-vm instance and its highlighted SSH dropdown menu in the Connect category.

For reference, the installation of Apache Kafka is in the following directory: /opt/kafka/.

In the SSH window, you will run the following commands to start all services in the correct order.

  1. First, start by changing your current path to the Kafka installation directory:
cd /opt/kafka/
  1. Run the following command to Generate a Cluster UUID:
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" sudo bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties Note: Leave this SSH connection to the Kafka VM instance open, so you can finish the configuration and run the application later.

Start the Kafka broker service

  1. Run the following command to first change your current path to the Kafka installation directory and start the Kafka broker service:
cd /opt/kafka/ sudo bin/kafka-server-start.sh config/server.properties

Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.

Note: The Kafka application is now configured to use the connector. Leave this SSH connection to the Kafka VM instance open, so you can finish the configuration and run the application later.

You will now need to open one final terminal session to complete the next steps.

  1. From the VM Instances page, click the SSH button next to the VM to open a new SSH connection.

Task 2. Prepare the topics and the input data

You will now send some input data to a Kafka topic, which will be subsequently processed by a Kafka Streams application.

  1. First change your current path to the Kafka installation directory:
cd /opt/kafka/

Now you'll need to create the input topic streams-plaintext-input.

sudo bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic streams-plaintext-input
  1. Next, create the output topic streams-wordcount-output:
sudo bin/kafka-topics.sh --create \ --bootstrap-server localhost:9092 \ --replication-factor 1 \ --partitions 1 \ --topic streams-wordcount-output

Click Check my progress to verify the objective. Create topics in Kafka

  1. Next, generate some input data and store it in a local file at /tmp/file-input.txt:
echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > /tmp/file-input.txt

The resulting file will have the following contents:

  • all streams lead to kafka

  • hello kafka streams

  • join kafka summit

  1. Lastly, you will send this input data to the input topic:
cat /tmp/file-input.txt | sudo bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input

The Kafka console producer reads the data from STDIN line-by-line, and publishes each line as a separate Kafka message to the topic streams-plaintext-input, where the message key is null and the message value is the respective line such as all streams lead to kafka, encoded as a string.

Note: In practice, these steps will typically look a bit different and noticeably happen in parallel.

For example, input data might not be sourced originally from a local file but sent directly from distributed devices, and the data would be flowing continuously into Kafka. Similarly, the stream processing application (as you'll see in the next section) might already be up and running before the first input data is being sent.

Task 3. Process the input data with Kafka streams

Now that you have generated some input data, you can run your first Kafka Streams based Java application.

You will run the WordCount demo application, which is included in Kafka. It implements the WordCount algorithm, which computes a word occurrence histogram from an input text.

However, unlike other WordCount examples you might have seen before that operate on finite, bounded data, the WordCount demo application behaves slightly differently because it is designed to operate on an infinite, unbounded stream of input data.

Similar to the bounded variant, it is a stateful algorithm that tracks and updates the counts of words. However, since it must assume potentially unbounded input data, it will periodically output its current state and results while continuing to process more data because it cannot know when it has processed "all" the input data.

This is a typical difference between the class of algorithms that operate on unbounded streams of data and, say, batch processing algorithms such as Hadoop MapReduce. It will be easier to understand this difference once you inspect the actual output data later on.

Kafka's WordCount demo application is bundled with Confluent Platform, which means you can run it without further ado, i.e. you do not need to compile any Java sources and so on.

  • Now, execute the following command to run the WordCount demo application. You can safely ignore any warn log messages:
sudo bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo Note: The warning errors are just generic alerts. You can safely ignore them for the purposes of this lab.

The WordCount demo application will read from the input topic streams-plaintext-input, perform the computations of the WordCount algorithm on the input data, and continuously write its current results to the output topic streams-wordcount-output (the names of its input and output topics are hardcoded). You can terminate the demo at any point by entering Ctrl+C from the keyboard.

Task 4. Inspect the output data

  1. On the VM Instances page, click the SSH button next to the VM name kafka-1-vm to start a new connection to the instance.

  2. You can now inspect the output of the WordCount demo application by reading from its output topic streams-wordcount-output:

cd /opt/kafka sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --property print.key=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

The following output data should be printed to the console:

all 1 streams 1 lead 1 to 1 kafka 1 hello 1 kafka 2 streams 2 join 1 kafka 3 summit 1

Here, the first column is the Kafka message key in java.lang.String format, and the second column is the message value in java.lang.Long format. You can terminate the console consumer at any point by entering Ctrl+C from the keyboard.

As discussed above, a streaming word count algorithm continuously computes the latest word counts from the input data, and, in this specific demo application, continuously writes the latest counts of words as its output.

You can check out the Confluent documentation to learn more about the duality between streams and tables. In fact, the output you have seen above is actually the changelog stream of a KTable, with the KTable being the result of the aggregation operation performed by the WordCount demo application.

Click Check my progress to verify the objective. Process the input data with Kafka Streams

Task 5. Stop the Kafka cluster

Once you are done with the previous steps, you can shut down the Kafka cluster in the following order:

  1. First, stop the Kafka broker by entering Ctrl+C in the terminal it is running in. Alternatively, you can kill the broker process.

Congratulations!

You have now run your first Kafka Streams applications against data stored in a single-node Kafka cluster. In this lab you started a Kafka cluster, used the console producer to write example input data to a Kafka topic, and processed the data with WordCount using the Kafka Streams library. You then inspected the output data using the console consumer and stopped the Kakfa cluster.

Next steps / Learn more

Google Cloud training and certification

...helps you make the most of Google Cloud technologies. Our classes include technical skills and best practices to help you get up to speed quickly and continue your learning journey. We offer fundamental to advanced level training, with on-demand, live, and virtual options to suit your busy schedule. Certifications help you validate and prove your skill and expertise in Google Cloud technologies.

Manual last updated November 05, 2024

Lab last tested November 05, 2024

Copyright 2025 Google LLC. All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.

Antes de começar

  1. Os laboratórios criam um projeto e recursos do Google Cloud por um período fixo
  2. Os laboratórios têm um limite de tempo e não têm o recurso de pausa. Se você encerrar o laboratório, vai precisar recomeçar do início.
  3. No canto superior esquerdo da tela, clique em Começar o laboratório

Usar a navegação anônima

  1. Copie o nome de usuário e a senha fornecidos para o laboratório
  2. Clique em Abrir console no modo anônimo

Fazer login no console

  1. Faça login usando suas credenciais do laboratório. Usar outras credenciais pode causar erros ou gerar cobranças.
  2. Aceite os termos e pule a página de recursos de recuperação
  3. Não clique em Terminar o laboratório a menos que você tenha concluído ou queira recomeçar, porque isso vai apagar seu trabalho e remover o projeto

Este conteúdo não está disponível no momento

Você vai receber uma notificação por e-mail quando ele estiver disponível

Ótimo!

Vamos entrar em contato por e-mail se ele ficar disponível

Um laboratório por vez

Confirme para encerrar todos os laboratórios atuais e iniciar este

Use a navegação anônima para executar o laboratório

Para executar este laboratório, use o modo de navegação anônima ou uma janela anônima do navegador. Isso evita conflitos entre sua conta pessoal e a conta de estudante, o que poderia causar cobranças extras na sua conta pessoal.