Visão geral
Neste laboratório, você vai fazer o seguinte:
- Lidar com dados atrasados
- Corrigir dados mal formatados:
- escrevendo uma transformação composta para criar um código mais modular;
- escrevendo uma transformação que gere respostas de vários tipos;
- coletando dados com erro de formatação e gravando-os em um local onde poderão ser examinados.
Ao final laboratório anterior, vimos um desafio comum para os pipelines em tempo real, que é a lacuna entre a ocorrência e o processamento dos eventos, também conhecida como atraso. Este laboratório apresenta conceitos do Apache Beam que os designers de pipelines podem usar para definir formalmente como lidar com esse atraso.
Além do atraso, existem outros problemas que podem afetar os pipelines em um contexto de streaming, como erros de formatação em entradas de fora do sistema. Também vamos ver técnicas para processar essas entradas.
Quando este laboratório terminar, o pipeline final vai ficar parecido com o da imagem abaixo. Repare que ele tem uma ramificação.

Configuração e requisitos
Antes de clicar no botão "Começar o laboratório"
Importante: leia estas instruções.
Os laboratórios são cronometrados e não podem ser pausados. O timer é iniciado quando você clica em Começar o laboratório e mostra por quanto tempo os recursos do Google Cloud vão ficar disponíveis.
Este laboratório prático do Google Skills permite que você realize as atividades em um ambiente real de nuvem, não em uma simulação ou demonstração. Você vai receber novas credenciais temporárias para fazer login e acessar o Google Cloud durante o laboratório.
O que é necessário
Veja os requisitos para concluir o laboratório:
- Acesso a um navegador de Internet padrão (recomendamos o Chrome).
- Tempo disponível para concluir as atividades
Observação: não use seu projeto ou conta pessoal do Google Cloud neste laboratório.
Observação: se você estiver usando um Pixelbook, faça o laboratório em uma janela anônima.
Como começar o laboratório e fazer login no console
-
Clique no botão Começar o laboratório. Se for preciso pagar pelo laboratório, você verá um pop-up para selecionar a forma de pagamento.
Um painel aparece à esquerda contendo as credenciais temporárias que você precisa usar no laboratório.

-
Copie o nome de usuário e clique em Abrir console do Google.
O laboratório ativa os recursos e depois abre a página Escolha uma conta em outra guia.
Observação: abra as guias em janelas separadas, lado a lado.
-
Na página "Escolha uma conta", clique em Usar outra conta. A página de login abre.

-
Cole o nome de usuário que foi copiado do painel "Detalhes da conexão". Em seguida, copie e cole a senha.
Observação: é necessário usar as credenciais do painel "Detalhes da conexão". Não use suas credenciais do Google Skills. Caso tenha sua própria conta do Google Cloud, não a use para este laboratório (isso evita cobranças).
- Acesse as próximas páginas:
- Aceite os Termos e Condições.
- Não adicione opções de recuperação nem autenticação de dois fatores (porque essa é uma conta temporária).
- Não se inscreva em testes sem custo financeiro.
Depois de alguns instantes, o console do Cloud abre nesta guia.
Observação: para acessar a lista dos produtos e serviços do Google Cloud, clique no Menu de navegação no canto superior esquerdo.
Configuração do ambiente de desenvolvimento das instâncias do Workbench
Neste laboratório, você vai executar todos os comandos em um terminal usando seu notebook.
-
No console do Google Cloud, no menu de navegação (
), clique em Vertex AI.
-
Selecione Ativar todas as APIs recomendadas.
-
No menu de navegação, clique em Workbench.
Verifique se você está na visualização Instâncias do topo da página do Workbench.
-
Clique em
Criar.
-
Configure a instância:
-
Nome: lab-workbench
-
Região: configure a região como
-
Zona: configure a zona como
-
Opções avançadas (opcional): se necessário, clique em "Opções avançadas" para personalizar mais (ex.: tipo de máquina, tamanho do disco).

- Clique em Criar.
O processo vai levar alguns minutos, e uma marca de confirmação verde vai aparecer ao lado do nome da instância quando ela for criada.
- Clique em ABRIR O JUPYTERLAB ao lado do nome da instância para iniciar a interface do ambiente. Uma nova guia será aberta no navegador.

- Em seguida, clique em Terminal. Nele, é possível executar todos os comandos deste laboratório.
Faça o download do repositório de código
Agora você precisa dele para usar neste laboratório.
- Insira este comando no terminal que você abriu:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst
cd /home/jupyter/training-data-analyst/quests/dataflow_python/
-
No painel à esquerda do ambiente do notebook no navegador de arquivos, você vai notar que o repositório training-data-analyst foi adicionado.
-
Acesse o repositório clonado /training-data-analyst/quests/dataflow_python/. Nele, você vai encontrar uma pasta para cada laboratório com duas subpastas: lab, que contém o código que precisa ser concluído, e solution, que inclui um exemplo prático caso você precise de ajuda.

Observação: caso você queira editar um arquivo, é só clicar nele. O arquivo será aberto, e você poderá adicionar ou modificar o código.
Clique em Verificar meu progresso para conferir o objetivo.
Crie uma instância de notebook e clone o repositório do curso
Parte 1 do laboratório: como lidar com dados atrasados
Nos laboratórios anteriores, você escreveu um código que dividia elementos por horário de evento em janelas de largura fixa, usando um código semelhante a este:
parsed_msgs
| "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(window_duration))
| "CountPerMinute" >> beam.CombineGlobally(CountCombineFn()).without_defaults()
No entanto, como vimos no final do último laboratório não SQL, os fluxos de dados costumam sofrer atrasos. Isso pode ser um problema quando os janelamentos são criados com base na hora do evento, e não no tempo de processamento dele, porque não é possível afirmar que os eventos chegaram naquele momento específico.
Para gerar resultados, seu pipeline precisou tomar uma decisão e fez isso usando um conceito chamado marca-d'água. Uma marca d'água é a noção do sistema baseada em heurística de quando todos os dados, até um certo ponto no horário do evento, podem ter chegado ao pipeline. Quando a marca d'água passar do fim de uma janela, qualquer outro elemento que chegar com um carimbo de data/hora dessa janela é considerado um dado atrasado, sendo simplesmente descartado. Portanto, o comportamento padrão do janelamento é emitir um único resultado completo quando o sistema tiver certeza de que recebeu todos os dados.
O Apache Beam usa várias heurísticas para tomar uma decisão embasada sobre o que é a marca d'água. De toda forma, ainda são heurísticas. Em outras palavras, essas heurísticas são de uso geral e podem não ser relevantes para alguns casos. Em vez de empregar heurísticas de uso geral, os designers de pipelines precisam considerar as seguintes perguntas para determinar quais compensações são apropriadas:
- Totalidade: qual é a importância de ter todos os dados antes de calcular o resultado?
- Latência: quanto tempo você quer esperar pelos dados? Por exemplo, você vai esperar por todos eles ou prefere processar os dados assim que chegarem?
- Custo: quanto você se dispõe a gastar em dinheiro e capacidade de computação para reduzir a latência?
Com base nas respostas, é possível usar os formalismos do Apache Beam para escrever um código que faça a compensação certa.
Atraso permitido
O atraso permitido define por quanto tempo uma janela deve manter um estado. Quando a marca d'água atinge o fim do período de atraso permitido, o estado é descartado. Seria ótimo poder manter o estado de uma janela indefinidamente, mas não é prático fazer isso com uma fonte de dados ilimitada porque o espaço em disco um dia acaba.
Como resultado, qualquer sistema de processamento real e fora de ordem precisa oferecer uma forma de definir os limites dos ciclos de vida das janelas processadas. Uma forma simples e concisa de fazer isso é definir um horizonte no atraso permitido no sistema, ou seja, estabelecer um limite de atraso para o registro do sistema, em relação à marca d'água, para que ele seja processado. Todos os dados que chegarem depois desse horizonte são simplesmente descartados. Ao definir o limite de atraso de dados individuais, você também estabelece com precisão por quanto tempo o estado das janelas precisa ser mantido: até que a marca d'água exceda o horizonte de atraso para o fim da janela.
Tarefa 1: preparar o ambiente
Assim como nos laboratórios anteriores, a primeira coisa a se fazer é gerar os dados que o pipeline vai processar. Abra o ambiente do laboratório e gere os dados como antes:
Abrir o laboratório apropriado
- No terminal do seu ambiente de desenvolvimento integrado, execute estes comandos, que farão a mudança para o diretório que você vai usar neste laboratório:
# Mude o diretório para o laboratório
cd 7_Advanced_Streaming_Analytics/lab
export BASE_DIR=$(pwd)
Como configurar o ambiente virtual e as dependências
Antes de começar a editar o código do pipeline, você precisa confirmar se instalou as dependências necessárias.
- Execute o código a seguir e crie um ambiente virtual para usar no laboratório:
sudo apt-get update && sudo apt-get install -y python3-venv
## Crie e ative o ambiente virtual
python3 -m venv df-env
source df-env/bin/activate
- Em seguida, instale os pacotes necessários para executar seu pipeline:
python3 -m pip install -q --upgrade pip setuptools wheel
python3 -m pip install apache-beam[gcp]
- Confira se a API Dataflow está ativada:
gcloud services enable dataflow.googleapis.com
Configure o ambiente de dados
# Crie buckets do GCS e um conjunto de dados do BQ
cd $BASE_DIR/../../
source create_streaming_sinks.sh
# Mude para o diretório que contém a versão prática do código
cd $BASE_DIR
Clique em Verificar meu progresso para conferir o objetivo.
Prepare o ambiente
Tarefa 2: definir o atraso permitido
- No explorador de arquivos, acesse
training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab e abra o arquivo streaming_minute_traffic_pipeline.py.
No Apache Beam, o atraso permitido é definido usando o argumento de palavra-chave allowed_lateness com o gatilho AfterWatermark() na PTransform WindowInto. Por exemplo:
items = p | ...
Windowed_items = items | beam.WindowInto(beam.window.FixedWindows(60), # 1 minute
trigger=AfterWatermark(),
allowed_lateness=60*60*24) # 1 day
- Para concluir esta tarefa, examine a transformação de janelamento e o atraso permitido definido pelo argumento de linha de comando
allowed_lateness. Use o bom senso ao escolher um valor razoável e atualize a linha de comando para refletir as unidades certas.
Gatilhos
Os designers de pipeline também podem decidir quando emitir resultados preliminares. Na etapa anterior, usamos o gatilho AfterWatermark() com um atraso permitido especificado. Por exemplo, digamos que a marca d'água no final de uma janela ainda não tenha sido alcançada, mas 75% dos dados esperados já chegaram. Em muitos casos, essa amostra pode ser considerada representativa, o que vale a pena mostrar a usuários finais.
Um Trigger (gatilho) determina em que ponto do tempo de processamento os resultados são mostrados. Cada saída específica de uma janela é chamada de painel da janela. Os gatilhos disparam painéis quando as condições do gatilho são atendidas. No Apache Beam, essas condições incluem o progresso da marca d'água, o progresso do tempo de processamento (que vai avançar de maneira uniforme, independentemente da quantidade de dados realmente recebidos), o número de elementos (como quando chega uma determinada quantidade de dados novos) e os gatilhos dependentes de dados, como quando se chega ao fim de um arquivo.
Dependendo das condições definidas, um gatilho pode disparar um painel várias vezes. Portanto, também é necessário especificar como os resultados serão acumulados. Atualmente, o Apache Beam oferece suporte a dois modos de acumulação. O primeiro acumula resultados, e o outro retorna apenas as partes do resultado que são novas desde que o último painel foi disparado.
Tarefa 3: defina um gatilho
Quando você define uma função de janelamento para uma PCollection usando a transformação Window, também pode especificar um gatilho.
Para definir os gatilhos de uma PCollection, configure o argumento de palavra-chave trigger da sua PTransform WindowInto. O Apache Beam oferece uma série de gatilhos:
-
AfterWatermark: para disparar quando a marca-d'água passar um determinado carimbo de data/hora no final da janela ou na chegada do primeiro elemento a um painel.
-
AfterProcessingTime: para disparar depois de um certo tempo de processamento, normalmente a partir do primeiro elemento em um painel.
-
AfterCount: dispara quando o número de elementos na janela atinge uma determinada contagem.
Este exemplo de código define um gatilho baseado em tempo para uma PCollection que emite resultados um minuto após o primeiro elemento da janela ter sido processado. Na última linha do exemplo de código, definimos o modo de acumulação da janela definindo o argumento de palavra-chave accumulation_mode como AccumulationMode.DISCARDING:
items = p | ...
windowed_items = items | beam.WindowInto(FixedWindows(60), # 1 minute
trigger=AfterProcessingTime(60),
accumulation_mode=AccumulationMode.DISCARDING)
-
Para concluir essa tarefa, adicione o argumento de palavra-chave trigger à WindowInto transmitindo o gatilho AfterWatermark. Ao criar o gatilho, lembre-se desse caso de uso, em que os dados são colocados em janelas de um minuto e os dados podem chegar com atraso. Além disso, como argumento para AfterWatermark, adicione um gatilho de atraso para cada elemento atrasado (dentro do limite permitido).
Se você não souber o que fazer, confira a solução.
-
Preencha o seguinte #TODO, próximo à linha 113, para definir o gatilho e o modo de acumulação:
trigger=AfterProcessingTime(120),
accumulation_mode=AccumulationMode.DISCARDING)
- Preencha o seguinte
#TODO, próximo à linha 119, para definir o atraso permitido, o gatilho e o modo de acumulação:
trigger=AfterWatermark(late=AfterCount(1)),
allowed_lateness=int(allowed_lateness),
accumulation_mode=AccumulationMode.ACCUMULATING)
Laboratório 2: como lidar com dados malformados
Dependendo da configuração do Trigger (gatilho), se você executar o pipeline agora e comparar com o do laboratório anterior, vai perceber que o novo pipeline apresenta os resultados com mais rapidez. Também é possível que os resultados sejam mais precisos, caso a heurística não tenha feito uma boa previsão sobre o comportamento do streaming e o atraso permitido seja melhor.
Embora o pipeline atual esteja mais preparado para os atrasos, ele ainda precisa lidar com dados mal formatados. Se você executasse o pipeline e publicasse uma mensagem só com uma string JSON correta e que pudesse ser analisada por CommonLog, o pipeline retornaria um erro. Ferramentas como o Cloud Logging facilitam a leitura desses erros, mas um pipeline bem projetado armazenaria os erros em um local predefinido para análise posterior.
Nesta seção, você vai adicionar componentes que deixam o pipeline mais modular e preparado para lidar com problemas.
Tarefa 1: colete dados mal formatados
Para lidar melhor com dados malformados, o pipeline precisa ter uma forma de filtrar e encaminhar esses dados para outro tipo de processamento. Já vimos uma maneira de introduzir uma ramificação em um pipeline: fazer com que uma PCollection seja a entrada de várias transformações.
Essa forma de ramificação é muito eficiente. mas não funciona em alguns casos. Por exemplo, se você quiser criar dois subconjuntos com a mesma PCollection. Usando o método de várias transformações, você criaria uma transformação de filtro para cada subconjunto e as aplicaria à PCollection original. O problema é que os elementos seriam processados duas vezes.
Outra forma de criar um pipeline com ramificações é configurar uma única transformação para gerar várias saídas processando a PCollection de entrada uma só vez. Nesta tarefa, você vai escrever uma transformação que produz várias saídas. A primeira é o resultado dos dados bem formados, e a segunda é dos elementos malformados do stream de entrada original.
Para emitir vários resultados e criar apenas uma PCollection, o Apache Beam usa uma classe chamada TaggedOutput para criar chaves para as saídas do DoFn com várias saídas (possivelmente heterogêneas).
Veja um exemplo da TaggedOutput sendo usada para marcar diferentes saídas de um DoFn. Depois, essas PCollections são recuperadas com o método with_outputs() e são referenciadas com o nome da tag especificado na TaggedOutput.
class ConvertToCommonLogFn(beam.DoFn):
def process(self, element):
try:
row = json.loads(element.decode('utf-8'))
yield beam.pvalue.TaggedOutput('parsed_row', CommonLog(**row))
except:
yield beam.pvalue.TaggedOutput('unparsed_row', element.decode('utf-8'))
…
rows = (p | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(input_topic)
| 'ParseJson' >> beam.ParDo(ConvertToCommonLogFn()).with_outputs('parsed_row', 'unparsed_row')
.with_output_types(CommonLog))
(rows.unparsed_row | …
(rows.parsed_row | …
Para concluir a tarefa, declare dois retornos de TaggedOutput na classe ConvertToCommonLogFn, conforme mostrado acima. Na instrução try, retorne a linha analisada como uma instância da classe CommonLog e, na instrução catch, retorne a linha não analisada (decodificada) de dados.
- Preencha o primeiro
#TODO na classe ConvertToCommonLogFn:
beam.pvalue.TaggedOutput('parsed_row', CommonLog(**row))
- Preencha o segundo
#TODO na classe ConvertToCommonLogFn:
beam.pvalue.TaggedOutput('unparsed_row', element.decode('utf-8'))
Tarefa 2: grave os dados malformados para análise futura
Para corrigir o problema de upstream que está produzindo dados malformados, é importante saber analisar esses dados. Para isso, é necessário armazenar os dados em algum lugar. Nesta tarefa, você vai gravar dados com erros de formatação no Google Cloud Storage. Esse padrão é chamado usando o armazenamento de mensagens inativas.
Nos laboratórios anteriores, você gravou diretamente de uma fonte limitada (lote) no Cloud Storage usando beam.io.WriteToText(). No entanto, ao gravar de uma fonte ilimitada (streaming), essa abordagem precisa ser ligeiramente modificada.
Primeiro, no upstream da transformação de gravação, use um Trigger (gatilho) para especificar quando, no tempo de processamento, a gravação deve ocorrer. Se você mantiver os padrões, a gravação nunca será feita. Por padrão, todos os eventos pertencem à janela global. No processamento em lote, isso não é um problema porque o conjunto completo dos dados é descoberto no momento da execução. No entanto, com fontes ilimitadas, o tamanho total do conjunto de dados é desconhecido. Sendo assim, os painéis da janela global não são disparados porque jamais são concluídos.
Como você está usando um gatilho (Trigger), também precisa de uma janela (Window). No entanto, não é necessário alterar a janela. Nos laboratórios e tarefas anteriores, você usou transformações de janelamento para substituir a janela global por uma janela de duração fixa na hora do evento. Nesse caso, é mais importante receber os resultados de maneira útil e a uma taxa eficiente do que saber quais elementos foram agrupados.
No exemplo abaixo, a janela dispara o painel da janela global a cada 10 segundos durante o processamento, mas só grava eventos novos:
pcollection | “FireEvery10s” >> WindowInto(FixedWindows(10)
trigger=AfterProcessingTime(10))
accumulation_mode=AccumulationMode.DISCARDING
Depois de definir um gatilho, será necessário alterar o coletor de beam.io.WriteToText(), que não é compatível com streaming, para beam.io.fileio.WriteToFiles() a fim de realizar as gravações. Ao gravar no downstream de uma transformação de janelamento, especificamos uma série de fragmentos para que a gravação possa ser feita em paralelo:
windowed_items = p | 'WriteWindowedPCollection' >> fileio.WriteToFiles("gs://path/to/somewhere",
shards=int(num_shards),
max_writers_per_bundle=0)
-
Para concluir a tarefa, crie uma nova transformação usando rows.unparsed_row como a entrada para recuperar os dados malformados. Use um gatilho de tempo de processamento de 120 segundos para sua janela fixa com duração de 120 segundos com o modo de acumulação definido como AccumulationMode.DISCARDING.
-
Preencha o #TODO para usar beam.fileio.WriteToFiles a fim de gravar no GCS:
fileio.WriteToFiles(output_path,shards=1,max_writers_per_bundle=0)
Tarefa 3: execute o pipeline
Para executar o pipeline, crie um comando semelhante ao exemplo abaixo. Faça as alterações necessárias para refletir os nomes das opções de linha de comando que você adicionou:
export PROJECT_ID=$(gcloud config get-value project)
export REGION={{{project_0.startup_script.lab_region|Region}}}
export BUCKET=gs://${PROJECT_ID}
export PIPELINE_FOLDER=${BUCKET}
export RUNNER=DataflowRunner
export PUBSUB_TOPIC=projects/${PROJECT_ID}/topics/my_topic
export WINDOW_DURATION=60
export ALLOWED_LATENESS=1
export OUTPUT_TABLE_NAME=${PROJECT_ID}:logs.minute_traffic
export DEADLETTER_BUCKET=${BUCKET}
cd $BASE_DIR
python3 streaming_minute_traffic_pipeline.py \
--project=${PROJECT_ID} \
--region=${REGION} \
--staging_location=${PIPELINE_FOLDER}/staging \
--temp_location=${PIPELINE_FOLDER}/temp \
--runner=${RUNNER} \
--input_topic=${PUBSUB_TOPIC} \
--window_duration=${WINDOW_DURATION} \
--allowed_lateness=${ALLOWED_LATENESS} \
--table_name=${OUTPUT_TABLE_NAME} \
--dead_letter_bucket=${DEADLETTER_BUCKET} \
--allow_unsafe_triggers
O código desta Quest inclui um script para publicar eventos JSON usando o Pub/Sub. Para concluir a tarefa e começar a publicar mensagens, abra um novo terminal ao lado do atual e execute o script a seguir. Ele vai continuar publicando mensagens até você encerrar o script. Você precisa estar na pasta training-data-analyst/quests/dataflow_python.
Observação: a flag true adiciona eventos atrasados ao stream.
cd /home/jupyter/training-data-analyst/quests/dataflow_python/
bash generate_streaming_events.sh true
Clique em Verificar meu progresso para conferir o objetivo.
Execute o pipeline
Tarefa 4: testar o pipeline
-
Na barra de título do console do Google Cloud, digite Pub/Sub no campo Pesquisar e clique em Pub/Sub na seção Produtos e páginas.
-
Clique em Tópicos e depois em my_topic.
-
Clique em Mensagens.
-
Clique em Selecione uma assinatura do Cloud Pub/Sub para receber mensagens dela e escolha "Minha assinatura de tópico" no menu suspenso.
Observação: talvez seja necessário clicar em "Atualizar" para ver a assinatura.
-
Clique no botão Publicar mensagem.
-
Na página a seguir, digite uma mensagem a ser publicada e clique em Publicar.
A mensagem deve chegar logo ao bucket do Cloud Storage, desde que não esteja em conformidade perfeita com a especificação JSON de CommonLog. Para acompanhar o trajeto da mensagem, volte à janela de monitoramento do pipeline e clique em um nó da ramificação que processa mensagens não analisadas. Ao ver um elemento adicionado a essa ramificação, acesse o Cloud Storage e verifique se a mensagem foi gravada em disco:
export PROJECT_ID=$(gcloud config get-value project)
export REGION={{{project_0.startup_script.lab_region|Region}}}
export BUCKET=gs://${PROJECT_ID}/deadletter
gcloud storage ls $BUCKET
gcloud storage cat $BUCKET/*
Clique em Verificar meu progresso para conferir o objetivo.
Teste o pipeline
Finalize o laboratório
Clique em Terminar o laboratório após a conclusão. O Google Cloud Ensina remove os recursos usados e limpa a conta por você.
Você vai poder avaliar sua experiência no laboratório. Basta selecionar o número de estrelas, digitar um comentário e clicar em Enviar.
O número de estrelas indica o seguinte:
- 1 estrela = muito insatisfeito
- 2 estrelas = insatisfeito
- 3 estrelas = neutro
- 4 estrelas = satisfeito
- 5 estrelas = muito satisfeito
Feche a caixa de diálogo se não quiser enviar feedback.
Para enviar seu feedback, fazer sugestões ou correções, use a guia Suporte.
Copyright 2026 Google LLC. Todos os direitos reservados. Google e o logotipo do Google são marcas registradas da Google LLC. Todos os outros nomes de empresas e produtos podem ser marcas registradas das empresas a que estão associados.