Visão geral
Neste laboratório, você vai usar muitos dos conceitos apresentados em um contexto de lote. Eles vão ser aplicados no contexto de streaming para criar um pipeline semelhante ao batch_minute_traffic_pipeline, mas que opera em tempo real. O pipeline final primeiro vai ler as mensagens JSON do Pub/Sub e analisá-las antes da ramificação. Uma ramificação grava alguns dados brutos no BigQuery e registra o tempo de processamento e de eventos. As outras ramificações agrupam em janelas e agregam os dados, além de gravar os resultados no BigQuery.
Objetivos
- Ler os dados de uma fonte de streaming
- Gravar os dados em um coletor de streaming.
- Dividir os dados em janelas em um contexto de streaming
- Verificar os efeitos de atrasos de modo experimental
Você vai criar o seguinte pipeline:

Configuração e requisitos
Antes de clicar no botão "Começar o laboratório"
Importante: leia estas instruções.
Os laboratórios são cronometrados e não podem ser pausados. O timer é iniciado quando você clica em Começar o laboratório e mostra por quanto tempo os recursos do Google Cloud vão ficar disponíveis.
Este laboratório prático do Qwiklabs permite que você realize as atividades em um ambiente real de nuvem, não em uma simulação ou demonstração. Você receberá novas credenciais temporárias para fazer login e acessar o Google Cloud durante o laboratório.
O que é necessário
Veja os requisitos para concluir o laboratório:
- Acesso a um navegador de Internet padrão (recomendamos o Chrome)
- Tempo disponível para concluir as atividades
Observação: não use seu projeto ou conta pessoal do Google Cloud neste laboratório.
Observação: se você estiver usando um Pixelbook, faça o laboratório em uma janela anônima.
Como começar o laboratório e fazer login no console
-
Clique no botão Começar o laboratório. Se for preciso pagar pelo laboratório, você verá um pop-up para selecionar a forma de pagamento.
Um painel aparece à esquerda contendo as credenciais temporárias que você precisa usar no laboratório.

-
Copie o nome de usuário e clique em Abrir console do Google.
O laboratório ativa os recursos e depois abre a página Escolha uma conta em outra guia.
Observação: abra as guias em janelas separadas, lado a lado.
-
Na página "Escolha uma conta", clique em Usar outra conta. A página de login abre.

-
Cole o nome de usuário que foi copiado do painel "Detalhes da conexão". Em seguida, copie e cole a senha.
Observação: é necessário usar as credenciais do painel "Detalhes da conexão". Não use suas credenciais do Google Cloud Ensina. Não use sua conta pessoal do Google Cloud, caso tenha uma neste laboratório (isso evita cobranças).
- Acesse as próximas páginas:
- Aceite os Termos e Condições.
- Não adicione opções de recuperação nem autenticação de dois fatores (porque essa é uma conta temporária).
- Não se inscreva em testes gratuitos.
Depois de alguns instantes, o console do Cloud abre nesta guia.
Observação: para acessar a lista dos produtos e serviços do Google Cloud, clique no Menu de navegação no canto superior esquerdo.
Configuração do ambiente de desenvolvimento baseado em notebook do Jupyter
Neste laboratório, você vai executar todos os comandos em um terminal usando seu notebook.
-
No Menu de navegação do console do Google Cloud, clique em Vertex AI > Workbench.
-
Selecione Ativar a API Notebooks.
-
Na página "Workbench", selecione NOTEBOOKS GERENCIADOS PELO USUÁRIO e clique em CRIAR NOVO.
-
Na caixa de diálogo Nova instância, defina a região como e a zona como .
-
Em "Ambiente", selecione Apache Beam.
-
Clique em CRIAR na parte de baixo da caixa de diálogo.
Observação: pode levar de três a cinco minutos para que o ambiente seja totalmente provisionado. Aguarde até a conclusão dessa etapa.
Observação: clique em Ativar API Notebooks para fazer isso.
- Depois, clique no link ABRIR O JUPYTERLAB ao lado do nome do seu notebook para abrir o ambiente em uma nova guia do seu navegador.

- Em seguida, clique em Terminal. Nele, é possível executar todos os comandos deste laboratório.

Faça o download do repositório de código
Agora você precisa dele para usar neste laboratório.
- Insira este comando no terminal que você abriu:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst
cd /home/jupyter/training-data-analyst/quests/dataflow_python/
-
No painel à esquerda do ambiente do notebook no navegador de arquivos, você vai notar que o repositório training-data-analyst foi adicionado.
-
Acesse o repositório clonado /training-data-analyst/quests/dataflow_python/
. Nele, você vai encontrar uma pasta para cada laboratório com duas subpastas: lab
, que contém o código que precisa ser concluído, e solution
, que inclui um exemplo prático caso você precise de ajuda.

Observação: caso você queira editar um arquivo, é só clicar nele. O arquivo será aberto, e você poderá adicionar ou modificar o código.
Abra o laboratório apropriado
- No seu terminal, execute os comandos a seguir para mudar para o diretório que você vai usar neste laboratório:
# Mude o diretório para o laboratório
cd 5_Streaming_Analytics/lab
export BASE_DIR=$(pwd)
Como configurar o ambiente virtual e as dependências
Antes de começar a editar o código do pipeline real, você precisa verificar se instalou as dependências necessárias.
- Execute o código a seguir e crie um ambiente virtual para o trabalho neste laboratório.
sudo apt-get install -y python3-venv
## Crie e ative o ambiente virtual
python3 -m venv df-env
source df-env/bin/activate
- Em seguida, instale os pacotes necessários para executar seu pipeline:
python3 -m pip install -q --upgrade pip setuptools wheel
python3 -m pip install apache-beam[gcp]
- Verifique se a API Dataflow está ativada:
gcloud services enable dataflow.googleapis.com
- Por fim, conceda o papel
dataflow.worker
à conta de serviço padrão do Compute Engine:
PROJECT_ID=$(gcloud config get-value project)
export PROJECT_NUMBER=$(gcloud projects list --filter="$PROJECT_ID" --format="value(PROJECT_NUMBER)")
export serviceAccount=""$PROJECT_NUMBER"-compute@developer.gserviceaccount.com"
-
No console do Cloud, acesse IAM e ADMIN > IAM, clique no ícone Editar principal na conta de serviço padrão do Compute Engine
.
-
Adicione Dataflow Worker como outro papel e clique em Salvar.
Configure o ambiente de dados
# Crie buckets do GCS e conjunto de dados do BQ
cd $BASE_DIR/../..
source create_streaming_sinks.sh
# Mude para o diretório que contém a versão prática do código
cd $BASE_DIR
Clique em Verificar meu progresso para ver o objetivo.
Configure o ambiente de dados
Tarefa 1: como ler uma fonte de streaming
Nos laboratórios anteriores, você usou o beam.io.ReadFromText
para ler no Google Cloud Storage. Neste laboratório, em vez do Google Cloud Storage, você usa o Pub/Sub. O Pub/Sub é um serviço de mensagens em tempo real totalmente gerenciado. Ele permite que os editores enviem mensagens a um "tópico", em que os assinantes podem se inscrever com uma "assinatura".

O pipeline que você criou se inscreve em um tópico chamado my_topic
, recém-criado via script create_streaming_sinks.sh
. Em uma situação de produção, é normal que a equipe de publicação crie esse tópico. Confira na parte do Pub/Sub do console.
- No explorador de arquivos, acesse
training-data-analyst/quest/dataflow_python/5_Streaming_Analytics/lab/
e abra o arquivo streaming_minute_traffic_pipeline.py
.
- Para ler no Pub/Sub usando os conectores de E/S do Apache Beam, adicione ao pipeline uma transformação que use a classe
beam.io.ReadFromPubSub()
. Essa classe tem atributos para especificar o tópico da fonte, bem como o timestamp_attribute
. Por padrão, esse atributo é definido como o horário de publicação da mensagem.
Observação:
o horário da publicação é o momento em que o serviço Pub/Sub recebe a mensagem pela primeira vez. Em alguns sistemas, pode haver uma diferença entre o horário real do evento e o horário da publicação (ou seja, dados atrasados). Se você quiser levar isso em conta, o código do cliente que publica a mensagem precisa definir um atributo de metadados "timestamp" na mensagem e fornecer o carimbo de data/hora do evento real, porque o Pub/Sub não saberá de forma nativa como extrair o carimbo de data/hora do evento incorporado ao payload. Confira o código do cliente que gera as mensagens que você vai usar.
Para concluir essa tarefa, siga estas etapas:
- Adicione uma transformação que leia o tópico do Pub/Sub especificado pelo parâmetro de linha de comando
input_topic
.
- Em seguida, use a função fornecida
parse_json
com beam.Map
para converter cada string JSON em uma instância CommonLog
.
- Colete os resultados dessa transformação em uma
PCollection
de instâncias do CommonLog
usando with_output_types()
.
- No primeiro
#TODO
, adicione este código:
beam.io.ReadFromPubSub(input_topic)
Tarefa 2: agrupar os dados
No laboratório anterior que não é SQL, você implementou janelas de tempo fixo para agrupar eventos por tempo do evento em janelas de tamanho fixo de exclusão mútua. Faça o mesmo com as entradas de streaming aqui. Se tiver dificuldades, consulte o código do laboratório anterior ou a solução.
Agrupar em janelas de um minuto
Para concluir essa tarefa, siga estas etapas:
- Adicione uma transformação ao pipeline que aceite a
PCollection
de dados CommonLog
e agrupe os elementos em janelas de segundos de duração window_duration
, com window_duration
como outro parâmetro de linha de comando.
- Use o código a seguir para adicionar uma transformação ao pipeline que agrupa os elementos em janelas de um minuto:
"WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(60))
Tarefa 3: agregar os dados
No laboratório anterior, você usou o combinador CountCombineFn()
na contagem do número de eventos por janela. Faça o mesmo aqui.
Contar eventos por janela
Para concluir essa tarefa, siga estas etapas:
- Transmita a
PCollection
em janela como entrada para uma transformação que faça a contagem do número de eventos por janela.
- Depois disso, use o
DoFn
fornecido, GetTimestampFn
, com beam.ParDo
para incluir o carimbo de data/hora de início da janela.
- Use o código a seguir para adicionar uma transformação ao pipeline que faça a contagem do número de eventos por janela:
"CountPerMinute" >> beam.CombineGlobally(CountCombineFn()).without_defaults()
Tarefa 4: gravar no BigQuery
Este pipeline faz gravações no BigQuery em duas ramificações separadas. A primeira ramificação grava os dados agregados no BigQuery. A segunda ramificação, que já foi criada para você, grava alguns metadados sobre cada evento bruto, incluindo os carimbos de data/hora do evento e do carimbo de data/hora do processamento. Ambos gravam dados direto no BigQuery via inserções por streaming.
Gravar dados agregados no BigQuery
Como a gravação no BigQuery foi abordada em laboratórios anteriores, o mecanismo básico não será detalhado aqui.
Para concluir essa tarefa, siga estas etapas:
- Crie um parâmetro de linha de comando chamado
agg_table_name
para a tabela que vai armazenar dados agregados.
- Adicione uma transformação como a anterior para gravação no BigQuery.
Observação:
em um contexto de streaming, beam.io.WriteToBigQuery()
não suporta write_disposition
de WRITE_TRUNCATE
, em que a tabela é descartada e recriada. Neste exemplo, use WRITE_APPEND
.
Método de inserção do BigQuery
O padrão de beam.io.WriteToBigQuery
será usar inserções por streaming em PCollections ilimitadas ou jobs de carregamento de arquivos em lote em PCollections limitadas. As inserções por streaming podem ser úteis quando você quiser agilizar a exibição dos dados em agregações, mas isso acarreta cobranças extras. Nos casos de uso de streaming em que os uploads em lote periódicos podem acontecer a cada dois minutos, é possível especificar esse comportamento com o argumento de palavra-chave method
e definir a frequência com o argumento de palavra-chave triggering_frequency
. Saiba mais na seção sobre gravação de dados no BigQuery da documentação do módulo apache_beam.io.gcp.bigquery.
- Use o código a seguir para adicionar ao pipeline uma transformação que grava dados agregados na tabela do BigQuery.
'WriteAggToBQ' >> beam.io.WriteToBigQuery(
agg_table_name,
schema=agg_table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
Tarefa 5: executar o pipeline
- Volte ao terminal e execute o código a seguir para executar o pipeline:
export PROJECT_ID=$(gcloud config get-value project)
export REGION='us-central1'
export BUCKET=gs://${PROJECT_ID}
export PIPELINE_FOLDER=${BUCKET}
export RUNNER=DataflowRunner
export PUBSUB_TOPIC=projects/${PROJECT_ID}/topics/my_topic
export WINDOW_DURATION=60
export AGGREGATE_TABLE_NAME=${PROJECT_ID}:logs.windowed_traffic
export RAW_TABLE_NAME=${PROJECT_ID}:logs.raw
python3 streaming_minute_traffic_pipeline.py \
--project=${PROJECT_ID} \
--region=${REGION} \
--staging_location=${PIPELINE_FOLDER}/staging \
--temp_location=${PIPELINE_FOLDER}/temp \
--runner=${RUNNER} \
--input_topic=${PUBSUB_TOPIC} \
--window_duration=${WINDOW_DURATION} \
--agg_table_name=${AGGREGATE_TABLE_NAME} \
--raw_table_name=${RAW_TABLE_NAME}
Observação: se você receber um erro de falha no pipeline do Dataflow com a mensagem de que não foi possível abrir o arquivo pipeline.pb
, execute o pipeline de novo. Dessa vez, ele não deve apresentar problemas.
Verifique se ele é executado na IU do Dataflow sem erros. Observe que ainda não há dados sendo criados e ingeridos pelo pipeline. Portanto, ele será executado, mas não processará nada. Você incluirá os dados na próxima etapa.
Clique em Verificar meu progresso para ver o objetivo.
Execute o pipeline
Tarefa 6: gerar uma entrada de streaming sem atraso
Como este é um pipeline de streaming, ele é inscrito na origem do streaming e aguarda a entrada. No momento, não há nada. Nesta seção, você vai gerar dados sem atraso. Os dados reais quase sempre têm atraso. No entanto, esta seção é instrutiva, e o objetivo é que você entenda as entradas de streaming sem atraso.
O código desta Quest inclui um script para publicar eventos JSON usando o Pub/Sub.
- Para concluir a tarefa e começar a publicar mensagens, abra um novo terminal ao lado do atual e execute o script a seguir. Ele vai continuar publicando mensagens até você encerrar o script. Acesse a pasta
training-data-analyst/quests/dataflow_python
.
bash generate_streaming_events.sh
Clique em Verificar meu progresso para ver o objetivo.
Gere uma entrada de streaming sem atraso
Analisar os resultados
- Aguarde o preenchimento de dados por alguns minutos. Em seguida, navegue até o BigQuery e faça a consulta a seguir na tabela
logs.minute_traffic
:
SELECT timestamp, page_views
FROM `logs.windowed_traffic`
ORDER BY timestamp ASC
Você verá que número de visualizações de páginas passou de 100 visualizações por minuto.
- Também é possível usar a ferramenta de linha de comando do BigQuery para confirmar rapidamente que os resultados estão sendo gravados:
bq head logs.raw
bq head logs.windowed_traffic
- Agora insira a seguinte consulta:
SELECT
UNIX_MILLIS(TIMESTAMP(event_timestamp)) - min_millis.min_event_millis AS event_millis,
UNIX_MILLIS(TIMESTAMP(processing_timestamp)) - min_millis.min_event_millis AS processing_millis,
user_id,
-- added as unique label so we see all the points
CAST(UNIX_MILLIS(TIMESTAMP(event_timestamp)) - min_millis.min_event_millis AS STRING) AS label
FROM
`logs.raw`
CROSS JOIN (
SELECT
MIN(UNIX_MILLIS(TIMESTAMP(event_timestamp))) AS min_event_millis
FROM
`logs.raw`) min_millis
WHERE
event_timestamp IS NOT NULL
ORDER BY
event_millis ASC
Esta consulta ilustra a lacuna entre o tempo de evento e o tempo de processamento. No entanto, pode ser difícil ver o panorama geral se analisarmos apenas os dados tabulares brutos. Usaremos o Data Studio, uma visualização leve de dados e um mecanismo de BI.
- Para ativar o Data Studio:
- Acesse https://datastudio.google.com.
- Clique em Criar no canto superior esquerdo.
- Clique em Relatório.
- Clique nos Termos de Serviço e em Concluído.
- Volte para a IU do BigQuery.
- Na IU do BigQuery, clique no botão Explorar dados e selecione Explorar com o Data Studio.
Uma nova janela é aberta.
-
No painel à direita dessa janela, selecione o tipo de gráfico de dispersão.
-
Na coluna Dados do painel à direita, defina os seguintes valores:
- Dimensão: identificador
- Hierarquia: desativada
- Métrica X: event_millis
- Métrica Y: processing_millis
O gráfico será transformado em um gráfico de dispersão, em que todos os pontos estão na diagonal. Isso ocorre porque, nos dados de streaming que estão sendo gerados, os eventos são processados logo após a geração, sem atraso. Se você iniciou o script de geração de dados rapidamente, ou seja, antes da ativação do job do Dataflow, verá algo parecido com um taco de hóquei, porque algumas mensagens na fila do Pub/Sub foram processadas quase de uma vez.
Mas, no mundo real, o atraso é algo com que os pipelines precisam lidar.

Tarefa 7: incluir o atraso na entrada de streaming
O script de evento de streaming é capaz de gerar eventos com atraso simulado.
Isso representa cenários em que há um atraso entre o momento em que os eventos são gerados e publicados no Pub/Sub. Por exemplo, quando um cliente de dispositivo móvel entra no modo off-line porque um usuário não tem o serviço, mas todos os eventos são coletados no dispositivo e publicados de uma vez quando o aparelho retorna ao modo on-line.
Gerar entrada de streaming com atraso
-
Primeiro, feche a janela do Data Studio.
-
Em seguida, para ativar o atraso, volte para o terminal e pare o script em execução usando CTRL+C
.
-
Por último, execute este comando:
bash generate_streaming_events.sh true
Analisar os resultados
- Volte à interface do BigQuery, execute a consulta outra vez e recrie a visualização do Data Studio. Os novos dados que chegam e são mostrados do lado direito do gráfico não são mais perfeitos. Alguns vão aparecer acima da diagonal, indicando que foram processados após a ocorrência dos eventos.
Tipo de gráfico: dispersão
- Dimensão: identificador
- Hierarquia: desativada
- Métrica X: event_millis
- Métrica Y: processing_millis

Finalize o laboratório
Clique em Terminar o laboratório após a conclusão. O Google Cloud Ensina remove os recursos usados e limpa a conta por você.
Você vai poder avaliar sua experiência no laboratório. Basta selecionar o número de estrelas, digitar um comentário e clicar em Enviar.
O número de estrelas indica o seguinte:
- 1 estrela = muito insatisfeito
- 2 estrelas = insatisfeito
- 3 estrelas = neutro
- 4 estrelas = satisfeito
- 5 estrelas = muito satisfeito
Feche a caixa de diálogo se não quiser enviar feedback.
Para enviar seu feedback, fazer sugestões ou correções, use a guia Suporte.
Copyright 2020 Google LLC. Todos os direitos reservados. Google e o logotipo do Google são marcas registradas da Google LLC. Todos os outros nomes de produtos e empresas podem ser marcas registradas das respectivas empresas a que estão associados.