Mettez en pratique vos compétences dans la console Google Cloud
Points de contrôle
Enable the Dataproc API
Vérifier ma progression
/ 20
Create a BigQuery dataset
Vérifier ma progression
/ 20
Prepare the PySpark Data Quality Script
Vérifier ma progression
/ 20
Run the Batch Pipeline
Vérifier ma progression
/ 20
Verify the Data in BigQuery
Vérifier ma progression
/ 20
Instructions et exigences de configuration de l'atelier
Protégez votre compte et votre progression. Utilisez toujours une fenêtre de navigation privée et les identifiants de l'atelier pour exécuter cet atelier.
Valider la qualité des données pour un pipeline de données par lot à l'aide de Serverless pour Apache Spark
Ce contenu n'est pas encore optimisé pour les appareils mobiles.
Pour une expérience optimale, veuillez accéder à notre site sur un ordinateur de bureau en utilisant un lien envoyé par e-mail.
Présentation
Serverless pour Apache Spark de Google Cloud est un service entièrement géré qui simplifie l'exécution des charges de travail par lot Spark sans avoir à gérer l'infrastructure. Ce modèle fournit une approche robuste pour les workflows ETL (Extraire, transformer et charger), en veillant à ce que seules les données de haute qualité soient intégrées à vos systèmes analytiques.
Le défi
Les données brutes ingérées dans un lac de données contiennent souvent des imperfections, comme des valeurs manquantes, des formats incorrects ou des entrées non valides. Si vous chargez ces données directement dans un entrepôt analytique, vous risquez de fausser vos rapports et de prendre de mauvaises décisions commerciales.
La solution
Créer un pipeline automatisé de qualité des données : ce pipeline intercepte les données brutes, applique un ensemble de règles de validation, puis oriente les données de manière intelligente. Les enregistrements valides sont envoyés à l'entrepôt de données de production, tandis que ceux qui ne passent pas la validation sont envoyés vers une file d'attente de lettres mortes pour inspection et correction.
Dans cet atelier, vous allez créer cette solution en exécutant un job PySpark personnalisé sur Serverless pour Apache Spark. Voici ce que vous allez apprendre avec ce job :
Lire un fichier CSV brut à partir d'un bucket Cloud Storage
Appliquer des règles de qualité des données pour valider chaque enregistrement
Charger les enregistrements valides et nettoyés dans une table BigQuery
Écrire les enregistrements non valides dans un bucket DLQ distinct dans Cloud Storage
Ce modèle garantit que votre entrepôt de données reste intact et fournit un processus clair et auditable pour gérer les erreurs de données.
Cas d'utilisation Enterprise
E-commerce : un pipeline valide les données de commande entrantes, en s'assurant que les ID de produit sont valides et que les adresses e-mail des clients sont correctement formatées avant de les charger dans une table BigQuery d'analyse des ventes. Les commandes non valides sont envoyées à une DLQ pour examen manuel.
Santé : un système traite les dossiers des patients, en validant que les codes médicaux existent et que les dates sont au bon format. Les enregistrements contenant des erreurs sont envoyés dans un bucket DLQ sécurisé pour être examinés par l’équipe d’intendance des données afin de garantir la conformité.
Finance : un pipeline quotidien ingère des données boursières et vérifie la présence de valeurs nulles dans des champs critiques tels que close_price. Les données de bandeau incomplètes sont envoyées à une DLQ, ce qui évite la corruption des modèles d'analyse de séries temporelles.
Objectifs
Dans cet atelier, vous allez apprendre à effectuer les tâches suivantes :
Explorer l'environnement d'atelier préconfiguré provisionné par Terraform
Terminer la configuration de l'environnement en créant un ensemble de données BigQuery
Rédiger un script PySpark personnalisé et commenté avec une logique de routage et de qualité des données
Configurer et exécuter un job Spark par lot sur un réseau VPC personnalisé et sécurisé
Vérifier les données nettoyées dans une table BigQuery
Examiner les enregistrements non valides dans la DLQ Cloud Storage
Préparation
Pour chaque atelier, nous vous attribuons un nouveau projet Google Cloud et un nouvel ensemble de ressources pour une durée déterminée, sans frais.
Connectez-vous à Google Skills dans une fenêtre de navigation privée.
Vérifiez le temps imparti pour l'atelier (par exemple : 01:15:00) : vous devez pouvoir le terminer dans ce délai.
Une fois l'atelier lancé, vous ne pouvez pas le mettre en pause. Si nécessaire, vous pourrez le redémarrer, mais vous devrez tout reprendre depuis le début.
Lorsque vous êtes prêt, cliquez sur Démarrer l'atelier.
Notez vos identifiants pour l'atelier (Nom d'utilisateur et Mot de passe). Ils vous serviront à vous connecter à la console Google Cloud.
Cliquez sur Ouvrir la console Google.
Cliquez sur Utiliser un autre compte, puis copiez-collez les identifiants de cet atelier lorsque vous y êtes invité.
Si vous utilisez d'autres identifiants, des messages d'erreur s'afficheront ou des frais seront appliqués.
Acceptez les conditions d'utilisation et ignorez la page concernant les ressources de récupération des données.
Activer Cloud Shell
Cloud Shell est une machine virtuelle qui contient des outils pour les développeurs. Elle comprend un répertoire d'accueil persistant de 5 Go et s'exécute sur Google Cloud. Google Cloud Shell vous permet d'accéder via une ligne de commande à vos ressources Google Cloud. gcloud est l'outil de ligne de commande associé à Google Cloud. Il est préinstallé sur Cloud Shell et permet la saisie semi-automatique via la touche Tabulation.
Dans Google Cloud Console, dans le volet de navigation, cliquez sur Activer Cloud Shell ().
Cliquez sur Continuer.
Le provisionnement et la connexion à l'environnement prennent quelques instants. Une fois connecté, vous êtes en principe authentifié, et le projet est défini sur votre ID_PROJET. Exemple :
Au début de cet atelier, un script Terraform s'exécute pour provisionner automatiquement la plupart des ressources et de l'infrastructure nécessaires. Voici les éléments qui ont été créés pour vous :
ID du projet =
Région =
Zone =
Un réseau VPC personnalisé (spark-network) et un sous-réseau (spark-subnet) configurés avec l'accès réseau requis pour Serverless pour Apache Spark.
Deux buckets Cloud Storage :
Un bucket principal (gs://-main-bucket) utilisé pour stocker votre script PySpark (scripts/), les données brutes d'entrée (source/) et comme zone de préparation temporaire pour le connecteur BigQuery.
Un bucket DLQ (gs://-dlq-bucket) dédié au stockage des enregistrements non valides.
Un fichier de données brutes : un script Python a automatiquement généré et importé un fichier CSV de 1 000 enregistrements (source/customer_contacts_1000.csv) dans le bucket principal. Environ 20 % de ces enregistrements contiennent des imperfections intentionnelles (par exemple, des ID manquants, des adresses e-mail non valides) pour tester votre pipeline.
Votre objectif est d'écrire un script capable d'identifier et de séparer les enregistrements valides des enregistrements non valides, puis de les charger dans les destinations appropriées.
Remarque sur la stratégie de bucket :
Pour simplifier cet atelier, le bucket principal est utilisé pour les scripts, les données sources et la zone de préproduction temporaire de BigQuery. En production, il est recommandé d'utiliser trois buckets distincts : un pour les données brutes/d'entrée, un pour la file d'attente de lettres mortes et un bucket dédié pour les données de préproduction temporaires utilisées par les connecteurs comme le connecteur BigQuery. Cela permet une meilleure isolation, une meilleure sécurité et une meilleure gestion du cycle de vie.
Tâche 1 : Explorer l'environnement et préparer les services
Vous allez d'abord vérifier que les ressources de l'atelier ont été créées correctement et prévisualiser les données sources que vous allez utiliser.
Confirmer les buckets Cloud Storage
Dans la console Google Cloud, accédez au menu de navigation (☰), puis à Cloud Storage > Buckets.
Vérifiez que deux buckets sont affichés : l'un se terminant par -main-bucket et l'autre par -dlq-bucket.
Prévisualiser les données brutes et activer l'API
Activez Cloud Shell.
Exécutez la commande suivante pour afficher l'en-tête et les 10 premiers enregistrements du fichier CSV brut situé dans votre bucket principal.
gsutil cat gs://{{{project_0.project_id|Project_ID}}}-main-bucket/source/customer_contacts_1000.csv | head -n 11
Avant de pouvoir exécuter un job, vous devez activer l'API Dataproc. Exécutez la commande suivante dans Cloud Shell pour l'activer :
gcloud services enable dataproc.googleapis.com
Cliquez sur Vérifier ma progression pour valider la tâche exécutée.
Activer l'API Dataproc
Tâche 2 : Préparer l'environnement BigQuery
Votre script Terraform a configuré le réseau et le stockage, mais vous devez encore créer l'ensemble de données BigQuery de destination dans lequel vos données nettoyées seront chargées.
Créer l'ensemble de données
Dans Cloud Shell, exécutez la commande suivante pour créer un ensemble de données BigQuery nommé customer_data_clean.
bq mk customer_data_clean
Vous pouvez maintenant vérifier que l'ensemble de données a bien été créé dans la console. Dans le menu de navigation (☰), accédez à BigQuery. Dans le panneau Explorateur, cliquez sur la flèche à côté de l'ID de votre projet pour développer son contenu. Vous devriez voir votre nouvel ensemble de données customer_data_clean.
Cliquez sur Vérifier ma progression pour valider la tâche exécutée.
Créer un ensemble de données BigQuery
Tâche 3 : Préparer le script PySpark pour la qualité des données
Créez ensuite le script PySpark personnalisé qui contient la logique de validation des données. La logique du script est simple :
Il lit le fichier CSV source depuis Cloud Storage et le charge dans un DataFrame,
Il applique une série de règles de validation pour vérifier les ID nuls et les formats d'e-mail valides,
Il divise le DataFrame en deux : l'un avec les enregistrements propres et l'autre avec les enregistrements non valides. Enfin,
Il écrit les données nettoyées dans BigQuery et les données non valides dans le bucket DLQ de Cloud Storage.
Écrire et importer le script
Dans Cloud Shell, créez le fichier de script PySpark nommé customer_dq.py.
nano customer_dq.py
Collez le code Python commenté suivant dans l'éditeur nano.
# Import necessary libraries
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
# This script expects 1 command-line argument:
# 1. The destination BigQuery table path in format 'dataset.table'
if len(sys.argv) != 2:
print("Usage: customer_dq.py <bq_dataset_table>")
sys.exit(-1)
# Assign command-line argument to variable
bq_dataset_table = sys.argv[1]
# Qwiklabs variables are substituted here when the lab runs
bq_project = "{{{project_0.project_id|Project_ID}}}"
gcs_source_path = f"gs://{bq_project}-main-bucket/source/customer_contacts_1000.csv"
gcs_dlq_path = f"gs://{bq_project}-dlq-bucket/errors/"
# Initialize a new Spark Session
spark = SparkSession.builder.appName("Customer DQ Check").getOrCreate()
# Step 1: Read the source CSV data from the GCS bucket
df = spark.read.option("header", "true").option("inferSchema", "true").csv(gcs_source_path)
# Step 2: Define the Data Quality rules
# Rule 1: The 'id' column must not be null.
dq_rule_id = col("id").isNotNull()
# Rule 2: The 'email' column must not be null and must match a valid email format regex.
email_regex = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
dq_rule_email = col("email").isNotNull().__and__(col("email").rlike(email_regex))
# Step 3: Apply rules and split the DataFrame into clean and error records
df_with_dq = df.withColumn("dq_passed", when(dq_rule_id.__and__(dq_rule_email), True).otherwise(False))
clean_df = df_with_dq.filter(col("dq_passed") == True).drop("dq_passed")
error_df = df_with_dq.filter(col("dq_passed") == False).drop("dq_passed")
# Step 4: Write the clean records to the specified BigQuery table
# The BigQuery connector requires a temporary GCS bucket NAME.
temp_gcs_bucket_name = f"{bq_project}-main-bucket"
clean_df.write \
.format("bigquery") \
.option("table", bq_dataset_table) \
.option("temporaryGcsBucket", temp_gcs_bucket_name) \
.option("project", bq_project) \
.mode("overwrite") \
.save()
# Step 5: Write the error records to the DLQ bucket in GCS as a single CSV file
error_df.repartition(1).write \
.option("header", "true") \
.mode("overwrite") \
.csv(gcs_dlq_path)
# Stop the Spark session
spark.stop()
Remarque importante : Assurez-vous que la dernière ligne du script est spark.stop(). Supprimez tout ce qui se trouve en dessous, comme </bq_dataset_table>.
Appuyez sur CTRL+X, puis sur Y et sur Entrée pour enregistrer et quitter nano.
Importez votre nouveau script PySpark dans le bucket Cloud Storage principal.
# The command below uploads the script to a 'scripts' folder in the main data bucket
gcloud storage cp customer_dq.py gs://{{{project_0.project_id|Project_ID}}}-main-bucket/scripts/
Cliquez sur Vérifier ma progression pour valider la tâche exécutée.
Préparer le script PySpark pour la qualité des données
Tâche 4 : Configurer et exécuter le pipeline par lot
Une fois le script importé, vous pouvez configurer le job et l'envoyer à Serverless pour Apache Spark.
Exécuter le job Spark
1. Définissez les variables d'environnement suivantes dans Cloud Shell. Ces variables créent des raccourcis vers les ressources provisionnées par Terraform.
# The name for the final table in BigQuery
export BQ_TABLE="valid_customers"
# The BigQuery table path in 'dataset.table' format
export BQ_DATASET_TABLE="customer_data_clean.${BQ_TABLE}"
# The path to the 1000-record source CSV file
export GCS_SOURCE_PATH="gs://{{{project_0.project_id|Project_ID}}}-main-bucket/source/customer_contacts_1000.csv"
# The GCS path where error records will be written
export GCS_DLQ_PATH="gs://{{{project_0.project_id|Project_ID}}}-dlq-bucket/errors/"
# The GCS path to the PySpark script you just uploaded
export PYSPARK_SCRIPT_PATH="gs://{{{project_0.project_id|Project_ID}}}-main-bucket/scripts/customer_dq.py"
# The full URI of the custom subnet created by Terraform
export SUBNET_URI="projects/{{{project_0.project_id |PROJECT_ID}}}/regions/{{{project_0.default_region |REGION}}}/subnetworks/spark-subnet"
Examinez la commande ci-dessous avant de l'exécuter. Elle envoie votre script en tant que job par lot et transmet vos variables d'environnement en tant qu'arguments.
--subnet : cet indicateur est essentiel. Il indique au job de s'exécuter dans le spark-subnet sécurisé et personnalisé créé par Terraform, ce qui constitue une bonne pratique de sécurité.
--deps-bucket : cet indicateur spécifie un bucket GCS pour la préproduction des dépendances du job.
-- : ce double tiret sépare les indicateurs de la commande gcloud des indicateurs qui seront transmis directement à votre script PySpark.
Remarque : L'exécution du job prend entre trois et cinq minutes. Vous pouvez suivre sa progression dans la console Google Cloud en accédant à Dataproc > Sans serveur > Lots.
Cliquez sur Vérifier ma progression pour valider la tâche exécutée.
Exécuter le pipeline de traitement par lots
Tâche 5 : Vérifier les données nettoyées dans BigQuery
Maintenant que le pipeline s'est exécuté, vérifiez que seuls les enregistrements propres ont été chargés dans BigQuery.
Interroger la table de résultats
Dans Cloud Shell, exécutez une requête pour compter les enregistrements nettoyés dans la table BigQuery. Le nombre doit être d'environ 800.
bq query \
--use_legacy_sql=false \
'SELECT count(*) as total_clean_records FROM `customer_data_clean.valid_customers`;'
Pour afficher un échantillon des données nettoyées, exécutez la commande suivante : Le résultat affiche les enregistrements dont les ID et les adresses e-mail sont valides.
Cliquez sur Vérifier ma progression pour valider la tâche exécutée.
Vérifier les données dans BigQuery
Tâche 6 : Examiner les enregistrements non valides dans la DLQ
Enfin, vérifiez que les enregistrements qui n'ont pas passé les contrôles de qualité des données ont bien été acheminés vers le bucket DLQ pour une analyse ultérieure.
Inspecter les fichiers d'erreur via Cloud Shell
Dans Cloud Shell, affichez un échantillon des enregistrements non valides dans le bucket DLQ. La commande head -n 11 affichera la ligne d'en-tête et les 10 premiers enregistrements d'erreur.
gcloud storage cat gs://{{{project_0.project_id |PROJECT_ID}}}-dlq-bucket/errors/*.csv | head -n 11
La commande doit renvoyer un échantillon des 200 enregistrements dont la validation a échoué. Vous verrez des lignes avec des ID manquants ou des adresses e-mail mal formées.
(Facultatif) Inspecter les fichiers d'erreur via la console
Vous pouvez également afficher le fichier d'erreurs directement dans Google Cloud Console :
Dans le menu de navigation (☰), accédez à Cloud Storage > Buckets.
Cliquez sur le nom du bucket se terminant par -dlq-bucket.
Accédez au dossier errors/.
Cliquez sur le nom du fichier .csv pour l'ouvrir et afficher son contenu dans le navigateur.
Félicitations !
Vous avez créé et testé un pipeline de qualité des données par lot de niveau production.
Dans cet atelier, vous avez écrit un job PySpark personnalisé pour valider et traiter un fichier provenant de Cloud Storage, chargé les résultats nettoyés dans une table BigQuery et acheminé les enregistrements non valides vers un bucket DLQ, le tout dans un environnement réseau sécurisé préprovisionné. Ce modèle est un composant fondamental des plates-formes de données modernes et fiables.
Étapes suivantes et informations supplémentaires
Pour en savoir plus sur le connecteur BigQuery pour Spark, consultez la documentation Google Cloud officielle.
Copyright 2026 Google LLC Tous droits réservés. Google et le logo Google sont des marques de Google LLC. Tous les autres noms de société et de produit peuvent être des marques des sociétés auxquelles ils sont associés.
Les ateliers créent un projet Google Cloud et des ressources pour une durée déterminée.
Les ateliers doivent être effectués dans le délai imparti et ne peuvent pas être mis en pause. Si vous quittez l'atelier, vous devrez le recommencer depuis le début.
En haut à gauche de l'écran, cliquez sur Démarrer l'atelier pour commencer.
Utilisez la navigation privée
Copiez le nom d'utilisateur et le mot de passe fournis pour l'atelier
Cliquez sur Ouvrir la console en navigation privée
Connectez-vous à la console
Connectez-vous à l'aide des identifiants qui vous ont été attribués pour l'atelier. L'utilisation d'autres identifiants peut entraîner des erreurs ou des frais.
Acceptez les conditions d'utilisation et ignorez la page concernant les ressources de récupération des données.
Ne cliquez pas sur Terminer l'atelier, à moins que vous n'ayez terminé l'atelier ou que vous ne vouliez le recommencer, car cela effacera votre travail et supprimera le projet.
Ce contenu n'est pas disponible pour le moment
Nous vous préviendrons par e-mail lorsqu'il sera disponible
Parfait !
Nous vous contacterons par e-mail s'il devient disponible
Un atelier à la fois
Confirmez pour mettre fin à tous les ateliers existants et démarrer celui-ci
Utilisez la navigation privée pour effectuer l'atelier
Le meilleur moyen d'exécuter cet atelier consiste à utiliser une fenêtre de navigation privée. Vous éviterez ainsi les conflits entre votre compte personnel et le compte temporaire de participant, qui pourraient entraîner des frais supplémentaires facturés sur votre compte personnel.
Dans cet atelier pratique, vous allez évaluer et valider la qualité des données dans un pipeline de données par lot à l'aide de Serverless pour Apache Spark.
Durée :
4 min de configuration
·
Accessible pendant 90 min
·
Terminé après 60 min