GSP903

Übersicht
Google Cloud Pub/Sub ist ein Messaging-Dienst für den Austausch von Ereignisdaten zwischen Anwendungen und Diensten. Ein Datenersteller veröffentlicht eine Nachricht zu einem Cloud Pub/Sub-Thema. Ein Nutzer erstellt ein Abo für dieses Thema. Abonnenten rufen Nachrichten entweder von einem Abo ab oder werden als Webhooks für Push-Abos konfiguriert. Jeder Abonnent muss die einzelnen Nachrichten innerhalb eines konfigurierbaren Zeitfensters bestätigen.
Dataflow ist ein vollständig verwalteter Dienst zum Transformieren und Anreichern von Daten im Streammodus (Echtzeitdaten) und Batchmodus (Verlaufsdaten) mit gleicher Zuverlässigkeit und Aussagekraft. Es bietet eine vereinfachte Pipeline-Entwicklungsumgebung mit dem Apache Beam SDK, das eine Vielzahl von Windowing- und Sitzungsanalyse-Primitiven sowie ein Ökosystem von Quell- und Sink-Connectors bietet.
Pub/Sub ist ein skalierbares, langlebiges System zur Aufnahme und Übermittlung von Ereignissen. Dataflow ergänzt das skalierbare Pub/Sub-Modell von mindestens einer Übermittlung durch die Deduplizierung von Nachrichten und der genau einmaligen Verarbeitung in Reihenfolge, wenn Fenster und Zwischenspeicher verwendet werden.
Aufgaben
- Nachrichten lesen, die in einem Pub/Sub-Thema veröffentlicht wurden
- Windowing (oder Gruppieren) von Nachrichten nach Zeitstempel
- Nachrichten in Cloud Storage schreiben
Einrichtung
Vor dem Klick auf „Start Lab“ (Lab starten)
Lesen Sie diese Anleitung. Labs sind zeitlich begrenzt und können nicht pausiert werden. Der Timer beginnt zu laufen, wenn Sie auf Lab starten klicken, und zeigt Ihnen, wie lange Google Cloud-Ressourcen für das Lab verfügbar sind.
In diesem praxisorientierten Lab können Sie die Lab-Aktivitäten in einer echten Cloud-Umgebung durchführen – nicht in einer Simulations- oder Demo-Umgebung. Dazu erhalten Sie neue, temporäre Anmeldedaten, mit denen Sie für die Dauer des Labs auf Google Cloud zugreifen können.
Für dieses Lab benötigen Sie Folgendes:
- Einen Standardbrowser (empfohlen wird Chrome)
Hinweis: Nutzen Sie den privaten oder Inkognitomodus (empfohlen), um dieses Lab durchzuführen. So wird verhindert, dass es zu Konflikten zwischen Ihrem persönlichen Konto und dem Teilnehmerkonto kommt und zusätzliche Gebühren für Ihr persönliches Konto erhoben werden.
- Zeit für die Durchführung des Labs – denken Sie daran, dass Sie ein begonnenes Lab nicht unterbrechen können.
Hinweis: Verwenden Sie für dieses Lab nur das Teilnehmerkonto. Wenn Sie ein anderes Google Cloud-Konto verwenden, fallen dafür möglicherweise Kosten an.
Lab starten und bei der Google Cloud Console anmelden
-
Klicken Sie auf Lab starten. Wenn Sie für das Lab bezahlen müssen, wird ein Dialogfeld geöffnet, in dem Sie Ihre Zahlungsmethode auswählen können.
Auf der linken Seite befindet sich der Bereich „Details zum Lab“ mit diesen Informationen:
- Schaltfläche „Google Cloud Console öffnen“
- Restzeit
- Temporäre Anmeldedaten für das Lab
- Ggf. weitere Informationen für dieses Lab
-
Klicken Sie auf Google Cloud Console öffnen (oder klicken Sie mit der rechten Maustaste und wählen Sie Link in Inkognitofenster öffnen aus, wenn Sie Chrome verwenden).
Im Lab werden Ressourcen aktiviert. Anschließend wird ein weiterer Tab mit der Seite „Anmelden“ geöffnet.
Tipp: Ordnen Sie die Tabs nebeneinander in separaten Fenstern an.
Hinweis: Wird das Dialogfeld Konto auswählen angezeigt, klicken Sie auf Anderes Konto verwenden.
-
Kopieren Sie bei Bedarf den folgenden Nutzernamen und fügen Sie ihn in das Dialogfeld Anmelden ein.
{{{user_0.username | "Username"}}}
Sie finden den Nutzernamen auch im Bereich „Details zum Lab“.
-
Klicken Sie auf Weiter.
-
Kopieren Sie das folgende Passwort und fügen Sie es in das Dialogfeld Willkommen ein.
{{{user_0.password | "Password"}}}
Sie finden das Passwort auch im Bereich „Details zum Lab“.
-
Klicken Sie auf Weiter.
Wichtig: Sie müssen die für das Lab bereitgestellten Anmeldedaten verwenden. Nutzen Sie nicht die Anmeldedaten Ihres Google Cloud-Kontos.
Hinweis: Wenn Sie Ihr eigenes Google Cloud-Konto für dieses Lab nutzen, können zusätzliche Kosten anfallen.
-
Klicken Sie sich durch die nachfolgenden Seiten:
- Akzeptieren Sie die Nutzungsbedingungen.
- Fügen Sie keine Wiederherstellungsoptionen oder Zwei-Faktor-Authentifizierung hinzu (da dies nur ein temporäres Konto ist).
- Melden Sie sich nicht für kostenlose Testversionen an.
Nach wenigen Augenblicken wird die Google Cloud Console in diesem Tab geöffnet.
Hinweis: Wenn Sie auf Google Cloud-Produkte und ‑Dienste zugreifen möchten, klicken Sie auf das Navigationsmenü oder geben Sie den Namen des Produkts oder Dienstes in das Feld Suchen ein.
Cloud Shell aktivieren
Cloud Shell ist eine virtuelle Maschine, auf der Entwicklertools installiert sind. Sie bietet ein Basisverzeichnis mit 5 GB nichtflüchtigem Speicher und läuft auf Google Cloud. Mit Cloud Shell erhalten Sie Befehlszeilenzugriff auf Ihre Google Cloud-Ressourcen.
-
Klicken Sie oben in der Google Cloud Console auf Cloud Shell aktivieren
.
-
Klicken Sie sich durch die folgenden Fenster:
- Fahren Sie mit dem Informationsfenster zu Cloud Shell fort.
- Autorisieren Sie Cloud Shell, Ihre Anmeldedaten für Google Cloud API-Aufrufe zu verwenden.
Wenn eine Verbindung besteht, sind Sie bereits authentifiziert und das Projekt ist auf Project_ID, eingestellt. Die Ausgabe enthält eine Zeile, in der die Project_ID für diese Sitzung angegeben ist:
Ihr Cloud-Projekt in dieser Sitzung ist festgelegt als {{{project_0.project_id | "PROJECT_ID"}}}
gcloud ist das Befehlszeilentool für Google Cloud. Das Tool ist in Cloud Shell vorinstalliert und unterstützt die Tab-Vervollständigung.
- (Optional) Sie können den aktiven Kontonamen mit diesem Befehl auflisten:
gcloud auth list
- Klicken Sie auf Autorisieren.
Ausgabe:
ACTIVE: *
ACCOUNT: {{{user_0.username | "ACCOUNT"}}}
Um das aktive Konto festzulegen, führen Sie diesen Befehl aus:
$ gcloud config set account `ACCOUNT`
- (Optional) Sie können die Projekt-ID mit diesem Befehl auflisten:
gcloud config list project
Ausgabe:
[core]
project = {{{project_0.project_id | "PROJECT_ID"}}}
Hinweis: Die vollständige Dokumentation für gcloud finden Sie in Google Cloud in der Übersicht zur gcloud CLI.
Region einrichten
- Führen Sie in Cloud Shell den folgenden Befehl aus, um die Projektregion für dieses Lab festzulegen:
gcloud config set compute/region {{{project_0.default_region | "REGION"}}}
Dataflow API neu aktivieren
Damit Sie Zugriff auf die erforderliche API haben, starten Sie die Verbindung zur Dataflow API neu.
gcloud services disable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}} --force
gcloud services enable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}}
Klicken Sie auf Fortschritt prüfen.
Dataflow API deaktivieren und wieder aktivieren.
Aufgabe 1: Projektressourcen erstellen
- Erstellen Sie in Cloud Shell Variablen für Ihren Bucket, Ihr Projekt und Ihre Region.
PROJECT_ID=$(gcloud config get-value project)
BUCKET_NAME="${PROJECT_ID}-bucket"
TOPIC_ID=my-id
REGION={{{project_0.default_region | "filled in at lab start"}}}
- Legen Sie die App Engine-Region fest.
Hinweis: Für andere Regionen als us-central1 und europe-west1 muss die App Engine-Regionsvariable auf die zugewiesene Region gesetzt werden. Wenn Ihnen us-central1 zugewiesen wurde, setzen Sie die Variable für die App Engine-Region auf us-central. Wenn Ihnen europe-west1 zugewiesen wurde, legen Sie die App Engine-Regionsvariable auf europe-west fest.
Weitere Informationen finden Sie unter App Engine-Standorte.
AE_REGION={{{project_0.startup_script.app_region|region_to_be_set}}}
- Erstellen Sie einen Cloud Storage-Bucket, der zu diesem Projekt gehört:
gsutil mb gs://$BUCKET_NAME
Hinweis: Cloud Storage-Bucket-Namen müssen global eindeutig sein. Ihre Qwiklabs-Projekt-ID ist immer eindeutig. Daher wird sie in diesem Lab als Bucket-Name verwendet.
- Erstellen Sie ein Pub/Sub-Thema in diesem Projekt:
gcloud pubsub topics create $TOPIC_ID
- Erstellen Sie eine App Engine-Anwendung für Ihr Projekt:
gcloud app create --region=$AE_REGION
- Erstellen Sie einen Cloud Scheduler-Job in diesem Projekt. Der Job veröffentlicht eine Nachricht zu einem Pub/Sub-Thema in Intervallen von einer Minute:
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
--topic=$TOPIC_ID --message-body="Hello!"
- Wenn Sie aufgefordert werden, die Cloud Scheduler API zu aktivieren, drücken Sie
y und die Eingabetaste.
Klicken Sie auf Fortschritt prüfen.
Projektrssourcen erstellen
- Job starten:
gcloud scheduler jobs run publisher-job
Hinweis: Wenn der Fehler RESOURCE_EXHAUSTED auftritt, versuchen Sie, den Befehl noch einmal auszuführen.
- Verwenden Sie die folgenden Befehle, um das Schnellstart-Repository zu klonen und zum Beispielcodeverzeichnis zu gehen:
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsub/streaming-analytics
docker run -it -e DEVSHELL_PROJECT_ID=$DEVSHELL_PROJECT_ID python:3.7 /bin/bash
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/pubsub/streaming-analytics
pip install -U -r requirements.txt # Install Apache Beam dependencies
Hinweis: Wenn Sie die Python-Option verwenden, führen Sie die Python-Befehle einzeln aus.
Klicken Sie auf Fortschritt prüfen.
Starten Sie den Cloud Scheduler-Job
Aufgabe 2: Code zum Streamen von Nachrichten von Pub/Sub zu Cloud Storage prüfen
Codebeispiel
Sehen Sie sich den folgenden Beispielcode an, in dem Dataflow für Folgendes verwendet wird:
- Pub/Sub-Nachrichten lesen.
- Windowing (oder Gruppieren) von Nachrichten in festen Intervallen nach Veröffentlichungszeitstempeln.
- Die Nachrichten in jedem Fenster in Dateien in Cloud Storage schreiben.
import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
public class PubSubToGcs {
/*
* Define your own configuration options. Sie können Ihre eigenen Argumente hinzufügen, damit sie vom Befehlszeilen-Parser verarbeitet werden, und entsprechende Standardwerte für sie angeben.
*/
public interface PubSubToGcsOptions extends StreamingOptions {
@Description("The Cloud Pub/Sub topic to read from.")
@Required
String getInputTopic();
void setInputTopic(String value);
@Description("Output file's window size in number of minutes.")
@Default.Integer(1)
Integer getWindowSize();
void setWindowSize(Integer value);
@Description("Path of the output file including its filename prefix.")
@Required
String getOutput();
void setOutput(String value);
}
public static void main(String[] args) throws IOException {
// The maximum number of shards when writing output.
int numShards = 1;
PubSubToGcsOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
pipeline
// 1) Read string messages from a Pub/Sub topic.
.apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
// 2) Group the messages into fixed-sized minute intervals.
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
// 3) Write one file to GCS for every window of messages.
.apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));
// Execute the pipeline and wait until it finishes running.
pipeline.run().waitUntilFinish();
}
}
import argparse
from datetime import datetime
import logging
import random
from apache_beam import (
DoFn,
GroupByKey,
io,
ParDo,
Pipeline,
PTransform,
WindowInto,
WithKeys,
)
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
class GroupMessagesByFixedWindows(PTransform):
"""A composite transform that groups Pub/Sub messages based on publish time
and outputs a list of tuples, each containing a message and its publish time.
"""
def __init__(self, window_size, num_shards=5):
# Set window size to 60 seconds.
self.window_size = int(window_size * 60)
self.num_shards = num_shards
def expand(self, pcoll):
return (
pcoll
# Bind window info to each element using element timestamp (or publish time).
| „Fenster in feste Intervalle“
>> WindowInto(FixedWindows(self.window_size))
| „Zeitstempel zu Fensterelementen hinzufügen“ >> ParDo(AddTimestamp())
# Jedem Fensterelement wird basierend auf der Anzahl der Shards ein zufälliger Schlüssel zugewiesen.
| "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
# Group windowed elements by key. Alle Elemente im selben Fenster müssen in den Speicher passen. Wenn nicht, müssen Sie `beam.util.BatchElements` verwenden.
| "Group by key" >> GroupByKey()
)
class AddTimestamp(DoFn):
def process(self, element, publish_time=DoFn.TimestampParam):
"""Processes each windowed element by extracting the message body and its
publish time into a tuple.
"""
yield (
element.decode("utf-8"),
datetime.utcfromtimestamp(float(publish_time)).strftime(
"%Y-%m-%d %H:%M:%S.%f"
),
)
class WriteToGCS(DoFn):
def __init__(self, output_path):
self.output_path = output_path
def process(self, key_value, window=DoFn.WindowParam):
"""Write messages in a batch to Google Cloud Storage."""
ts_format = "%H:%M"
window_start = window.start.to_utc_datetime().strftime(ts_format)
window_end = window.end.to_utc_datetime().strftime(ts_format)
shard_id, batch = key_value
filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])
with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
for message_body, publish_time in batch:
f.write(f"{message_body},{publish_time}\n".encode())
def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
# Set `save_main_session` to True so DoFns can access globally imported modules.
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True
)
with Pipeline(options=pipeline_options) as pipeline:
(
pipeline
# Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
# binds the publish time returned by the Pub/Sub server for each message
# to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
# https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
| "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
| "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
| "Write to GCS" >> ParDo(WriteToGCS(output_path))
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_topic",
help="The Cloud Pub/Sub topic to read from."
'"projects//topics/".',
)
parser.add_argument(
"--window_size",
type=float,
default=1.0,
help="Output file's window size in minutes.",
)
parser.add_argument(
"--output_path",
help="Path of the output GCS file including the prefix.",
)
parser.add_argument(
"--num_shards",
type=int,
default=5,
help="Number of shards to use when writing windowed elements to GCS.",
)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.input_topic,
known_args.output_path,
known_args.window_size,
known_args.num_shards,
pipeline_args,
)
Hinweis: Weitere Informationen zum Beispielcode finden Sie auf den entsprechenden GitHub-Seiten java-docs-samples und python-docs-samples.
Aufgabe 3: Pipeline starten
- Führen Sie den folgenden Befehl aus, um die Pipeline zu starten:
mvn compile exec:java \
-Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--project=$PROJECT_ID \
--region=$REGION \
--inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
--output=gs://$BUCKET_NAME/samples/output \
--runner=DataflowRunner \
--windowSize=2 \
--tempLocation=gs://$BUCKET_NAME/temp"
python PubSubToGCS.py \
--project=project_id \
--region=region \
--input_topic=projects/project_id/topics/my-id \
--output_path=gs://bucket_name/samples/output \
--runner=DataflowRunner \
--window_size=2 \
--num_shards=2 \
--temp_location=gs://bucket_name/temp
Hinweis: Ersetzen Sie beim Ausführen des Python-Befehls project_id, bucket_name und region durch Ihre Projekt-ID, den Bucket-Namen und die zugewiesene Lab-Region.
Der vorherige Befehl wird lokal ausgeführt und startet einen Dataflow-Job, der in der Cloud ausgeführt wird.
Hinweis: Es kann etwa 10 Minuten dauern, bis der Code vollständig ausgeführt wurde und der Pipeline-Job in der nächsten Aufgabe in der Dataflow Console angezeigt wird.
Hinweis: Wenn Sie eine Warnung bezüglich StaticLoggerBinder erhalten, können Sie diese ignorieren und mit dem Lab fortfahren.
Klicken Sie auf Fortschritt prüfen.
Pipeline starten und Dataflow-Job ausführen
Aufgabe 4: Job- und Pipeline-Fortschritt beobachten
-
Rufen Sie die Dataflow-Konsole auf, um den Fortschritt des Jobs zu beobachten.
-
Klicken Sie auf Aktualisieren, um den Job und die neuesten Statusupdates zu sehen.

- Klicken Sie auf den Jobnamen, um die Jobdetails zu öffnen und Folgendes zu prüfen:
- Jobstruktur
- Jobprotokolle
- Anzeigebereich-Messwerte

Es kann einige Minuten dauern, bis die Ausgabedateien in Cloud Storage angezeigt werden.
- Die Ausgabedateien finden Sie unter Navigationsmenü > Cloud Storage. Klicken Sie auf den Namen Ihres Buckets und dann auf Samples.

- Alternativ können Sie die Anwendung in Cloud Shell mit STRG + C beenden (und bei der Python-Option
exit eingeben) und dann den folgenden Befehl ausführen, um die Dateien aufzulisten, die in Cloud Storage geschrieben wurden:
gsutil ls gs://${BUCKET_NAME}/samples/
Die Ausgabe sollte so aussehen:
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
Aufgabe 5: Bereinigen
- Falls noch nicht geschehen, beenden Sie die Anwendung in Cloud Shell mit STRG+C.
Geben Sie für die Python-Option exit ein, um die Python-Umgebung zu beenden.
- Löschen Sie in Cloud Shell den Cloud Scheduler-Job:
gcloud scheduler jobs delete publisher-job
Wenn Sie gefragt werden, ob Sie fortfahren möchten, drücken Sie Y und die Eingabetaste.
- Beenden Sie den Job in der Dataflow-Konsole. Wählen Sie dazu den Jobnamen aus und klicken Sie auf Beenden.
Klicken Sie auf Job anhalten > Abbrechen, um die Pipeline zu beenden, ohne sie zu leeren.
- Löschen Sie das Thema in Cloud Shell:
gcloud pubsub topics delete $TOPIC_ID
- Löschen Sie in Cloud Shell die von der Pipeline erstellten Dateien:
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
- Erstellen Sie in Cloud Shell den Cloud Storage-Bucket:
gsutil rb gs://${BUCKET_NAME}
Glückwunsch!
Sie haben eine Dataflow-Pipeline erstellt, die Nachrichten aus Ihrem Pub/Sub-Thema liest, sie nach Zeitstempel einordnet und sie in Ihren Cloud Storage-Bucket schreibt.
Weitere Informationen
Google Cloud-Schulungen und -Zertifizierungen
In unseren Schulungen erfahren Sie alles zum optimalen Einsatz unserer Google Cloud-Technologien und können sich entsprechend zertifizieren lassen. Unsere Kurse vermitteln technische Fähigkeiten und Best Practices, damit Sie möglichst schnell mit Google Cloud loslegen und Ihr Wissen fortlaufend erweitern können. Wir bieten On-Demand-, Präsenz- und virtuelle Schulungen für Anfänger wie Fortgeschrittene an, die Sie individuell in Ihrem eigenen Zeitplan absolvieren können. Mit unseren Zertifizierungen weisen Sie nach, dass Sie Experte im Bereich Google Cloud-Technologien sind.
Anleitung zuletzt am 20. August 2025 aktualisiert
Lab zuletzt am 20. August 2025 getestet
© 2025 Google LLC. Alle Rechte vorbehalten. Google und das Google-Logo sind Marken von Google LLC. Alle anderen Unternehmens- und Produktnamen können Marken der jeweils mit ihnen verbundenen Unternehmen sein.