Petunjuk dan persyaratan penyiapan lab
Lindungi akun dan progres Anda. Selalu gunakan jendela browser pribadi dan kredensial lab untuk menjalankan lab ini.

Stream Processing dengan Cloud Pub/Sub dan Dataflow: Qwik Start

Lab 45 menit universal_currency_alt 1 Kredit show_chart Pengantar
info Lab ini mungkin menggabungkan alat AI untuk mendukung pembelajaran Anda.
Konten ini belum dioptimalkan untuk perangkat seluler.
Untuk pengalaman terbaik, kunjungi kami dengan komputer desktop menggunakan link yang dikirim melalui email.

GSP903

Logo lab mandiri Google Cloud

Ringkasan

Google Cloud Pub/Sub adalah layanan pesan untuk bertukar data peristiwa antara aplikasi dan layanan. Produser data memublikasikan pesan ke topik Cloud Pub/Sub. Seorang konsumen membuat langganan ke topik tersebut. Pelanggan dapat menarik pesan dari langganan atau dikonfigurasi sebagai webhook untuk langganan push. Setiap pelanggan harus mengonfirmasi setiap pesan dalam jangka waktu yang dapat dikonfigurasi.

Dataflow adalah layanan terkelola sepenuhnya untuk mengubah dan memperkaya data dalam mode streaming (real time) dan batch tanpa mengurangi keandalan dan kecepatan. Dataflow menyediakan lingkungan pengembangan pipeline yang disederhanakan menggunakan Apache Beam SDK, yang memiliki serangkaian primitif analisis sesi dan jendela yang lengkap serta ekosistem konektor sumber dan sink.

Pub/Sub adalah sistem penyerapan dan pengiriman peristiwa yang skalabel dan tahan lama. Dataflow melengkapi model pengiriman Pub/Sub yang skalabel dan minimal satu kali dengan penghapusan duplikat pesan dan pemrosesan tepat satu kali sesuai urutan jika Anda menggunakan jendela dan buffering.

Yang akan Anda lakukan

  • Membaca pesan yang dipublikasikan ke topik Pub/Sub
  • Mengelompokkan (atau mengelompokkan) pesan berdasarkan stempel waktu
  • Menulis pesan ke Cloud Storage

Penyiapan

Sebelum mengklik tombol Start Lab

Baca petunjuk ini. Lab memiliki timer dan Anda tidak dapat menjedanya. Timer yang dimulai saat Anda mengklik Start Lab akan menampilkan durasi ketersediaan resource Google Cloud untuk Anda.

Lab interaktif ini dapat Anda gunakan untuk melakukan aktivitas lab di lingkungan cloud sungguhan, bukan di lingkungan demo atau simulasi. Untuk mengakses lab ini, Anda akan diberi kredensial baru yang bersifat sementara dan dapat digunakan untuk login serta mengakses Google Cloud selama durasi lab.

Untuk menyelesaikan lab ini, Anda memerlukan:

  • Akses ke browser internet standar (disarankan browser Chrome).
Catatan: Gunakan jendela Samaran (direkomendasikan) atau browser pribadi untuk menjalankan lab ini. Hal ini akan mencegah konflik antara akun pribadi Anda dan akun siswa yang dapat menyebabkan tagihan ekstra pada akun pribadi Anda.
  • Waktu untuk menyelesaikan lab. Ingat, setelah dimulai, lab tidak dapat dijeda.
Catatan: Hanya gunakan akun siswa untuk lab ini. Jika Anda menggunakan akun Google Cloud yang berbeda, Anda mungkin akan dikenai tagihan ke akun tersebut.

Cara memulai lab dan login ke Google Cloud Console

  1. Klik tombol Start Lab. Jika Anda perlu membayar lab, dialog akan terbuka untuk memilih metode pembayaran. Di sebelah kiri ada panel Lab Details yang berisi hal-hal berikut:

    • Tombol Open Google Cloud console
    • Waktu tersisa
    • Kredensial sementara yang harus Anda gunakan untuk lab ini
    • Informasi lain, jika diperlukan, untuk menyelesaikan lab ini
  2. Klik Open Google Cloud console (atau klik kanan dan pilih Open Link in Incognito Window jika Anda menjalankan browser Chrome).

    Lab akan menjalankan resource, lalu membuka tab lain yang menampilkan halaman Sign in.

    Tips: Atur tab di jendela terpisah secara berdampingan.

    Catatan: Jika Anda melihat dialog Choose an account, klik Use Another Account.
  3. Jika perlu, salin Username di bawah dan tempel ke dialog Sign in.

    {{{user_0.username | "Username"}}}

    Anda juga dapat menemukan Username di panel Lab Details.

  4. Klik Next.

  5. Salin Password di bawah dan tempel ke dialog Welcome.

    {{{user_0.password | "Password"}}}

    Anda juga dapat menemukan Password di panel Lab Details.

  6. Klik Next.

    Penting: Anda harus menggunakan kredensial yang diberikan lab. Jangan menggunakan kredensial akun Google Cloud Anda. Catatan: Menggunakan akun Google Cloud sendiri untuk lab ini dapat dikenai biaya tambahan.
  7. Klik halaman berikutnya:

    • Setujui persyaratan dan ketentuan.
    • Jangan tambahkan opsi pemulihan atau autentikasi 2 langkah (karena ini akun sementara).
    • Jangan mendaftar uji coba gratis.

Setelah beberapa saat, Konsol Google Cloud akan terbuka di tab ini.

Catatan: Untuk mengakses produk dan layanan Google Cloud, klik Navigation menu atau ketik nama layanan atau produk di kolom Search. Ikon Navigation menu dan kolom Search

Mengaktifkan Cloud Shell

Cloud Shell adalah mesin virtual yang dilengkapi dengan berbagai alat pengembangan. Mesin virtual ini menawarkan direktori beranda persisten berkapasitas 5 GB dan berjalan di Google Cloud. Cloud Shell menyediakan akses command-line untuk resource Google Cloud Anda.

  1. Klik Activate Cloud Shell Ikon Activate Cloud Shell di bagian atas Konsol Google Cloud.

  2. Klik jendela berikut:

    • Lanjutkan melalui jendela informasi Cloud Shell.
    • Beri otorisasi ke Cloud Shell untuk menggunakan kredensial Anda guna melakukan panggilan Google Cloud API.

Setelah terhubung, Anda sudah diautentikasi, dan project ditetapkan ke Project_ID, . Output berisi baris yang mendeklarasikan Project_ID untuk sesi ini:

Project Cloud Platform Anda dalam sesi ini disetel ke {{{project_0.project_id | "PROJECT_ID"}}}

gcloud adalah alat command line untuk Google Cloud. Alat ini sudah terinstal di Cloud Shell dan mendukung pelengkapan command line.

  1. (Opsional) Anda dapat menampilkan daftar nama akun yang aktif dengan perintah ini:
gcloud auth list
  1. Klik Authorize.

Output:

ACTIVE: * ACCOUNT: {{{user_0.username | "ACCOUNT"}}} Untuk menetapkan akun aktif, jalankan: $ gcloud config set account `ACCOUNT`
  1. (Opsional) Anda dapat menampilkan daftar ID project dengan perintah ini:
gcloud config list project

Output:

[core] project = {{{project_0.project_id | "PROJECT_ID"}}} Catatan: Untuk mendapatkan dokumentasi gcloud yang lengkap di Google Cloud, baca panduan ringkasan gcloud CLI.

Menetapkan region

  • Di Cloud Shell, jalankan perintah berikut untuk menetapkan region project untuk lab ini:
gcloud config set compute/region {{{project_0.default_region | "REGION"}}}

Memastikan Dataflow API berhasil diaktifkan

Untuk memastikan akses ke API yang diperlukan, mulai ulang koneksi ke Dataflow API.

gcloud services disable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}} --force gcloud services enable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}}

Klik Periksa progres saya untuk memverifikasi tujuan. Disable and re-enable the Dataflow API

Tugas 1. Membuat resource project

  1. Di Cloud Shell, buat variabel untuk bucket, project, dan region Anda.
PROJECT_ID=$(gcloud config get-value project) BUCKET_NAME="${PROJECT_ID}-bucket" TOPIC_ID=my-id REGION={{{project_0.default_region | "filled in at lab start"}}}
  1. Tetapkan region App Engine Anda.
Catatan: Untuk region selain us-central1 dan europe-west1, setel variabel region AppEngine agar sama dengan region yang ditetapkan. Jika region Anda us-central1, tetapkan variabel region AppEngine ke us-central. Jika region Anda europe-west1, setel variabel region AppEngine ke europe-west.

Anda dapat merujuk ke Lokasi App Engine untuk mengetahui informasi selengkapnya.

AE_REGION={{{project_0.startup_script.app_region|region_to_be_set}}}
  1. Membuat bucket Cloud Storage yang dimiliki project ini:
gsutil mb gs://$BUCKET_NAME Catatan: Nama bucket Cloud Storage harus unik secara global. Project ID Qwiklabs Anda selalu unik, jadi ID tersebut digunakan dalam nama bucket Anda di lab ini.
  1. Buat topik Pub/Sub di project ini:
gcloud pubsub topics create $TOPIC_ID
  1. Buat aplikasi App Engine untuk project Anda:
gcloud app create --region=$AE_REGION
  1. Buat tugas Cloud Scheduler di project ini. Tugas ini memublikasikan pesan ke topik Pub/Sub dengan interval satu menit:
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!"
  1. Jika diminta untuk mengaktifkan Cloud Scheduler API, tekan y dan masukkan.

Klik Periksa progres saya untuk memverifikasi tujuan. Membuat Resource Project

  1. Mulai tugas:
gcloud scheduler jobs run publisher-job Catatan: Jika Anda mengalami error untuk RESOURCE_EXHAUSTED, coba jalankan kembali perintah tersebut.
  1. Gunakan perintah berikut untuk membuat clone repositori panduan memulai cepat dan membuka direktori kode sampel:
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics docker run -it -e DEVSHELL_PROJECT_ID=$DEVSHELL_PROJECT_ID python:3.7 /bin/bash git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -U -r requirements.txt # Install Apache Beam dependencies Catatan: Jika Anda menggunakan opsi Python, jalankan perintah Python satu per satu.

Klik Periksa progres saya untuk memverifikasi tujuan. Memulai tugas cloud scheduler

Tugas 2. Meninjau kode untuk mengalirkan pesan dari Pub/Sub ke Cloud Storage

Contoh kode

Tinjau kode contoh berikut, yang menggunakan Dataflow untuk:

  • Membaca pesan Pub/Sub.
  • Membuat jendela (atau mengelompokkan) pesan ke dalam interval ukuran tetap berdasarkan stempel waktu publikasi.
  • Menulis pesan di setiap jendela ke file di Cloud Storage.
import java.io.IOException; import org.apache.beam.examples.common.WriteOneFilePerWindow; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.Validation.Required; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.joda.time.Duration; public class PubSubToGcs { /* * Define your own configuration options. Add your own arguments to be processed * by the command-line parser, and specify default values for them. */ public interface PubSubToGcsOptions extends StreamingOptions { @Description("The Cloud Pub/Sub topic to read from.") @Required String getInputTopic(); void setInputTopic(String value); @Description("Output file's window size in number of minutes.") @Default.Integer(1) Integer getWindowSize(); void setWindowSize(Integer value); @Description("Path of the output file including its filename prefix.") @Required String getOutput(); void setOutput(String value); } public static void main(String[] args) throws IOException { // The maximum number of shards when writing output. int numShards = 1; PubSubToGcsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class); options.setStreaming(true); Pipeline pipeline = Pipeline.create(options); pipeline // 1) Read string messages from a Pub/Sub topic. .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic())) // 2) Group the messages into fixed-sized minute intervals. .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))) // 3) Write one file to GCS for every window of messages. .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards)); // Execute the pipeline and wait until it finishes running. pipeline.run().waitUntilFinish(); } } import argparse from datetime import datetime import logging import random from apache_beam import ( DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys, ) from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.transforms.window import FixedWindows class GroupMessagesByFixedWindows(PTransform): """A composite transform that groups Pub/Sub messages based on publish time and outputs a list of tuples, each containing a message and its publish time. """ def __init__(self, window_size, num_shards=5): # Set window size to 60 seconds. self.window_size = int(window_size * 60) self.num_shards = num_shards def expand(self, pcoll): return ( pcoll # Bind window info to each element using element timestamp (or publish time). | "Window into fixed intervals" >> WindowInto(FixedWindows(self.window_size)) | "Add timestamp to windowed elements" >> ParDo(AddTimestamp()) # Assign a random key to each windowed element based on the number of shards. | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1)) # Group windowed elements by key. All the elements in the same window must fit # memory for this. If not, you need to use `beam.util.BatchElements`. | "Group by key" >> GroupByKey() ) class AddTimestamp(DoFn): def process(self, element, publish_time=DoFn.TimestampParam): """Processes each windowed element by extracting the message body and its publish time into a tuple. """ yield ( element.decode("utf-8"), datetime.utcfromtimestamp(float(publish_time)).strftime( "%Y-%m-%d %H:%M:%S.%f" ), ) class WriteToGCS(DoFn): def __init__(self, output_path): self.output_path = output_path def process(self, key_value, window=DoFn.WindowParam): """Write messages in a batch to Google Cloud Storage.""" ts_format = "%H:%M" window_start = window.start.to_utc_datetime().strftime(ts_format) window_end = window.end.to_utc_datetime().strftime(ts_format) shard_id, batch = key_value filename = "-".join([self.output_path, window_start, window_end, str(shard_id)]) with io.gcsio.GcsIO().open(filename=filename, mode="w") as f: for message_body, publish_time in batch: f.write(f"{message_body},{publish_time}\n".encode()) def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None): # Set `save_main_session` to True so DoFns can access globally imported modules. pipeline_options = PipelineOptions( pipeline_args, streaming=True, save_main_session=True ) with Pipeline(options=pipeline_options) as pipeline: ( pipeline # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam # binds the publish time returned by the Pub/Sub server for each message # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`. # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic) | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards) | "Write to GCS" >> ParDo(WriteToGCS(output_path)) ) if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() parser.add_argument( "--input_topic", help="The Cloud Pub/Sub topic to read from." '"projects//topics/".', ) parser.add_argument( "--window_size", type=float, default=1.0, help="Output file's window size in minutes.", ) parser.add_argument( "--output_path", help="Path of the output GCS file including the prefix.", ) parser.add_argument( "--num_shards", type=int, default=5, help="Number of shards to use when writing windowed elements to GCS.", ) known_args, pipeline_args = parser.parse_known_args() run( known_args.input_topic, known_args.output_path, known_args.window_size, known_args.num_shards, pipeline_args, ) Catatan: Untuk mempelajari lebih lanjut kode contoh, kunjungi halaman GitHub java-docs-samples dan python-docs-samples yang relevan.

Tugas 3. Memulai pipeline

  1. Untuk memulai pipeline, jalankan perintah berikut:
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --windowSize=2 \ --tempLocation=gs://$BUCKET_NAME/temp" python PubSubToGCS.py \ --project=project_id \ --region=region \ --input_topic=projects/project_id/topics/my-id \ --output_path=gs://bucket_name/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://bucket_name/temp Catatan: Saat menjalankan perintah python, ganti project_id, bucket_name, dan region dengan ID project, nama bucket, dan region lab yang ditetapkan.

Perintah sebelumnya berjalan secara lokal dan meluncurkan tugas Dataflow yang berjalan di cloud.

Catatan: Anda mungkin harus menunggu sekitar 10 menit agar kode dapat dieksekusi sepenuhnya dan tugas pipeline muncul di konsol Dataflow pada tugas berikutnya. Catatan: Jika Anda menerima peringatan terkait StaticLoggerBinder, Anda dapat mengabaikannya dan melanjutkan lab.

Klik Periksa progres saya untuk memverifikasi tujuan. Memulai pipeline dan meluncurkan tugas dataflow

Tugas 4. Mengamati progres tugas dan pipeline

  1. Buka konsol Dataflow untuk mengamati progres tugas.

  2. Klik Refresh untuk melihat tugas dan update status terbaru.

Halaman Dataflow yang menampilkan informasi tugas pubsubtogcs 0815172250-75a99ab8

  1. Klik nama tugas untuk membuka detail tugas dan meninjau hal berikut:
  • Struktur pekerjaan
  • Log tugas
  • Metrik tahapan

Halaman tugas yang menampilkan informasi Ringkasan tugas

Anda mungkin harus menunggu beberapa menit lagi untuk melihat file output di Cloud Storage.

  1. Anda dapat melihat file output dengan membuka Navigation menu > Cloud Storage, lalu mengklik nama bucket Anda dan mengklik Samples.

Halaman detail bucket yang menampilkan informasi file output

  1. Atau, Anda dapat keluar dari aplikasi di Cloud Shell menggunakan CTRL+C (dan untuk opsi Python, ketik exit), lalu jalankan perintah di bawah untuk mencantumkan file yang telah ditulis ke Cloud Storage:
gsutil ls gs://${BUCKET_NAME}/samples/

Output-nya akan terlihat seperti berikut:

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

Tugas 5. Pembersihan

  1. Jika belum, keluar dari aplikasi di Cloud Shell menggunakan CTRL+C.

Untuk opsi Python, ketik exit untuk keluar dari lingkungan Python.

  1. Di Cloud Shell, hapus tugas Cloud Scheduler:
gcloud scheduler jobs delete publisher-job

Jika diminta "Do you want to continue", tekan Y dan enter.

  1. Di konsol Dataflow, hentikan tugas dengan memilih nama tugas Anda, lalu klik Stop.

Saat diminta, klik Stop Job > Cancel untuk membatalkan pipeline tanpa menguras.

  1. Di Cloud Shell, hapus topik:
gcloud pubsub topics delete $TOPIC_ID
  1. Di Cloud Shell, hapus file yang dibuat oleh pipeline:
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
  1. Di Cloud Shell, hapus bucket Cloud Storage:
gsutil rb gs://${BUCKET_NAME}

Selamat!

Anda telah membuat pipeline Dataflow yang membaca pesan dari topik Pub/Sub Anda, mengelompokkannya berdasarkan stempel waktu, dan menuliskannya ke bucket Cloud Storage Anda.

Langkah berikutnya/Pelajari lebih lanjut

Sertifikasi dan pelatihan Google Cloud

...membantu Anda mengoptimalkan teknologi Google Cloud. Kelas kami mencakup keterampilan teknis dan praktik terbaik untuk membantu Anda memahami dengan cepat dan melanjutkan proses pembelajaran. Kami menawarkan pelatihan tingkat dasar hingga lanjutan dengan opsi on demand, live, dan virtual untuk menyesuaikan dengan jadwal Anda yang sibuk. Sertifikasi membantu Anda memvalidasi dan membuktikan keterampilan serta keahlian Anda dalam teknologi Google Cloud.

Panduan Terakhir Diperbarui pada 20 Agustus 2025

Lab Terakhir Diuji pada 20 Agustus 2025

Hak cipta 2025 Google LLC. Semua hak dilindungi undang-undang. Google dan logo Google adalah merek dagang dari Google LLC. Semua nama perusahaan dan produk lain mungkin adalah merek dagang masing-masing perusahaan yang bersangkutan.

Sebelum memulai

  1. Lab membuat project dan resource Google Cloud untuk jangka waktu tertentu
  2. Lab memiliki batas waktu dan tidak memiliki fitur jeda. Jika lab diakhiri, Anda harus memulainya lagi dari awal.
  3. Di kiri atas layar, klik Start lab untuk memulai

Gunakan penjelajahan rahasia

  1. Salin Nama Pengguna dan Sandi yang diberikan untuk lab tersebut
  2. Klik Open console dalam mode pribadi

Login ke Konsol

  1. Login menggunakan kredensial lab Anda. Menggunakan kredensial lain mungkin menyebabkan error atau dikenai biaya.
  2. Setujui persyaratan, dan lewati halaman resource pemulihan
  3. Jangan klik End lab kecuali jika Anda sudah menyelesaikan lab atau ingin mengulanginya, karena tindakan ini akan menghapus pekerjaan Anda dan menghapus project

Konten ini tidak tersedia untuk saat ini

Kami akan memberi tahu Anda melalui email saat konten tersedia

Bagus!

Kami akan menghubungi Anda melalui email saat konten tersedia

Satu lab dalam satu waktu

Konfirmasi untuk mengakhiri semua lab yang ada dan memulai lab ini

Gunakan penjelajahan rahasia untuk menjalankan lab

Gunakan jendela Samaran atau browser pribadi untuk menjalankan lab ini. Langkah ini akan mencegah konflik antara akun pribadi Anda dan akun Siswa yang dapat menyebabkan tagihan ekstra pada akun pribadi Anda.