GSP903

Visão geral
O Google Cloud Pub/Sub é um serviço de mensagens para trocar dados de eventos entre aplicativos e serviços. Um produtor de dados publica mensagens em um tópico do Cloud Pub/Sub. Um consumidor cria uma assinatura nesse tópico. Os assinantes recebem mensagens de uma assinatura ou são configurados como webhooks para assinaturas de push. Os assinantes precisam confirmar cada mensagem em um período configurável.
O Dataflow é um serviço totalmente gerenciado para transformar e enriquecer dados em stream (em tempo real) e modos de lote com a mesma confiabilidade e expressividade. Ele fornece um ambiente simplificado de desenvolvimento de pipeline usando o SDK do Apache Beam, que tem um conjunto avançado de primitivos de análise de sessões e janelas, além de um ecossistema de conectores de origem e de coletor.
O Pub/Sub é um sistema de processamento e entrega de eventos durável e escalonável. O Dataflow complementa o modelo de entrega escalonável pelo menos uma vez do Pub/Sub com eliminação de duplicação de mensagem e processamento sequencial exatamente uma vez, se você usar janelas e armazenamento em buffer.
Atividades deste laboratório
- ler mensagens publicadas em um tópico do Pub/Sub;
- organizar em janelas (ou agrupar) as mensagens por carimbo de data/hora;
- gravar as mensagens no Cloud Storage.
Configuração
Antes de clicar no botão Começar o Laboratório
Leia estas instruções. Os laboratórios são cronometrados e não podem ser pausados. O timer é ativado quando você clica em Iniciar laboratório e mostra por quanto tempo os recursos do Google Cloud vão ficar disponíveis.
Este laboratório prático permite que você realize as atividades em um ambiente real de nuvem, e não em uma simulação ou demonstração. Você vai receber novas credenciais temporárias para fazer login e acessar o Google Cloud durante o laboratório.
Confira os requisitos para concluir o laboratório:
- Acesso a um navegador de Internet padrão (recomendamos o Chrome).
Observação: para executar este laboratório, use o modo de navegação anônima (recomendado) ou uma janela anônima do navegador. Isso evita conflitos entre sua conta pessoal e de estudante, o que poderia causar cobranças extras na sua conta pessoal.
- Tempo para concluir o laboratório: não se esqueça que, depois de começar, não será possível pausar o laboratório.
Observação: use apenas a conta de estudante neste laboratório. Se usar outra conta do Google Cloud, você poderá receber cobranças nela.
Como iniciar seu laboratório e fazer login no console do Google Cloud
-
Clique no botão Começar o laboratório. Se for preciso pagar por ele, uma caixa de diálogo vai aparecer para você selecionar a forma de pagamento.
No painel Detalhes do Laboratório, à esquerda, você vai encontrar o seguinte:
- O botão Abrir Console do Google Cloud
- O tempo restante
- As credenciais temporárias que você vai usar neste laboratório
- Outras informações, se forem necessárias
-
Se você estiver usando o navegador Chrome, clique em Abrir console do Google Cloud ou clique com o botão direito do mouse e selecione Abrir link em uma janela anônima.
O laboratório ativa os recursos e depois abre a página Fazer Login em outra guia.
Dica: coloque as guias em janelas separadas lado a lado.
Observação: se aparecer a caixa de diálogo Escolher uma conta, clique em Usar outra conta.
-
Se necessário, copie o Nome de usuário abaixo e cole na caixa de diálogo Fazer login.
{{{user_0.username | "Username"}}}
Você também encontra o nome de usuário no painel Detalhes do Laboratório.
-
Clique em Próxima.
-
Copie a Senha abaixo e cole na caixa de diálogo de Olá.
{{{user_0.password | "Password"}}}
Você também encontra a senha no painel Detalhes do Laboratório.
-
Clique em Próxima.
Importante: você precisa usar as credenciais fornecidas no laboratório, e não as da sua conta do Google Cloud.
Observação: se você usar sua própria conta do Google Cloud neste laboratório, é possível que receba cobranças adicionais.
-
Acesse as próximas páginas:
- Aceite os Termos e Condições.
- Não adicione opções de recuperação nem autenticação de dois fatores (porque essa é uma conta temporária).
- Não se inscreva em testes gratuitos.
Depois de alguns instantes, o console do Google Cloud será aberto nesta guia.
Observação: para acessar os produtos e serviços do Google Cloud, clique no Menu de navegação ou digite o nome do serviço ou produto no campo Pesquisar.
Ativar o Cloud Shell
O Cloud Shell é uma máquina virtual com várias ferramentas de desenvolvimento. Ele tem um diretório principal permanente de 5 GB e é executado no Google Cloud. O Cloud Shell oferece acesso de linha de comando aos recursos do Google Cloud.
-
Clique em Ativar o Cloud Shell
na parte de cima do console do Google Cloud.
-
Clique nas seguintes janelas:
- Continue na janela de informações do Cloud Shell.
- Autorize o Cloud Shell a usar suas credenciais para fazer chamadas de APIs do Google Cloud.
Depois de se conectar, você verá que sua conta já está autenticada e que o projeto está configurado com seu Project_ID, . A saída contém uma linha que declara o projeto PROJECT_ID para esta sessão:
Your Cloud Platform project in this session is set to {{{project_0.project_id | "PROJECT_ID"}}}
A gcloud é a ferramenta de linha de comando do Google Cloud. Ela vem pré-instalada no Cloud Shell e aceita preenchimento com tabulação.
- (Opcional) É possível listar o nome da conta ativa usando este comando:
gcloud auth list
- Clique em Autorizar.
Saída:
ACTIVE: *
ACCOUNT: {{{user_0.username | "ACCOUNT"}}}
To set the active account, run:
$ gcloud config set account `ACCOUNT`
- (Opcional) É possível listar o ID do projeto usando este comando:
gcloud config list project
Saída:
[core]
project = {{{project_0.project_id | "PROJECT_ID"}}}
Observação: consulte a documentação completa da gcloud no Google Cloud no guia de visão geral da gcloud CLI.
Configurar a região
- No Cloud Shell, execute o seguinte comando para definir a região do projeto deste laboratório:
gcloud config set compute/region {{{project_0.default_region | "REGION"}}}
Ativar a API Dataflow
Para ter acesso à API Dataflow, reinicie a conexão.
gcloud services disable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}} --force
gcloud services enable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}}
Clique em Verificar meu progresso para conferir o objetivo.
Desativar e reativar a API Dataflow.
Tarefa 1: Criar recursos do projeto
- No Cloud Shell, crie variáveis para o bucket, o projeto e a região.
PROJECT_ID=$(gcloud config get-value project)
BUCKET_NAME="${PROJECT_ID}-bucket"
TOPIC_ID=my-id
REGION={{{project_0.default_region | "filled in at lab start"}}}
- Defina a região do App Engine.
Observação: para regiões diferentes de us-central1 e europe-west1, defina a variável de região do App Engine para ser igual à região atribuída. Se você estiver atribuído a us-central1, defina a variável de região do App Engine como us-central. Se você estiver atribuído a europe-west1, defina a variável de região do App Engine como europe-west.
Consulte Locais do App Engine para mais informações.
AE_REGION={{{project_0.startup_script.app_region|region_to_be_set}}}
- Crie um bucket do Cloud Storage que pertença a este projeto:
gsutil mb gs://$BUCKET_NAME
Observação: os nomes dos buckets do Cloud Storage precisam ser globalmente exclusivos. O ID do projeto do Qwiklabs é sempre exclusivo, por isso é usado no nome do bucket neste laboratório.
- Crie um tópico do Pub/Sub neste projeto:
gcloud pubsub topics create $TOPIC_ID
- Crie um app do App Engine para seu projeto:
gcloud app create --region=$AE_REGION
- Crie um job do Cloud Scheduler neste projeto. O job publica uma mensagem em um tópico do Pub/Sub em intervalos de um minuto.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
--topic=$TOPIC_ID --message-body="Hello!"
- Se for solicitado que você ative a API Cloud Scheduler, pressione
y e Enter.
Clique em Verificar meu progresso para conferir o objetivo.
Criar recursos do projeto
- Inicie o job:
gcloud scheduler jobs run publisher-job
Observação: se você encontrar um erro de RESOURCE_EXHAUSTED, tente executar o comando novamente.
- Use os seguintes comandos para clonar o repositório do guia de início rápido e navegar até o diretório do exemplo de código:
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsub/streaming-analytics
docker run -it -e DEVSHELL_PROJECT_ID=$DEVSHELL_PROJECT_ID python:3.7 /bin/bash
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/pubsub/streaming-analytics
pip install -U -r requirements.txt # Install Apache Beam dependencies
Observação: se você estiver usando a opção Python, execute os comandos do Python individualmente.
Clique em Verificar meu progresso para conferir o objetivo.
Inicie o job do Cloud Scheduler
Tarefa 2: Analisar o código para transmitir mensagens do Pub/Sub para o Cloud Storage
Exemplo de código
Analise o exemplo de código a seguir, que usa o Dataflow para:
- Ler as mensagens do Pub/Sub.
- organizar em janelas (ou agrupar) mensagens em intervalos de tamanho fixo por carimbos de data/hora de publicação.
- Grave as mensagens em cada janela nos arquivos no Cloud Storage.
import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
public class PubSubToGcs {
/*
* Define your own configuration options. Adicione seus próprios argumentos para serem processados
* pelo analisador de linha de comando e especifique valores padrão para eles.
*/
public interface PubSubToGcsOptions extends StreamingOptions {
@Description("The Cloud Pub/Sub topic to read from.")
@Required
String getInputTopic();
void setInputTopic(String value);
@Description("Output file's window size in number of minutes.")
@Default.Integer(1)
Integer getWindowSize();
void setWindowSize(Integer value);
@Description("Path of the output file including its filename prefix.")
@Required
String getOutput();
void setOutput(String value);
}
public static void main(String[] args) throws IOException {
// The maximum number of shards when writing output.
int numShards = 1;
PubSubToGcsOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
pipeline
// 1) Read string messages from a Pub/Sub topic.
.apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
// 2) Group the messages into fixed-sized minute intervals.
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
// 3) Write one file to GCS for every window of messages.
.apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));
// Execute the pipeline and wait until it finishes running.
pipeline.run().waitUntilFinish();
}
}
import argparse
from datetime import datetime
import logging
import random
from apache_beam import (
DoFn,
GroupByKey,
io,
ParDo,
Pipeline,
PTransform,
WindowInto,
WithKeys,
)
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
class GroupMessagesByFixedWindows(PTransform):
"""A composite transform that groups Pub/Sub messages based on publish time
and outputs a list of tuples, each containing a message and its publish time.
"""
def __init__(self, window_size, num_shards=5):
# Set window size to 60 seconds.
self.window_size = int(window_size * 60)
self.num_shards = num_shards
def expand(self, pcoll):
return (
pcoll
# Bind window info to each element using element timestamp (or publish time).
| "Window into fixed intervals"
>> WindowInto(FixedWindows(self.window_size))
| "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
# Assign a random key to each windowed element based on the number of shards.
| "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
# Group windowed elements by key. Todos os elementos na mesma janela precisam caber na
# memória para isso. Caso contrário, use `beam.util.BatchElements`.
| "Group by key" >> GroupByKey()
)
class AddTimestamp(DoFn):
def process(self, element, publish_time=DoFn.TimestampParam):
"""Processes each windowed element by extracting the message body and its
publish time into a tuple.
"""
yield (
element.decode("utf-8"),
datetime.utcfromtimestamp(float(publish_time)).strftime(
"%Y-%m-%d %H:%M:%S.%f"
),
)
class WriteToGCS(DoFn):
def __init__(self, output_path):
self.output_path = output_path
def process(self, key_value, window=DoFn.WindowParam):
"""Write messages in a batch to Google Cloud Storage."""
ts_format = "%H:%M"
window_start = window.start.to_utc_datetime().strftime(ts_format)
window_end = window.end.to_utc_datetime().strftime(ts_format)
shard_id, batch = key_value
filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])
with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
for message_body, publish_time in batch:
f.write(f"{message_body},{publish_time}\n".encode())
def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
# Set `save_main_session` to True so DoFns can access globally imported modules.
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True
)
with Pipeline(options=pipeline_options) as pipeline:
(
pipeline
# Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
# binds the publish time returned by the Pub/Sub server for each message
# to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
# https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
| "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
| "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
| "Write to GCS" >> ParDo(WriteToGCS(output_path))
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_topic",
help="The Cloud Pub/Sub topic to read from."
'"projects//topics/".',
)
parser.add_argument(
"--window_size",
type=float,
default=1.0,
help="Output file's window size in minutes.",
)
parser.add_argument(
"--output_path",
help="Path of the output GCS file including the prefix.",
)
parser.add_argument(
"--num_shards",
type=int,
default=5,
help="Number of shards to use when writing windowed elements to GCS.",
)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.input_topic,
known_args.output_path,
known_args.window_size,
known_args.num_shards,
pipeline_args,
)
Observação: para saber mais sobre o exemplo de código, acesse as páginas do GitHub java-docs-samples e python-docs-samples.
Tarefa 3: Iniciar o pipeline
- Para iniciar o pipeline, execute o seguinte comando:
mvn compile exec:java \
-Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--project=$PROJECT_ID \
--region=$REGION \
--inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
--output=gs://$BUCKET_NAME/samples/output \
--runner=DataflowRunner \
--windowSize=2 \
--tempLocation=gs://$BUCKET_NAME/temp"
python PubSubToGCS.py \
--project=project_id \
--region=region \
--input_topic=projects/project_id/topics/my-id \
--output_path=gs://bucket_name/samples/output \
--runner=DataflowRunner \
--window_size=2 \
--num_shards=2 \
--temp_location=gs://bucket_name/temp
Observação: ao executar o comando Python, substitua project_id, bucket_name e region pelo ID do projeto, nome do bucket e região do laboratório atribuída.
O comando anterior é executado localmente e inicia um job do Dataflow executado na nuvem.
Observação: talvez seja necessário esperar cerca de 10 minutos para que o código seja totalmente executado e o job do pipeline apareça no console do Dataflow na próxima tarefa.
Observação: se você receber um aviso sobre o StaticLoggerBinder, ignore-o e continue o laboratório.
Clique em Verificar meu progresso para conferir o objetivo.
Iniciar o pipeline e o job do Dataflow
Tarefa 4: Observar o andamento do job e do pipeline
-
Acesse o console do Dataflow para observar o progresso do job.
-
Clique em Atualizar para ver o job e as atualizações de status mais recentes.

- Clique no nome do job para abrir os detalhes e analisar o seguinte:
- a estrutura do job;
- os registros da tarefa;
- as métricas do cenário.

Talvez seja necessário aguardar alguns minutos para ver os arquivos de saída no Cloud Storage.
- Você pode ver os arquivos de saída acessando o menu de navegação > Cloud Storage, clicando no nome do bucket e depois em Exemplos.

- Como alternativa, você pode sair do aplicativo no Cloud Shell usando CTRL+C (e, para a opção Python, digite
exit) e executar o comando abaixo para listar os arquivos que foram gravados no Cloud Storage:
gsutil ls gs://${BUCKET_NAME}/samples/
A saída será semelhante a esta:
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
Tarefa 5: Limpeza
- Se ainda não fez isso, saia do aplicativo no Cloud Shell usando CTRL+C.
Para a opção Python, digite exit para sair do ambiente Python.
- No Cloud Shell, exclua o job do Cloud Scheduler:
gcloud scheduler jobs delete publisher-job
Se a mensagem "Quer continuar?" for exibida, pressione Y e Enter.
- No console do Dataflow, interrompa o job selecionando o nome dele e clicando em Interromper.
Quando solicitado, clique em Interromper job > Cancelar para cancelar o pipeline sem esvaziá-lo.
- No Cloud Shell, exclua o tópico:
gcloud pubsub topics delete $TOPIC_ID
- No Cloud Shell, exclua os arquivos criados pelo pipeline:
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
- No Cloud Shell, exclua o bucket do Cloud Storage:
gsutil rb gs://${BUCKET_NAME}
Parabéns!
Você criou um pipeline do Dataflow que lê mensagens do seu tópico do Pub/Sub, as organiza em janelas por carimbo de data/hora e as grava no seu bucket do Cloud Storage.
Próximas etapas/Saiba mais
Treinamento e certificação do Google Cloud
Esses treinamentos ajudam você a aproveitar as tecnologias do Google Cloud ao máximo. Nossas aulas incluem habilidades técnicas e práticas recomendadas para ajudar você a alcançar rapidamente o nível esperado e continuar sua jornada de aprendizado. Oferecemos treinamentos que vão do nível básico ao avançado, com opções de aulas virtuais, sob demanda e por meio de transmissões ao vivo para que você possa encaixá-las na correria do seu dia a dia. As certificações validam sua experiência e comprovam suas habilidades com as tecnologias do Google Cloud.
Manual atualizado em 20 de agosto de 2025
Laboratório testado em 20 de agosto de 2025
Copyright 2025 Google LLC. Todos os direitos reservados. Google e o logotipo do Google são marcas registradas da Google LLC. Todos os outros nomes de produtos e empresas podem ser marcas registradas das respectivas empresas a que estão associados.