GSP903

Présentation
Google Cloud Pub/Sub est un service de messagerie permettant d'échanger des données d'événements entre des applications et des services. Un producteur de données publie des messages dans un sujet Cloud Pub/Sub. Un client crée un abonnement associé à ce sujet. Les abonnés récupèrent des messages depuis un abonnement ou sont configurés en tant que webhooks pour recevoir directement les messages de l'abonnement. Les abonnés doivent accuser réception de chaque message pendant une période configurable.
Dataflow est un service entièrement géré permettant de transformer et d'enrichir les données en flux continu (en temps réel) et par lots avec un niveau identique de fiabilité et d'expressivité. Il fournit un environnement de développement de pipeline simplifié à l'aide du SDK Apache Beam, qui offre de nombreuses primitives de fenêtrage et d'analyse de sessions, ainsi qu'un écosystème de connecteurs de sources et de récepteurs.
Pub/Sub est un système évolutif et durable d'ingestion et de diffusion d'événements. Dataflow complète le modèle de diffusion évolutif de type "au moins une fois" de Pub/Sub avec la déduplication des messages et le traitement dans l'ordre de type "exactement une fois" si vous utilisez des fenêtres et la mise en mémoire tampon.
Objectifs de l'atelier
- Lire les messages publiés dans un sujet Pub/Sub
- Effectuer le fenêtrage (ou le regroupement) de messages par code temporel
- Écrire les messages dans Cloud Storage
Prérequis
Avant de cliquer sur le bouton "Démarrer l'atelier"
Lisez ces instructions. Les ateliers sont minutés, et vous ne pouvez pas les mettre en pause. Le minuteur, qui démarre lorsque vous cliquez sur Démarrer l'atelier, indique combien de temps les ressources Google Cloud resteront accessibles.
Cet atelier pratique vous permet de suivre les activités dans un véritable environnement cloud, et non dans un environnement de simulation ou de démonstration. Des identifiants temporaires vous sont fournis pour vous permettre de vous connecter à Google Cloud le temps de l'atelier.
Pour réaliser cet atelier :
- Vous devez avoir accès à un navigateur Internet standard (nous vous recommandons d'utiliser Chrome).
Remarque : Ouvrez une fenêtre de navigateur en mode incognito (recommandé) ou de navigation privée pour effectuer cet atelier. Vous éviterez ainsi les conflits entre votre compte personnel et le compte temporaire de participant, qui pourraient entraîner des frais supplémentaires facturés sur votre compte personnel.
- Vous disposez d'un temps limité. N'oubliez pas qu'une fois l'atelier commencé, vous ne pouvez pas le mettre en pause.
Remarque : Utilisez uniquement le compte de participant pour cet atelier. Si vous utilisez un autre compte Google Cloud, des frais peuvent être facturés à ce compte.
Démarrer l'atelier et se connecter à la console Google Cloud
-
Cliquez sur le bouton Démarrer l'atelier. Si l'atelier est payant, une boîte de dialogue s'affiche pour vous permettre de sélectionner un mode de paiement.
Sur la gauche, vous trouverez le panneau "Détails concernant l'atelier", qui contient les éléments suivants :
- Le bouton "Ouvrir la console Google Cloud"
- Le temps restant
- Les identifiants temporaires que vous devez utiliser pour cet atelier
- Des informations complémentaires vous permettant d'effectuer l'atelier
-
Cliquez sur Ouvrir la console Google Cloud (ou effectuez un clic droit et sélectionnez Ouvrir le lien dans la fenêtre de navigation privée si vous utilisez le navigateur Chrome).
L'atelier lance les ressources, puis ouvre la page "Se connecter" dans un nouvel onglet.
Conseil : Réorganisez les onglets dans des fenêtres distinctes, placées côte à côte.
Remarque : Si la boîte de dialogue Sélectionner un compte s'affiche, cliquez sur Utiliser un autre compte.
-
Si nécessaire, copiez le nom d'utilisateur ci-dessous et collez-le dans la boîte de dialogue Se connecter.
{{{user_0.username | "Username"}}}
Vous trouverez également le nom d'utilisateur dans le panneau "Détails concernant l'atelier".
-
Cliquez sur Suivant.
-
Copiez le mot de passe ci-dessous et collez-le dans la boîte de dialogue Bienvenue.
{{{user_0.password | "Password"}}}
Vous trouverez également le mot de passe dans le panneau "Détails concernant l'atelier".
-
Cliquez sur Suivant.
Important : Vous devez utiliser les identifiants fournis pour l'atelier. Ne saisissez pas ceux de votre compte Google Cloud.
Remarque : Si vous utilisez votre propre compte Google Cloud pour cet atelier, des frais supplémentaires peuvent vous être facturés.
-
Accédez aux pages suivantes :
- Acceptez les conditions d'utilisation.
- N'ajoutez pas d'options de récupération ni d'authentification à deux facteurs (ce compte est temporaire).
- Ne vous inscrivez pas à des essais sans frais.
Après quelques instants, la console Cloud s'ouvre dans cet onglet.
Remarque : Pour accéder aux produits et services Google Cloud, cliquez sur le menu de navigation ou saisissez le nom du service ou du produit dans le champ Recherche.
Activer Cloud Shell
Cloud Shell est une machine virtuelle qui contient de nombreux outils pour les développeurs. Elle comprend un répertoire d'accueil persistant de 5 Go et s'exécute sur Google Cloud. Cloud Shell vous permet d'accéder via une ligne de commande à vos ressources Google Cloud.
-
Cliquez sur Activer Cloud Shell
en haut de la console Google Cloud.
-
Passez les fenêtres suivantes :
- Accédez à la fenêtre d'informations de Cloud Shell.
- Autorisez Cloud Shell à utiliser vos identifiants pour effectuer des appels d'API Google Cloud.
Une fois connecté, vous êtes en principe authentifié et le projet est défini sur votre ID_PROJET : . Le résultat contient une ligne qui déclare l'ID_PROJET pour cette session :
Your Cloud Platform project in this session is set to {{{project_0.project_id | "PROJECT_ID"}}}
gcloud est l'outil de ligne de commande pour Google Cloud. Il est préinstallé sur Cloud Shell et permet la complétion par tabulation.
- (Facultatif) Vous pouvez lister les noms des comptes actifs à l'aide de cette commande :
gcloud auth list
- Cliquez sur Autoriser.
Résultat :
ACTIVE: *
ACCOUNT: {{{user_0.username | "ACCOUNT"}}}
To set the active account, run:
$ gcloud config set account `ACCOUNT`
- (Facultatif) Vous pouvez lister les ID de projet à l'aide de cette commande :
gcloud config list project
Résultat :
[core]
project = {{{project_0.project_id | "PROJECT_ID"}}}
Remarque : Pour consulter la documentation complète sur gcloud, dans Google Cloud, accédez au guide de présentation de la gcloud CLI.
Définir la région
- Dans Cloud Shell, exécutez la commande suivante pour définir la région du projet pour cet atelier :
gcloud config set compute/region {{{project_0.default_region | "REGION"}}}
Vérifier que l'API Dataflow est activée
Pour vous assurer que vous avez bien accès à l'API requise, redémarrez la connexion à l'API Dataflow.
gcloud services disable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}} --force
gcloud services enable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}}
Cliquez sur Vérifier ma progression pour valider l'objectif.
Désactiver et réactiver l'API Dataflow
Tâche 1 : Créer des ressources de projet
- Dans Cloud Shell, créez des variables pour votre bucket, votre projet et votre région.
PROJECT_ID=$(gcloud config get-value project)
BUCKET_NAME="${PROJECT_ID}-bucket"
TOPIC_ID=my-id
REGION={{{project_0.default_region | "filled in at lab start"}}}
- Définissez votre région App Engine.
Remarque : Pour les régions autres que us-central1 et europe-west1, définissez la variable de région App Engine sur la même valeur que la région attribuée. Si vous êtes affecté à la région us-central1, définissez la variable de région App Engine sur us-central. Si vous êtes affecté à la région europe-west1, définissez la variable de région App Engine sur europe-west.
Pour en savoir plus, consultez la page Emplacements App Engine.
AE_REGION={{{project_0.startup_script.app_region|region_to_be_set}}}
- Créez un bucket Cloud Storage appartenant à ce projet :
gsutil mb gs://$BUCKET_NAME
Remarque : Les noms des buckets Cloud Storage doivent être uniques dans le monde entier. L'ID de votre projet Qwiklabs est toujours unique. C'est pourquoi il est utilisé dans le nom de votre bucket dans cet atelier.
- Créez un sujet Pub/Sub dans ce projet :
gcloud pubsub topics create $TOPIC_ID
- Créez une application App Engine pour votre projet :
gcloud app create --region=$AE_REGION
- Créez un job Cloud Scheduler dans ce projet. Le job publie un message sur un sujet Cloud Pub/Sub chaque minute :
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
--topic=$TOPIC_ID --message-body="Hello!"
- Si vous êtes invité à activer l'API Cloud Scheduler, appuyez sur
y, puis sur Entrée.
Cliquez sur Vérifier ma progression pour valider l'objectif.
Créer des ressources de projet
- Démarrez le job.
gcloud scheduler jobs run publisher-job
Remarque : Si vous rencontrez une erreur RESOURCE_EXHAUSTED, essayez d'exécuter à nouveau la commande.
- Utilisez les commandes suivantes pour cloner le dépôt du guide de démarrage rapide et accéder au répertoire de l'exemple de code :
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsub/streaming-analytics
docker run -it -e DEVSHELL_PROJECT_ID=$DEVSHELL_PROJECT_ID python:3.7 /bin/bash
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/pubsub/streaming-analytics
pip install -U -r requirements.txt # Install Apache Beam dependencies
Remarque : Si vous utilisez l'option Python, exécutez les commandes Python individuellement.
Cliquez sur Vérifier ma progression pour valider l'objectif.
Démarrer le job Cloud Scheduler
Tâche 2 : Examiner le code permettant de diffuser des messages depuis Pub/Sub vers Cloud Storage
Exemple de code
Examinez l'exemple de code suivant, qui utilise Dataflow pour effectuer les opérations suivantes :
- Lire les messages Pub/Sub
- Effectuer le fenêtrage (ou le regroupement) de messages dans des intervalles fixes par codes temporels de publication
- Écrire les messages de chaque fenêtre dans des fichiers dans Cloud Storage
import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
public class PubSubToGcs {
/*
* Define your own configuration options. Add your own arguments to be processed
* by the command-line parser, and specify default values for them.
*/
public interface PubSubToGcsOptions extends StreamingOptions {
@Description("The Cloud Pub/Sub topic to read from.")
@Required
String getInputTopic();
void setInputTopic(String value);
@Description("Output file's window size in number of minutes.")
@Default.Integer(1)
Integer getWindowSize();
void setWindowSize(Integer value);
@Description("Path of the output file including its filename prefix.")
@Required
String getOutput();
void setOutput(String value);
}
public static void main(String[] args) throws IOException {
// The maximum number of shards when writing output.
int numShards = 1;
PubSubToGcsOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
pipeline
// 1) Read string messages from a Pub/Sub topic.
.apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
// 2) Group the messages into fixed-sized minute intervals.
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
// 3) Write one file to GCS for every window of messages.
.apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));
// Execute the pipeline and wait until it finishes running.
pipeline.run().waitUntilFinish();
}
}
import argparse
from datetime import datetime
import logging
import random
from apache_beam import (
DoFn,
GroupByKey,
io,
ParDo,
Pipeline,
PTransform,
WindowInto,
WithKeys,
)
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
class GroupMessagesByFixedWindows(PTransform):
"""A composite transform that groups Pub/Sub messages based on publish time
and outputs a list of tuples, each containing a message and its publish time.
"""
def __init__(self, window_size, num_shards=5):
# Set window size to 60 seconds.
self.window_size = int(window_size * 60)
self.num_shards = num_shards
def expand(self, pcoll):
return (
pcoll
# Bind window info to each element using element timestamp (or publish time).
| "Window into fixed intervals"
>> WindowInto(FixedWindows(self.window_size))
| "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
# Assign a random key to each windowed element based on the number of shards.
| "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
# Group windowed elements by key. All the elements in the same window must fit
# memory for this. If not, you need to use `beam.util.BatchElements`.
| "Group by key" >> GroupByKey()
)
class AddTimestamp(DoFn):
def process(self, element, publish_time=DoFn.TimestampParam):
"""Processes each windowed element by extracting the message body and its
publish time into a tuple.
"""
yield (
element.decode("utf-8"),
datetime.utcfromtimestamp(float(publish_time)).strftime(
"%Y-%m-%d %H:%M:%S.%f"
),
)
class WriteToGCS(DoFn):
def __init__(self, output_path):
self.output_path = output_path
def process(self, key_value, window=DoFn.WindowParam):
"""Write messages in a batch to Google Cloud Storage."""
ts_format = "%H:%M"
window_start = window.start.to_utc_datetime().strftime(ts_format)
window_end = window.end.to_utc_datetime().strftime(ts_format)
shard_id, batch = key_value
filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])
with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
for message_body, publish_time in batch:
f.write(f"{message_body},{publish_time}\n".encode())
def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
# Set `save_main_session` to True so DoFns can access globally imported modules.
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True
)
with Pipeline(options=pipeline_options) as pipeline:
(
pipeline
# Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
# binds the publish time returned by the Pub/Sub server for each message
# to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
# https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
| "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
| "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
| "Write to GCS" >> ParDo(WriteToGCS(output_path))
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_topic",
help="The Cloud Pub/Sub topic to read from."
'"projects//topics/".',
)
parser.add_argument(
"--window_size",
type=float,
default=1.0,
help="Output file's window size in minutes.",
)
parser.add_argument(
"--output_path",
help="Path of the output GCS file including the prefix.",
)
parser.add_argument(
"--num_shards",
type=int,
default=5,
help="Number of shards to use when writing windowed elements to GCS.",
)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.input_topic,
known_args.output_path,
known_args.window_size,
known_args.num_shards,
pipeline_args,
)
Remarque : Pour explorer plus en détail l'exemple de code, consultez les pages GitHub java-docs-samples et python-docs-samples.
Tâche 3 : Démarrer le pipeline
- Pour démarrer le pipeline, exécutez la commande suivante :
mvn compile exec:java \
-Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--project=$PROJECT_ID \
--region=$REGION \
--inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
--output=gs://$BUCKET_NAME/samples/output \
--runner=DataflowRunner \
--windowSize=2 \
--tempLocation=gs://$BUCKET_NAME/temp"
python PubSubToGCS.py \
--project=project_id \
--region=region \
--input_topic=projects/project_id/topics/my-id \
--output_path=gs://bucket_name/samples/output \
--runner=DataflowRunner \
--window_size=2 \
--num_shards=2 \
--temp_location=gs://bucket_name/temp
Remarque : Lorsque vous exécutez la commande Python, remplacez project_id, bucket_name et region par l'ID de votre projet, le nom de votre bucket et la région qui vous a été attribuée pour l'atelier.
La commande précédente s'exécute localement et lance un job Dataflow dans le cloud.
Remarque : Vous devrez peut-être attendre environ 10 minutes pour que le code s'exécute complètement et que le job de pipeline apparaisse dans la console Dataflow lors de la tâche suivante.
Remarque : Si vous recevez un avertissement concernant StaticLoggerBinder, vous pouvez l'ignorer sans problème et poursuivre l'atelier.
Cliquez sur Vérifier ma progression pour valider l'objectif.
Démarrer le pipeline et lancer le job Dataflow
Tâche 4 : Observer la progression du job et du pipeline
-
Accédez à la console Dataflow pour observer la progression du job.
-
Cliquez sur Actualiser pour afficher le job et les dernières mises à jour de son état.

- Cliquez sur le nom du job pour afficher les détails du job et examiner les éléments suivants :
- Structure du job
- Journaux du job
- Métriques de l'étape

Vous devrez peut-être patienter quelques minutes avant que les fichiers de sortie ne s'affichent dans Cloud Storage.
- Pour afficher les fichiers de sortie, accédez au menu de navigation > Cloud Storage, cliquez sur le nom de votre bucket, puis sur Exemples.

- Vous pouvez également quitter l'application dans Cloud Shell en appuyant sur CTRL+C (et en saisissant
exit pour l'option Python), puis exécuter la commande ci-dessous pour lister les fichiers qui ont été écrits dans Cloud Storage :
gsutil ls gs://${BUCKET_NAME}/samples/
Le résultat doit se présenter sous la forme suivante :
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
Tâche 5 : Nettoyage
- Si ce n'est pas déjà fait, quittez l'application dans Cloud Shell en appuyant sur Ctrl+C.
Pour l'option Python, saisissez exit pour quitter l'environnement Python.
- Dans Cloud Shell, supprimez le job Cloud Scheduler :
gcloud scheduler jobs delete publisher-job
Si l'invite "Voulez-vous continuer ?" s'affiche, appuyez sur Y, puis sur Entrée.
- Dans la console Dataflow, arrêtez le job en sélectionnant son nom, puis en cliquant sur Arrêter.
Lorsque vous y êtes invité, cliquez sur Arrêter le job > Annuler pour annuler le pipeline sans vidange.
- Dans Cloud Shell, supprimez le sujet :
gcloud pubsub topics delete $TOPIC_ID
- Dans Cloud Shell, supprimez les fichiers créés par le pipeline :
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
- Dans Cloud Shell, supprimez le bucket Cloud Storage :
gsutil rb gs://${BUCKET_NAME}
Félicitations !
Vous avez créé un pipeline Dataflow qui lit les messages de votre sujet Pub/Sub, les filtre par code temporel et les écrit dans votre bucket Cloud Storage.
Étape suivante/en savoir plus
Formations et certifications Google Cloud
Les formations et certifications Google Cloud vous aident à tirer pleinement parti des technologies Google Cloud. Nos cours portent sur les compétences techniques et les bonnes pratiques à suivre pour être rapidement opérationnel et poursuivre votre apprentissage. Nous proposons des formations pour tous les niveaux, à la demande, en salle et à distance, pour nous adapter aux emplois du temps de chacun. Les certifications vous permettent de valider et de démontrer vos compétences et votre expérience en matière de technologies Google Cloud.
Dernière mise à jour du manuel : 20 août 2025
Dernier test de l'atelier : 20 août 2025
Copyright 2025 Google LLC. Tous droits réservés. Google et le logo Google sont des marques de Google LLC. Tous les autres noms d'entreprises et de produits peuvent être des marques des entreprises auxquelles ils sont associés.