GSP290

Übersicht
Dataflow ist ein Google Cloud-Dienst, der die einheitliche Verarbeitung großer Stream- und Batchdatenmengen ermöglicht. Er basiert auf dem Apache Beam-Projekt, einem Open-Source-Modell, mit dem sowohl Batch- als auch Streamingpipelines zur parallelen Verarbeitung von Daten definiert werden können. Mit einem der als Open Source veröffentlichten Apache Beam SDKs können Sie ein Programm erstellen, das die Pipeline definiert, und diese dann mit Dataflow ausführen.
In diesem Lab erstellen Sie mit dem Apache Beam SDK für Python eine Pipeline und führen sie in Dataflow aus, um Daten aus Cloud Storage in BigQuery aufzunehmen und dann in BigQuery zu transformieren und anzureichern.
Hinweis: Öffnen Sie unbedingt die Python-Dateien und lesen Sie die Kommentare, wenn Sie dazu aufgefordert werden. So können Sie nachvollziehen, was der Code bewirkt.
Aufgaben
In diesem Lab erfahren Sie, wie Sie Dataflow-Pipelines (Python) erstellen und ausführen, um Folgendes zu tun:
- Daten aus Cloud Storage in BigQuery aufnehmen
- Daten in BigQuery transformieren und anreichern
- Daten in BigQuery zusammenführen und die Ergebnisse in eine neue Tabelle schreiben
Einrichtung und Anforderungen
Vor dem Klick auf „Start Lab“ (Lab starten)
Lesen Sie diese Anleitung. Labs sind zeitlich begrenzt und können nicht pausiert werden. Der Timer beginnt zu laufen, wenn Sie auf Lab starten klicken, und zeigt Ihnen, wie lange Google Cloud-Ressourcen für das Lab verfügbar sind.
In diesem praxisorientierten Lab können Sie die Lab-Aktivitäten in einer echten Cloud-Umgebung durchführen – nicht in einer Simulations- oder Demo-Umgebung. Dazu erhalten Sie neue, temporäre Anmeldedaten, mit denen Sie für die Dauer des Labs auf Google Cloud zugreifen können.
Für dieses Lab benötigen Sie Folgendes:
- Einen Standardbrowser (empfohlen wird Chrome)
Hinweis: Nutzen Sie den privaten oder Inkognitomodus (empfohlen), um dieses Lab durchzuführen. So wird verhindert, dass es zu Konflikten zwischen Ihrem persönlichen Konto und dem Teilnehmerkonto kommt und zusätzliche Gebühren für Ihr persönliches Konto erhoben werden.
- Zeit für die Durchführung des Labs – denken Sie daran, dass Sie ein begonnenes Lab nicht unterbrechen können.
Hinweis: Verwenden Sie für dieses Lab nur das Teilnehmerkonto. Wenn Sie ein anderes Google Cloud-Konto verwenden, fallen dafür möglicherweise Kosten an.Lab starten und bei der Google Cloud Console anmelden
- 
Klicken Sie auf Lab starten. Wenn Sie für das Lab bezahlen müssen, wird ein Dialogfeld geöffnet, in dem Sie Ihre Zahlungsmethode auswählen können.
Auf der linken Seite befindet sich der Bereich „Details zum Lab“ mit diesen Informationen: 
- Schaltfläche „Google Cloud Console öffnen“
- Restzeit
- Temporäre Anmeldedaten für das Lab
- Ggf. weitere Informationen für dieses Lab
 
- 
Klicken Sie auf Google Cloud Console öffnen (oder klicken Sie mit der rechten Maustaste und wählen Sie Link in Inkognitofenster öffnen aus, wenn Sie Chrome verwenden). Im Lab werden Ressourcen aktiviert. Anschließend wird ein weiterer Tab mit der Seite „Anmelden“ geöffnet. Tipp: Ordnen Sie die Tabs nebeneinander in separaten Fenstern an. Hinweis: Wird das Dialogfeld Konto auswählen angezeigt, klicken Sie auf Anderes Konto verwenden.
- 
Kopieren Sie bei Bedarf den folgenden Nutzernamen und fügen Sie ihn in das Dialogfeld Anmelden ein. {{{user_0.username | "Username"}}}Sie finden den Nutzernamen auch im Bereich „Details zum Lab“. 
- 
Klicken Sie auf Weiter. 
- 
Kopieren Sie das folgende Passwort und fügen Sie es in das Dialogfeld Willkommen ein. {{{user_0.password | "Password"}}}Sie finden das Passwort auch im Bereich „Details zum Lab“. 
- 
Klicken Sie auf Weiter. Wichtig: Sie müssen die für das Lab bereitgestellten Anmeldedaten verwenden. Nutzen Sie nicht die Anmeldedaten Ihres Google Cloud-Kontos. 
 
Hinweis: Wenn Sie Ihr eigenes Google Cloud-Konto für dieses Lab nutzen, können zusätzliche Kosten anfallen.
- 
Klicken Sie sich durch die nachfolgenden Seiten: 
- Akzeptieren Sie die Nutzungsbedingungen.
- Fügen Sie keine Wiederherstellungsoptionen oder Zwei-Faktor-Authentifizierung hinzu (da dies nur ein temporäres Konto ist).
- Melden Sie sich nicht für kostenlose Testversionen an.
 
Nach wenigen Augenblicken wird die Google Cloud Console in diesem Tab geöffnet.
Hinweis: Wenn Sie auf Google Cloud-Produkte und ‑Dienste zugreifen möchten, klicken Sie auf das Navigationsmenü oder geben Sie den Namen des Produkts oder Dienstes in das Feld Suchen ein.
 
Cloud Shell aktivieren
Cloud Shell ist eine virtuelle Maschine, auf der Entwicklertools installiert sind. Sie bietet ein Basisverzeichnis mit 5 GB nichtflüchtigem Speicher und läuft auf Google Cloud. Mit Cloud Shell erhalten Sie Befehlszeilenzugriff auf Ihre Google Cloud-Ressourcen.
- 
Klicken Sie oben in der Google Cloud Console auf Cloud Shell aktivieren  . .
 
- 
Klicken Sie sich durch die folgenden Fenster: 
- Fahren Sie mit dem Informationsfenster zu Cloud Shell fort.
- Autorisieren Sie Cloud Shell, Ihre Anmeldedaten für Google Cloud API-Aufrufe zu verwenden.
 
Wenn eine Verbindung besteht, sind Sie bereits authentifiziert und das Projekt ist auf Project_ID,  eingestellt. Die Ausgabe enthält eine Zeile, in der die Project_ID für diese Sitzung angegeben ist:
Ihr Cloud-Projekt in dieser Sitzung ist festgelegt als {{{project_0.project_id | "PROJECT_ID"}}}
gcloud ist das Befehlszeilentool für Google Cloud. Das Tool ist in Cloud Shell vorinstalliert und unterstützt die Tab-Vervollständigung.
- (Optional) Sie können den aktiven Kontonamen mit diesem Befehl auflisten:
gcloud auth list
- Klicken Sie auf Autorisieren.
Ausgabe:
ACTIVE: *
ACCOUNT: {{{user_0.username | "ACCOUNT"}}}
Um das aktive Konto festzulegen, führen Sie diesen Befehl aus:
    $ gcloud config set account `ACCOUNT`
- (Optional) Sie können die Projekt-ID mit diesem Befehl auflisten:
gcloud config list projectAusgabe:
[core]
project = {{{project_0.project_id | "PROJECT_ID"}}}
Hinweis: Die vollständige Dokumentation für gcloud finden Sie in Google Cloud in der Übersicht zur gcloud CLI.
Aufgabe 1: Dataflow API neu aktivieren
gcloud services disable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}} --force
gcloud services enable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}}
Wenn die API wieder aktiviert ist, wird auf der Seite die Option Deaktivieren angezeigt.
Klicken Sie auf Fortschritt prüfen.
Dataflow API deaktivieren und wieder aktivieren.
Aufgabe 2: Starter-Code herunterladen
Laden Sie die Dataflow-Python-Beispiele für dieses Lab herunter.
gcloud storage cp -r gs://spls/gsp290/dataflow-python-examples .
Aufgabe 3: Cloud Storage-Bucket erstellen und Dateien in den Bucket kopieren
Erstellen Sie in Cloud Shell einen Cloud Storage-Bucket und kopieren Sie dann Dateien in den Bucket. Diese Dateien sind die Dataflow-Python-Beispiele.
Cloud Storage-Bucket erstellen
- Verwenden Sie in Cloud Shell den Befehl zum Erstellen eines Buckets, um einen neuen regionalen Bucket in der Region  in Ihrem Projekt zu erstellen:
gcloud storage buckets create gs://{{{ project_0.project_id | BUCKET_NAME }}} --location={{{ project_0.default_region | REGION }}}Klicken Sie auf Fortschritt prüfen.
Cloud Storage-Bucket erstellen.
Dateien in Ihren Bucket kopieren
- Kopieren Sie in Cloud Shell mit dem Befehl gsutilDateien in den gerade erstellten Cloud Storage-Bucket:
gcloud storage cp gs://spls/gsp290/data_files/usa_names.csv gs://{{{ project_0.project_id | BUCKET_NAME }}}/data_files/
gcloud storage cp gs://spls/gsp290/data_files/head_usa_names.csv gs://{{{ project_0.project_id | BUCKET_NAME }}}/data_files/Klicken Sie auf Fortschritt prüfen.
Dateien in Ihren Bucket kopieren.
Aufgabe 4: BigQuery-Dataset erstellen
Sie erstellen ein Dataset in BigQuery. Hier werden Ihre Tabellen in BigQuery geladen.
- Erstellen Sie in Cloud Shell das Dataset lake:
bq mk lakeKlicken Sie auf Fortschritt prüfen.
BigQuery-Dataset „lake“ erstellen.
Aufgabe 5: Datenaufnahmepipeline prüfen und ausführen
In dieser Aufgabe sehen Sie sich den Pipelinecode an, um herauszufinden, wie er funktioniert. Anschließend richten Sie die Pipeline ein und führen sie aus.
Die Datenaufnahmepipeline nimmt Daten mithilfe einer TextIO-Quelle und eines BigQueryIO-Ziels aus Cloud Storage in die BigQuery-Tabelle auf. Im Detail macht die Pipeline Folgendes:
-  Die Dateien werden aus Cloud Storage aufgenommen.
-  Die Kopfzeile in den Dateien wird herausgefiltert.
-  Die gelesenen Zeilen werden in Wörterbuchobjekte umgewandelt.
-  Die Zeilen werden nach BigQuery ausgegeben.
Gemini Code Assist in der Cloud Shell-IDE aktivieren
Sie können Gemini Code Assist in einer integrierten Entwicklungsumgebung (Integrated Development Environment, IDE) wie Cloud Shell verwenden, um Unterstützung beim Programmieren zu erhalten oder Probleme mit Ihrem Code zu lösen. Bevor Sie Gemini Code Assist verwenden können, müssen Sie das Tool aktivieren.
- Aktivieren Sie in Cloud Shell die Gemini for Google Cloud API mit dem folgenden Befehl:
gcloud services enable cloudaicompanion.googleapis.com
- Klicken Sie in der Cloud Shell-Symbolleiste auf Editor öffnen.
Hinweis: Klicken Sie zum Öffnen des Cloud Shell-Editors in der Cloud Shell-Symbolleiste auf Editor öffnen. Sie können zwischen Cloud Shell und dem Code-Editor wechseln. Klicken Sie dazu entsprechend auf Editor öffnen oder Terminal öffnen.
- 
Klicken Sie im linken Bereich auf das Symbol Einstellungen und suchen Sie unter Einstellungen nach Gemini Code Assist. 
- 
Suchen Sie nach Gemini Code Assist: Aktivieren und prüfen Sie, ob das Kästchen ausgewählt ist. Schließen Sie dann die Einstellungen. 
- 
Klicken Sie in der Statusleiste unten auf dem Bildschirm auf Cloud Code – kein Projekt. 
- 
Autorisieren Sie das Plug-in wie beschrieben. Wenn kein Projekt automatisch ausgewählt wurde, klicken Sie auf Google Cloud-Projekt auswählen und wählen Sie  aus. 
- 
Prüfen Sie, ob Ihr Google Cloud-Projekt () in der Cloud Code-Statusmeldung in der Statusleiste angezeigt wird. 
Python-Code für die Datenaufnahmepipeline prüfen
In diesem Abschnitt senden Sie einen Prompt an Gemini Code Assist, um weitere Informationen zur Datenaufnahmepipeline zu erhalten. Diese Übersicht möchten Sie an ein neues Teammitglied weiterleiten.
- 
Gehen Sie im Datei-Explorer des Cloud Shell-Editors zu dataflow_python_examples > dataflow_python_examples > data_ingestion.py. 
- 
Öffnen Sie die Datei data_ingestion.py. Dadurch wird Gemini Code Assist aktiviert. Dies lässt sich am Symbol rechts oben im Editor ablesen. rechts oben im Editor ablesen.
 
- 
Klicken Sie auf das Symbol Gemini Code Assist: Intelligente Aktionen  und wählen Sie Erkläre mir das aus. und wählen Sie Erkläre mir das aus.
 
- 
Gemini Code Assist öffnet ein Chatfenster mit dem vorausgefüllten Prompt Erkläre mir das. Ersetzen Sie im Inline-Textfeld des Code Assist-Chats den vorausgefüllten Prompt durch Folgendes und klicken Sie auf Senden:
 
You are an expert Data Engineer at Cymbal AI. A new team member is unfamiliar with this pipeline code. Explain the purpose and functionality of the data ingestion pipeline defined in the data_ingestion.py. Your explanation should include:
1. A high-level summary of what the script does.
2. A breakdown of the key components, such as the DataIngestion class and the run function.
3. An explanation of how the script uses the Apache Beam pipeline to read, process, and write data.
4. The role of command-line arguments and how they are used.
5. A description of the input data format and the output BigQuery table schema.
For the suggested improvements, don't update this file.Durch diesen Code wird eine BigQuery-Tabelle mit den Datendateien aus Cloud Storage ausgefüllt. Die detaillierte Erklärung des Codes in der Datei data_ingestion.py wird im Gemini Code Assist-Chat angezeigt.
- Klicken Sie auf Terminal öffnen, um zu Cloud Shell zurückzukehren.
Docker-Container für die Dataflow-Jobs einrichten
In diesem Abschnitt kehren Sie zu Ihrer Cloud Shell-Sitzung zurück, um die erforderlichen Python-Bibliotheken einzurichten.
Für die Dataflow-Jobs in diesem Lab ist Python 3.8 erforderlich. Damit Sie mit der richtigen Version arbeiten, werden die Dataflow-Prozesse in einem Python 3.8-Docker-Container ausgeführt.
- Führen Sie in Cloud Shell den folgenden Befehl aus, um einen Python-Container zu starten:
cd ~
docker run -it -e PROJECT={{{ project_0.project_id | PROJECT_ID }}} -v $(pwd)/dataflow-python-examples:/dataflow python:3.8 /bin/bashMit diesem Befehl wird ein Docker-Container mit der aktuellen stabilen Version von Python 3.8 abgerufen und eine Befehlsshell geöffnet, über die Sie die folgenden Befehle im Container ausführen. Das Flag -v stellt den Quellcode als Volume für den Container bereit. Dadurch können wir ihn im Cloud Shell-Editor bearbeiten und weiterhin darauf im ausgeführten Container zugreifen.
- Wenn das Abrufen des Containers abgeschlossen ist und die Ausführung in Cloud Shell beginnt, installieren Sie apache-beamin diesem ausgeführten Container mit dem folgenden Befehl:
pip install apache-beam[gcp]==2.59.0
- Als Nächstes wechseln Sie im ausgeführten Container in Cloud Shell in das Verzeichnis, in dem sich der verlinkte Quellcode befindet:
cd dataflow/Datenaufnahmepipeline in der Cloud ausführen
- Führen Sie mit dem folgenden Code die Pipeline für die Datenaufnahme aus:
python dataflow_python_examples/data_ingestion.py \
  --project={{{ project_0.project_id | PROJECT_ID }}} \
  --region={{{ project_0.default_region | REGION }}} \
  --runner=DataflowRunner \
  --machine_type=e2-standard-2 \
  --staging_location=gs://{{{ project_0.project_id | BUCKET_NAME }}}/test \
  --temp_location gs://{{{ project_0.project_id | BUCKET_NAME }}}/test \
  --input gs://{{{ project_0.project_id | BUCKET_NAME }}}/data_files/head_usa_names.csv \
  --save_main_sessionDurch diesen Code werden die erforderlichen Worker hochgefahren und nach Abschluss der Pipeline wieder heruntergefahren.
- Geben Sie in der Titelleiste der Console Dataflow in das Suchfeld ein und klicken Sie in den Suchergebnissen auf Dataflow.
Auf der Dataflow-Seite können Sie sich den Status Ihres Jobs ansehen.
- Klicken Sie auf den Namen des Jobs, um den Fortschritt zu verfolgen.
Sobald Erfolgreich als Jobstatus angezeigt wird, können Sie den nächsten Schritt ausführen. Diese Datenaufnahmepipeline benötigt etwa fünf Minuten, um zu starten, die Aufgabe auszuführen und dann herunterzufahren.
- 
Rufen Sie BigQuery (Navigationsmenü > BigQuery) auf, um zu überprüfen, ob Ihre Daten angezeigt werden. 
- 
Klicken Sie auf Ihren Projektnamen, um die Tabelle usa_names unter dem Dataset lakeeinzublenden.
 

- Klicken Sie auf die Tabelle und gehen Sie zum Tab Vorschau, um sich Beispiele zu den Daten aus usa_namesanzusehen.
Hinweis: Wenn Sie die Tabelleusa_names nicht sehen, aktualisieren Sie die Seite oder rufen Sie die Tabellen mit der klassischen BigQuery-Benutzeroberfläche auf.
Klicken Sie auf Fortschritt prüfen.
Datenaufnahmepipeline erstellen.
Aufgabe 6: Datentransformationspipeline prüfen und ausführen
In dieser Aufgabe sehen Sie sich die Datentransformationspipeline an, um herauszufinden, wie sie funktioniert. Anschließend führen Sie die Pipeline aus, um die Cloud Storage-Dateien zu verarbeiten und das Ergebnis nach BigQuery auszugeben.
Die Datentransformationspipeline nimmt auch Daten aus Cloud Storage mithilfe einer TextIO-Quelle und eines BigQueryIO-Ziels in die BigQuery-Tabelle auf, jedoch mit zusätzlichen Datentransformationen. Im Detail macht die Pipeline Folgendes:
-  Die Dateien werden aus Cloud Storage aufgenommen.
-  Die gelesenen Zeilen werden in Wörterbuchobjekte umgewandelt.
-  Daten, die eine Jahresangabe enthalten, werden in ein Format umgewandelt, das BigQuery als Datum versteht.
-  Die Zeilen werden nach BigQuery ausgegeben.
Python-Code für die Datentransformationspipeline prüfen
In diesem Abschnitt senden Sie einen Prompt an Gemini Code Assist, um zusätzliche Informationen zur Datentransformationspipeline zu erhalten, die Sie dem neuen Teammitglied zur Verfügung stellen möchten.
- 
Klicken Sie in der Cloud Shell-Menüleiste auf Editor öffnen. 
- 
Öffnen Sie im Cloud Shell-Editor im selben Verzeichnis die Datei data_transformation.py. Wie zuvor sehen Sie das Symbol rechts oben im Editor. rechts oben im Editor.
 
- 
Klicken Sie auf das Symbol Gemini Code Assist: Intelligente Aktionen  und wählen Sie Erkläre mir das aus. und wählen Sie Erkläre mir das aus.
 
- 
Gemini Code Assist öffnet ein Chatfenster mit dem vorausgefüllten Prompt Erkläre mir das. Ersetzen Sie im Inline-Textfeld des Code Assist-Chats den vorausgefüllten Prompt durch Folgendes und klicken Sie auf Senden:
 
You are an expert Data Engineer at Cymbal AI. A new team member is unfamiliar with this pipeline code. Explain the purpose and functionality of the data transformation pipeline defined in the data_transformation.py. Your explanation should include:
1. A high-level summary of what the script does, noting its differences from a simple ingestion pipeline.
2. A breakdown of the key components, specifically the DataTransformation class and the run function.
3. A detailed explanation of how the script uses the Apache Beam pipeline to read from a file, transform the data, and write it to a BigQuery table.
4. Describe how the script handles the BigQuery schema by reading it from a JSON file.
5. Explain the data transformation logic within the parse_method, particularly how it converts the year to a DATE type.
6. The role of command-line arguments and how they are used.
For the suggested improvements, don't update this file.Die Erklärung des Codes in der Datei data_transformation.py wird im Gemini Code Assist-Chat angezeigt.
Datentransformationspipeline in der Cloud ausführen
- Geben Sie den folgenden Befehl in das Cloud Shell-Terminal ein, um die Datentransformationspipeline auszuführen:
python dataflow_python_examples/data_transformation.py \
  --project={{{ project_0.project_id | PROJECT_ID }}} \
  --region={{{ project_0.default_region | REGION }}} \
  --runner=DataflowRunner \
  --machine_type=e2-standard-2 \
  --staging_location=gs://{{{ project_0.project_id | BUCKET_NAME }}}/test \
  --temp_location gs://{{{ project_0.project_id | BUCKET_NAME }}}/test \
  --input gs://{{{ project_0.project_id | BUCKET_NAME }}}/data_files/head_usa_names.csv \
  --save_main_session
- 
Geben Sie in der Titelleiste der Google Cloud Console Dataflow in das Suchfeld ein und klicken Sie dann in den Suchergebnissen auf Dataflow. 
- 
Klicken Sie auf den Namen dieses Jobs, um dessen Status aufzurufen. 
Diese Dataflow-Pipeline benötigt etwa fünf Minuten, um zu starten, die Aufgabe auszuführen und dann herunterzufahren.
- Sobald der Jobstatus in Dataflow Erfolgreich lautet, gehen Sie zu BigQuery, um zu überprüfen, ob Ihre Daten ausgefüllt wurden.
Die Tabelle usa_names_transformed sollte unter dem Dataset lake angezeigt werden.
- Klicken Sie auf die Tabelle und gehen Sie zum Tab Vorschau, um Beispiele zu den Daten aus usa_names_transformedaufzurufen.
Hinweis: Wenn Sie die Tabelleusa_names_transformed nicht sehen, aktualisieren Sie die Seite oder rufen Sie die Tabellen mit der klassischen BigQuery-Benutzeroberfläche auf.
Klicken Sie auf Fortschritt prüfen.
Datentransformationspipeline erstellen.
Aufgabe 7: Datenanreicherungspipeline prüfen und ausführen
Sie erstellen jetzt eine Pipeline für die Datenanreicherung, die Folgendes tut:
-  Nehmen Sie die Dateien aus Cloud Storage auf.
-  Filtern Sie die Kopfzeile in den Dateien heraus.
-  Die gelesenen Zeilen werden in Wörterbuchobjekte konvertiert.
-  Geben Sie die Zeilen an BigQuery aus.
Python-Code für die Datenanreicherungspipeline prüfen und bearbeiten
In diesem Abschnitt nutzen Sie die KI-basierten Funktionen von Gemini Code Assist, um den Python-Code für die Datenanreicherungspipeline zu prüfen und zu bearbeiten.
- 
Klicken Sie in der Cloud Shell-Menüleiste auf Editor öffnen. 
- 
Öffnen Sie im Cloud Shell-Editor im selben Verzeichnis die Datei data_enrichment.py. Wie zuvor sehen Sie das Symbol rechts oben im Editor. rechts oben im Editor.
 
- 
Klicken Sie in der Symbolleiste auf das Symbol Gemini Code Assist: Intelligente Aktionen  . .
 
- 
Fügen Sie den folgenden Prompt in das Inline-Textfeld von Gemini Code Assist ein, das über die Symbolleiste geöffnet wird, um den Code in Zeile 83 zu aktualisieren: 
In the data_enrichment.py file, update line 83 by replacing x.decode('utf8') with x.
- 
Drücken Sie die Eingabetaste, damit Gemini Code Assist den Code entsprechend ändert. 
- 
Klicken Sie in der Ansicht „Unterschiede“ von Gemini auf Akzeptieren, wenn Sie dazu aufgefordert werden. 
Die aktualisierte Zeile 83 in der Datei data_enrichment.py sieht jetzt in etwa so aus:
values = [x for x in csv_row]
- Wenn Sie diese Zeile bearbeitet haben, müssen Sie die aktualisierte Datei speichern. Klicken Sie dazu im Code-Editor auf die Option Datei und dann auf Speichern.
Datenanreicherungspipeline ausführen
- Geben Sie den folgenden Befehl in das Cloud Shell-Terminal ein, um die Datenanreicherungspipeline auszuführen:
python dataflow_python_examples/data_enrichment.py \
  --project={{{ project_0.project_id | PROJECT_ID }}} \
  --region={{{ project_0.default_region | REGION }}} \
  --runner=DataflowRunner \
  --machine_type=e2-standard-2 \
  --staging_location=gs://{{{ project_0.project_id | BUCKET_NAME }}}/test \
  --temp_location gs://{{{ project_0.project_id | BUCKET_NAME }}}/test \
  --input gs://{{{ project_0.project_id | BUCKET_NAME }}}/data_files/head_usa_names.csv \
  --save_main_session
- Klicken Sie auf der Dataflow-Seite auf Ihren Job, um den Jobstatus aufzurufen.
Diese Dataflow-Pipeline benötigt etwa fünf Minuten, um zu starten, die Aufgabe auszuführen und dann herunterzufahren.
- Sobald der Jobstatus in Dataflow Erfolgreich lautet, klicken Sie in der Console auf Navigationsmenü > BigQuery, um zu prüfen, ob Ihre Daten eingefügt wurden.
Die Tabelle usa_names_enriched sollte unter dem Dataset lake angezeigt werden.
- Klicken Sie auf die Tabelle und gehen Sie zum Tab Vorschau, um Beispiele zu den Daten aus usa_names_enrichedaufzurufen.
Hinweis: Wenn Sie die Tabelleusa_names_enriched nicht sehen, aktualisieren Sie die Seite oder rufen Sie die Tabellen mit der klassischen BigQuery-Benutzeroberfläche auf.
Klicken Sie auf Fortschritt prüfen.
Datenanreicherungspipeline erstellen.
Aufgabe 8: Data-Lake-zu-Data-Mart-Pipeline prüfen und ausführen
Als Nächstes erstellen Sie eine Dataflow-Pipeline, mit der Daten aus zwei BigQuery-Datenquellen gelesen und die Datenquellen dann verknüpft werden. Dabei geschieht Folgendes:
-  Dateien aus zwei BigQuery-Quellen werden aufgenommen.
-  Die beiden Datenquellen werden miteinander verbunden.
-  Die Kopfzeile in den Dateien werden herausgefiltert.
-  Die gelesenen Zeilen werden in Wörterbuchobjekte konvertiert.
-  Die Zeilen werden in an BigQuery ausgegeben.
Datenaufnahmepipeline wird ausgeführt, um Daten zusammenzuführen und die resultierende Tabelle in BigQuery zu schreiben
Sie sehen sich zuerst den Code in der Datei data_lake_to_mart.py an, um zu verstehen, was er bewirkt. Anschließend führen Sie die Pipeline in der Cloud aus.
- Öffnen Sie im Code-Editor die Datei data_lake_to_mart.py.
Lesen Sie die Kommentare, in denen die Funktion des Codes erklärt wird. Durch diesen Code werden zwei Tabellen zusammengeführt und die Ergebnisse in eine neue Tabelle in BigQuery geschrieben.
Hinweis: Wenn Sie mehr über die Data-Lake-zu-Data-Mart-Pipeline erfahren möchten, können Sie Gemini Code Assist noch einmal auffordern, den Code zu erklären. Das funktioniert ähnlich wie in Aufgabe 5 und Aufgabe 6.
- Führen Sie mit dem folgenden Codeblock die Pipeline aus:
python dataflow_python_examples/data_lake_to_mart.py \
  --worker_disk_type="compute.googleapis.com/projects//zones//diskTypes/pd-ssd" \
  --max_num_workers=4 \
  --project={{{ project_0.project_id | PROJECT_ID }}} \
  --runner=DataflowRunner \
  --machine_type=e2-standard-2 \
  --staging_location=gs://{{{ project_0.project_id | BUCKET_NAME }}}/test \
  --temp_location gs://{{{ project_0.project_id | BUCKET_NAME }}}/test \
  --save_main_session \
  --region={{{ project_0.default_region | REGION }}}
- 
Geben Sie in der Titelleiste der Google Cloud Console Dataflow in das Suchfeld ein und klicken Sie dann in den Suchergebnissen auf Dataflow. 
- 
Klicken Sie auf diesen neuen Job, um dessen Status aufzurufen. 
Diese Dataflow-Pipeline benötigt etwa fünf Minuten, um zu starten, die Aufgabe auszuführen und dann herunterzufahren.
- Sobald der Jobstatus in Dataflow Erfolgreich lautet, klicken Sie auf Navigationsmenü > BigQuery, um zu prüfen, ob Ihre Daten eingefügt wurden.
Die Tabelle orders_denormalized_sideinput sollte unter dem Dataset lake angezeigt werden.
- Klicken Sie auf die Tabelle und rufen Sie den Tab Vorschau auf, um Beispiele zu den Daten aus orders_denormalized_sideinputanzeigen zu lassen.
Hinweis: Wenn Sie die Tabelleorders_denormalized_sideinput nicht sehen, aktualisieren Sie die Seite oder rufen Sie die Tabellen mit der klassischen BigQuery-Benutzeroberfläche auf.
Klicken Sie auf Fortschritt prüfen.
Dataflow-Pipeline für Data Lake zu Data-Mart erstellen
Testen Sie Ihr Wissen
Im Folgenden stellen wir Ihnen eine Multiple-Choice-Frage, um Ihr bisher erworbenes Wissen zu testen und zu festigen. Beantworten Sie die Frage so gut Sie können.
Das wars! Sie haben das Lab erfolgreich abgeschlossen.
Sie haben Python-Code mithilfe von Dataflow und Unterstützung durch Gemini Code Assist ausgeführt, um Daten aus Cloud Storage in BigQuery aufzunehmen und dann in BigQuery zu transformieren und anzureichern.
Weitere Informationen
Wünschen Sie noch mehr Informationen? Sie finden die offizielle Dokumentation unter:
Google Cloud-Schulungen und -Zertifizierungen
In unseren Schulungen erfahren Sie alles zum optimalen Einsatz unserer Google Cloud-Technologien und können sich entsprechend zertifizieren lassen. Unsere Kurse vermitteln technische Fähigkeiten und Best Practices, damit Sie möglichst schnell mit Google Cloud loslegen und Ihr Wissen fortlaufend erweitern können. Wir bieten On-Demand-, Präsenz- und virtuelle Schulungen für Anfänger wie Fortgeschrittene an, die Sie individuell in Ihrem eigenen Zeitplan absolvieren können. Mit unseren Zertifizierungen weisen Sie nach, dass Sie Experte im Bereich Google Cloud-Technologien sind.
Anleitung zuletzt am 1. September 2025 aktualisiert
Lab zuletzt am 1. September 2025 getestet
© 2025 Google LLC. Alle Rechte vorbehalten. Google und das Google-Logo sind Marken von Google LLC. Alle anderen Unternehmens- und Produktnamen können Marken der jeweils mit ihnen verbundenen Unternehmen sein.