Accédez à plus de 700 ateliers et cours

Traitement en flux continu avec Cloud Pub/Sub et Dataflow : Qwik Start

Atelier 45 minutes universal_currency_alt 1 crédit show_chart Débutant
info Cet atelier peut intégrer des outils d'IA pour vous accompagner dans votre apprentissage.
Accédez à plus de 700 ateliers et cours

GSP903

Logo des ateliers d'auto-formation Google Cloud

Présentation

Google Cloud Pub/Sub est un service de messagerie permettant d'échanger des données d'événements entre des applications et des services. Un producteur de données publie des messages dans un sujet Cloud Pub/Sub. Un client crée un abonnement associé à ce sujet. Les abonnés récupèrent des messages depuis un abonnement ou sont configurés en tant que webhooks pour recevoir directement les messages de l'abonnement. Les abonnés doivent accuser réception de chaque message pendant une période configurable.

Dataflow est un service entièrement géré permettant de transformer et d'enrichir les données en flux continu (en temps réel) et par lots avec un niveau identique de fiabilité et d'expressivité. Il fournit un environnement de développement de pipeline simplifié à l'aide du SDK Apache Beam, qui offre de nombreuses primitives de fenêtrage et d'analyse de sessions, ainsi qu'un écosystème de connecteurs de sources et de récepteurs.

Pub/Sub est un système évolutif et durable d'ingestion et de diffusion d'événements. Dataflow complète le modèle de diffusion évolutif de type "au moins une fois" de Pub/Sub avec la déduplication des messages et le traitement dans l'ordre de type "exactement une fois" si vous utilisez des fenêtres et la mise en mémoire tampon.

Objectifs de l'atelier

  • Lire les messages publiés dans un sujet Pub/Sub
  • Effectuer le fenêtrage (ou le regroupement) de messages par code temporel
  • Écrire les messages dans Cloud Storage

Prérequis

Avant de cliquer sur le bouton "Démarrer l'atelier"

Lisez ces instructions. Les ateliers sont minutés, et vous ne pouvez pas les mettre en pause. Le minuteur, qui démarre lorsque vous cliquez sur Démarrer l'atelier, indique combien de temps les ressources Google Cloud resteront accessibles.

Cet atelier pratique vous permet de suivre les activités dans un véritable environnement cloud, et non dans un environnement de simulation ou de démonstration. Des identifiants temporaires vous sont fournis pour vous permettre de vous connecter à Google Cloud le temps de l'atelier.

Pour réaliser cet atelier :

  • Vous devez avoir accès à un navigateur Internet standard (nous vous recommandons d'utiliser Chrome).
Remarque : Ouvrez une fenêtre de navigateur en mode incognito (recommandé) ou de navigation privée pour effectuer cet atelier. Vous éviterez ainsi les conflits entre votre compte personnel et le compte temporaire de participant, qui pourraient entraîner des frais supplémentaires facturés sur votre compte personnel.
  • Vous disposez d'un temps limité. N'oubliez pas qu'une fois l'atelier commencé, vous ne pouvez pas le mettre en pause.
Remarque : Utilisez uniquement le compte de participant pour cet atelier. Si vous utilisez un autre compte Google Cloud, des frais peuvent être facturés à ce compte.

Démarrer l'atelier et se connecter à la console Google Cloud

  1. Cliquez sur le bouton Démarrer l'atelier. Si l'atelier est payant, une boîte de dialogue s'affiche pour vous permettre de sélectionner un mode de paiement. Sur la gauche, vous trouverez le panneau "Détails concernant l'atelier", qui contient les éléments suivants :

    • Le bouton "Ouvrir la console Google Cloud"
    • Le temps restant
    • Les identifiants temporaires que vous devez utiliser pour cet atelier
    • Des informations complémentaires vous permettant d'effectuer l'atelier
  2. Cliquez sur Ouvrir la console Google Cloud (ou effectuez un clic droit et sélectionnez Ouvrir le lien dans la fenêtre de navigation privée si vous utilisez le navigateur Chrome).

    L'atelier lance les ressources, puis ouvre la page "Se connecter" dans un nouvel onglet.

    Conseil : Réorganisez les onglets dans des fenêtres distinctes, placées côte à côte.

    Remarque : Si la boîte de dialogue Sélectionner un compte s'affiche, cliquez sur Utiliser un autre compte.
  3. Si nécessaire, copiez le nom d'utilisateur ci-dessous et collez-le dans la boîte de dialogue Se connecter.

    {{{user_0.username | "Username"}}}

    Vous trouverez également le nom d'utilisateur dans le panneau "Détails concernant l'atelier".

  4. Cliquez sur Suivant.

  5. Copiez le mot de passe ci-dessous et collez-le dans la boîte de dialogue Bienvenue.

    {{{user_0.password | "Password"}}}

    Vous trouverez également le mot de passe dans le panneau "Détails concernant l'atelier".

  6. Cliquez sur Suivant.

    Important : Vous devez utiliser les identifiants fournis pour l'atelier. Ne saisissez pas ceux de votre compte Google Cloud. Remarque : Si vous utilisez votre propre compte Google Cloud pour cet atelier, des frais supplémentaires peuvent vous être facturés.
  7. Accédez aux pages suivantes :

    • Acceptez les conditions d'utilisation.
    • N'ajoutez pas d'options de récupération ni d'authentification à deux facteurs (ce compte est temporaire).
    • Ne vous inscrivez pas à des essais sans frais.

Après quelques instants, la console Cloud s'ouvre dans cet onglet.

Remarque : Pour accéder aux produits et services Google Cloud, cliquez sur le menu de navigation ou saisissez le nom du service ou du produit dans le champ Recherche. Icône du menu de navigation et champ de recherche

Activer Cloud Shell

Cloud Shell est une machine virtuelle qui contient de nombreux outils pour les développeurs. Elle comprend un répertoire d'accueil persistant de 5 Go et s'exécute sur Google Cloud. Cloud Shell vous permet d'accéder via une ligne de commande à vos ressources Google Cloud.

  1. Cliquez sur Activer Cloud Shell Icône Activer Cloud Shell en haut de la console Google Cloud.

  2. Passez les fenêtres suivantes :

    • Accédez à la fenêtre d'informations de Cloud Shell.
    • Autorisez Cloud Shell à utiliser vos identifiants pour effectuer des appels d'API Google Cloud.

Une fois connecté, vous êtes en principe authentifié et le projet est défini sur votre ID_PROJET : . Le résultat contient une ligne qui déclare l'ID_PROJET pour cette session :

Your Cloud Platform project in this session is set to {{{project_0.project_id | "PROJECT_ID"}}}

gcloud est l'outil de ligne de commande pour Google Cloud. Il est préinstallé sur Cloud Shell et permet la complétion par tabulation.

  1. (Facultatif) Vous pouvez lister les noms des comptes actifs à l'aide de cette commande :
gcloud auth list
  1. Cliquez sur Autoriser.

Résultat :

ACTIVE: * ACCOUNT: {{{user_0.username | "ACCOUNT"}}} To set the active account, run: $ gcloud config set account `ACCOUNT`
  1. (Facultatif) Vous pouvez lister les ID de projet à l'aide de cette commande :
gcloud config list project

Résultat :

[core] project = {{{project_0.project_id | "PROJECT_ID"}}} Remarque : Pour consulter la documentation complète sur gcloud, dans Google Cloud, accédez au guide de présentation de la gcloud CLI.

Définir la région

  • Dans Cloud Shell, exécutez la commande suivante pour définir la région du projet pour cet atelier :
gcloud config set compute/region {{{project_0.default_region | "REGION"}}}

Vérifier que l'API Dataflow est activée

Pour vous assurer que vous avez bien accès à l'API requise, redémarrez la connexion à l'API Dataflow.

gcloud services disable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}} --force gcloud services enable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}}

Cliquez sur Vérifier ma progression pour valider l'objectif. Désactiver et réactiver l'API Dataflow

Tâche 1 : Créer des ressources de projet

  1. Dans Cloud Shell, créez des variables pour votre bucket, votre projet et votre région.
PROJECT_ID=$(gcloud config get-value project) BUCKET_NAME="${PROJECT_ID}-bucket" TOPIC_ID=my-id REGION={{{project_0.default_region | "filled in at lab start"}}}
  1. Définissez votre région App Engine.
Remarque : Pour les régions autres que us-central1 et europe-west1, définissez la variable de région App Engine sur la même valeur que la région attribuée. Si vous êtes affecté à la région us-central1, définissez la variable de région App Engine sur us-central. Si vous êtes affecté à la région europe-west1, définissez la variable de région App Engine sur europe-west.

Pour en savoir plus, consultez la page Emplacements App Engine.

AE_REGION={{{project_0.startup_script.app_region|region_to_be_set}}}
  1. Créez un bucket Cloud Storage appartenant à ce projet :
gsutil mb gs://$BUCKET_NAME Remarque : Les noms des buckets Cloud Storage doivent être uniques dans le monde entier. L'ID de votre projet Qwiklabs est toujours unique. C'est pourquoi il est utilisé dans le nom de votre bucket dans cet atelier.
  1. Créez un sujet Pub/Sub dans ce projet :
gcloud pubsub topics create $TOPIC_ID
  1. Créez une application App Engine pour votre projet :
gcloud app create --region=$AE_REGION
  1. Créez un job Cloud Scheduler dans ce projet. Le job publie un message sur un sujet Cloud Pub/Sub chaque minute :
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!"
  1. Si vous êtes invité à activer l'API Cloud Scheduler, appuyez sur y, puis sur Entrée.

Cliquez sur Vérifier ma progression pour valider l'objectif. Créer des ressources de projet

  1. Démarrez le job.
gcloud scheduler jobs run publisher-job Remarque : Si vous rencontrez une erreur RESOURCE_EXHAUSTED, essayez d'exécuter à nouveau la commande.
  1. Utilisez les commandes suivantes pour cloner le dépôt du guide de démarrage rapide et accéder au répertoire de l'exemple de code :
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics docker run -it -e DEVSHELL_PROJECT_ID=$DEVSHELL_PROJECT_ID python:3.7 /bin/bash git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -U -r requirements.txt # Install Apache Beam dependencies Remarque : Si vous utilisez l'option Python, exécutez les commandes Python individuellement.

Cliquez sur Vérifier ma progression pour valider l'objectif. Démarrer le job Cloud Scheduler

Tâche 2 : Examiner le code permettant de diffuser des messages depuis Pub/Sub vers Cloud Storage

Exemple de code

Examinez l'exemple de code suivant, qui utilise Dataflow pour effectuer les opérations suivantes :

  • Lire les messages Pub/Sub
  • Effectuer le fenêtrage (ou le regroupement) de messages dans des intervalles fixes par codes temporels de publication
  • Écrire les messages de chaque fenêtre dans des fichiers dans Cloud Storage
import java.io.IOException; import org.apache.beam.examples.common.WriteOneFilePerWindow; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.Validation.Required; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.joda.time.Duration; public class PubSubToGcs { /* * Define your own configuration options. Add your own arguments to be processed * by the command-line parser, and specify default values for them. */ public interface PubSubToGcsOptions extends StreamingOptions { @Description("The Cloud Pub/Sub topic to read from.") @Required String getInputTopic(); void setInputTopic(String value); @Description("Output file's window size in number of minutes.") @Default.Integer(1) Integer getWindowSize(); void setWindowSize(Integer value); @Description("Path of the output file including its filename prefix.") @Required String getOutput(); void setOutput(String value); } public static void main(String[] args) throws IOException { // The maximum number of shards when writing output. int numShards = 1; PubSubToGcsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class); options.setStreaming(true); Pipeline pipeline = Pipeline.create(options); pipeline // 1) Read string messages from a Pub/Sub topic. .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic())) // 2) Group the messages into fixed-sized minute intervals. .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))) // 3) Write one file to GCS for every window of messages. .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards)); // Execute the pipeline and wait until it finishes running. pipeline.run().waitUntilFinish(); } } import argparse from datetime import datetime import logging import random from apache_beam import ( DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys, ) from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.transforms.window import FixedWindows class GroupMessagesByFixedWindows(PTransform): """A composite transform that groups Pub/Sub messages based on publish time and outputs a list of tuples, each containing a message and its publish time. """ def __init__(self, window_size, num_shards=5): # Set window size to 60 seconds. self.window_size = int(window_size * 60) self.num_shards = num_shards def expand(self, pcoll): return ( pcoll # Bind window info to each element using element timestamp (or publish time). | "Window into fixed intervals" >> WindowInto(FixedWindows(self.window_size)) | "Add timestamp to windowed elements" >> ParDo(AddTimestamp()) # Assign a random key to each windowed element based on the number of shards. | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1)) # Group windowed elements by key. All the elements in the same window must fit # memory for this. If not, you need to use `beam.util.BatchElements`. | "Group by key" >> GroupByKey() ) class AddTimestamp(DoFn): def process(self, element, publish_time=DoFn.TimestampParam): """Processes each windowed element by extracting the message body and its publish time into a tuple. """ yield ( element.decode("utf-8"), datetime.utcfromtimestamp(float(publish_time)).strftime( "%Y-%m-%d %H:%M:%S.%f" ), ) class WriteToGCS(DoFn): def __init__(self, output_path): self.output_path = output_path def process(self, key_value, window=DoFn.WindowParam): """Write messages in a batch to Google Cloud Storage.""" ts_format = "%H:%M" window_start = window.start.to_utc_datetime().strftime(ts_format) window_end = window.end.to_utc_datetime().strftime(ts_format) shard_id, batch = key_value filename = "-".join([self.output_path, window_start, window_end, str(shard_id)]) with io.gcsio.GcsIO().open(filename=filename, mode="w") as f: for message_body, publish_time in batch: f.write(f"{message_body},{publish_time}\n".encode()) def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None): # Set `save_main_session` to True so DoFns can access globally imported modules. pipeline_options = PipelineOptions( pipeline_args, streaming=True, save_main_session=True ) with Pipeline(options=pipeline_options) as pipeline: ( pipeline # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam # binds the publish time returned by the Pub/Sub server for each message # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`. # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic) | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards) | "Write to GCS" >> ParDo(WriteToGCS(output_path)) ) if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() parser.add_argument( "--input_topic", help="The Cloud Pub/Sub topic to read from." '"projects//topics/".', ) parser.add_argument( "--window_size", type=float, default=1.0, help="Output file's window size in minutes.", ) parser.add_argument( "--output_path", help="Path of the output GCS file including the prefix.", ) parser.add_argument( "--num_shards", type=int, default=5, help="Number of shards to use when writing windowed elements to GCS.", ) known_args, pipeline_args = parser.parse_known_args() run( known_args.input_topic, known_args.output_path, known_args.window_size, known_args.num_shards, pipeline_args, ) Remarque : Pour explorer plus en détail l'exemple de code, consultez les pages GitHub java-docs-samples et python-docs-samples.

Tâche 3 : Démarrer le pipeline

  1. Pour démarrer le pipeline, exécutez la commande suivante :
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --windowSize=2 \ --tempLocation=gs://$BUCKET_NAME/temp" python PubSubToGCS.py \ --project=project_id \ --region=region \ --input_topic=projects/project_id/topics/my-id \ --output_path=gs://bucket_name/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://bucket_name/temp Remarque : Lorsque vous exécutez la commande Python, remplacez project_id, bucket_name et region par l'ID de votre projet, le nom de votre bucket et la région qui vous a été attribuée pour l'atelier.

La commande précédente s'exécute localement et lance un job Dataflow dans le cloud.

Remarque : Vous devrez peut-être attendre environ 10 minutes pour que le code s'exécute complètement et que le job de pipeline apparaisse dans la console Dataflow lors de la tâche suivante. Remarque : Si vous recevez un avertissement concernant StaticLoggerBinder, vous pouvez l'ignorer sans problème et poursuivre l'atelier.

Cliquez sur Vérifier ma progression pour valider l'objectif. Démarrer le pipeline et lancer le job Dataflow

Tâche 4 : Observer la progression du job et du pipeline

  1. Accédez à la console Dataflow pour observer la progression du job.

  2. Cliquez sur Actualiser pour afficher le job et les dernières mises à jour de son état.

Page Dataflow affichant les informations du job pubsubtogcs 0815172250-75a99ab8

  1. Cliquez sur le nom du job pour afficher les détails du job et examiner les éléments suivants :
  • Structure du job
  • Journaux du job
  • Métriques de l'étape

Page du job affichant les informations récapitulatives sur le job

Vous devrez peut-être patienter quelques minutes avant que les fichiers de sortie ne s'affichent dans Cloud Storage.

  1. Pour afficher les fichiers de sortie, accédez au menu de navigation > Cloud Storage, cliquez sur le nom de votre bucket, puis sur Exemples.

Page "Informations sur le bucket" affichant les informations sur le fichier de sortie

  1. Vous pouvez également quitter l'application dans Cloud Shell en appuyant sur CTRL+C (et en saisissant exit pour l'option Python), puis exécuter la commande ci-dessous pour lister les fichiers qui ont été écrits dans Cloud Storage :
gsutil ls gs://${BUCKET_NAME}/samples/

Le résultat doit se présenter sous la forme suivante :

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

Tâche 5 : Nettoyage

  1. Si ce n'est pas déjà fait, quittez l'application dans Cloud Shell en appuyant sur Ctrl+C.

Pour l'option Python, saisissez exit pour quitter l'environnement Python.

  1. Dans Cloud Shell, supprimez le job Cloud Scheduler :
gcloud scheduler jobs delete publisher-job

Si l'invite "Voulez-vous continuer ?" s'affiche, appuyez sur Y, puis sur Entrée.

  1. Dans la console Dataflow, arrêtez le job en sélectionnant son nom, puis en cliquant sur Arrêter.

Lorsque vous y êtes invité, cliquez sur Arrêter le job > Annuler pour annuler le pipeline sans vidange.

  1. Dans Cloud Shell, supprimez le sujet :
gcloud pubsub topics delete $TOPIC_ID
  1. Dans Cloud Shell, supprimez les fichiers créés par le pipeline :
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
  1. Dans Cloud Shell, supprimez le bucket Cloud Storage :
gsutil rb gs://${BUCKET_NAME}

Félicitations !

Vous avez créé un pipeline Dataflow qui lit les messages de votre sujet Pub/Sub, les filtre par code temporel et les écrit dans votre bucket Cloud Storage.

Étape suivante/en savoir plus

Formations et certifications Google Cloud

Les formations et certifications Google Cloud vous aident à tirer pleinement parti des technologies Google Cloud. Nos cours portent sur les compétences techniques et les bonnes pratiques à suivre pour être rapidement opérationnel et poursuivre votre apprentissage. Nous proposons des formations pour tous les niveaux, à la demande, en salle et à distance, pour nous adapter aux emplois du temps de chacun. Les certifications vous permettent de valider et de démontrer vos compétences et votre expérience en matière de technologies Google Cloud.

Dernière mise à jour du manuel : 20 août 2025

Dernier test de l'atelier : 20 août 2025

Copyright 2025 Google LLC. Tous droits réservés. Google et le logo Google sont des marques de Google LLC. Tous les autres noms d'entreprises et de produits peuvent être des marques des entreprises auxquelles ils sont associés.

Avant de commencer

  1. Les ateliers créent un projet Google Cloud et des ressources pour une durée déterminée.
  2. Les ateliers doivent être effectués dans le délai imparti et ne peuvent pas être mis en pause. Si vous quittez l'atelier, vous devrez le recommencer depuis le début.
  3. En haut à gauche de l'écran, cliquez sur Démarrer l'atelier pour commencer.

Utilisez la navigation privée

  1. Copiez le nom d'utilisateur et le mot de passe fournis pour l'atelier
  2. Cliquez sur Ouvrir la console en navigation privée

Connectez-vous à la console

  1. Connectez-vous à l'aide des identifiants qui vous ont été attribués pour l'atelier. L'utilisation d'autres identifiants peut entraîner des erreurs ou des frais.
  2. Acceptez les conditions d'utilisation et ignorez la page concernant les ressources de récupération des données.
  3. Ne cliquez pas sur Terminer l'atelier, à moins que vous n'ayez terminé l'atelier ou que vous ne vouliez le recommencer, car cela effacera votre travail et supprimera le projet.

Ce contenu n'est pas disponible pour le moment

Nous vous préviendrons par e-mail lorsqu'il sera disponible

Parfait !

Nous vous contacterons par e-mail s'il devient disponible

Un atelier à la fois

Confirmez pour mettre fin à tous les ateliers existants et démarrer celui-ci

Utilisez la navigation privée pour effectuer l'atelier

Ouvrez une fenêtre de navigateur en mode navigation privée pour effectuer cet atelier. Vous éviterez ainsi les conflits entre votre compte personnel et le compte temporaire de participant, qui pourraient entraîner des frais supplémentaires facturés sur votre compte personnel.