Instrucciones y requisitos de configuración del lab
Protege tu cuenta y tu progreso. Usa siempre una ventana de navegador privada y las credenciales del lab para ejecutarlo.

Valida la calidad de los datos para una canalización de datos por lotes con Serverless for Apache Spark

Lab 1 hora universal_currency_alt 5 créditos show_chart Intermedio
info Es posible que este lab incorpore herramientas de IA para facilitar tu aprendizaje.
Este contenido aún no está optimizado para dispositivos móviles.
Para obtener la mejor experiencia, visítanos en una computadora de escritorio con un vínculo que te enviaremos por correo electrónico.

Descripción general

Serverless for Apache Spark de Google Cloud es un servicio completamente administrado que simplifica la ejecución de cargas de trabajo por lotes de Spark sin administrar la infraestructura. Este patrón proporciona un enfoque sólido para los flujos de trabajo de ETL (extracción, transformación y carga), lo que garantiza que solo los datos de alta calidad lleguen a tus sistemas analíticos.

El desafío

Los datos sin procesar que se transfieren a data lakes suelen tener imperfecciones, como valores faltantes, formato incorrecto o entradas no válidas. Cargar estos datos directamente en almacenes analíticos puede dañar los informes de manera que provoquen malas decisiones empresariales.

La solución

Crea una canalización automatizada de calidad de los datos que intercepte los datos sin procesar, aplique un conjunto de reglas de validación y, luego, enrute los datos de forma inteligente. Los registros depurados se envían al almacén de datos de producción, mientras que los registros que no pasan la validación se envían a una "cola de mensajes no entregados" (DLQ) para inspeccionarlos y corregirlos.

En este lab, crearás esa solución ejecutando un trabajo personalizado de PySpark en Serverless for Apache Spark. El trabajo hará lo siguiente:

  1. Leer un archivo CSV sin procesar de un bucket de Cloud Storage
  2. Aplicar reglas de calidad de los datos para validar cada registro
  3. Cargar los registros depurados y válidos en una tabla de BigQuery
  4. Escribir los registros no válidos en un bucket de DLQ independiente en Cloud Storage

Este patrón garantiza que tu almacén de datos permanezca impecable y ofrece un proceso claro y auditable para manejar los errores de datos.

Casos de uso empresarial

  • Comercio electrónico: Una canalización valida los datos de pedidos entrantes para garantizar que los IDs de productos sean válidos y que los correos electrónicos de los clientes tengan el formato correcto antes de cargarlos en una tabla de BigQuery para analítica de ventas. Los pedidos no válidos se enrutan a una DLQ para revisarlos manualmente.
  • Salud: Un sistema procesa registros de pacientes y valida que los códigos médicos existan y que las fechas tengan el formato correcto. Los registros con errores se envían a un bucket de DLQ seguro para que los revise la administración de datos y se garantice el cumplimiento.
  • Finanzas: Una canalización diaria transfiere datos del mercado de valores y comprueba si hay valores nulos en campos esenciales como close_price. Los datos incompletos de los símbolos bursátiles se envían a una DLQ, lo que evita que se dañen los modelos de análisis de series temporales.

Objetivos

En este lab, aprenderás a hacer lo siguiente:

  • Explorar el entorno de lab previamente configurado que aprovisionó Terraform
  • Completar la configuración del entorno creando un conjunto de datos de BigQuery
  • Escribir una secuencia de comandos de PySpark personalizada y comentada con lógica de calidad de los datos y enrutamiento
  • Configurar y ejecutar un trabajo por lotes de Spark en una red de VPC personalizada y segura
  • Verificar la salida de datos depurados en una tabla de BigQuery
  • Revisar los registros no válidos en la DLQ de Cloud Storage

Configuración y requisitos

En cada lab, recibirás un proyecto de Google Cloud y un conjunto de recursos nuevos por tiempo limitado y sin costo adicional.

  1. Accede a Google Skills en una ventana de incógnito.

  2. Ten en cuenta el tiempo de acceso del lab (por ejemplo, 1:15:00) y asegúrate de finalizarlo en el plazo asignado.
    No existe una función de pausa. Si lo necesitas, puedes reiniciar el lab, pero deberás hacerlo desde el comienzo.

  3. Cuando esté listo, haga clic en Comenzar lab.

  4. Anote las credenciales del lab (el nombre de usuario y la contraseña). Las usarás para acceder a la consola de Google Cloud.

  5. Haga clic en Abrir Google Console.

  6. Haga clic en Usar otra cuenta, copie las credenciales para este lab y péguelas en el mensaje emergente que aparece.
    Si usas otras credenciales, se generarán errores o incurrirás en cargos.

  7. Acepta las condiciones y omite la página de recursos de recuperación.

Active Cloud Shell

Cloud Shell es una máquina virtual que contiene herramientas de desarrollo y un directorio principal persistente de 5 GB. Se ejecuta en Google Cloud. Cloud Shell proporciona acceso de línea de comandos a sus recursos de Google Cloud. gcloud es la herramienta de línea de comandos de Google Cloud, la cual está preinstalada en Cloud Shell y es compatible con la función de autocompletado con tabulador.

  1. En el panel de navegación de Google Cloud Console, haga clic en Activar Cloud Shell (Ícono de Cloud Shell).

  2. Haga clic en Continuar.
    El aprovisionamiento y la conexión al entorno tardan solo unos momentos. Una vez que se conecte, también estará autenticado, y el proyecto estará configurado con su PROJECT_ID. Por ejemplo:

Terminal de Cloud Shell

Comandos de muestra

  • Si desea ver el nombre de cuenta activa, use este comando:

gcloud auth list

(Resultado)

Credentialed accounts: - <myaccount>@<mydomain>.com (active)

(Resultado de ejemplo)

Credentialed accounts: - google1623327_student@qwiklabs.net
  • Si desea ver el ID del proyecto, use este comando:

gcloud config list project

(Resultado)

[core] project = <project_ID>

(Resultado de ejemplo)

[core] project = qwiklabs-gcp-44776a13dea667a6

Entorno del lab

Cuando comiences este lab, se ejecutará una secuencia de comandos de Terraform para aprovisionar automáticamente la mayoría de la infraestructura y los recursos necesarios. Se crearon los siguientes elementos para ti:

  • ID del proyecto = 

  • Región = 

  • Zona = 

  • Una red de VPC personalizada (spark-network) y una subred (spark-subnet) configurada con el acceso de red necesario para Serverless for Apache Spark

  • Dos buckets de Cloud Storage:

    1. Un bucket principal (gs://-main-bucket) que se usará para almacenar tu secuencia de comandos de PySpark (scripts/), los datos de entrada sin procesar (source/) y como área de almacenamiento en etapa intermedia temporal para el conector de BigQuery
    2. Un bucket de DLQ (gs://-dlq-bucket) dedicado a almacenar registros no válidos
  • Un archivo de datos sin procesar: Una secuencia de comandos de Python generó y subió automáticamente un archivo CSV de 1,000 registros (source/customer_contacts_1000.csv) al bucket principal. Aproximadamente el 20% de estos registros contienen imperfecciones intencionales (p. ej., IDs faltantes o correos electrónicos no válidos) para probar tu canalización

Tu objetivo es escribir una secuencia de comandos que pueda identificar y separar los registros correctos de los con errores y, luego, cargarlos en los destinos apropiados.

Nota sobre la estrategia para buckets:

Para simplificar este lab, el bucket principal se usa en secuencias de comandos, datos de origen y almacenamiento temporal en etapa intermedia de BigQuery. En los entornos de producción, es una práctica recomendada usar tres buckets distintos: uno para los datos sin procesar o de entrada, uno para la cola de mensajes no entregados y otro para los datos de almacenamiento temporal en etapa intermedia que usan conectores como el de BigQuery. Esto mejora el aislamiento, la seguridad y la administración del ciclo de vida.

Tarea 1: Explora el entorno y prepara los servicios

Primero, confirmarás que los recursos del lab se hayan creado correctamente y obtendrás una vista previa de los datos de origen con los que trabajarás.

Confirma los buckets de Cloud Storage

  1. En la consola de Google Cloud, usa el menú de navegación (☰) para ir a Cloud Storage > Buckets.
  2. Confirma que ves dos buckets en la lista: uno que termina en -main-bucket y otro que termina en -dlq-bucket.

Obtén una vista previa de los datos sin procesar y habilita la API

Haz clic en el ícono de Cloud Shell

  1. Activa Cloud Shell.

  2. Ejecuta el siguiente comando para ver el encabezado y los primeros 10 registros del archivo CSV sin procesar ubicado en tu bucket principal.

    gsutil cat gs://{{{project_0.project_id|Project_ID}}}-main-bucket/source/customer_contacts_1000.csv | head -n 11
  3. Antes de ejecutar un trabajo, debes habilitar la API de Dataproc. Ejecuta el siguiente comando en Cloud Shell para habilitarla:

    gcloud services enable dataproc.googleapis.com

Haz clic en Revisar mi progreso para verificar la tarea realizada.

Habilitar la API de Dataproc

Tarea 2: Prepara el entorno de BigQuery

Tu secuencia de comandos de Terraform configuró la red y el almacenamiento, pero aún debes crear el conjunto de datos de BigQuery de destino en el que se cargarán tus datos depurados.

Crea el conjunto de datos

  1. En Cloud Shell, ejecuta el siguiente comando para crear un nuevo conjunto de datos de BigQuery llamado customer_data_clean.

    bq mk customer_data_clean
  2. Ahora puedes validar que el conjunto de datos se haya creado correctamente en la consola. Usa el menú de navegación (☰) para ir a BigQuery. En el panel Explorador, haz clic en la flecha junto a tu ID del proyecto para expandir su contenido. Deberías ver el nuevo conjunto de datos customer_data_clean.

Haz clic en Revisar mi progreso para verificar la tarea realizada.

Crear un conjunto de datos de BigQuery

Tarea 3: Prepara la secuencia de comandos de calidad de los datos de PySpark

Ahora crearás la secuencia de comandos personalizada de PySpark que contendrá la lógica para validar los datos. La lógica de la secuencia de comandos es sencilla:

  • Lee el archivo CSV de origen de Cloud Storage en un DataFrame.
  • Aplica una serie de reglas de validación para comprobar si hay IDs nulos y formatos de correo electrónico válidos.
  • Divide el DataFrame en dos: uno con registros depurados y otro con registros no válidos.
  • Escribe los datos depurados en BigQuery y los datos no válidos en el bucket de DLQ en Cloud Storage.

Escribe y sube la secuencia de comandos

  1. En Cloud Shell, crea el archivo de secuencia de comandos de PySpark llamado customer_dq.py.

    nano customer_dq.py
  2. Pega el siguiente código de Python con comentarios en el editor nano.

    # Import necessary libraries import sys from pyspark.sql import SparkSession from pyspark.sql.functions import col, when # This script expects 1 command-line argument: # 1. The destination BigQuery table path in format 'dataset.table' if len(sys.argv) != 2: print("Usage: customer_dq.py <bq_dataset_table>") sys.exit(-1) # Assign command-line argument to variable bq_dataset_table = sys.argv[1] # Qwiklabs variables are substituted here when the lab runs bq_project = "{{{project_0.project_id|Project_ID}}}" gcs_source_path = f"gs://{bq_project}-main-bucket/source/customer_contacts_1000.csv" gcs_dlq_path = f"gs://{bq_project}-dlq-bucket/errors/" # Initialize a new Spark Session spark = SparkSession.builder.appName("Customer DQ Check").getOrCreate() # Step 1: Read the source CSV data from the GCS bucket df = spark.read.option("header", "true").option("inferSchema", "true").csv(gcs_source_path) # Step 2: Define the Data Quality rules # Rule 1: The 'id' column must not be null. dq_rule_id = col("id").isNotNull() # Rule 2: The 'email' column must not be null and must match a valid email format regex. email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" dq_rule_email = col("email").isNotNull().__and__(col("email").rlike(email_regex)) # Step 3: Apply rules and split the DataFrame into clean and error records df_with_dq = df.withColumn("dq_passed", when(dq_rule_id.__and__(dq_rule_email), True).otherwise(False)) clean_df = df_with_dq.filter(col("dq_passed") == True).drop("dq_passed") error_df = df_with_dq.filter(col("dq_passed") == False).drop("dq_passed") # Step 4: Write the clean records to the specified BigQuery table # The BigQuery connector requires a temporary GCS bucket NAME. temp_gcs_bucket_name = f"{bq_project}-main-bucket" clean_df.write \ .format("bigquery") \ .option("table", bq_dataset_table) \ .option("temporaryGcsBucket", temp_gcs_bucket_name) \ .option("project", bq_project) \ .mode("overwrite") \ .save() # Step 5: Write the error records to the DLQ bucket in GCS as a single CSV file error_df.repartition(1).write \ .option("header", "true") \ .mode("overwrite") \ .csv(gcs_dlq_path) # Stop the Spark session spark.stop()

Nota importante Asegúrate de que la última línea de la secuencia de comandos sea spark.stop(). Borra todo lo que esté debajo, como </bq_dataset_table>.

  1. Presiona CTRL + X, luego, Y y, por último, Intro para guardar y salir de nano.

  2. Sube tu nueva secuencia de comandos de PySpark al bucket principal de Cloud Storage.

    # The command below uploads the script to a 'scripts' folder in the main data bucket gcloud storage cp customer_dq.py gs://{{{project_0.project_id|Project_ID}}}-main-bucket/scripts/

Haz clic en Revisar mi progreso para verificar la tarea realizada.

Preparar la secuencia de comandos de calidad de los datos de PySpark

Tarea 4: Configura y ejecuta la canalización por lotes

Con la secuencia de comandos subida, ahora puedes configurar y enviar el trabajo a Serverless for Apache Spark.

Ejecuta el trabajo de Spark

1. Establece las siguientes variables de entorno en Cloud Shell. Estas crean accesos directos a los recursos aprovisionados por Terraform.

# The name for the final table in BigQuery export BQ_TABLE="valid_customers" # The BigQuery table path in 'dataset.table' format export BQ_DATASET_TABLE="customer_data_clean.${BQ_TABLE}" # The path to the 1000-record source CSV file export GCS_SOURCE_PATH="gs://{{{project_0.project_id|Project_ID}}}-main-bucket/source/customer_contacts_1000.csv" # The GCS path where error records will be written export GCS_DLQ_PATH="gs://{{{project_0.project_id|Project_ID}}}-dlq-bucket/errors/" # The GCS path to the PySpark script you just uploaded export PYSPARK_SCRIPT_PATH="gs://{{{project_0.project_id|Project_ID}}}-main-bucket/scripts/customer_dq.py" # The full URI of the custom subnet created by Terraform export SUBNET_URI="projects/{{{project_0.project_id |PROJECT_ID}}}/regions/{{{project_0.default_region |REGION}}}/subnetworks/spark-subnet"
  1. Revisa el siguiente comando antes de ejecutarlo. El comando envía tu secuencia de comandos como un trabajo por lotes y pasa tus variables de entorno como argumentos.

    • --subnet: Esta marca es fundamental. Le indica al trabajo que se ejecute en la spark-subnet segura y personalizada que creó Terraform, lo que es una práctica recomendada de seguridad.
    • --deps-bucket: Esta marca especifica un bucket de GCS para almacenar en etapa intermedia las dependencias del trabajo.
    • --: Este doble guion separa las marcas del comando gcloud de los argumentos que se pasarán directamente a tu secuencia de comandos de PySpark.
  2. Ejecuta el comando para enviar el trabajo:

    gcloud dataproc batches submit pyspark $PYSPARK_SCRIPT_PATH \ --version=2.1 \ --batch="customer-dq-job-$(date +%s)" \ --region={{{project_0.default_region |REGION}}} \ --subnet=$SUBNET_URI \ --deps-bucket=gs://{{{project_0.project_id |PROJECT_ID}}}-main-bucket \ -- \ $BQ_DATASET_TABLE
Nota: El trabajo tardará de 3 a 5 minutos en completarse. Puedes supervisar su progreso en la consola de Google Cloud navegando a Dataproc > Sin servidores > Lotes.

Haz clic en Revisar mi progreso para verificar la tarea realizada.

Ejecutar la canalización por lotes

Tarea 5: Verifica los datos depurados en BigQuery

Ahora que se ejecutó la canalización, verifica que solo se cargaron los registros depurados en BigQuery.

Consulta la tabla de resultados

  1. En Cloud Shell, ejecuta una consulta para contar los registros depurados en la tabla de BigQuery. Deberían ser aproximadamente 800.

    bq query \ --use_legacy_sql=false \ 'SELECT count(*) as total_clean_records FROM `customer_data_clean.valid_customers`;'
  2. Para ver una muestra de los datos depurados, ejecuta el siguiente comando. El resultado mostrará registros que tienen IDs y formatos de correo electrónico válidos.

    bq query \ --use_legacy_sql=false \ 'SELECT * FROM `customer_data_clean.valid_customers` LIMIT 10;'

Haz clic en Revisar mi progreso para verificar la tarea realizada.

Verificar los datos en BigQuery

Tarea 6: Revisa los registros no válidos en la DLQ

Por último, comprueba que los registros que no pasaron las verificaciones de calidad de los datos se hayan enrutado correctamente al bucket de DLQ para analizarlos después.

Inspecciona los archivos de error a través de Cloud Shell

  1. En Cloud Shell, consulta una muestra de los registros no válidos en el bucket de DLQ. El comando head -n 11 mostrará la fila de encabezado más los primeros 10 registros de errores.

    gcloud storage cat gs://{{{project_0.project_id |PROJECT_ID}}}-dlq-bucket/errors/*.csv | head -n 11
  2. El comando debería devolver una muestra de aproximadamente 200 registros que no pasaron la validación. Verás filas con IDs faltantes o correos electrónicos con errores de formato.

    Resultado de ejemplo:

    id,first_name,last_name,email ,Isabella,Smith,<REDACTED_EMAIL> 12,Michael,Johnson, 21,Sophia,Williams,sophia.williams@example

Inspecciona los archivos de error a través de la consola (opcional)

También puedes ver el archivo de errores directamente en la consola de Google Cloud:

  1. En el menú de navegación (☰), ve a Cloud Storage > Buckets.
  2. Haz clic en el nombre del bucket que termina en -dlq-bucket.
  3. Navega a la carpeta errors/.
  4. Haz clic en el nombre del archivo .csv para abrirlo y ver su contenido en el navegador.

¡Felicitaciones!

Creaste y probaste correctamente una canalización de calidad de datos por lotes de nivel de producción.

En este lab, escribiste un trabajo personalizado de PySpark para validar y procesar un archivo de Cloud Storage, cargaste los resultados depurados en una tabla de BigQuery y enrutaste los registros no válidos a un bucket de DLQ, todo en un entorno de red seguro y aprovisionado preliminarmente. Este patrón es un componente fundamental de las plataformas de datos modernas y confiables.

Próximos pasos y más información

Copyright 2026 Google LLC. Todos los derechos reservados. Google y el logotipo de Google son marcas de Google LLC. El resto de los nombres de productos y empresas pueden ser marcas de las respectivas empresas a las que están asociados.

Antes de comenzar

  1. Los labs crean un proyecto de Google Cloud y recursos por un tiempo determinado
  2. .
  3. Los labs tienen un límite de tiempo y no tienen la función de pausa. Si finalizas el lab, deberás reiniciarlo desde el principio.
  4. En la parte superior izquierda de la pantalla, haz clic en Comenzar lab para empezar

Usa la navegación privada

  1. Copia el nombre de usuario y la contraseña proporcionados para el lab
  2. Haz clic en Abrir la consola en modo privado

Accede a la consola

  1. Accede con tus credenciales del lab. Si usas otras credenciales, se generarán errores o se incurrirá en cargos.
  2. Acepta las condiciones y omite la página de recursos de recuperación
  3. No hagas clic en Finalizar lab, a menos que lo hayas terminado o quieras reiniciarlo, ya que se borrará tu trabajo y se quitará el proyecto

Este contenido no está disponible en este momento

Te enviaremos una notificación por correo electrónico cuando esté disponible

¡Genial!

Nos comunicaremos contigo por correo electrónico si está disponible

Un lab a la vez

Confirma para finalizar todos los labs existentes y comenzar este

Usa la navegación privada para ejecutar el lab

Usar una ventana de incógnito o de navegación privada es la mejor forma de ejecutar este lab. Así evitarás cualquier conflicto entre tu cuenta personal y la cuenta de estudiante, lo que podría generar cargos adicionales en tu cuenta personal.