Este conteúdo ainda não foi otimizado para dispositivos móveis.
Para aproveitar a melhor experiência, acesse nosso site em um computador desktop usando o link enviado a você por e-mail.
Visão geral
Neste laboratório, você vai:
implementar um pipeline com ramificações;
filtrar os dados antes de serem gravados;
adicionar parâmetros de linha de comando personalizados a um pipeline;
converter um pipeline personalizado em um modelo Flex personalizado do Dataflow;
executar um modelo Flex do Dataflow.
Pré-requisitos:
Noções básicas sobre o Python.
No laboratório anterior, você criou um pipeline básico sequencial de extração, transformação e carga e usou um modelo equivalente do Dataflow para ingerir armazenamento de dados em lote no Google Cloud Storage. Esse pipeline se baseia em uma sequência de transformações:
No entanto, muitos pipelines não vão exibir uma estrutura tão simples. Neste laboratório, você vai criar um pipeline mais sofisticado e não sequencial.
O caso de uso aqui é otimizar o consumo de recursos. Os produtos variam de acordo com a maneira que consomem recursos. Além disso, nem todos os dados são usados do mesmo modo em uma empresa. Algumas informações serão consultadas regularmente, por exemplo, em cargas de trabalho analíticas, e outras serão usadas apenas para recuperação. Neste laboratório, você vai otimizar o pipeline do primeiro laboratório para o consumo de recursos ao armazenar apenas dados que os analistas usarão no BigQuery e, ao mesmo tempo, arquivar outros dados em um serviço de armazenamento altamente durável e de baixo custo, o Coldline Storage do Google Cloud Storage.
Configuração e requisitos
Configuração do Qwiklabs
Para cada laboratório, você recebe um novo projeto do Google Cloud e um conjunto de recursos por um determinado período e sem custos financeiros.
Faça login no Qwiklabs em uma janela anônima.
Confira o tempo de acesso do laboratório (por exemplo, 1:15:00) e finalize todas as atividades nesse prazo.
Não é possível pausar o laboratório. Você pode reiniciar o desafio, mas vai precisar refazer todas as etapas.
Quando tudo estiver pronto, clique em Começar o laboratório.
Anote as credenciais (Nome de usuário e Senha). É com elas que você vai fazer login no Console do Google Cloud.
Clique em Abrir Console do Google.
Clique em Usar outra conta, depois copie e cole as credenciais deste laboratório nos locais indicados.
Se você usar outras credenciais, vai receber mensagens de erro ou cobranças.
Aceite os termos e pule a página de recursos de recuperação.
Verifique as permissões do projeto
Antes de começar a trabalhar no Google Cloud, veja se o projeto tem as permissões corretas no Identity and Access Management (IAM).
No console do Google Cloud, em Menu de navegação (), selecione IAM e administrador > IAM.
Confira se a conta de serviço padrão do Compute {project-number}-compute@developer.gserviceaccount.com está na lista e recebeu o papel de editor. O prefixo da conta é o número do projeto, que está no Menu de navegação > Visão geral do Cloud > Painel.
Observação: se a conta não estiver no IAM ou não tiver o papel de editor, siga as etapas abaixo.
No console do Google Cloud, em Menu de navegação, clique em Visão geral do Cloud > Painel.
Copie o número do projeto, por exemplo, 729328892908.
Em Menu de navegação, clique em IAM e administrador > IAM.
Clique em Permitir acesso, logo abaixo de Visualizar por principais na parte de cima da tabela de papéis.
Substitua {project-number} pelo número do seu projeto.
Em Papel, selecione Projeto (ou Básico) > Editor.
Clique em Save.
Configuração do ambiente de desenvolvimento baseado em notebook do Jupyter
Neste laboratório, você vai executar todos os comandos em um terminal usando seu notebook.
No Menu de navegação do console do Google Cloud, clique em Vertex AI > Workbench.
Selecione Ativar a API Notebooks.
Na página "Workbench", selecione NOTEBOOKS GERENCIADOS PELO USUÁRIO e clique em CRIAR NOVO.
Na caixa de diálogo Nova instância, defina a região como e a zona como .
Em "Ambiente", selecione Apache Beam.
Clique em CRIAR na parte de baixo da caixa de diálogo.
Observação: pode levar de três a cinco minutos para que o ambiente seja totalmente provisionado. Aguarde até a conclusão dessa etapa.Observação: clique em Ativar API Notebooks para fazer isso.
Depois, clique no link ABRIR O JUPYTERLAB ao lado do nome do seu notebook para abrir o ambiente em uma nova guia do seu navegador.
Em seguida, clique em Terminal. Nele, é possível executar todos os comandos deste laboratório.
Faça o download do repositório de código
Agora você precisa dele para usar neste laboratório.
Insira este comando no terminal que você abriu:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst
cd /home/jupyter/training-data-analyst/quests/dataflow_python/
No painel à esquerda do ambiente do notebook no navegador de arquivos, você vai notar que o repositório training-data-analyst foi adicionado.
Acesse o repositório clonado /training-data-analyst/quests/dataflow_python/. Nele, você vai encontrar uma pasta para cada laboratório com duas subpastas: lab, que contém o código que precisa ser concluído, e solution, que inclui um exemplo prático caso você precise de ajuda.
Observação: caso você queira editar um arquivo, é só clicar nele. O arquivo será aberto, e você poderá adicionar ou modificar o código.
Clique em Verificar meu progresso para conferir o objetivo.
Crie uma instância de notebook e clone o repositório do curso
Parte 1 do laboratório: como escrever pipelines com ramificações
Nesta etapa do laboratório, você vai escrever um pipeline com ramificações que grava dados no Google Cloud Storage e no BigQuery.
Várias transformações processam a mesma PCollection
Uma maneira de escrever um pipeline com ramificações é aplicar duas transformações diferentes à mesma PCollection, resultando em duas PCollections diferentes.
[PCollection1] = [PCollection de entrada inicial] | [Uma transformação]
[PCollection2] = [PCollection de entrada inicial] | [Uma transformação diferente]
Como implementar um pipeline com ramificações
Se você tiver dúvidas nessa seção ou nas próximas, a solução estará disponível aqui.
Tarefa 1: adicionar uma ramificação para fazer gravações no Cloud Storage
Para concluir esta tarefa, modifique um pipeline atual adicionando uma ramificação que faça gravações no Cloud Storage.
Abrir o laboratório correto
Crie um novo terminal no ambiente de desenvolvimento integrado, caso ainda não tenha feito isso, e copie e cole o seguinte comando:
# Altere o diretório no laboratório
cd 2_Branching_Pipelines/lab
export BASE_DIR=$(pwd)
Como configurar o ambiente virtual e as dependências
Antes de começar a editar o código real do pipeline, você precisa garantir que as dependências necessárias estão instaladas.
Volte ao terminal aberto no ambiente de desenvolvimento integrado e crie um ambiente virtual para este laboratório:
Por fim, verifique se a API Dataflow está ativada:
gcloud services enable dataflow.googleapis.com
Configurar o ambiente de dados
# Crie buckets do GCS e um conjunto de dados BQ
cd $BASE_DIR/../..
source create_batch_Sinks.sh
# Gere um evento de origem do dataflow
generate_batch_events.sh
# Altere para o diretório em que está a versão prática do código
cd $BASE_DIR
Abra o arquivo my_pipeline.py no ambiente de desenvolvimento integrado, que pode ser encontrado em 2_Branching_Pipelines/labs/. Role para baixo até o método run(), em que o corpo do pipeline está definido. Atualmente, o formato é o seguinte:
Modifique esse código adicionando uma nova transformação com ramificação que grava no Cloud Storage. Para isso, use textio.WriteToText antes de cada elemento que será convertido de json para dict.
Se você tiver dúvidas nessa seção ou nas próximas, consulte a solução aqui.
Clique em Verificar meu progresso para conferir o objetivo.
Configurar o ambiente de dados
Tarefa 2: filtrar dados por campo
No momento, o novo pipeline não consome menos recursos, já que todos os dados são armazenados duas vezes. Para começar a otimizar o consumo de recursos, precisamos reduzir a quantidade de informações duplicadas. O bucket do Google Cloud Storage funciona como um recurso de arquivamento e backup. Por isso, é importante que todos os dados sejam armazenados nele. No entanto, nem todas as informações precisam ser enviadas ao BigQuery.
Vamos supor que as pessoas responsáveis pela análise de dados geralmente analisam quais recursos os usuários acessam no site e como esses padrões de acesso diferem em função da região geográfica e do tempo. Somente um subconjunto dos campos seria necessário. Como analisamos os elementos json em dicionários, podemos usar facilmente o método pop para soltar um campo de dentro de uma chamável do Python:
def drop_field(element):
element.pop('field_name')
return element
Para concluir esta tarefa, use uma chamável do Python com beam.Map para soltar o campo user_agent, que não será usado pelos nossos analistas no BigQuery.
Tarefa 3: filtrar dados por elemento
Há muitas maneiras de filtrar no Apache Beam. Como estamos trabalhando com uma PCollection de dicionários Python, a maneira mais fácil é usar uma função lambda (anônima) como filtro, uma função que retorna um valor booleano, com beam.Filter. Exemplo:
purchases | beam.Filter(lambda element : element['cost_cents'] > 20*100)
Para concluir esta tarefa, adicione uma transformação beam.Filter ao pipeline. É possível filtrar por qualquer critério, mas recomendamos tentar eliminar linhas em que num_bytes seja maior ou igual a 120.
Tarefa 4: adicionar parâmetros personalizados de linha de comando
No momento, o pipeline tem vários parâmetros codificados, incluindo o caminho para a entrada e o local da tabela no BigQuery. No entanto, o pipeline seria mais útil se pudesse ler qualquer arquivo json no Cloud Storage. A adição desse recurso exige a adição de um conjunto de parâmetros de linha de comando.
No momento, usamos um ArgumentParser para ler e analisar argumentos de linha de comando. Em seguida, transmitimos esses argumentos para o objeto PipelineOptions() que especificamos ao criar nosso pipeline.
parser = argparse.ArgumentParser(description='...')
# Definir e analisar argumentos
options = PipelineOptions()
# Definir valores de opções
p = beam.Pipeline(options=options)
O PipelineOptions é usado para interpretar as opções lidas pelo ArgumentParser. Para adicionar um novo argumento de linha de comando ao analisador, podemos usar a sintaxe:
Para concluir esta tarefa, adicione parâmetros de linha de comando para o caminho de entrada, o caminho de saída do Google Cloud Storage e o nome da tabela do BigQuery. Em seguida, atualize o código do pipeline para acessar esses parâmetros em vez de constantes.
Tarefa 5: adicionar campos NULLABLE ao pipeline
Talvez você tenha notado que a tabela do BigQuery criada no último laboratório tinha um esquema com todos os campos REQUIRED como este:
Talvez seja melhor criar um esquema do Apache Beam com campos NULLABLE em que os dados estejam ausentes, tanto para a execução do pipeline quanto para uma tabela resultante do BigQuery com um esquema que reflita isso.
Podemos atualizar o esquema JSON do BigQuery adicionando um novo modo de propriedade para um campo que queremos anular:
Para concluir esta tarefa, marque os campos lat e lon como anuláveis no esquema do BigQuery.
Tarefa 6: executar o pipeline na linha de comando
Para concluir esta tarefa, execute o pipeline na linha de comando e transmita os parâmetros adequados. Além disso, anote o esquema resultante do BigQuery para campos NULLABLE. O código vai ficar assim:
# Configure as variáveis de ambiente
export PROJECT_ID=$(gcloud config get-value project)
export REGION={{{project_0.startup_script.lab_region|Region}}}
export BUCKET=gs://${PROJECT_ID}
export COLDLINE_BUCKET=${BUCKET}-coldline
export PIPELINE_FOLDER=${BUCKET}
export RUNNER=DataflowRunner
export INPUT_PATH=${PIPELINE_FOLDER}/events.json
export OUTPUT_PATH=${PIPELINE_FOLDER}-coldline/pipeline_output
export TABLE_NAME=${PROJECT_ID}:logs.logs_filtered
cd $BASE_DIR
python3 my_pipeline.py \
--project=${PROJECT_ID} \
--region=${REGION} \
--stagingLocation=${PIPELINE_FOLDER}/staging \
--tempLocation=${PIPELINE_FOLDER}/temp \
--runner=${RUNNER} \
--inputPath=${INPUT_PATH} \
--outputPath=${OUTPUT_PATH} \
--tableName=${TABLE_NAME}
Se o pipeline estiver sendo criado com sucesso, mas você perceber muitos erros devido a problemas no código ou configuração incorreta no serviço Dataflow, defina RUNNER novamente como "DirectRunner", o que faz com que o pipeline seja executado localmente e receba feedback mais rápido. Nesse caso, essa abordagem funciona porque o conjunto de dados é pequeno e você não está usando recursos incompatíveis com o DirectRunner.
Tarefa 7: verificar os resultados do pipeline
Acesse a página Jobs do Cloud Dataflow e veja o job enquanto ele está em execução. O gráfico será semelhante a este:
Clique no nó que representa a função Filter, que na imagem acima é chamada de FilterFn. Ao olhar o painel do lado direito, você vai perceber que mais elementos foram adicionados como entradas do que gravados como saídas.
Agora clique no nó que representa a gravação no Cloud Storage. Como todos os elementos foram gravados, esse número precisa estar de acordo com o número de elementos na entrada para a função "Filter".
Quando o processo do pipeline terminar, examine os resultados no BigQuery consultando sua tabela. O número de registros na tabela precisa estar de acordo com o número de elementos gerados pela função de filtro.
Clique em Verificar meu progresso para conferir o objetivo.
Executar o pipeline na linha de comando
Laboratório parte 2: modelos personalizados do Dataflow
Um pipeline que aceita parâmetros de linha de comando é muito mais útil do que um com esses parâmetros codificados. No entanto, executar esse tipo de pipeline exige a criação de um ambiente de desenvolvimento. Usar um modelo do Dataflow é uma opção ainda melhor para pipelines que serão executados novamente por vários usuários ou em contextos distintos.
Existem muitos modelos do Dataflow que já foram criados como parte do Google Cloud Platform. Para conferir, é só consultar este link. No entanto, nenhum deles executa a mesma função que o pipeline deste laboratório. Em vez disso, nesta parte do laboratório, você converterá o pipeline em um modelo Flex personalizado do Dataflow e mais novo (em vez de um modelo tradicional personalizado).
Converter um pipeline em um modelo Flex personalizado do Dataflow requer o uso de um contêiner do Docker para empacotar não apenas o código, mas as dependências, um Dockerfile para descrever qual código criar, o Cloud Build para criar o contêiner subjacente que será executado no ambiente de execução a fim de criar o job real e um arquivo de metadados para descrever os parâmetros do job.
Tarefa 1: criar uma imagem de contêiner personalizada para o modelo Flex do Dataflow
Primeiro, ative como padrão o uso do cache do Kaniko. O Kaniko armazena em cache os artefatos do build de contêiner. Portanto, o uso dessa opção acelera a criação dos builds subsequentes. Também vamos usar o pip3 freeze para registrar os pacotes e as versões usadas no ambiente.
gcloud config set builds/use_kaniko True
Em seguida, vamos criar o Dockerfile. Isso vai especificar o código e as dependências que precisamos usar.
a. Para concluir esta tarefa, crie um Novo arquivo na pasta dataflow_python/2_Branching_Pipelines/lab no explorador de arquivos do ambiente de desenvolvimento integrado.
b. Para criar um arquivo, clique em Arquivo >> Novo >> Arquivo de texto
c. Clique com o botão direito do mouse para renomear o arquivo como Dockerfile.
d. Para abrir o Dockerfile no painel de edição, clique no arquivo.
e. Copie o código abaixo para o arquivo Dockerfile e salve:
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}
RUN apt-get update && apt-get install -y libffi-dev && rm -rf /var/lib/apt/lists/*
COPY my_pipeline.py .
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/my_pipeline.py"
RUN python3 -m pip install apache-beam[gcp]==2.25.0
Por fim, use o Cloud Build para criar a imagem do contêiner.
O processo de criação e envio do contêiner vai levar alguns minutos.
Clique em Verificar meu progresso para conferir o objetivo.
Crie uma imagem de contêiner para o modelo Flex personalizado do Dataflow
Tarefa 2: criar e organizar o modelo Flex
Para executar um modelo, é preciso criar um arquivo de especificação no Cloud Storage com todas as informações necessárias para executar o job, como metadados e informações do SDK.
a. Crie um Novo arquivo na pasta dataflow_python/2_Branching_Pipelines/lab do explorador de arquivos do ambiente de desenvolvimento integrado.
b. Para criar um Novo arquivo, clique em Arquivo >> Novo >> Arquivo de texto.
c. Clique com o botão direito do mouse para renomear o arquivo como metadata.json.
d. Abra o arquivo metadata.json no painel de edição. Para abrir o arquivo, clique com o botão direito do mouse no metadata.json e selecione Abrir com >> Editor.
e. Para concluir esta tarefa, crie um arquivo metadata.json no formato abaixo, que considera todos os parâmetros de entrada esperados pelo pipeline. Se precisar, consulte a solução aqui. Para isso, é necessário escrever seus próprios parâmetros da verificação de regex. Embora não seja uma prática recomendada, ".*" será uma correspondência em qualquer entrada.
{
"name": "My Branching Pipeline",
"description": "A branching pipeline that writes raw to GCS Coldline, and filtered data to BQ",
"parameters": [
{
"name": "inputPath",
"label": "Input file path.",
"helpText": "Path to events.json file.",
"regexes": [
".*\\.json"
]
},
{
"name": "outputPath",
"label": "Output file location",
"helpText": "GCS Coldline Bucket location for raw data",
"regexes": [
"gs:\\/\\/[a-zA-z0-9\\-\\_\\/]+"
]
},
{
"name": "tableName",
"label": "BigQuery output table",
"helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
"regexes": [
"[^:]+:[^.]+[.].+"
]
}
]
}
Em seguida, crie e organize o modelo real:
export TEMPLATE_PATH="gs://${PROJECT_ID}/templates/mytemplate.json"
# Crie e faça upload para o modelo do GCS
# Talvez seja necessário ativar os recursos Beta do gcloud
gcloud beta dataflow flex-template build $TEMPLATE_PATH \
--image "$TEMPLATE_IMAGE" \
--sdk-language "PYTHON" \
--metadata-file "metadata.json"
Verifique se o arquivo foi enviado para o local do modelo no Cloud Storage.
Clique em Verificar meu progresso para conferir o objetivo.
Crie e organize o modelo Flex
Tarefa 3: executar o modelo usando a IU
Para concluir a tarefa, siga as instruções abaixo.
Selecione Modelo personalizado no menu suspenso Modelo do Cloud Dataflow.
Digite o caminho do Cloud Storage para o arquivo de modelo no campo de caminho do Cloud Storage referente ao modelo.
Insira os itens relevantes em Parâmetros obrigatórios
a. No Caminho do arquivo de entrada, insira
b. No Local do arquivo de saída, insira
c. Em Tabela de saída do BigQuery, insira
Clique em Executar job.
Observação: não é necessário especificar um bucket de preparo. O DataFlow vai criar um bucket privado no seu projeto usando o número dele, parecido com
Analise o console do Compute Engine. Você verá uma VM inicializadora temporária que foi criada para executar o contêiner e iniciar o pipeline com os parâmetros fornecidos.
Tarefa 4: executar o modelo usando a gcloud
Um dos benefícios de usar modelos do Dataflow é a capacidade de executá-los em vários contextos diferentes do ambiente de desenvolvimento. Para demonstrar isso, use a gcloud para executar um modelo do Dataflow na linha de comando.
Para concluir esta tarefa, execute este comando no seu terminal, modificando os parâmetros conforme apropriado:
Clique em Verificar meu progresso para conferir o objetivo.
Execute o modelo pela IU e usando a gcloud
Finalize o laboratório
Clique em Terminar o laboratório após a conclusão. O Google Cloud Ensina remove os recursos usados e limpa a conta por você.
Você vai poder avaliar sua experiência no laboratório. Basta selecionar o número de estrelas, digitar um comentário e clicar em Enviar.
O número de estrelas indica o seguinte:
1 estrela = muito insatisfeito
2 estrelas = insatisfeito
3 estrelas = neutro
4 estrelas = satisfeito
5 estrelas = muito satisfeito
Feche a caixa de diálogo se não quiser enviar feedback.
Para enviar seu feedback, fazer sugestões ou correções, use a guia Suporte.
Copyright 2026 Google LLC. Todos os direitos reservados. Google e o logotipo do Google são marcas registradas da Google LLC. Todos os outros nomes de empresas e produtos podem ser marcas registradas das empresas a que estão associados.
Os laboratórios criam um projeto e recursos do Google Cloud por um período fixo
Os laboratórios têm um limite de tempo e não têm o recurso de pausa. Se você encerrar o laboratório, vai precisar recomeçar do início.
No canto superior esquerdo da tela, clique em Começar o laboratório
Usar a navegação anônima
Copie o nome de usuário e a senha fornecidos para o laboratório
Clique em Abrir console no modo anônimo
Fazer login no console
Faça login usando suas credenciais do laboratório. Usar outras credenciais pode causar erros ou gerar cobranças.
Aceite os termos e pule a página de recursos de recuperação
Não clique em Terminar o laboratório a menos que você tenha concluído ou queira recomeçar, porque isso vai apagar seu trabalho e remover o projeto
Este conteúdo não está disponível no momento
Você vai receber uma notificação por e-mail quando ele estiver disponível
Ótimo!
Vamos entrar em contato por e-mail se ele ficar disponível
Um laboratório por vez
Confirme para encerrar todos os laboratórios atuais e iniciar este
Use a navegação anônima para executar o laboratório
A melhor maneira de executar este laboratório é usando uma janela de navegação anônima
ou privada. Isso evita conflitos entre sua conta pessoal
e a conta de estudante, o que poderia causar cobranças extras
na sua conta pessoal.
Neste laboratório, você vai: a) implementar um pipeline com ramificações; b) filtrar dados antes da gravação; c) adicionar parâmetros de linha de comando personalizados a um pipeline; d) converter um pipeline personalizado em um modelo Flex do Dataflow; e e) executar um modelo Flex do Dataflow.
Duração:
Configuração: 1 minutos
·
Tempo de acesso: 120 minutos
·
Tempo para conclusão: 120 minutos