Instrucciones y requisitos de configuración del lab
Protege tu cuenta y tu progreso. Usa siempre una ventana de navegador privada y las credenciales del lab para ejecutarlo.

Procesamiento de transmisión con Cloud Pub/Sub y Dataflow: Qwik Start

Lab 45 minutos universal_currency_alt 1 crédito show_chart Introductorio
info Es posible que este lab incorpore herramientas de IA para facilitar tu aprendizaje.
Este contenido aún no está optimizado para dispositivos móviles.
Para obtener la mejor experiencia, visítanos en una computadora de escritorio con un vínculo que te enviaremos por correo electrónico.

GSP903

Logotipo de los labs de autoaprendizaje de Google Cloud

Descripción general

Google Cloud Pub/Sub es un servicio de mensajería para intercambiar datos de eventos entre aplicaciones y servicios. Un productor de datos publica mensajes en un tema de Cloud Pub/Sub. Un consumidor crea una suscripción a ese tema. Los suscriptores pueden extraer mensajes desde una suscripción o se configuran como webhooks para suscripciones de envío. Los suscriptores deben confirmar la recepción de cada mensaje en el transcurso de un período configurable.

Dataflow es un servicio completamente administrado para transformar y enriquecer datos en modos de transmisión (en tiempo real) y por lotes con la misma confiabilidad y expresividad. Proporciona un entorno de desarrollo de canalización simplificado con el SDK de Apache Beam, que tiene un conjunto amplio de primitivas de análisis de sesiones y sistemas de ventanas, además de un ecosistema de conectores fuente y receptores.

Pub/Sub es un sistema de transferencia y entrega de eventos escalable y duradero. Dataflow complementa el modelo de entrega escalable de tipo “al menos una vez” de Pub/Sub con la anulación de mensajes duplicados y el procesamiento “en orden” y de tipo “exactamente una vez” si usas ventanas y almacenamiento en búfer.

Actividades

  • Leer mensajes publicados en un tema de Pub/Sub
  • Mostrar mensajes en ventanas, o agruparlos, por marca de tiempo
  • Escribir mensajes a Cloud Storage

Configuración

Antes de hacer clic en el botón Comenzar lab

Lee estas instrucciones. Los labs cuentan con un temporizador que no se puede pausar. El temporizador, que comienza a funcionar cuando haces clic en Comenzar lab, indica por cuánto tiempo tendrás a tu disposición los recursos de Google Cloud.

Este lab práctico te permitirá realizar las actividades correspondientes en un entorno de nube real, no en uno de simulación o demostración. Para ello, se te proporcionan credenciales temporales nuevas que utilizarás para acceder a Google Cloud durante todo el lab.

Para completar este lab, necesitarás lo siguiente:

  • Acceso a un navegador de Internet estándar. Se recomienda el navegador Chrome.
Nota: Usa una ventana del navegador privada o de incógnito (opción recomendada) para ejecutar el lab. Así evitarás conflictos entre tu cuenta personal y la cuenta de estudiante, lo que podría generar cargos adicionales en tu cuenta personal.
  • Tiempo para completar el lab (recuerda que, una vez que comienzas un lab, no puedes pausarlo).
Nota: Usa solo la cuenta de estudiante para este lab. Si usas otra cuenta de Google Cloud, es posible que se apliquen cargos a esa cuenta.

Cómo iniciar tu lab y acceder a la consola de Google Cloud

  1. Haz clic en el botón Comenzar lab. Si debes pagar por el lab, se abrirá un diálogo para que selecciones la forma de pago. A la izquierda, se encuentra el panel Detalles del lab, que tiene estos elementos:

    • El botón para abrir la consola de Google Cloud
    • El tiempo restante
    • Las credenciales temporales que debes usar para el lab
    • Otra información para completar el lab, si es necesaria
  2. Haz clic en Abrir la consola de Google Cloud (o haz clic con el botón derecho y selecciona Abrir el vínculo en una ventana de incógnito si ejecutas el navegador Chrome).

    El lab inicia recursos y abre otra pestaña en la que se muestra la página de acceso.

    Sugerencia: Ordena las pestañas en ventanas separadas, una junto a la otra.

    Nota: Si ves el diálogo Elegir una cuenta, haz clic en Usar otra cuenta.
  3. De ser necesario, copia el nombre de usuario a continuación y pégalo en el diálogo Acceder.

    {{{user_0.username | "Username"}}}

    También puedes encontrar el nombre de usuario en el panel Detalles del lab.

  4. Haz clic en Siguiente.

  5. Copia la contraseña que aparece a continuación y pégala en el diálogo Te damos la bienvenida.

    {{{user_0.password | "Password"}}}

    También puedes encontrar la contraseña en el panel Detalles del lab.

  6. Haz clic en Siguiente.

    Importante: Debes usar las credenciales que te proporciona el lab. No uses las credenciales de tu cuenta de Google Cloud. Nota: Usar tu propia cuenta de Google Cloud para este lab podría generar cargos adicionales.
  7. Haz clic para avanzar por las páginas siguientes:

    • Acepta los Términos y Condiciones.
    • No agregues opciones de recuperación o autenticación de dos factores (esta es una cuenta temporal).
    • No te registres para obtener pruebas gratuitas.

Después de un momento, se abrirá la consola de Google Cloud en esta pestaña.

Nota: Para acceder a los productos y servicios de Google Cloud, haz clic en el menú de navegación o escribe el nombre del servicio o producto en el campo Buscar. Ícono del menú de navegación y campo de búsqueda

Activa Cloud Shell

Cloud Shell es una máquina virtual que cuenta con herramientas para desarrolladores. Ofrece un directorio principal persistente de 5 GB y se ejecuta en Google Cloud. Cloud Shell proporciona acceso de línea de comandos a tus recursos de Google Cloud.

  1. Haz clic en Activar Cloud Shell Ícono de Activar Cloud Shell en la parte superior de la consola de Google Cloud.

  2. Haz clic para avanzar por las siguientes ventanas:

    • Continúa en la ventana de información de Cloud Shell.
    • Autoriza a Cloud Shell para que use tus credenciales para realizar llamadas a la API de Google Cloud.

Cuando te conectes, habrás completado la autenticación, y el proyecto estará configurado con tu Project_ID, . El resultado contiene una línea que declara el Project_ID para esta sesión:

Your Cloud Platform project in this session is set to {{{project_0.project_id | "PROJECT_ID"}}}

gcloud es la herramienta de línea de comandos de Google Cloud. Viene preinstalada en Cloud Shell y es compatible con la función de autocompletado con tabulador.

  1. Puedes solicitar el nombre de la cuenta activa con este comando (opcional):
gcloud auth list
  1. Haz clic en Autorizar.

Resultado:

ACTIVE: * ACCOUNT: {{{user_0.username | "ACCOUNT"}}} To set the active account, run: $ gcloud config set account `ACCOUNT`
  1. Puedes solicitar el ID del proyecto con este comando (opcional):
gcloud config list project

Resultado:

[core] project = {{{project_0.project_id | "PROJECT_ID"}}} Nota: Para obtener toda la documentación de gcloud, en Google Cloud, consulta la guía con la descripción general de gcloud CLI.

Configura la región

  • En Cloud Shell, ejecuta el siguiente comando para establecer la región del proyecto en este lab:
gcloud config set compute/region {{{project_0.default_region | "REGION"}}}

Asegúrate de que la API de Dataflow esté habilitada de forma correcta

Para garantizar el acceso a la API necesaria, reinicia la conexión a la API de Dataflow.

gcloud services disable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}} --force gcloud services enable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}}

Haz clic en Revisar mi progreso para verificar el objetivo. Inhabilitar y volver a habilitar la API de Dataflow

Tarea 1: Crea recursos del proyecto

  1. En Cloud Shell, crea variables para tu bucket, proyecto y región.
PROJECT_ID=$(gcloud config get-value project) BUCKET_NAME="${PROJECT_ID}-bucket" TOPIC_ID=my-id REGION={{{project_0.default_region | "se completan al inicio del lab"}}}
  1. Establece tu región de App Engine.
Nota: Para regiones distintas de us-central1 y europe-west1, establece la variable de región de AppEngine para que sea la misma que la región asignada. Si se te asigna us-central1, establece la variable de región de AppEngine en us-central. Si se te asigna europe-west1, establece la variable de región de AppEngine en europe-west.

Puedes consultar las ubicaciones de App Engine para obtener más información.

AE_REGION={{{project_0.startup_script.app_region|region_to_be_set}}}
  1. Crea un bucket de Cloud Storage que sea propiedad de este proyecto:
gsutil mb gs://$BUCKET_NAME Nota: Los nombres de los buckets de Cloud Storage deben ser únicos a nivel global. Tu ID del proyecto de Qwiklabs siempre es único, por lo que se usa en el nombre de tu bucket en este lab.
  1. Crea un tema de Pub/Sub en este proyecto:
gcloud pubsub topics create $TOPIC_ID
  1. Crea una app de App Engine para tu proyecto:
gcloud app create --region=$AE_REGION
  1. Crea un trabajo de Cloud Scheduler en este proyecto. El trabajo publica un mensaje en un tema de Pub/Sub con intervalos de un minuto:
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!"
  1. Si se te solicita habilitar la API de Cloud Scheduler, presiona y y, luego, Intro.

Haz clic en Revisar mi progreso para verificar el objetivo. Crear recursos del proyecto

  1. Inicia el trabajo:
gcloud scheduler jobs run publisher-job Nota: Si encuentras un error de RESOURCE_EXHAUSTED, intenta ejecutar el comando de nuevo.
  1. Usa el siguiente comando para clonar el repositorio de la guía de inicio rápido y navega al directorio de código de muestra:
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics docker run -it -e DEVSHELL_PROJECT_ID=$DEVSHELL_PROJECT_ID python:3.7 /bin/bash git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -U -r requirements.txt # Instala dependencias de Apache Beam Nota: Si usas la opción de Python, ejecuta los comandos correspondientes de forma individual.

Haz clic en Revisar mi progreso para verificar el objetivo. Iniciar el trabajo de Cloud Scheduler

Tarea 2: Revisa el código para transmitir mensajes desde Pub/Sub a Cloud Storage

Muestra de código

Revisa el siguiente código de muestra, que usa Dataflow para realizar las siguientes acciones:

  • Leer mensajes de Pub/Sub
  • Mostrar mensajes en ventanas, o agruparlos, en intervalos de tamaño fijo con marcas de tiempo públicas
  • Escribir los mensajes en cada ventana de archivos en Cloud Storage
import java.io.IOException; import org.apache.beam.examples.common.WriteOneFilePerWindow; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.Validation.Required; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.joda.time.Duration; public class PubSubToGcs { /* * Define tus propias opciones de configuración. Agrega tus propios argumentos para * que el analizador de línea de comandos los procese y especifica valores predeterminados para ellos. */ public interface PubSubToGcsOptions extends StreamingOptions { @Description("The Cloud Pub/Sub topic to read from.") @Required String getInputTopic(); void setInputTopic(String value); @Description("Output file's window size in number of minutes.") @Default.Integer(1) Integer getWindowSize(); void setWindowSize(Integer value); @Description("Path of the output file including its filename prefix.") @Required String getOutput(); void setOutput(String value); } public static void main(String[] args) throws IOException { // La cantidad máxima de fragmentos que se producen cuando se escribe el resultado. int numShards = 1; PubSubToGcsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class); options.setStreaming(true); Pipeline pipeline = Pipeline.create(options); pipeline // 1) Lee los mensajes de cadena de un tema de Pub/Sub. .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic())) // 2) Agrupa los mensajes en intervalos fijos de un minuto. .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))) // 3) Escribe un archivo para GCS por cada ventana de mensajes. .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards)); // Ejecuta la canalización y espera hasta que termine de ejecutarse. pipeline.run().waitUntilFinish(); } } import argparse from datetime import datetime import logging import random from apache_beam import ( DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys, ) from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.transforms.window import FixedWindows class GroupMessagesByFixedWindows(PTransform): """Una transformación compuesta que agrupa mensajes de Pub/Sub según la hora de publicación y que arroja como resultado una lista de tuplas que incluyen individualmente un mensaje y su hora de publicación. """ def __init__(self, window_size, num_shards=5): # Define el tamaño de la ventana en 60 segundos. self.window_size = int(window_size * 60) self.num_shards = num_shards def expand(self, pcoll): return ( pcoll # Vincula la ventana a cada elemento usando la marca de tiempo correspondiente (o la hora de publicación). | "Ventana en intervalos fijos" >> WindowInto(FixedWindows(self.window_size)) | "Agrega marcas de tiempo a los elementos en ventanas" >> ParDo(AddTimestamp()) # Asigna una clave aleatoria a cada elemento en ventanas según el número de fragmentos. | "Agregar clave" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1)) # Agrupa los elementos en ventanas por clave. Todos los elementos de la misma ventana deben caber en la # memoria para esto. Si no, debes usar `beam.util.BatchElements`. | "Agrupar por clave" >> GroupByKey() ) class AddTimestamp(DoFn): def process(self, element, publish_time=DoFn.TimestampParam): """Procesa cada elemento en ventanas extrayendo el cuerpo del mensaje y su hora de publicación en una tupla. """ yield ( element.decode("utf-8"), datetime.utcfromtimestamp(float(publish_time)).strftime( "%Y-%m-%d %H:%M:%S.%f" ), ) class WriteToGCS(DoFn): def __init__(self, output_path): self.output_path = output_path def process(self, key_value, window=DoFn.WindowParam): """Escribe mensajes en un lote para Google Cloud Storage.""" ts_format = "%H:%M" window_start = window.start.to_utc_datetime().strftime(ts_format) window_end = window.end.to_utc_datetime().strftime(ts_format) shard_id, batch = key_value filename = "-".join([self.output_path, window_start, window_end, str(shard_id)]) with io.gcsio.GcsIO().open(filename=filename, mode="w") as f: for message_body, publish_time in batch: f.write(f"{message_body},{publish_time}\n".encode()) def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None): # Establece `save_main_session` como Verdadero para que DoFns pueda acceder a los módulos importados globalmente. pipeline_options = PipelineOptions( pipeline_args, streaming=True, save_main_session=True ) with Pipeline(options=pipeline_options) as pipeline: ( pipeline # Debido a que `timestamp_attribute` no se especifica en `ReadFromPubSub`, Beam # vincula la hora de publicación devuelta por el servidor de Pub/Sub para cada mensaje # al parámetro de marca de tiempo del elemento, accesible a través de `DoFn.TimestampParam`. # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub | "Leer desde Pub/Sub" >> io.ReadFromPubSub(topic=input_topic) | "Agrupar en" >> GroupMessagesByFixedWindows(window_size, num_shards) | "Escribir a GCS" >> ParDo(WriteToGCS(output_path)) ) if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() parser.add_argument( "--input_topic", help="The Cloud Pub/Sub topic to read from." '"projects//topics/".', ) parser.add_argument( "--window_size", type=float, default=1.0, help="Output file's window size in minutes.", ) parser.add_argument( "--output_path", help="Path of the output GCS file including the prefix.", ) parser.add_argument( "--num_shards", type=int, default=5, help="Number of shards to use when writing windowed elements to GCS.", ) known_args, pipeline_args = parser.parse_known_args() run( known_args.input_topic, known_args.output_path, known_args.window_size, known_args.num_shards, pipeline_args, ) Nota: Para explorar más el código de muestra, visita las páginas de GitHub java-docs-samples y python-docs-samples.

Tarea 3: Comienza la canalización

  1. Para iniciar la canalización, ejecuta el siguiente comando:
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --windowSize=2 \ --tempLocation=gs://$BUCKET_NAME/temp" python PubSubToGCS.py \ --project=project_id \ --region=region \ --input_topic=projects/project_id/topics/my-id \ --output_path=gs://bucket_name/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://bucket_name/temp Nota: Cuando ejecutes el comando de Python, reemplaza project_id, bucket_name y region por tu ID del proyecto, nombre del bucket y región del lab asignada.

El comando anterior se ejecuta de manera local y, luego, inicia un trabajo de Dataflow que se ejecuta en la nube.

Nota: Es posible que debas esperar alrededor de 10 minutos para que el código se ejecute por completo y para que el trabajo de canalización aparezca en la consola de Dataflow en la siguiente tarea. Nota: Si recibes una advertencia sobre StaticLoggerBinder, puedes ignorarla de forma segura y avanzar en el lab.

Haz clic en Revisar mi progreso para verificar el objetivo. Iniciar la canalización y lanzar el trabajo de Dataflow

Tarea 4: Observa el progreso del trabajo y la canalización

  1. Ve a la consola de Dataflow para observar el progreso del trabajo.

  2. Haz clic en Actualizar para ver el trabajo y las actualizaciones de estado más recientes.

Página de Dataflow en la que se muestra la información del trabajo pubsubtogcs 0815172250-75a99ab8

  1. Haz clic en el nombre del trabajo para abrir los detalles y revisar lo siguiente:
  • Estructura del trabajo
  • Registros del trabajo
  • Métricas de etapas

Página de trabajo en la que se muestra la información de resumen del trabajo

Puede que debas esperar unos minutos para ver los archivos de salida en Cloud Storage.

  1. Para ver los archivos de salida, navega a Menú de navegación > Cloud Storage, haz clic en el nombre de tu bucket y, luego, en Ejemplos.

Página de detalles del bucket en la que se muestra la información del archivo de salida

  1. Como alternativa, puedes salir de la aplicación en Cloud Shell con CTRL + C (y, para la opción de Python, escribe exit) y, luego, ejecutar el siguiente comando para enumerar los archivos que se escribieron en Cloud Storage:
gsutil ls gs://${BUCKET_NAME}/samples/

El resultado debe verse de la siguiente manera:

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

Tarea 5: Realiza una limpieza

  1. Si aún no lo hiciste, sal de la aplicación en Cloud Shell con CTRL + C.

En la opción de Python, escribe exit para salir del entorno de Python.

  1. En Cloud Shell, borra el trabajo de Cloud Scheduler:
gcloud scheduler jobs delete publisher-job

Si aparece la pregunta “Do you want to continue”, presiona Y y, luego, Intro.

  1. En la consola de Dataflow, detén el trabajo seleccionando su nombre y haciendo clic en Detener.

Cuando se te solicite, haz clic en Detener trabajo > Cancelar para cancelar la canalización sin desviarla.

  1. En Cloud Shell, borra el tema:
gcloud pubsub topics delete $TOPIC_ID
  1. En Cloud Shell, borra los archivos que creó la canalización:
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
  1. En Cloud Shell, borra el bucket de Cloud Storage:
gsutil rb gs://${BUCKET_NAME}

¡Felicitaciones!

Creaste una canalización de Dataflow que leyó mensajes de tu tema de Pub/Sub, los agrupó por marca de tiempo y los escribió en tu bucket de Cloud Storage.

Próximos pasos/Más información

Capacitación y certificación de Google Cloud

Recibe la formación que necesitas para aprovechar al máximo las tecnologías de Google Cloud. Nuestras clases incluyen habilidades técnicas y recomendaciones para ayudarte a avanzar rápidamente y a seguir aprendiendo. Para que puedas realizar nuestros cursos cuando más te convenga, ofrecemos distintos tipos de capacitación de nivel básico a avanzado: a pedido, presenciales y virtuales. Las certificaciones te ayudan a validar y demostrar tus habilidades y tu conocimiento técnico respecto a las tecnologías de Google Cloud.

Última actualización del manual: 20 de agosto de 2025

Prueba más reciente del lab: 20 de agosto de 2025

Copyright 2025 Google LLC. All rights reserved. Google y el logotipo de Google son marcas de Google LLC. Los demás nombres de productos y empresas pueden ser marcas de las respectivas empresas a las que estén asociados.

Antes de comenzar

  1. Los labs crean un proyecto de Google Cloud y recursos por un tiempo determinado
  2. .
  3. Los labs tienen un límite de tiempo y no tienen la función de pausa. Si finalizas el lab, deberás reiniciarlo desde el principio.
  4. En la parte superior izquierda de la pantalla, haz clic en Comenzar lab para empezar

Usa la navegación privada

  1. Copia el nombre de usuario y la contraseña proporcionados para el lab
  2. Haz clic en Abrir la consola en modo privado

Accede a la consola

  1. Accede con tus credenciales del lab. Si usas otras credenciales, se generarán errores o se incurrirá en cargos.
  2. Acepta las condiciones y omite la página de recursos de recuperación
  3. No hagas clic en Finalizar lab, a menos que lo hayas terminado o quieras reiniciarlo, ya que se borrará tu trabajo y se quitará el proyecto

Este contenido no está disponible en este momento

Te enviaremos una notificación por correo electrónico cuando esté disponible

¡Genial!

Nos comunicaremos contigo por correo electrónico si está disponible

Un lab a la vez

Confirma para finalizar todos los labs existentes y comenzar este

Usa la navegación privada para ejecutar el lab

Usa una ventana de navegación privada o de Incógnito para ejecutar el lab. Así evitarás cualquier conflicto entre tu cuenta personal y la cuenta de estudiante, lo que podría generar cargos adicionales en tu cuenta personal.