Instructions et exigences de configuration de l'atelier
Protégez votre compte et votre progression. Utilisez toujours une fenêtre de navigation privée et les identifiants de l'atelier pour exécuter cet atelier.

Valider la qualité des données pour un pipeline de données par lot à l'aide de Serverless pour Apache Spark

Atelier 1 heure universal_currency_alt 5 crédits show_chart Intermédiaire
info Cet atelier peut intégrer des outils d'IA pour vous accompagner dans votre apprentissage.
Ce contenu n'est pas encore optimisé pour les appareils mobiles.
Pour une expérience optimale, veuillez accéder à notre site sur un ordinateur de bureau en utilisant un lien envoyé par e-mail.

Présentation

Serverless pour Apache Spark de Google Cloud est un service entièrement géré qui simplifie l'exécution des charges de travail par lot Spark sans avoir à gérer l'infrastructure. Ce modèle fournit une approche robuste pour les workflows ETL (Extraire, transformer et charger), en veillant à ce que seules les données de haute qualité soient intégrées à vos systèmes analytiques.

Le défi

Les données brutes ingérées dans un lac de données contiennent souvent des imperfections, comme des valeurs manquantes, des formats incorrects ou des entrées non valides. Si vous chargez ces données directement dans un entrepôt analytique, vous risquez de fausser vos rapports et de prendre de mauvaises décisions commerciales.

La solution

Créer un pipeline automatisé de qualité des données : ce pipeline intercepte les données brutes, applique un ensemble de règles de validation, puis oriente les données de manière intelligente. Les enregistrements valides sont envoyés à l'entrepôt de données de production, tandis que ceux qui ne passent pas la validation sont envoyés vers une file d'attente de lettres mortes pour inspection et correction.

Dans cet atelier, vous allez créer cette solution en exécutant un job PySpark personnalisé sur Serverless pour Apache Spark. Voici ce que vous allez apprendre avec ce job :

  1. Lire un fichier CSV brut à partir d'un bucket Cloud Storage
  2. Appliquer des règles de qualité des données pour valider chaque enregistrement
  3. Charger les enregistrements valides et nettoyés dans une table BigQuery
  4. Écrire les enregistrements non valides dans un bucket DLQ distinct dans Cloud Storage

Ce modèle garantit que votre entrepôt de données reste intact et fournit un processus clair et auditable pour gérer les erreurs de données.

Cas d'utilisation Enterprise

  • E-commerce : un pipeline valide les données de commande entrantes, en s'assurant que les ID de produit sont valides et que les adresses e-mail des clients sont correctement formatées avant de les charger dans une table BigQuery d'analyse des ventes. Les commandes non valides sont envoyées à une DLQ pour examen manuel.
  • Santé : un système traite les dossiers des patients, en validant que les codes médicaux existent et que les dates sont au bon format. Les enregistrements contenant des erreurs sont envoyés dans un bucket DLQ sécurisé pour être examinés par l’équipe d’intendance des données afin de garantir la conformité.
  • Finance : un pipeline quotidien ingère des données boursières et vérifie la présence de valeurs nulles dans des champs critiques tels que close_price. Les données de bandeau incomplètes sont envoyées à une DLQ, ce qui évite la corruption des modèles d'analyse de séries temporelles.

Objectifs

Dans cet atelier, vous allez apprendre à effectuer les tâches suivantes :

  • Explorer l'environnement d'atelier préconfiguré provisionné par Terraform
  • Terminer la configuration de l'environnement en créant un ensemble de données BigQuery
  • Rédiger un script PySpark personnalisé et commenté avec une logique de routage et de qualité des données
  • Configurer et exécuter un job Spark par lot sur un réseau VPC personnalisé et sécurisé
  • Vérifier les données nettoyées dans une table BigQuery
  • Examiner les enregistrements non valides dans la DLQ Cloud Storage

Préparation

Pour chaque atelier, nous vous attribuons un nouveau projet Google Cloud et un nouvel ensemble de ressources pour une durée déterminée, sans frais.

  1. Connectez-vous à Google Skills dans une fenêtre de navigation privée.

  2. Vérifiez le temps imparti pour l'atelier (par exemple : 01:15:00) : vous devez pouvoir le terminer dans ce délai.
    Une fois l'atelier lancé, vous ne pouvez pas le mettre en pause. Si nécessaire, vous pourrez le redémarrer, mais vous devrez tout reprendre depuis le début.

  3. Lorsque vous êtes prêt, cliquez sur Démarrer l'atelier.

  4. Notez vos identifiants pour l'atelier (Nom d'utilisateur et Mot de passe). Ils vous serviront à vous connecter à la console Google Cloud.

  5. Cliquez sur Ouvrir la console Google.

  6. Cliquez sur Utiliser un autre compte, puis copiez-collez les identifiants de cet atelier lorsque vous y êtes invité.
    Si vous utilisez d'autres identifiants, des messages d'erreur s'afficheront ou des frais seront appliqués.

  7. Acceptez les conditions d'utilisation et ignorez la page concernant les ressources de récupération des données.

Activer Cloud Shell

Cloud Shell est une machine virtuelle qui contient des outils pour les développeurs. Elle comprend un répertoire d'accueil persistant de 5 Go et s'exécute sur Google Cloud. Google Cloud Shell vous permet d'accéder via une ligne de commande à vos ressources Google Cloud. gcloud est l'outil de ligne de commande associé à Google Cloud. Il est préinstallé sur Cloud Shell et permet la saisie semi-automatique via la touche Tabulation.

  1. Dans Google Cloud Console, dans le volet de navigation, cliquez sur Activer Cloud Shell (Icône Cloud Shell).

  2. Cliquez sur Continuer.
    Le provisionnement et la connexion à l'environnement prennent quelques instants. Une fois connecté, vous êtes en principe authentifié, et le projet est défini sur votre ID_PROJET. Exemple :

Terminal Cloud Shell

Exemples de commandes

  • Afficher le nom du compte actif :

gcloud auth list

(Résultat)

Credentialed accounts: - <myaccount>@<mydomain>.com (active)

(Exemple de résultat)

Credentialed accounts: - google1623327_student@qwiklabs.net
  • Afficher l'ID du projet :

gcloud config list project

(Résultat)

[core] project = <ID_Projet>

(Exemple de résultat)

[core] project = qwiklabs-gcp-44776a13dea667a6

Votre environnement pour l'atelier

Au début de cet atelier, un script Terraform s'exécute pour provisionner automatiquement la plupart des ressources et de l'infrastructure nécessaires. Voici les éléments qui ont été créés pour vous :

  • ID du projet =

  • Région =

  • Zone =

  • Un réseau VPC personnalisé (spark-network) et un sous-réseau (spark-subnet) configurés avec l'accès réseau requis pour Serverless pour Apache Spark.

  • Deux buckets Cloud Storage :

    1. Un bucket principal (gs://-main-bucket) utilisé pour stocker votre script PySpark (scripts/), les données brutes d'entrée (source/) et comme zone de préparation temporaire pour le connecteur BigQuery.
    2. Un bucket DLQ (gs://-dlq-bucket) dédié au stockage des enregistrements non valides.
  • Un fichier de données brutes : un script Python a automatiquement généré et importé un fichier CSV de 1 000 enregistrements (source/customer_contacts_1000.csv) dans le bucket principal. Environ 20 % de ces enregistrements contiennent des imperfections intentionnelles (par exemple, des ID manquants, des adresses e-mail non valides) pour tester votre pipeline.

Votre objectif est d'écrire un script capable d'identifier et de séparer les enregistrements valides des enregistrements non valides, puis de les charger dans les destinations appropriées.

Remarque sur la stratégie de bucket :

Pour simplifier cet atelier, le bucket principal est utilisé pour les scripts, les données sources et la zone de préproduction temporaire de BigQuery. En production, il est recommandé d'utiliser trois buckets distincts : un pour les données brutes/d'entrée, un pour la file d'attente de lettres mortes et un bucket dédié pour les données de préproduction temporaires utilisées par les connecteurs comme le connecteur BigQuery. Cela permet une meilleure isolation, une meilleure sécurité et une meilleure gestion du cycle de vie.

Tâche 1 : Explorer l'environnement et préparer les services

Vous allez d'abord vérifier que les ressources de l'atelier ont été créées correctement et prévisualiser les données sources que vous allez utiliser.

Confirmer les buckets Cloud Storage

  1. Dans la console Google Cloud, accédez au menu de navigation (☰), puis à Cloud Storage > Buckets.
  2. Vérifiez que deux buckets sont affichés : l'un se terminant par -main-bucket et l'autre par -dlq-bucket.

Prévisualiser les données brutes et activer l'API

Cliquez sur l&#39;icône Cloud Shell.

  1. Activez Cloud Shell.

  2. Exécutez la commande suivante pour afficher l'en-tête et les 10 premiers enregistrements du fichier CSV brut situé dans votre bucket principal.

    gsutil cat gs://{{{project_0.project_id|Project_ID}}}-main-bucket/source/customer_contacts_1000.csv | head -n 11
  3. Avant de pouvoir exécuter un job, vous devez activer l'API Dataproc. Exécutez la commande suivante dans Cloud Shell pour l'activer :

    gcloud services enable dataproc.googleapis.com

Cliquez sur Vérifier ma progression pour valider la tâche exécutée.

Activer l'API Dataproc

Tâche 2 : Préparer l'environnement BigQuery

Votre script Terraform a configuré le réseau et le stockage, mais vous devez encore créer l'ensemble de données BigQuery de destination dans lequel vos données nettoyées seront chargées.

Créer l'ensemble de données

  1. Dans Cloud Shell, exécutez la commande suivante pour créer un ensemble de données BigQuery nommé customer_data_clean.

    bq mk customer_data_clean
  2. Vous pouvez maintenant vérifier que l'ensemble de données a bien été créé dans la console. Dans le menu de navigation (☰), accédez à BigQuery. Dans le panneau Explorateur, cliquez sur la flèche à côté de l'ID de votre projet pour développer son contenu. Vous devriez voir votre nouvel ensemble de données customer_data_clean.

Cliquez sur Vérifier ma progression pour valider la tâche exécutée.

Créer un ensemble de données BigQuery

Tâche 3 : Préparer le script PySpark pour la qualité des données

Créez ensuite le script PySpark personnalisé qui contient la logique de validation des données. La logique du script est simple :

  • Il lit le fichier CSV source depuis Cloud Storage et le charge dans un DataFrame,
  • Il applique une série de règles de validation pour vérifier les ID nuls et les formats d'e-mail valides,
  • Il divise le DataFrame en deux : l'un avec les enregistrements propres et l'autre avec les enregistrements non valides. Enfin,
  • Il écrit les données nettoyées dans BigQuery et les données non valides dans le bucket DLQ de Cloud Storage.

Écrire et importer le script

  1. Dans Cloud Shell, créez le fichier de script PySpark nommé customer_dq.py.

    nano customer_dq.py
  2. Collez le code Python commenté suivant dans l'éditeur nano.

    # Import necessary libraries import sys from pyspark.sql import SparkSession from pyspark.sql.functions import col, when # This script expects 1 command-line argument: # 1. The destination BigQuery table path in format 'dataset.table' if len(sys.argv) != 2: print("Usage: customer_dq.py <bq_dataset_table>") sys.exit(-1) # Assign command-line argument to variable bq_dataset_table = sys.argv[1] # Qwiklabs variables are substituted here when the lab runs bq_project = "{{{project_0.project_id|Project_ID}}}" gcs_source_path = f"gs://{bq_project}-main-bucket/source/customer_contacts_1000.csv" gcs_dlq_path = f"gs://{bq_project}-dlq-bucket/errors/" # Initialize a new Spark Session spark = SparkSession.builder.appName("Customer DQ Check").getOrCreate() # Step 1: Read the source CSV data from the GCS bucket df = spark.read.option("header", "true").option("inferSchema", "true").csv(gcs_source_path) # Step 2: Define the Data Quality rules # Rule 1: The 'id' column must not be null. dq_rule_id = col("id").isNotNull() # Rule 2: The 'email' column must not be null and must match a valid email format regex. email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" dq_rule_email = col("email").isNotNull().__and__(col("email").rlike(email_regex)) # Step 3: Apply rules and split the DataFrame into clean and error records df_with_dq = df.withColumn("dq_passed", when(dq_rule_id.__and__(dq_rule_email), True).otherwise(False)) clean_df = df_with_dq.filter(col("dq_passed") == True).drop("dq_passed") error_df = df_with_dq.filter(col("dq_passed") == False).drop("dq_passed") # Step 4: Write the clean records to the specified BigQuery table # The BigQuery connector requires a temporary GCS bucket NAME. temp_gcs_bucket_name = f"{bq_project}-main-bucket" clean_df.write \ .format("bigquery") \ .option("table", bq_dataset_table) \ .option("temporaryGcsBucket", temp_gcs_bucket_name) \ .option("project", bq_project) \ .mode("overwrite") \ .save() # Step 5: Write the error records to the DLQ bucket in GCS as a single CSV file error_df.repartition(1).write \ .option("header", "true") \ .mode("overwrite") \ .csv(gcs_dlq_path) # Stop the Spark session spark.stop()

Remarque importante : Assurez-vous que la dernière ligne du script est spark.stop(). Supprimez tout ce qui se trouve en dessous, comme </bq_dataset_table>.

  1. Appuyez sur CTRL+X, puis sur Y et sur Entrée pour enregistrer et quitter nano.

  2. Importez votre nouveau script PySpark dans le bucket Cloud Storage principal.

    # The command below uploads the script to a 'scripts' folder in the main data bucket gcloud storage cp customer_dq.py gs://{{{project_0.project_id|Project_ID}}}-main-bucket/scripts/

Cliquez sur Vérifier ma progression pour valider la tâche exécutée.

Préparer le script PySpark pour la qualité des données

Tâche 4 : Configurer et exécuter le pipeline par lot

Une fois le script importé, vous pouvez configurer le job et l'envoyer à Serverless pour Apache Spark.

Exécuter le job Spark

1.  Définissez les variables d'environnement suivantes dans Cloud Shell. Ces variables créent des raccourcis vers les ressources provisionnées par Terraform.

# The name for the final table in BigQuery export BQ_TABLE="valid_customers" # The BigQuery table path in 'dataset.table' format export BQ_DATASET_TABLE="customer_data_clean.${BQ_TABLE}" # The path to the 1000-record source CSV file export GCS_SOURCE_PATH="gs://{{{project_0.project_id|Project_ID}}}-main-bucket/source/customer_contacts_1000.csv" # The GCS path where error records will be written export GCS_DLQ_PATH="gs://{{{project_0.project_id|Project_ID}}}-dlq-bucket/errors/" # The GCS path to the PySpark script you just uploaded export PYSPARK_SCRIPT_PATH="gs://{{{project_0.project_id|Project_ID}}}-main-bucket/scripts/customer_dq.py" # The full URI of the custom subnet created by Terraform export SUBNET_URI="projects/{{{project_0.project_id |PROJECT_ID}}}/regions/{{{project_0.default_region |REGION}}}/subnetworks/spark-subnet"
  1. Examinez la commande ci-dessous avant de l'exécuter. Elle envoie votre script en tant que job par lot et transmet vos variables d'environnement en tant qu'arguments.

    • --subnet : cet indicateur est essentiel. Il indique au job de s'exécuter dans le spark-subnet sécurisé et personnalisé créé par Terraform, ce qui constitue une bonne pratique de sécurité.
    • --deps-bucket : cet indicateur spécifie un bucket GCS pour la préproduction des dépendances du job.
    • -- : ce double tiret sépare les indicateurs de la commande gcloud des indicateurs qui seront transmis directement à votre script PySpark.
  2. Exécutez la commande pour envoyer le job :

    gcloud dataproc batches submit pyspark $PYSPARK_SCRIPT_PATH \ --version=2.1 \ --batch="customer-dq-job-$(date +%s)" \ --region={{{project_0.default_region |REGION}}} \ --subnet=$SUBNET_URI \ --deps-bucket=gs://{{{project_0.project_id |PROJECT_ID}}}-main-bucket \ -- \ $BQ_DATASET_TABLE
Remarque : L'exécution du job prend entre trois et cinq minutes. Vous pouvez suivre sa progression dans la console Google Cloud en accédant à Dataproc > Sans serveur > Lots.

Cliquez sur Vérifier ma progression pour valider la tâche exécutée.

Exécuter le pipeline de traitement par lots

Tâche 5 : Vérifier les données nettoyées dans BigQuery

Maintenant que le pipeline s'est exécuté, vérifiez que seuls les enregistrements propres ont été chargés dans BigQuery.

Interroger la table de résultats

  1. Dans Cloud Shell, exécutez une requête pour compter les enregistrements nettoyés dans la table BigQuery. Le nombre doit être d'environ 800.

    bq query \ --use_legacy_sql=false \ 'SELECT count(*) as total_clean_records FROM `customer_data_clean.valid_customers`;'
  2. Pour afficher un échantillon des données nettoyées, exécutez la commande suivante : Le résultat affiche les enregistrements dont les ID et les adresses e-mail sont valides.

    bq query \ --use_legacy_sql=false \ 'SELECT * FROM `customer_data_clean.valid_customers` LIMIT 10;'

Cliquez sur Vérifier ma progression pour valider la tâche exécutée.

Vérifier les données dans BigQuery

Tâche 6 : Examiner les enregistrements non valides dans la DLQ

Enfin, vérifiez que les enregistrements qui n'ont pas passé les contrôles de qualité des données ont bien été acheminés vers le bucket DLQ pour une analyse ultérieure.

Inspecter les fichiers d'erreur via Cloud Shell

  1. Dans Cloud Shell, affichez un échantillon des enregistrements non valides dans le bucket DLQ. La commande head -n 11 affichera la ligne d'en-tête et les 10 premiers enregistrements d'erreur.

    gcloud storage cat gs://{{{project_0.project_id |PROJECT_ID}}}-dlq-bucket/errors/*.csv | head -n 11
  2. La commande doit renvoyer un échantillon des 200 enregistrements dont la validation a échoué. Vous verrez des lignes avec des ID manquants ou des adresses e-mail mal formées.

    Exemple de résultat :

    id,first_name,last_name,email ,Isabella,Smith,<REDACTED_EMAIL> 12,Michael,Johnson, 21,Sophia,Williams,sophia.williams@example

(Facultatif) Inspecter les fichiers d'erreur via la console

Vous pouvez également afficher le fichier d'erreurs directement dans Google Cloud Console :

  1. Dans le menu de navigation (☰), accédez à Cloud Storage > Buckets.
  2. Cliquez sur le nom du bucket se terminant par -dlq-bucket.
  3. Accédez au dossier errors/.
  4. Cliquez sur le nom du fichier .csv pour l'ouvrir et afficher son contenu dans le navigateur.

Félicitations !

Vous avez créé et testé un pipeline de qualité des données par lot de niveau production.

Dans cet atelier, vous avez écrit un job PySpark personnalisé pour valider et traiter un fichier provenant de Cloud Storage, chargé les résultats nettoyés dans une table BigQuery et acheminé les enregistrements non valides vers un bucket DLQ, le tout dans un environnement réseau sécurisé préprovisionné. Ce modèle est un composant fondamental des plates-formes de données modernes et fiables.

Étapes suivantes et informations supplémentaires

Copyright 2026 Google LLC Tous droits réservés. Google et le logo Google sont des marques de Google LLC. Tous les autres noms de société et de produit peuvent être des marques des sociétés auxquelles ils sont associés.

Avant de commencer

  1. Les ateliers créent un projet Google Cloud et des ressources pour une durée déterminée.
  2. Les ateliers doivent être effectués dans le délai imparti et ne peuvent pas être mis en pause. Si vous quittez l'atelier, vous devrez le recommencer depuis le début.
  3. En haut à gauche de l'écran, cliquez sur Démarrer l'atelier pour commencer.

Utilisez la navigation privée

  1. Copiez le nom d'utilisateur et le mot de passe fournis pour l'atelier
  2. Cliquez sur Ouvrir la console en navigation privée

Connectez-vous à la console

  1. Connectez-vous à l'aide des identifiants qui vous ont été attribués pour l'atelier. L'utilisation d'autres identifiants peut entraîner des erreurs ou des frais.
  2. Acceptez les conditions d'utilisation et ignorez la page concernant les ressources de récupération des données.
  3. Ne cliquez pas sur Terminer l'atelier, à moins que vous n'ayez terminé l'atelier ou que vous ne vouliez le recommencer, car cela effacera votre travail et supprimera le projet.

Ce contenu n'est pas disponible pour le moment

Nous vous préviendrons par e-mail lorsqu'il sera disponible

Parfait !

Nous vous contacterons par e-mail s'il devient disponible

Un atelier à la fois

Confirmez pour mettre fin à tous les ateliers existants et démarrer celui-ci

Utilisez la navigation privée pour effectuer l'atelier

Le meilleur moyen d'exécuter cet atelier consiste à utiliser une fenêtre de navigation privée. Vous éviterez ainsi les conflits entre votre compte personnel et le compte temporaire de participant, qui pourraient entraîner des frais supplémentaires facturés sur votre compte personnel.