Instructions et exigences de configuration de l'atelier
Protégez votre compte et votre progression. Utilisez toujours une fenêtre de navigation privée et les identifiants de l'atelier pour exécuter cet atelier.

Traitement des données sans serveur avec Dataflow : pipeline d'analyse de flux avancé avec Dataflow (Python)

Atelier 2 heures universal_currency_alt 5 crédits show_chart Avancé
info Cet atelier peut intégrer des outils d'IA pour vous accompagner dans votre apprentissage.
Ce contenu n'est pas encore optimisé pour les appareils mobiles.
Pour une expérience optimale, veuillez accéder à notre site sur un ordinateur de bureau en utilisant un lien envoyé par e-mail.

Présentation

Au cours de cet atelier, vous allez :

  • Gérer les données arrivant en retard.
  • Gérer les données non conformes en :
  • écrivant une transformation composite pour rendre le code plus modulaire ;
  • écrivant une transformation qui génère plusieurs sorties de différents types ;
  • collectant les données non conformes et en les enregistrant dans un emplacement où elles pourront être examinées.

À la fin de l'atelier précédent, nous évoquions un défi typique des pipelines en temps réel : le décalage entre le moment où les événements se produisent et celui où ils sont traités, également appelé "latence". Cet atelier présente des concepts d'Apache Beam qui permettent aux concepteurs de pipelines de définir, de manière formelle, comment les pipelines doivent gérer cette latence.

Mais la latence n'est pas le seul problème que les pipelines sont susceptibles de rencontrer dans un contexte de flux : chaque fois qu'une entrée provient de l'extérieur du système, il existe toujours un risque qu'elle soit, d'une manière ou d'une autre, non conforme. Cet atelier présente également des techniques pour traiter ces entrées.

Le pipeline obtenu à la fin de cet atelier sera semblable à celui illustré ci-dessous. Notez qu'il comporte une branche.

Flux du pipeline commençant par ReadPubSubMessages, avec une branche se terminant par WriteToBQ et une autre branche se terminant par WriteDeadLetterStorage

Préparation

Avant de cliquer sur le bouton "Démarrer l'atelier"

Remarque : Lisez ces instructions.

Les ateliers sont minutés, et vous ne pouvez pas les mettre en pause. Le minuteur, qui démarre lorsque vous cliquez sur Démarrer l'atelier, indique combien de temps les ressources Google Cloud resteront accessibles.

Cet atelier pratique Google Skills vous permet de suivre vous-même les activités dans un véritable environnement cloud, et non dans un environnement de simulation ou de démonstration. Nous vous fournissons des identifiants temporaires pour vous connecter à Google Cloud le temps de l'atelier.

Conditions requises

Pour réaliser cet atelier, vous devez :

  • avoir accès à un navigateur Internet standard (nous vous recommandons d'utiliser Chrome) ;
  • disposer de suffisamment de temps pour effectuer l'atelier en une fois.
Remarque : Si vous possédez déjà votre propre compte ou projet Google Cloud, veillez à ne pas l'utiliser pour réaliser cet atelier. Remarque : Si vous utilisez un Pixelbook, veuillez exécuter cet atelier dans une fenêtre de navigation privée.

Démarrer votre atelier et vous connecter à la console

  1. Cliquez sur le bouton Démarrer l'atelier. Si l'atelier est payant, un pop-up s'affiche pour vous permettre de sélectionner un mode de paiement. Sur la gauche, vous verrez un panneau contenant les identifiants temporaires à utiliser pour cet atelier.

    Panneau d'identifiants

  2. Copiez le nom d'utilisateur, puis cliquez sur Ouvrir la console Google. L'atelier lance les ressources, puis la page Sélectionner un compte dans un nouvel onglet.

    Remarque : Ouvrez les onglets dans des fenêtres distinctes, placées côte à côte.
  3. Sur la page "Sélectionner un compte", cliquez sur Utiliser un autre compte. La page de connexion s'affiche.

    Boîte de dialogue "Sélectionner un compte" avec l'option "Utiliser un autre compte" encadrée.

  4. Collez le nom d'utilisateur que vous avez copié dans le panneau "Détails de connexion". Copiez et collez ensuite le mot de passe.

Remarque : Vous devez utiliser les identifiants fournis dans le panneau "Détails de connexion", et non ceux de votre compte Google Skills. Si vous possédez un compte Google Cloud, ne vous en servez pas pour cet atelier (vous éviterez ainsi que des frais vous soient facturés).
  1. Accédez aux pages suivantes :
  • Acceptez les conditions d'utilisation.
  • N'ajoutez pas d'options de récupération ni d'authentification à deux facteurs (ce compte est temporaire).
  • Ne vous inscrivez pas aux essais sans frais.

Après quelques instants, la console Cloud s'ouvre dans cet onglet.

Remarque : Vous pouvez afficher le menu qui contient la liste des produits et services Google Cloud en cliquant sur le menu de navigation en haut à gauche. Menu de la console Cloud

Configuration de l'environnement de développement des instances Workbench

Dans cet atelier, vous exécuterez toutes les commandes dans un terminal intégré au notebook de votre instance.

  1. Dans le menu de navigation (Menu de navigation) de la console Google Cloud, sélectionnez Vertex AI.

  2. Cliquez sur Activer toutes les API recommandées.

  3. Dans le menu de navigation, cliquez sur Workbench.

    En haut de la page "Workbench", vérifiez que vous vous trouvez dans la vue Instances.

  4. Cliquez sur boîte de dialogue d'ajoutCréer.

  5. Configurez l'instance :

    • Nom : lab-workbench
    • Région : définissez la région sur
    • Zone : définissez la zone sur
    • Options avancées (facultatif) : si nécessaire, cliquez sur "Options avancées" pour une personnalisation plus avancée (par exemple, type de machine, taille du disque).

Créer une instance Vertex AI Workbench

  1. Cliquez sur Créer.

La création de l'instance prend quelques minutes. Une coche verte apparaît à côté de son nom quand elle est prête.

  1. Cliquez sur Ouvrir JupyterLab à côté du nom de l'instance pour lancer l'interface JupyterLab. Un nouvel onglet s'ouvre alors dans votre navigateur.

Instance Workbench déployée

  1. Cliquez ensuite sur Terminal. Le terminal qui s'affiche vous permet d'exécuter toutes les commandes de cet atelier.

Télécharger le dépôt de code

Maintenant, vous allez télécharger le dépôt de code que vous utiliserez dans cet atelier.

  1. Dans le terminal que vous venez d'ouvrir, saisissez le code suivant :
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. Dans le panneau de gauche de votre environnement de notebook, dans l'explorateur de fichiers, vous pouvez noter que le dépôt training-data-analyst a été ajouté.

  2. Accédez au dépôt cloné /training-data-analyst/quests/dataflow_python/. Chaque atelier correspond à un dossier, divisé en deux sous-dossiers : le premier, intitulé lab, contient du code que vous devez compléter, tandis que le second, nommé solution, comporte un exemple concret que vous pouvez consulter si vous rencontrez des difficultés.

Option "Explorer" mise en évidence dans le menu "Affichage" développé

Remarque : Si vous souhaitez ouvrir un fichier pour le modifier, il vous suffit de le trouver et de cliquer dessus. Une fois ce fichier ouvert, vous pourrez y ajouter ou modifier le code.

Cliquez sur Vérifier ma progression pour valider l'objectif. Créer une instance de notebook et cloner le dépôt du cours

Partie 1 : Gérer les données arrivant en retard

Dans les ateliers précédents, vous avez écrit du code qui répartissait les éléments selon l'heure d'événement en fenêtres de largeur fixe, à l'aide d'un code semblable à celui-ci :

parsed_msgs | "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(window_duration)) | "CountPerMinute" >> beam.CombineGlobally(CountCombineFn()).without_defaults()

Cependant, comme vous l'avez constaté à la fin du dernier atelier non SQL, les flux de données présentent souvent une latence. La latence est problématique lors du fenêtrage selon l'heure d'événement (par opposition à l'heure de traitement), car il introduit une incertitude : tous les événements correspondant à un instant donné ont-ils bien été reçus ?

De toute évidence, pour produire des résultats, le pipeline que vous avez écrit a dû trancher sur la question. Pour ce faire, il a utilisé ce que l'on appelle une "watermark". Une watermark est une estimation heuristique du système indiquant le moment où l'on peut considérer que toutes les données jusqu'à un certain point dans le temps des événements ont été reçues dans le pipeline. Quand la watermark dépasse la fin d'une fenêtre, tous les éléments ultérieurs dont le code temporel correspond à cette fenêtre sont considérés comme des données en retard et sont simplement ignorés. Par défaut, le comportement du fenêtrage consiste donc à produire un seul résultat, que l'on espère complet, lorsque le système est certain d'avoir reçu toutes les données.

Apache Beam utilise plusieurs méthodes heuristiques pour estimer la watermark. Toutefois, il s'agit toujours d'une approximation. De plus, ces méthodes heuristiques sont générales et ne conviennent pas à tous les cas d'utilisation. Plutôt que de se fier à des méthodes heuristiques générales, les concepteurs de pipelines doivent réfléchir attentivement aux questions suivantes afin de déterminer les compromis les plus appropriés :

  • Exhaustivité : Est-il important d'avoir toutes les données avant de calculer le résultat ?
  • Latence : Combien de temps êtes-vous prêt à attendre pour obtenir les données ? Par exemple, souhaitez-vous attendre d'avoir reçu toutes les données ou préférez-vous les traiter au fur et à mesure qu'elles arrivent ?
  • Coût : Quelle puissance de calcul et quel budget êtes-vous prêt à consacrer pour réduire la latence ?

Une fois ces réponses en tête, il est possible d'utiliser les structures formelles d'Apache Beam pour écrire du code de façon à faire les bons compromis.

Retard autorisé

Le retard autorisé détermine combien de temps une fenêtre doit conserver son état. Une fois que la watermark atteint la fin de la période de retard autorisé, l'état est supprimé. Certes, il serait intéressant de pouvoir conserver l'état persistant jusqu'à la fin des temps. Cependant, dans la réalité, lorsque l'on travaille avec une source de données illimitée, il n'est souvent pas pratique de conserver l'état d'une fenêtre donnée indéfiniment : l'espace disque finirait par manquer.

Par conséquent, tout système de traitement réel capable de gérer des données dans le désordre doit mettre en place un moyen de limiter la durée de vie des fenêtres qu'il traite. Une méthode simple et claire consiste à définir un horizon pour le retard autorisé, c'est-à-dire à fixer une limite (par rapport à la watermark) au-delà de laquelle un enregistrement donné ne sera plus traité par le système. Toutes les données arrivant après cet horizon sont simplement ignorées. En définissant la limite de retard pour les données individuelles, vous fixez également la durée précise pendant laquelle l'état des fenêtres doit être conservé : jusqu'à ce que la watermark dépasse l'horizon de retard autorisé pour la fin de la fenêtre.

Tâche 1 : Préparer l'environnement

Comme dans les ateliers précédents, la première étape consiste à générer les données que le pipeline va traiter. Vous allez ouvrir l'environnement de l'atelier et générer les données comme précédemment :

Ouvrir l'atelier approprié

  • Dans le terminal de votre IDE, exécutez les commandes suivantes pour accéder au répertoire que vous allez utiliser pour cet atelier :
# Change directory into the lab cd 7_Advanced_Streaming_Analytics/lab export BASE_DIR=$(pwd)

Configurer l'environnement virtuel et les dépendances

Avant de pouvoir modifier le code du pipeline, assurez-vous d'avoir installé les dépendances nécessaires.

  1. Exécutez la commande suivante pour créer un environnement virtuel pour cet atelier :
sudo apt-get update && sudo apt-get install -y python3-venv ## Create and activate virtual environment python3 -m venv df-env source df-env/bin/activate
  1. Installez ensuite les packages dont vous aurez besoin pour exécuter votre pipeline :
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Vérifiez que l'API Dataflow est activée :
gcloud services enable dataflow.googleapis.com

Configurer l'environnement de données

# Create GCS buckets and BQ dataset cd $BASE_DIR/../../ source create_streaming_sinks.sh # Change to the directory containing the practice version of the code cd $BASE_DIR

Cliquez sur Vérifier ma progression pour valider l'objectif. Préparer l'environnement

Tâche 2 : Définir le retard autorisé

  1. Dans l'explorateur de fichiers, accédez à training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab et ouvrez le fichier streaming_minute_traffic_pipeline.py.

Dans Apache Beam, vous définissez le retard autorisé à l'aide de l'argument de mot clé allowed_lateness avec le déclencheur AfterWatermark() dans la PTransform WindowInto, comme dans l'exemple ci-dessous :

items = p | ... Windowed_items = items | beam.WindowInto(beam.window.FixedWindows(60), # 1 minute trigger=AfterWatermark(), allowed_lateness=60*60*24) # 1 day
  1. Pour effectuer cette tâche, examinez la transformation de fenêtrage et le retard autorisé définis par l'argument de ligne de commande allowed_lateness. Déterminez une valeur raisonnable et mettez à jour la ligne de commande en indiquant les unités correctes.

Déclencheurs

Les concepteurs de pipelines peuvent également décider quand produire des résultats préliminaires. À l'étape précédente, nous avons utilisé le déclencheur AfterWatermark() avec un retard autorisé défini. Par exemple, supposons que la watermark pour la fin d'une fenêtre n'ait pas encore été atteinte, mais que 75 % des données attendues soient déjà arrivées. Dans de nombreux cas, on peut supposer qu'un tel échantillon est représentatif, ce qui justifie de l'afficher aux utilisateurs finaux.

Les déclencheurs déterminent à quel moment les résultats seront produits pendant le traitement. Chaque sortie spécifique d'une fenêtre est appelée "volet". Les déclencheurs produisent un volet lorsque leurs conditions sont remplies. Dans Apache Beam, ces conditions incluent l'avancement de la watermark, l'avancement du temps de traitement (qui progresse de manière uniforme, quelle que soit la quantité de données réellement reçues), le nombre d'éléments (par exemple, lorsqu'une certaine quantité de nouvelles données arrive) et des déclencheurs basés sur des données, comme la détection de la fin d'un fichier.

Les conditions d'un déclencheur peuvent l'amener à produire un volet à plusieurs reprises. Par conséquent, il est également nécessaire de préciser comment cumuler ces résultats. Apache Beam prend actuellement en charge deux modes d'accumulation : l'un qui cumule les résultats et l'autre qui renvoie uniquement les parties du résultat qui sont nouvelles depuis le dernier volet produit.

Tâche 3 : Définir un déclencheur

Lorsque vous définissez un fenêtrage pour une PCollection à l'aide de la transformation Window, vous pouvez également spécifier un déclencheur.

Vous définissez le ou les déclencheurs pour une PCollection en définissant l'argument de mot clé trigger de votre PTransform WindowInto. Apache Beam propose un certain nombre de déclencheurs prédéfinis :

  • AfterWatermark déclenche l'émission des résultats lorsque la watermark dépasse un code temporel déterminé soit par la fin de la fenêtre, soit par l'arrivée du premier élément dans un volet.

  • AfterProcessingTime déclenche l'émission des résultats après qu'un certain temps de traitement s'est écoulé (généralement depuis l'arrivée du premier élément dans volet).

  • AfterCount déclenche l'émission des résultats lorsque le nombre d'éléments dans la fenêtre atteint un certain seuil.

Cet exemple de code définit un déclencheur basé sur le temps pour une PCollection qui émet des résultats une minute après le traitement du premier élément de cette fenêtre. Dans la dernière ligne de l'exemple, nous définissons le mode d'accumulation de la fenêtre en définissant l'argument de mot clé accumulation_mode sur AccumulationMode.DISCARDING :

items = p | ... windowed_items = items | beam.WindowInto(FixedWindows(60), # 1 minute trigger=AfterProcessingTime(60), accumulation_mode=AccumulationMode.DISCARDING)
  1. Pour réaliser cette tâche, ajoutez l'argument de mot clé trigger à WindowInto en transmettant le déclencheur AfterWatermark. Lorsque vous concevez votre déclencheur, gardez à l'esprit ce cas d'utilisation : les données sont regroupées dans des fenêtres d'une minute et peuvent arriver en retard. Ajoutez également un déclencheur pour chaque élément en retard (dans les limites de la période de retard autorisé) en tant qu'argument au déclencheur AfterWatermark. Si vous rencontrez des difficultés, consultez la solution.

  2. Remplissez la section #TODO suivante vers la ligne 113 pour définir le mode de déclenchement et d'accumulation :

trigger=AfterProcessingTime(120), accumulation_mode=AccumulationMode.DISCARDING)
  1. Remplissez la section #TODO vers la ligne 119 pour définir le retard autorisé, le déclencheur et le mode d'accumulation :
trigger=AfterWatermark(late=AfterCount(1)), allowed_lateness=int(allowed_lateness), accumulation_mode=AccumulationMode.ACCUMULATING)

Partie 2 : Gérer les données non conformes

Selon la façon dont vous avez configuré le déclencheur, si vous exécutez le pipeline maintenant et que vous le comparez à celui de l'atelier précédent, vous remarquerez peut-être que le nouveau pipeline présente les résultats plus tôt. Il est également possible que les résultats soient plus précis si les méthodes heuristiques n'ont pas permis de prédire le comportement du flux et que la période de retard autorisé est mieux adaptée.

Cependant, bien que le pipeline actuel soit plus robuste face aux retards, il reste vulnérable aux données non conformes. Si vous exécutiez le pipeline et que vous publiiez un message qui n'est pas une chaîne JSON correctement formée pouvant être transformée en objet CommonLog, le pipeline générerait une erreur. Bien que des outils comme Cloud Logging facilitent la lecture de ces erreurs, un pipeline mieux conçu les stockera dans un emplacement prédéfini afin qu'elles puissent être examinées ultérieurement.

Dans cette section, vous allez ajouter des composants au pipeline afin de le rendre plus modulaire et plus robuste.

Tâche 1 : Collecter des données non conformes

Pour être plus robuste face aux données non conformes, le pipeline doit pouvoir filtrer ces données et créer des branches pour les traiter différemment. Nous avons déjà vu une façon de créer une branche dans un pipeline : en utilisant une seule PCollection comme entrée pour plusieurs transformations.

Cette méthode de création de branches est très pratique. Cependant, elle peut s'avérer inefficace dans certains cas d'utilisation. Supposons que vous souhaitiez créer deux sous-ensembles différents d'une même PCollection. La méthode des transformations multiples obligerait à créer une transformation de filtrage pour chaque sous-ensemble et à les appliquer toutes deux à la PCollection d'origine. Cependant, chaque élément serait traité deux fois.

Une autre méthode pour générer un pipeline avec des branches consiste à utiliser une seule transformation qui produit plusieurs sorties tout en traitant la PCollection d'entrée une seule fois. Dans cette tâche, vous allez écrire une transformation qui génère plusieurs sorties : la première sera constituée des résultats issus des données correctement formées, et la seconde contiendra des éléments non conformes provenant du flux d'entrée d'origine.

Pour produire plusieurs résultats tout en ne créant qu'une seule PCollection, Apache Beam utilise la classe TaggedOutput, qui permet d'identifier les différentes sorties de la fonction DoFn, même si elles sont de types différents.

Voici un exemple montrant comment utiliser TaggedOutput pour étiqueter différentes sorties d'une fonction DoFn. Ces PCollections sont ensuite récupérées avec la méthode with_outputs() et accessibles via le nom du tag spécifié dans TaggedOutput.

class ConvertToCommonLogFn(beam.DoFn): def process(self, element): try: row = json.loads(element.decode('utf-8')) yield beam.pvalue.TaggedOutput('parsed_row', CommonLog(**row)) except: yield beam.pvalue.TaggedOutput('unparsed_row', element.decode('utf-8')) … rows = (p | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(input_topic) | 'ParseJson' >> beam.ParDo(ConvertToCommonLogFn()).with_outputs('parsed_row', 'unparsed_row') .with_output_types(CommonLog)) (rows.unparsed_row | … (rows.parsed_row | …

Pour effectuer cette tâche, déclarez deux valeurs renvoyées de type TaggedOutput dans la classe ConvertToCommonLogFn, comme indiqué ci-dessus. Dans l'instruction try, renvoyez la ligne analysée en tant qu'instance de la classe CommonLog. Dans l'instruction catch, renvoyez la ligne non analysée (décodée).

  1. Remplissez la première section #TODO dans la classe ConvertToCommonLogFn :
beam.pvalue.TaggedOutput('parsed_row', CommonLog(**row))
  1. Remplissez la deuxième section #TODO dans la classe ConvertToCommonLogFn :
beam.pvalue.TaggedOutput('unparsed_row', element.decode('utf-8'))

Tâche 2 : Écrire les données non conformes pour analyse ultérieure

Pour résoudre le problème à l'origine des données non conformes en amont, il est important de pouvoir analyser ces données. Cela nécessite de les stocker quelque part. Dans cette tâche, vous allez écrire des données non conformes dans Google Cloud Storage. Ce modèle est appelé "stockage des lettres mortes".

Dans les ateliers précédents, vous écriviez directement à partir d'une source limitée (traitement par lot) dans Cloud Storage à l'aide de beam.io.WriteToText(). Toutefois, lorsque l'on écrit des données à partir d'une source illimitée (traitement par flux), cette approche doit être légèrement modifiée.

Tout d'abord, avant la transformation d'écriture, vous devez utiliser un déclencheur pour spécifier quand, selon le temps de traitement, écrire les données. Sinon, si vous conservez les valeurs par défaut, l'écriture ne se produira jamais. Par défaut, chaque événement appartient à la fenêtre globale. Si vous traitez les données par lot, cela ne pose pas de problème, car l'ensemble de données complet est connu au moment de l'exécution. En revanche, avec des sources illimitées, la taille de l'ensemble de données complet est inconnue, et les volets de la fenêtre globale ne se déclenchent jamais, car ils ne se terminent jamais.

Comme vous utilisez un déclencheur, vous devez également utiliser une fonction Window. Toutefois, il n'est pas nécessaire de changer de fenêtre. Dans les ateliers et tâches précédents, vous avez utilisé des transformations de fenêtrage pour remplacer la fenêtre globale par une fenêtre de durée fixe en fonction du temps des événements. Dans ce cas, ce n'est pas tant le regroupement des éléments qui importe, mais plutôt que les résultats soient produits de manière utile et à un rythme approprié.

Dans l'exemple ci-dessous, la fenêtre déclenche le volet de la fenêtre globale toutes les 10 secondes en fonction du temps de traitement, mais écrit seulement les nouveaux événements :

pcollection | “FireEvery10s” >> WindowInto(FixedWindows(10) trigger=AfterProcessingTime(10)) accumulation_mode=AccumulationMode.DISCARDING

Une fois que vous avez défini un déclencheur, vous devez remplacer le récepteur beam.io.WriteToText() (qui ne prend pas en charge le traitement par flux) par beam.io.fileio.WriteToFiles() pour effectuer les écritures. Lorsque l'on écrit des données en aval d'une transformation de fenêtrage, on spécifie un certain nombre de fragments pour que l'écriture puisse se faire en parallèle :

windowed_items = p | 'WriteWindowedPCollection' >> fileio.WriteToFiles("gs://path/to/somewhere", shards=int(num_shards), max_writers_per_bundle=0)
  1. Pour terminer cette tâche, créez une transformation utilisant rows.unparsed_row comme entrée pour récupérer les données non conformes. Utilisez un déclencheur basé sur le temps de traitement de 120 secondes pour une fenêtre fixe de 120 secondes. Définissez le mode d'accumulation sur AccumulationMode.DISCARDING.

  2. Remplissez la section #TODO en utilisant beam.fileio.WriteToFiles pour écrire les données dans GCS :

fileio.WriteToFiles(output_path,shards=1,max_writers_per_bundle=0)

Tâche 3 : Exécuter le pipeline

Pour exécuter votre pipeline, créez une commande semblable à l'exemple ci-dessous. Notez que vous devrez la modifier pour que les noms des options de ligne de commande reflètent celles que vous avez définies :

export PROJECT_ID=$(gcloud config get-value project) export REGION={{{project_0.startup_script.lab_region|Region}}} export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export PUBSUB_TOPIC=projects/${PROJECT_ID}/topics/my_topic export WINDOW_DURATION=60 export ALLOWED_LATENESS=1 export OUTPUT_TABLE_NAME=${PROJECT_ID}:logs.minute_traffic export DEADLETTER_BUCKET=${BUCKET} cd $BASE_DIR python3 streaming_minute_traffic_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --staging_location=${PIPELINE_FOLDER}/staging \ --temp_location=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --input_topic=${PUBSUB_TOPIC} \ --window_duration=${WINDOW_DURATION} \ --allowed_lateness=${ALLOWED_LATENESS} \ --table_name=${OUTPUT_TABLE_NAME} \ --dead_letter_bucket=${DEADLETTER_BUCKET} \ --allow_unsafe_triggers

Le code de cette quête inclut un script permettant de publier des événements JSON à l'aide de Pub/Sub. Pour effectuer cette tâche et commencer à publier des messages, ouvrez un nouveau terminal à côté de celui que vous utilisez actuellement et exécutez le script suivant. Il continuera à publier des messages jusqu'à ce que vous l'arrêtiez. Assurez-vous de vous trouver dans le dossier training-data-analyst/quests/dataflow_python.

Remarque : L'option true ajoute les événements en retard au flux. cd /home/jupyter/training-data-analyst/quests/dataflow_python/ bash generate_streaming_events.sh true

Cliquez sur Vérifier ma progression pour valider l'objectif. Exécuter le pipeline

Tâche 4 : Tester votre pipeline

  1. Dans la barre de titre de la console Google Cloud, saisissez Pub/Sub dans le champ Recherche, puis cliquez sur Pub/Sub dans la section Produits et pages.

  2. Cliquez sur Sujets, puis sur le sujet my_topic.

  3. Cliquez sur Messages.

  4. Cliquez sur Sélectionner un abonnement Cloud Pub/Sub à partir duquel extraire les messages* et sélectionnez l'abonnement "my_topic" dans le menu déroulant.

Remarque : Vous devrez peut-être cliquer sur "Actualiser" pour voir l'abonnement.
  1. Cliquez sur le bouton Publier un message.

  2. Sur la page suivante, saisissez un message à publier, puis cliquez sur Publier.

Si le message ne respecte pas parfaitement la spécification JSON de CommonLog, il devrait rapidement apparaître dans le bucket de lettres mortes Cloud Storage. Vous pouvez suivre son parcours dans le pipeline en revenant à la fenêtre de surveillance du pipeline et en cliquant sur un nœud de la branche chargée de traiter les messages non analysés. Une fois qu'un élément apparaît dans cette branche, vous pouvez accéder à Cloud Storage pour vérifier que le message a bien été écrit sur le disque :

export PROJECT_ID=$(gcloud config get-value project) export REGION={{{project_0.startup_script.lab_region|Region}}} export BUCKET=gs://${PROJECT_ID}/deadletter gcloud storage ls $BUCKET gcloud storage cat $BUCKET/*

Cliquez sur Vérifier ma progression pour valider l'objectif. Tester votre pipeline

Terminer l'atelier

Une fois l'atelier terminé, cliquez sur Terminer l'atelier. Google Cloud Skills Boost supprime les ressources que vous avez utilisées, puis efface le compte.

Si vous le souhaitez, vous pouvez noter l'atelier. Sélectionnez un nombre d'étoiles, saisissez un commentaire, puis cliquez sur Envoyer.

Le nombre d'étoiles correspond à votre degré de satisfaction :

  • 1 étoile = très insatisfait(e)
  • 2 étoiles = insatisfait(e)
  • 3 étoiles = ni insatisfait(e), ni satisfait(e)
  • 4 étoiles = satisfait(e)
  • 5 étoiles = très satisfait(e)

Si vous ne souhaitez pas donner votre avis, vous pouvez fermer la boîte de dialogue.

Pour soumettre des commentaires, suggestions ou corrections, veuillez accéder à l'onglet Assistance.

Copyright 2026 Google LLC Tous droits réservés. Google et le logo Google sont des marques de Google LLC. Tous les autres noms de société et de produit peuvent être des marques des sociétés auxquelles ils sont associés.

Avant de commencer

  1. Les ateliers créent un projet Google Cloud et des ressources pour une durée déterminée.
  2. Les ateliers doivent être effectués dans le délai imparti et ne peuvent pas être mis en pause. Si vous quittez l'atelier, vous devrez le recommencer depuis le début.
  3. En haut à gauche de l'écran, cliquez sur Démarrer l'atelier pour commencer.

Utilisez la navigation privée

  1. Copiez le nom d'utilisateur et le mot de passe fournis pour l'atelier
  2. Cliquez sur Ouvrir la console en navigation privée

Connectez-vous à la console

  1. Connectez-vous à l'aide des identifiants qui vous ont été attribués pour l'atelier. L'utilisation d'autres identifiants peut entraîner des erreurs ou des frais.
  2. Acceptez les conditions d'utilisation et ignorez la page concernant les ressources de récupération des données.
  3. Ne cliquez pas sur Terminer l'atelier, à moins que vous n'ayez terminé l'atelier ou que vous ne vouliez le recommencer, car cela effacera votre travail et supprimera le projet.

Ce contenu n'est pas disponible pour le moment

Nous vous préviendrons par e-mail lorsqu'il sera disponible

Parfait !

Nous vous contacterons par e-mail s'il devient disponible

Un atelier à la fois

Confirmez pour mettre fin à tous les ateliers existants et démarrer celui-ci

Utilisez la navigation privée pour effectuer l'atelier

Le meilleur moyen d'exécuter cet atelier consiste à utiliser une fenêtre de navigation privée. Vous éviterez ainsi les conflits entre votre compte personnel et le compte temporaire de participant, qui pourraient entraîner des frais supplémentaires facturés sur votre compte personnel.