Instruções e requisitos de configuração do laboratório
Proteja sua conta e seu progresso. Sempre use uma janela anônima do navegador e suas credenciais para realizar este laboratório.

Validar a qualidade de dados de um pipeline de dados em lote usando o Serverless para Apache Spark

Laboratório 1 hora universal_currency_alt 5 créditos show_chart Intermediário
info Este laboratório pode incorporar ferramentas de IA para ajudar no seu aprendizado.
Este conteúdo ainda não foi otimizado para dispositivos móveis.
Para aproveitar a melhor experiência, acesse nosso site em um computador desktop usando o link enviado a você por e-mail.

Visão geral

O Serverless para Apache Spark do Google Cloud é um serviço totalmente gerenciado que simplifica a execução de cargas de trabalho em lote do Spark sem gerenciar a infraestrutura. Esse padrão oferece uma abordagem robusta para fluxos de trabalho de ETL (extração, transformação e carregamento), garantindo que apenas dados de alta qualidade alimentem seus sistemas analíticos.

O desafio

Os dados brutos ingeridos em um data lake geralmente contêm imprecisões, como formatos incorretos, entradas inválidas ou valores faltando. Carregá-los diretamente em um warehouse analítico pode corromper relatórios e levar a decisões de negócios ruins.

A solução

Criar um pipeline automatizado de qualidade de dados que intercepta os dados brutos, aplica um conjunto de regras de validação e depois roteia os dados de forma inteligente. Os registros limpos são enviados para o data warehouse de produção, e aqueles com falha na validação são enviados para uma "fila de mensagens inativas" (DLQ, na sigla em inglês) para inspeção e correção.

Neste laboratório, você vai criar essa solução executando um job personalizado do PySpark no Serverless para Apache Spark. O job vai:

  1. Ler um arquivo CSV bruto de um bucket do Cloud Storage.
  2. Aplicar regras de qualidade de dados para validar cada registro.
  3. Carregar os registros limpos e válidos para uma tabela do BigQuery.
  4. Gravar os registros inválidos em um bucket de DLQ separado no Cloud Storage.

Esse padrão garante que seu data warehouse fique sempre em um bom estado e oferece um processo claro e auditável para lidar com erros de dados.

Casos de uso corporativo

  • E-commerce: um pipeline valida os dados dos pedidos recebidos, garantindo que os IDs dos produtos sejam legítimos e que os e-mails dos clientes estejam no formato correto antes de serem carregados em uma tabela do BigQuery de análise de vendas. Pedidos inválidos são encaminhados para uma revisão manual de DLQ.
  • Serviços de saúde: um sistema processa registros de pacientes, validando se os códigos médicos existem e se as datas estão no formato correto. Os registros com erros são enviados para um bucket de DLQ seguro para revisão de gestão de dados, assim garantindo a conformidade.
  • Finanças: um pipeline diário ingere dados do mercado de ações, verificando se há valores nulos em campos críticos como close_price. Dados incompletos de mostrador são enviados para uma DLQ, evitando a corrupção de modelos de análise de séries temporais.

Objetivos

Neste laboratório, você vai aprender a:

  • Navegar pelo ambiente de laboratório pré-configurado provisionado pelo Terraform.
  • Concluir a configuração do ambiente criando um conjunto de dados do BigQuery.
  • Criar um script PySpark personalizado e comentado com lógica de roteamento e qualidade de dados.
  • Configurar e executar um job em lote do Spark em uma rede VPC personalizada e segura.
  • Verificar a saída de dados limpos em uma tabela do BigQuery.
  • Analisar os registros inválidos na DLQ do Cloud Storage.

Configuração e requisitos

Para cada laboratório, você recebe um novo projeto do Google Cloud e um conjunto de recursos por um determinado período sem custo financeiro.

  1. Faça login no Google Skills usando uma janela anônima.

  2. Confira o tempo de acesso do laboratório (por exemplo, 1:15:00) e finalize todas as atividades nesse prazo.
    Não é possível pausar o laboratório. Você pode reiniciar o desafio, mas vai precisar refazer todas as etapas.

  3. Quando tudo estiver pronto, clique em Começar o laboratório.

  4. Anote as credenciais (Nome de usuário e Senha). É com elas que você vai fazer login no Console do Google Cloud.

  5. Clique em Abrir Console do Google.

  6. Clique em Usar outra conta, depois copie e cole as credenciais deste laboratório nos locais indicados.
    Se você usar outras credenciais, vai receber mensagens de erro ou cobranças.

  7. Aceite os termos e pule a página de recursos de recuperação.

Ativar o Cloud Shell

O Cloud Shell é uma máquina virtual que contém ferramentas para desenvolvedores. Ele tem um diretório principal permanente de 5 GB e é executado no Google Cloud. O Cloud Shell oferece aos seus recursos do Google Cloud acesso às linhas de comando. A gcloud é a ferramenta ideal para esse tipo de operação no Google Cloud. Ela vem pré-instalada no Cloud Shell e aceita preenchimento com tabulação.

  1. No painel de navegação do Console do Google Cloud, clique em Ativar o Cloud Shell (Ícone do Cloud Shell).

  2. Clique em Continuar.
    O provisionamento e a conexão do ambiente podem demorar um pouco. Quando esses processos forem concluídos, você já vai ter uma autenticação, e o projeto estará definido com seu PROJECT_ID. Por exemplo:

Terminal do Cloud Shell

Exemplo de comandos

  • Liste o nome da conta ativa:

gcloud auth list

(Saída)

Credentialed accounts: - <myaccount>@<mydomain>.com (active)

(Exemplo de saída)

Credentialed accounts: - google1623327_student@qwiklabs.net
  • Liste o ID do projeto:

gcloud config list project

(Saída)

[core] project = <project_ID>

(Exemplo de saída)

[core] project = qwiklabs-gcp-44776a13dea667a6

Seu ambiente de laboratório

Quando você começa este laboratório, um script do Terraform é executado para provisionar automaticamente a maioria dos recursos e da infraestrutura necessários. Estes itens foram criados para você:

  • ID do projeto =

  • Região =

  • Zona =

  • Uma rede VPC personalizada (spark-network) e uma sub-rede (spark-subnet) configuradas com o acesso de rede necessário para o Serverless para Apache Spark.

  • Dois buckets do Cloud Storage:

    1. Um bucket principal (gs://-main-bucket) para armazenar o script do PySpark (scripts/) e os dados brutos de entrada (source/) e servir como área de preparo temporária para o conector do BigQuery.
    2. Um bucket de DLQ (gs://-dlq-bucket) dedicado ao armazenamento de registros inválidos.
  • Um arquivo de dados brutos: um script do Python automaticamente gerou e carregou um arquivo CSV de 1.000 registros (source/customer_contacts_1000.csv) no bucket principal. Aproximadamente 20% desses registros têm imperfeições intencionais para testar seu pipeline, como e-mails inválidos e IDs ausentes.

Seu objetivo é criar um script capaz de identificar e separar os registros bons dos ruins e carregá-los nos destinos corretos.

Observação sobre a estratégia de bucket:

Para simplificar este laboratório, o bucket principal é usado para scripts, dados de origem e preparo temporário do BigQuery. Em ambientes de produção, é prática recomendada usar três buckets separados: um para dados brutos/de entrada, um para a fila de mensagens inativas e um dedicado para dados temporários de preparo usados por conectores, como do BigQuery. Fazer isso proporciona melhor isolamento, segurança e gerenciamento do ciclo de vida.

Tarefa 1: navegue pelo ambiente e prepare os serviços

Primeiro, você vai confirmar se os recursos do laboratório foram criados corretamente e visualizar os dados de origem que serão usados.

Confirme os buckets do Cloud Storage

  1. No console do Google Cloud, abra Menu de navegação (☰) e acesse Cloud Storage > Buckets.
  2. Confirme se dois buckets estão listados: um terminando em -main-bucket e o outro em -dlq-bucket.

Visualize os dados brutos e ative a API

Clique no ícone do Cloud Shell

  1. Ative o Cloud Shell.

  2. Execute o comando abaixo para visualizar o cabeçalho e os 10 primeiros registros do arquivo CSV bruto localizado no bucket principal.

    gsutil cat gs://{{{project_0.project_id|Project_ID}}}-main-bucket/source/customer_contacts_1000.csv | head -n 11
  3. Você precisa ativar a API Dataproc antes de executar um job. Para fazer isso, execute este comando no Cloud Shell:

    gcloud services enable dataproc.googleapis.com

Clique em Verificar meu progresso para conferir a tarefa realizada.

Ativar a API Dataproc.

Tarefa 2: prepare o ambiente do BigQuery

O script do Terraform já configurou a rede e o armazenamento, mas você ainda tem que criar o conjunto de dados de destino do BigQuery onde os dados limpos serão carregados.

Crie o conjunto de dados

  1. Para criar um novo conjunto de dados do BigQuery chamado customer_data_clean, execute o comando abaixo no Cloud Shell.

    bq mk customer_data_clean
  2. Agora é possível validar se o conjunto de dados foi criado com sucesso no console. Acesse o BigQuery pelo Menu de navegação (☰). No painel Explorador, clique na seta ao lado do ID do projeto para expandir o conteúdo. O novo conjunto de dados customer_data_clean vai aparecer.

Clique em Verificar meu progresso para conferir a tarefa realizada.

Criar um conjunto de dados do BigQuery.

Tarefa 3: prepare o script de qualidade de dados do PySpark

Agora, crie o script do PySpark personalizado que contém a lógica para validar os dados. A lógica do script é bem direta:

  • ele lê o arquivo CSV de origem do Cloud Storage em um DataFrame,
  • aplica uma série de regras de validação para conferir se há IDs nulos e formatos de e-mail válidos,
  • divide o DataFrame em dois: um com registros limpos e outro com registros inválidos e, por fim,
  • grava os dados limpos no BigQuery e os inválidos no bucket de DLQ no Cloud Storage.

Escreva e faça upload do script

  1. No Cloud Shell, crie o arquivo de script do PySpark chamado customer_dq.py.

    nano customer_dq.py
  2. Cole o código Python comentado abaixo no editor nano.

    # Importar bibliotecas obrigatórias import sys from pyspark.sql import SparkSession from pyspark.sql.functions import col, when # Este script espera 1 argumento de linha de comando: # 1. O caminho da tabela de destino do BigQuery no formato "dataset.table" if len(sys.argv) != 2: print("Usage: customer_dq.py <bq_dataset_table>") sys.exit(-1) # Atribuir argumento de linha de comando à variável bq_dataset_table = sys.argv[1] # Variável do Qwiklabs são substituídas aqui quando o laboratório executa bq_project = "{{{project_0.project_id|Project_ID}}}" gcs_source_path = f"gs://{bq_project}-main-bucket/source/customer_contacts_1000.csv" gcs_dlq_path = f"gs://{bq_project}-dlq-bucket/errors/" # Inicializar uma nova Sessão do Spark spark = SparkSession.builder.appName("Customer DQ Check").getOrCreate() # Etapa 1: ler os dados do CSV de origem a partir do bucket do GCS df = spark.read.option("header", "true").option("inferSchema", "true").csv(gcs_source_path) # Etapa 2: definir as regras de qualidade de dados # Regra 1: A coluna "ID" não pode ser nula. dq_rule_id = col("id").isNotNull() # Regra 2: a coluna "e-mail" não pode ser nula e deve corresponder a uma expressão regular de formato de e-mail válida. email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" dq_rule_email = col("email").isNotNull().__and__(col("email").rlike(email_regex)) # Etapa 3: aplicar regras e dividir o DataFrame em registros limpos e com erro df_with_dq = df.withColumn("dq_passed", when(dq_rule_id.__and__(dq_rule_email), True).otherwise(False)) clean_df = df_with_dq.filter(col("dq_passed") == True).drop("dq_passed") error_df = df_with_dq.filter(col("dq_passed") == False).drop("dq_passed") # Etapa 4: gravar os registros limpos para a tabela do BigQuery especificada # O conector do BigQuery exige o NOME de um bucket temporário do GCS. temp_gcs_bucket_name = f"{bq_project}-main-bucket" clean_df.write \ .format("bigquery") \ .option("table", bq_dataset_table) \ .option("temporaryGcsBucket", temp_gcs_bucket_name) \ .option("project", bq_project) \ .mode("overwrite") \ .save() # Etapa 5: gravar os registros de erro no bucket de DLQ no GCS como um arquivo CSV único error_df.repartition(1).write \ .option("header", "true") \ .mode("overwrite") \ .csv(gcs_dlq_path) # Parar a sessão do Spark spark.stop()

Observação importante: a última linha do script precisa ser spark.stop(). Exclua tudo o que estiver abaixo dela, por exemplo, </bq_dataset_table>.

  1. Pressione CTRL+X, depois Y e Enter para salvar e sair do nano.

  2. Faça upload do novo script do PySpark no bucket principal do Cloud Storage.

    # O comando abaixo envia o script para uma pasta "scripts" no bucket de dados principal gcloud storage cp customer_dq.py gs://{{{project_0.project_id|Project_ID}}}-main-bucket/scripts/

Clique em Verificar meu progresso para conferir a tarefa realizada.

Preparar o script de qualidade de dados do PySpark.

Tarefa 4: configure e execute o pipeline em lote

Com o script carregado, é possível configurar e enviar o job para o Serverless para Apache Spark.

Execute o job do Spark

1.  Defina as variáveis de ambiente a seguir no Cloud Shell. Elas criam atalhos para os recursos provisionados pelo Terraform.

# O nome para a tabela final no BigQuery export BQ_TABLE="valid_customers" # O caminho da tabela do BigQuery no formato 'dataset.table' export BQ_DATASET_TABLE="customer_data_clean.${BQ_TABLE}" # O caminho para o arquivo CSV de origem com 1.000 registros export GCS_SOURCE_PATH="gs://{{{project_0.project_id|Project_ID}}}-main-bucket/source/customer_contacts_1000.csv" # O caminho do GCS onde os registros de erro serão gravados export GCS_DLQ_PATH="gs://{{{project_0.project_id|Project_ID}}}-dlq-bucket/errors/" # O caminho do GCS para o script do PySpark que você fez upload export PYSPARK_SCRIPT_PATH="gs://{{{project_0.project_id|Project_ID}}}-main-bucket/scripts/customer_dq.py" # O URI completo da subrede presonalizada criada pelo Terraform export SUBNET_URI="projects/{{{project_0.project_id |PROJECT_ID}}}/regions/{{{project_0.default_region |REGION}}}/subnetworks/spark-subnet"
  1. Revise o comando abaixo antes de executá-lo. Ele envia seu script como um job em lote e transmite suas variáveis de ambiente como argumentos.

    • --subnet: essa flag é essencial. Ela informa que o job deve ser executado na spark-subnet personalizada e segura criada pelo Terraform como prática recomendada de segurança.
    • --deps-bucket: essa flag especifica um bucket do GCS para preparação de dependências do job.
    • --: esse traço duplo separa as flags do comando gcloud dos argumentos que serão transmitidos diretamente ao seu script do PySpark.
  2. Execute este comando para enviar o job:

    gcloud dataproc batches submit pyspark $PYSPARK_SCRIPT_PATH \ --version=2.1 \ --batch="customer-dq-job-$(date +%s)" \ --region={{{project_0.default_region |REGION}}} \ --subnet=$SUBNET_URI \ --deps-bucket=gs://{{{project_0.project_id |PROJECT_ID}}}-main-bucket \ -- \ $BQ_DATASET_TABLE
Observação: o job leva de 3 a 5 minutos para ser concluído. É possível monitorar o progresso no console do Google Cloud acessando Dataproc > Sem servidor > Lotes.

Clique em Verificar meu progresso para conferir a tarefa realizada.

Executar o pipeline em lote.

Tarefa 5: verifique os dados limpos no BigQuery

Agora que o pipeline foi executado, verifique se apenas os registros limpos foram carregados no BigQuery.

Faça consultas na tabela de resultados

  1. No Cloud Shell, execute uma consulta para contar os registros limpos na tabela do BigQuery. A contagem deve ser aproximadamente 800.

    bq query \ --use_legacy_sql=false \ 'SELECT count(*) as total_clean_records FROM `customer_data_clean.valid_customers`;'
  2. Para visualizaruma amostra dos dados limpos, execute o comando a seguir. A saída vai mostrar todos os registros com IDs e formatos de e-mail válidos.

    bq query \ --use_legacy_sql=false \ 'SELECT * FROM `customer_data_clean.valid_customers` LIMIT 10;'

Clique em Verificar meu progresso para conferir a tarefa realizada.

Verificar os dados no BigQuery.

Tarefa 6: analise os registros inválidos na DLQ

Por fim, confira se os registros que não passaram nas verificações de qualidade de dados foram roteados corretamente para o bucket de DLQ para análise posterior.

Inspecione os arquivos com erro usando o Cloud Shell

  1. No Cloud Shell, veja uma amostra dos registros inválidos no bucket de DLQ. O comando head -n 11 vai mostrar a linha de cabeçalho e os 10 primeiros registros de erro.

    gcloud storage cat gs://{{{project_0.project_id |PROJECT_ID}}}-dlq-bucket/errors/*.csv | head -n 11
  2. O comando vai retornar uma amostra de aproximadamente 200 registros que falharam na validação. Algumas linhas terão IDs ausentes ou e-mails com erro de formatação.

    Exemplo de saída:

    id,first_name,last_name,email ,Isabella,Smith,<REDACTED_EMAIL> 12,Michael,Johnson, 21,Sophia,Williams,sophia.williams@example

(Opcional) Inspecione os arquivos de erro pelo console

Os arquivos de erros também podem ser visualizados diretamente no console do Google Cloud:

  1. No Menu de navegação (☰), acesse Cloud Storage > Buckets.
  2. Clique no nome do bucket que termina em -dlq-bucket.
  3. Navegue até a pasta errors/.
  4. Clique no nome do arquivo .csv para abrir e visualizar o conteúdo no navegador.

Parabéns!

Você criou e testou um pipeline de qualidade de dados em lote de nível de produção.

Neste laboratório, você criou um job personalizado do PySpark para validar e processar um arquivo do Cloud Storage, carregou os resultados limpos em uma tabela do BigQuery e encaminhou os registros inválidos para um bucket de DLQ, tudo em um ambiente de rede seguro e pré-provisionado. Esse padrão é um componente fundamental das plataformas de dados modernas e confiáveis.

Próximas etapas / Saiba mais

Copyright 2026 Google LLC. Todos os direitos reservados. Google e o logotipo do Google são marcas registradas da Google LLC. Todos os outros nomes de empresas e produtos podem ser marcas registradas das empresas a que estão associados.

Antes de começar

  1. Os laboratórios criam um projeto e recursos do Google Cloud por um período fixo
  2. Os laboratórios têm um limite de tempo e não têm o recurso de pausa. Se você encerrar o laboratório, vai precisar recomeçar do início.
  3. No canto superior esquerdo da tela, clique em Começar o laboratório

Usar a navegação anônima

  1. Copie o nome de usuário e a senha fornecidos para o laboratório
  2. Clique em Abrir console no modo anônimo

Fazer login no console

  1. Faça login usando suas credenciais do laboratório. Usar outras credenciais pode causar erros ou gerar cobranças.
  2. Aceite os termos e pule a página de recursos de recuperação
  3. Não clique em Terminar o laboratório a menos que você tenha concluído ou queira recomeçar, porque isso vai apagar seu trabalho e remover o projeto

Este conteúdo não está disponível no momento

Você vai receber uma notificação por e-mail quando ele estiver disponível

Ótimo!

Vamos entrar em contato por e-mail se ele ficar disponível

Um laboratório por vez

Confirme para encerrar todos os laboratórios atuais e iniciar este

Use a navegação anônima para executar o laboratório

A melhor maneira de executar este laboratório é usando uma janela de navegação anônima ou privada. Isso evita conflitos entre sua conta pessoal e a conta de estudante, o que poderia causar cobranças extras na sua conta pessoal.