Instrucciones y requisitos de configuración del lab
Protege tu cuenta y tu progreso. Usa siempre una ventana de navegador privada y las credenciales del lab para ejecutarlo.

Procesamiento de datos sin servidores con Dataflow: Canalización avanzada de análisis de transmisiones con Dataflow (Python)

Lab 2 horas universal_currency_alt 5 créditos show_chart Avanzado
info Es posible que este lab incorpore herramientas de IA para facilitar tu aprendizaje.
Este contenido aún no está optimizado para dispositivos móviles.
Para obtener la mejor experiencia, visítanos en una computadora de escritorio con un vínculo que te enviaremos por correo electrónico.

Descripción general

En este lab, aprenderás a hacer lo siguiente:

  • Manejar los datos con retraso
  • Manejar los datos con errores de formato de las siguientes maneras:
  • Escribir una transformación compuesta para crear un código más modular
  • Escribir una transformación que emita varias salidas de distintos tipos
  • Recopilar datos con errores de formato y escribirlos en una ubicación en la que se los pueda examinar

Al final del lab anterior, se presentó un tipo de desafío con el que deben lidiar las canalizaciones en tiempo real: la brecha entre el momento en que transcurren los eventos y el momento en que se procesan. Esto se denomina retraso. En este lab, se presentan conceptos de Apache Beam que permiten a los creadores de canalizaciones especificar cómo estas deben abordar el retraso de manera formal.

Sin embargo, el retraso no es el único problema que pueden enfrentar las canalizaciones en un contexto de transmisión: siempre que la entrada provenga del exterior del sistema, existe la posibilidad de que tenga un formato incorrecto. En este lab, también se presentan técnicas que pueden usarse para tratar esas entradas.

La canalización final de este lab se parece a la que se muestra en la imagen a continuación. Ten en cuenta que contiene una rama.

El flujo de canalización que comienza en ReadPubSubMessages con una rama que termina en WriteToBQ y otra rama que termina en WriteDeadLetterStorage

Configuración y requisitos

Antes de hacer clic en el botón Comenzar lab

Nota: Lee estas instrucciones.

Los labs son cronometrados y no se pueden pausar. El cronómetro, que comienza a funcionar cuando haces clic en Comenzar lab, indica por cuánto tiempo tendrás a tu disposición los recursos de Google Cloud.

Este lab práctico de Google Skills te permitirá realizar las actividades correspondientes en un entorno de nube real, no en uno de simulación o demostración. Para ello, se te proporcionan credenciales temporales nuevas que usarás para acceder a Google Cloud durante todo el lab.

Requisitos

Para completar este lab, necesitarás lo siguiente:

  • Acceso a un navegador de Internet estándar (se recomienda el navegador Chrome)
  • Tiempo para completar el lab
Nota: Si ya tienes un proyecto o una cuenta personal de Google Cloud, no los uses para el lab. Nota: Si usas una Pixelbook, abre una ventana de incógnito para ejecutar el lab.

Cómo iniciar tu lab y acceder a la consola

  1. Haz clic en el botón Comenzar lab. Si debes pagar por el lab, se abrirá una ventana emergente para que selecciones tu forma de pago. A la izquierda, verás un panel con las credenciales temporales que debes usar para este lab.

    Panel de credenciales

  2. Copia el nombre de usuario y, luego, haz clic en Abrir la consola de Google. El lab inicia los recursos y abre otra pestaña que muestra la página Elige una cuenta.

    Sugerencia: Abre las pestañas en ventanas separadas, una junto a la otra.
  3. En la página Elige una cuenta, haz clic en Usar otra cuenta. Se abrirá la página de acceso.

    Cuadro de diálogo Elige una cuenta el que se destaca la opción Usar otra cuenta

  4. Pega el nombre de usuario que copiaste del panel Detalles de la conexión. Luego, copia y pega la contraseña.

Nota: Debes usar las credenciales del panel Detalles de la conexión. No uses tus credenciales de Google Skills. Si tienes una cuenta propia de Google Cloud, no la utilices para este lab para no incurrir en cargos.
  1. Haz clic para avanzar por las páginas siguientes:
  • Acepta los Términos y Condiciones.
  • No agregues opciones de recuperación o autenticación de dos factores (esta es una cuenta temporal).
  • No te registres para obtener pruebas gratuitas.

Después de un momento, se abrirá la consola de Cloud en esta pestaña.

Nota: Para ver el menú con una lista de los productos y servicios de Google Cloud, haz clic en el menú de navegación que se encuentra en la parte superior izquierda de la pantalla. Menú de la consola de Cloud

Configuración del entorno de desarrollo de las instancias de Workbench

En este lab, ejecutarás todos los comandos en una terminal de tu notebook de instancia.

  1. En el menú de navegación (Menú de navegación) de la consola de Google Cloud, selecciona Vertex AI.

  2. Haz clic en Habilitar todas las APIs recomendadas.

  3. En el menú de navegación, haz clic en Workbench.

    En la parte superior de la página de Workbench, asegúrate de estar en la vista Instancias.

  4. Haz clic en agregar cuadroCrear nueva.

  5. Configura la instancia:

    • Nombre: lab-workbench
    • Región: Configura la región como
    • Zona: Establece la zona en
    • Opciones avanzadas (opcional): Si es necesario, haz clic en "Opciones avanzadas" para realizar personalizaciones adicionales (p. ej., tipo de máquina, tamaño del disco).

Crea una instancia de Vertex AI Workbench

  1. Haz clic en Crear.

La instancia tardará algunos minutos en crearse. Se mostrará una marca de verificación verde junto a su nombre cuando esté lista.

  1. Haz clic en ABRIR JUPYTERLAB junto al nombre de la instancia para iniciar la interfaz de JupyterLab. Se abrirá una pestaña nueva en el navegador.

Instancia de Workbench implementada

  1. Haz clic en Terminal. Esto abrirá una terminal en la que podrá ejecutar todos los comandos del lab.

Descarga el repositorio de código

A continuación, descargarás un repositorio de código que usarás en este lab.

  1. En la terminal que acabas de abrir, ingresa lo siguiente:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. En el panel izquierdo de tu entorno de notebook, en el navegador de archivos, verás que se agregó el repo training-data-analyst.

  2. Navega al repo clonado /training-data-analyst/quests/dataflow_python/. Verás una carpeta para cada lab. Cada una de ellas se divide en una subcarpeta lab con un código que debes completar y una subcarpeta solution con un ejemplo viable que puedes consultar como referencia si no sabes cómo continuar.

Opción Explorador destacada en el menú Ver expandido

Nota: Para abrir un archivo y editarlo, simplemente debes navegar al archivo y hacer clic en él. Se abrirá el archivo, en el que puedes agregar o modificar código.

Haz clic en Revisar mi progreso para verificar el objetivo. Crear una instancia de notebook y clonar el repo del curso

Parte 1 del lab: Cómo abordar los datos retrasados

En los labs anteriores, escribiste código que dividía los elementos según el tiempo del evento en ventanas de ancho fijo, con un código similar al siguiente:

parsed_msgs | "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(window_duration)) | "CountPerMinute" >> beam.CombineGlobally(CountCombineFn()).without_defaults()

Sin embargo, como observaste al final del último lab que no era de SQL, las transmisiones de datos suelen tener retrasos. El lapso es problemático cuando se emplea el sistema de ventanas por tiempo del evento (en lugar del tiempo de procesamiento) porque genera incertidumbre: ¿llegaron o no todos los eventos que corresponden un tiempo de evento específico?

Claramente, para generar resultados, la canalización escrita debe tomar una decisión al respecto. Para ello, se utilizó un concepto llamado marca de agua. Una marca de agua es la noción del sistema basada en la heurística sobre cuándo se puede esperar que todos los datos generados hasta un tiempo de evento determinado hayan llegado a la canalización. Una vez que la marca de agua avanza más allá del final de una ventana, cualquier elemento adicional que llegue con una marca de tiempo en esa ventana se considera un dato retrasado y, simplemente, se descarta. Por lo tanto, el comportamiento predeterminado del sistema de ventanas es emitir un resultado único, y (ojalá) completo cuando el sistema cree que llegaron todos los datos.

Apache Beam utiliza varios métodos heurísticos para hacer una conjetura de cuál es la marca de agua. Sin embargo, esto no es más que heurística. Además, es una heurística de uso general, que puede no ser adecuada para todos los casos de uso. En lugar de usar una heurística de uso general, los diseñadores de canalizaciones deben considerar cuidadosamente las siguientes preguntas para determinar qué aspectos vale la pena sacrificar:

  • Integridad: ¿Qué tan importante es contar con todos los datos antes de calcular el resultado?
  • Latencia: ¿Cuánto tiempo estás dispuesto a esperar a que lleguen los datos? Por ejemplo, ¿esperas a tener todos los datos para procesarlos o los procesas a medida que llegan?
  • Costo: ¿Cuánto dinero y capacidad de procesamiento estás dispuesto a invertir para reducir la latencia?

Con las respuestas a estas preguntas, es posible usar formalismos de Apache Beam para escribir código que sacrifique los aspectos correctos.

Retraso permitido

El retraso permitido controla por cuánto tiempo debe retenerse el estado de una ventana. Cuando la marca de agua alcanza el final del período de retraso permitido, se descarta todo el estado. Si bien sería fantástico poder mantener todo nuestro estado persistente por un tiempo ilimitado, cuando se trata de una fuente de datos no delimitada, por lo general no es práctico conservar el estado de una ventana determinada de manera indefinida, ya que corremos el riesgo de quedarnos sin espacio en el disco.

Como resultado, cualquier sistema que deba procesar datos reales fuera de orden necesita tener un método para limitar la duración de las ventanas de procesamiento. Una forma limpia y concisa de hacer esto es definir un horizonte para la demora permitida dentro del sistema, es decir, poner un límite sobre qué tan tarde puede llegar cualquier registro individual (con relación a la marca de agua) para que valga la pena que el sistema lo procese. Los datos que lleguen después de este horizonte simplemente se descartarán. Cuando se limita el retraso máximo de cualquier dato individual, también se establece con exactitud el tiempo por el que se debe conservar el estado de las ventanas (hasta que la marca de agua supere el umbral de retraso para el final de la ventana).

Tarea 1. Prepara el entorno

Al igual que en los labs anteriores, el primer paso es generar datos para que la canalización los procese. Abre el entorno del lab y genera los datos como lo hiciste anteriormente:

Abre el lab adecuado

  • En la terminal del IDE, ejecuta los siguientes comandos para cambiar al directorio que usarás en este lab:
# Change directory into the lab cd 7_Advanced_Streaming_Analytics/lab export BASE_DIR=$(pwd)

Configura el entorno virtual y las dependencias

Antes de comenzar a editar el código de la canalización en sí, debes asegurarte de haber instalado las dependencias necesarias.

  1. Ejecuta el siguiente comando para crear un entorno virtual para tu trabajo en este lab:
sudo apt-get update && sudo apt-get install -y python3-venv ## Create and activate virtual environment python3 -m venv df-env source df-env/bin/activate
  1. Luego, instala los paquetes que necesitarás para ejecutar tu canalización:
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Asegúrate de que esté habilitada la API de Dataflow:
gcloud services enable dataflow.googleapis.com

Configura el entorno de datos

# Crear buckets de GCS y un conjunto de datos de BigQuery cd $BASE_DIR/../../ source create_streaming_sinks.sh # Cambiar al directorio que contiene la versión de práctica del código cd $BASE_DIR

Haz clic en Revisar mi progreso para verificar el objetivo. Preparar el entorno

Tarea 2. Configura el retraso permitido

  1. En el explorador de archivos, ve a training-data-analyst/quests/dataflow_python/7_Advanced_Streaming_Analytics/lab y abre el archivo streaming_minute_traffic_pipeline.py.

En Apache Beam, el retraso permitido se establece con el argumento de palabra clave allowed_lateness con el activador AfterWatermark() dentro de la PTransform WindowInto, como en el siguiente ejemplo:

items = p | ... Windowed_items = items | beam.WindowInto(beam.window.FixedWindows(60), # 1 minute trigger=AfterWatermark(), allowed_lateness=60*60*24) # 1 day
  1. Para completar esta tarea, examina la transformación del sistema de ventanas y el retraso permitido por el argumento de línea de comandos allowed_lateness. Usa tu criterio para decidir cuál sería un valor razonable y actualiza la línea de comandos para que refleje las unidades correctas.

Activadores

Los diseñadores de canalizaciones también pueden elegir a su criterio cuándo emitir resultados preliminares. En el paso anterior, usamos el activador AfterWatermark() con un retraso permitido especificado. Por ejemplo, supongamos que aún no se alcanzó la marca de agua del final de una ventana, pero ya llegó el 75% de los datos esperados. En muchos casos, se puede suponer que esta muestra es representativa, por lo que tiene sentido mostrar un resultado a los usuarios finales.

Los Triggers determinan en qué momento del tiempo de procesamiento se materializarán los resultados. Cada resultado específico de una ventana se conoce como un panel de la ventana. Los activadores activan los paneles cuando se cumplen las condiciones de activación. En Apache Beam, esas condiciones incluyen el progreso de la marca de agua, el progreso del tiempo de procesamiento (que avanza de manera uniforme, sin importar la cantidad de datos que realmente lleguen), el recuento de elementos (como cuando llega una cantidad determinada de datos nuevos) y los activadores dependientes de los datos (como cuando se llega al final de un archivo).

Las condiciones de un activador pueden provocar que este active un panel muchas veces. Por lo tanto, también es necesario especificar cómo acumular estos resultados. Actualmente, Apache Beam admite dos modos de acumulación, uno que agrupa resultados y otro que solo devuelve las partes del resultado que son nuevas desde que se activó el último panel.

Tarea 3. Configura un activador

Cuando se configura una función de ventanas para una PCollection con la transformación Window, también se puede especificar un activador.

Para configurar los activadores de una PCollection, debes configurar el argumento de palabra clave trigger de la PTransform WindowInto. Apache Beam incluye varios activadores:

  • AfterWatermark, que realiza la activación cuando la marca de agua pasa una marca de tiempo determinada desde el final de la ventana o la llegada del primer elemento de un panel

  • AfterProcessingTime, que realiza la activación cuando transcurre determinado tiempo de procesamiento (por lo general, contado desde la llegada del primer elemento de un panel)

  • AfterCount, que realiza la activación cuando la cantidad de elementos de la ventana llega a un recuento determinado.

En esta muestra de código, se configura un activador basado en el tiempo para una PCollection que emite resultados un minuto después de que se procesa el primer elemento de la ventana. En la última línea de la muestra de código, configuramos el modo de acumulación de la ventana definiendo el argumento de palabra clave accumulation_mode en AccumulationMode.DISCARDING:

items = p | ... windowed_items = items | beam.WindowInto(FixedWindows(60), # 1 minute trigger=AfterProcessingTime(60), accumulation_mode=AccumulationMode.DISCARDING)
  1. Para completar esta tarea, agrega el argumento de palabra clave trigger a WindowInto pasando el activador AfterWatermark. Cuando diseñes tu activador, ten en cuenta este caso de uso, en el que los datos se organizan en ventanas de un minuto y los datos pueden llegar tarde. Además, como argumento del activador AfterWatermark, agrega un activador de retraso para cada elemento que llegue con retraso (dentro del plazo permitido). Si tienes dificultades para completar la tarea, consulta la solución.

  2. Completa el siguiente #TODO cerca de la línea 113 para establecer el activador y el modo de acumulación:

trigger=AfterProcessingTime(120), accumulation_mode=AccumulationMode.DISCARDING)
  1. Completa el siguiente #TODO cerca de la línea 119 para establecer el retraso permitido, el activador y el modo de acumulación:
trigger=AfterWatermark(late=AfterCount(1)), allowed_lateness=int(allowed_lateness), accumulation_mode=AccumulationMode.ACCUMULATING)

Parte 2 del lab: Cómo tratar los datos con errores de formato

Según cómo configures tu Trigger, si ejecutas la canalización en este momento y la comparas con la del lab anterior, tal vez notes que la canalización nueva presenta resultados antes. También es posible que sus resultados tengan una exactitud mayor si la heurística no predijo con mucha exactitud el comportamiento de la transmisión y si el retraso permitido es mejor.

Sin embargo, aunque la canalización actual es más sólida en lo que respecta a los retrasos, sigue siendo vulnerable a los datos con formato incorrecto. Si ejecutas la canalización y se publica un mensaje que contenga algo distinto de una cadena JSON con el formato correcto que pueda analizarse como un CommonLog, la canalización generará un error. Si bien existen herramientas, como Cloud Logging, que facilitan la lectura de esos errores, una canalización mejor diseñada puede almacenarlos en una ubicación predefinida para inspeccionarlos en otro momento.

En esta sección, agregarás componentes a la canalización que la harán más modular y sólida.

Tarea 1. Recopila datos con errores de formato

Para que la canalización sea más sólida ante los datos con formato incorrecto, necesita una forma de excluirlos y ramificarse para procesarlos de manera diferente. Ya viste una forma de generar una rama en una canalización: hacer que una PCollection sea la entrada de varias transformaciones.

Esta forma de ramificación es potente. Sin embargo, hay algunos casos de uso en los que esta estrategia es ineficiente. Por ejemplo, supongamos que quieres crear dos subconjuntos diferentes de la misma PCollection. Con el método de transformación múltiple, podrías crear una transformación de filtro para cada subconjunto y aplicar ambas a la PCollection original. Sin embargo, esto implicaría procesar cada elemento dos veces.

Otro método para producir una canalización con ramas es hacer que una única transformación produzca múltiples resultados, pero que procese la PCollection de entrada solo una vez. En esta tarea, escribirás una transformación que produzca dos resultados: uno de los datos con formato correcto y otro de los elementos con formato incorrecto del flujo de entrada original.

Para emitir varios resultados y crear una sola PCollection, Apache Beam usa una clase llamada TaggedOutput para asignar claves a las salidas de la DoFn con varias salidas (posiblemente heterogéneas).

A continuación, se muestra un ejemplo de TaggedOutput que se usa para etiquetar diferentes resultados de un DoFn. Esas PCollection se recuperan con el método with_outputs(), y se hace referencia a ellas con el nombre de etiqueta especificado en TaggedOutput.

class ConvertToCommonLogFn(beam.DoFn): def process(self, element): try: row = json.loads(element.decode('utf-8')) yield beam.pvalue.TaggedOutput('parsed_row', CommonLog(**row)) except: yield beam.pvalue.TaggedOutput('unparsed_row', element.decode('utf-8')) … rows = (p | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(input_topic) | 'ParseJson' >> beam.ParDo(ConvertToCommonLogFn()).with_outputs('parsed_row', 'unparsed_row') .with_output_types(CommonLog)) (rows.unparsed_row | … (rows.parsed_row | …

Para completar esta tarea, declara dos TaggedOutput que se muestran en la clase ConvertToCommonLogFn, como se muestra arriba. En la sentencia try, muestra la fila analizada como una instancia de la clase CommonLog; en la sentencia catch, muestra la fila sin analizar (decodificada).

  1. Completa el primer #TODO de la clase ConvertToCommonLogFn:
beam.pvalue.TaggedOutput('parsed_row', CommonLog(**row))
  1. Completa el segundo #TODO de la clase ConvertToCommonLogFn:
beam.pvalue.TaggedOutput('unparsed_row', element.decode('utf-8'))

Tarea 2. Escribe los datos con formato incorrecto para su análisis posterior

Para solucionar el problema de una etapa anterior que produce datos con formato incorrecto, es importante poder analizar esos datos. Para ello, es necesario materializarlos en alguna parte. En esta tarea, escribirás datos con formato incorrecto en Google Cloud Storage. Este patrón se llama mediante el almacenamiento de mensajes no entregados.

En labs anteriores, escribiste directamente desde una fuente limitada (por lotes) en Cloud Storage con beam.io.WriteToText(). Sin embargo, cuando se escribe desde una fuente no delimitada (de transmisión), este enfoque debe modificarse levemente.

En primer lugar, en un momento anterior a la transformación de escritura, es necesario usar un trigger para especificar en qué momento del procesamiento debe realizarse la escritura. De lo contrario, se aplica la configuración predeterminada y los datos no se escriben nunca. De forma predeterminada, cada evento pertenece a la ventana global. Cuando se trabaja por lotes, esto es correcto porque se conoce el conjunto de datos completo en el momento de la ejecución. Sin embargo, con las fuentes no delimitadas, el tamaño completo del conjunto de datos es desconocido, por lo que los paneles de la ventana global nunca se activan, ya que nunca están completos.

Debido a que usarás un Trigger, también deberás usar una Window. Sin embargo, no siempre es necesario cambiar esta última. En labs y tareas anteriores, utilizaste transformaciones de sistemas de ventanas para reemplazar la ventana global por una ventana de duración fija en el tiempo del evento. En este caso, determinar qué elementos se agrupan no es tan importante como el hecho de que los resultados se materialicen de forma útil y a una tasa útil.

En el siguiente ejemplo, la ventana activa el panel de la ventana global cada 10 segundos de tiempo de procesamiento, pero solo escribe los eventos nuevos:

pcollection | “FireEvery10s” >> WindowInto(FixedWindows(10) trigger=AfterProcessingTime(10)) accumulation_mode=AccumulationMode.DISCARDING

Una vez configurado un activador, tendremos que cambiar el receptor de beam.io.WriteToText() (que no admite datos de transmisión) a beam.io.fileio.WriteToFiles() para realizar las operaciones de escritura. Cuando se escribe después de una transformación de sistema de ventanas, es necesario especificar una cantidad de shards para que se pueda escribir en paralelo:

windowed_items = p | 'WriteWindowedPCollection' >> fileio.WriteToFiles("gs://path/to/somewhere", shards=int(num_shards), max_writers_per_bundle=0)
  1. Para completar esta tarea, crea una transformación nueva con rows.unparsed_row como entrada para recuperar los datos con formato incorrecto. Usa un activador de tiempo de procesamiento de 120 segundos para la ventana fija de 120 segundos de duración con el modo de acumulación configurado en AccumulationMode.DISCARDING.

  2. Completa el #TODO para usar beam.fileio.WriteToFiles y poder escribir en GCS:

fileio.WriteToFiles(output_path,shards=1,max_writers_per_bundle=0)

Tarea 3. Ejecuta tu canalización

Para ejecutar tu canalización, crea un comando similar al siguiente ejemplo. Ten en cuenta que deberás modificarlo para que refleje los nombres de las opciones de línea de comandos que incluiste:

export PROJECT_ID=$(gcloud config get-value project) export REGION={{{project_0.startup_script.lab_region|Region}}} export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export PUBSUB_TOPIC=projects/${PROJECT_ID}/topics/my_topic export WINDOW_DURATION=60 export ALLOWED_LATENESS=1 export OUTPUT_TABLE_NAME=${PROJECT_ID}:logs.minute_traffic export DEADLETTER_BUCKET=${BUCKET} cd $BASE_DIR python3 streaming_minute_traffic_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --staging_location=${PIPELINE_FOLDER}/staging \ --temp_location=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --input_topic=${PUBSUB_TOPIC} \ --window_duration=${WINDOW_DURATION} \ --allowed_lateness=${ALLOWED_LATENESS} \ --table_name=${OUTPUT_TABLE_NAME} \ --dead_letter_bucket=${DEADLETTER_BUCKET} \ --allow_unsafe_triggers

El código de esta Quest incluye una secuencia de comandos para publicar eventos JSON con Pub/Sub. Para completar esta tarea y comenzar a publicar mensajes, abre una nueva terminal al lado de la actual y ejecuta la siguiente secuencia de comandos. Se seguirán publicando mensajes hasta que finalices la secuencia de comandos. Asegúrate de estar en la carpeta training-data-analyst/quests/dataflow_python.

Nota: La marca true agrega eventos retrasados a la transmisión. cd /home/jupyter/training-data-analyst/quests/dataflow_python/ bash generate_streaming_events.sh true

Haz clic en Revisar mi progreso para verificar el objetivo. Ejecutar tu canalización

Tarea 4. Prueba tu canalización

  1. En la barra de título de la consola de Google Cloud, escribe Pub/Sub en el campo de búsqueda y, luego, haz clic en Pub/Sub en la sección Productos y páginas.

  2. Haz clic en Temas y, luego, en el tema my_topic.

  3. Haz clic en Mensajes.

  4. Haz clic en Seleccionar una suscripción de Cloud Pub/Sub desde la que deseas extraer mensajes* y selecciona la suscripción My topic del menú desplegable.

Nota: Es posible que debas hacer clic en Actualizar para ver la suscripción.
  1. Haz clic en el menú Publicar mensaje.

  2. En la página siguiente, ingresa el mensaje que deseas publicar y, luego, haz clic en Publicar.

El mensaje debería llegar pronto al bucket de Cloud Storage destinado a los mensajes no entregados, siempre y cuando no se ajuste perfectamente a la especificación JSON CommonLog. Para hacer un seguimiento de su ruta en la canalización, regresa a la ventana de supervisión de la canalización y haz clic en un nodo de la rama responsable de manejar los mensajes no analizados. Cuando veas un elemento agregado a esta rama, podrás navegar a Cloud Storage y verificar que el mensaje se haya escrito en el disco:

export PROJECT_ID=$(gcloud config get-value project) export REGION={{{project_0.startup_script.lab_region|Region}}} export BUCKET=gs://${PROJECT_ID}/deadletter gcloud storage ls $BUCKET gcloud storage cat $BUCKET/*

Haz clic en Revisar mi progreso para verificar el objetivo. Probar tu canalización .

Finalice su lab

Cuando haya completado el lab, haga clic en Finalizar lab. Google Cloud Skills Boost quitará los recursos que usó y limpiará la cuenta.

Tendrá la oportunidad de calificar su experiencia en el lab. Seleccione la cantidad de estrellas que corresponda, ingrese un comentario y haga clic en Enviar.

La cantidad de estrellas indica lo siguiente:

  • 1 estrella = Muy insatisfecho
  • 2 estrellas = Insatisfecho
  • 3 estrellas = Neutral
  • 4 estrellas = Satisfecho
  • 5 estrellas = Muy satisfecho

Puede cerrar el cuadro de diálogo si no desea proporcionar comentarios.

Para enviar comentarios, sugerencias o correcciones, use la pestaña Asistencia.

Copyright 2026 Google LLC. Todos los derechos reservados. Google y el logotipo de Google son marcas de Google LLC. El resto de los nombres de productos y empresas pueden ser marcas de las respectivas empresas a las que están asociados.

Antes de comenzar

  1. Los labs crean un proyecto de Google Cloud y recursos por un tiempo determinado
  2. .
  3. Los labs tienen un límite de tiempo y no tienen la función de pausa. Si finalizas el lab, deberás reiniciarlo desde el principio.
  4. En la parte superior izquierda de la pantalla, haz clic en Comenzar lab para empezar

Usa la navegación privada

  1. Copia el nombre de usuario y la contraseña proporcionados para el lab
  2. Haz clic en Abrir la consola en modo privado

Accede a la consola

  1. Accede con tus credenciales del lab. Si usas otras credenciales, se generarán errores o se incurrirá en cargos.
  2. Acepta las condiciones y omite la página de recursos de recuperación
  3. No hagas clic en Finalizar lab, a menos que lo hayas terminado o quieras reiniciarlo, ya que se borrará tu trabajo y se quitará el proyecto

Este contenido no está disponible en este momento

Te enviaremos una notificación por correo electrónico cuando esté disponible

¡Genial!

Nos comunicaremos contigo por correo electrónico si está disponible

Un lab a la vez

Confirma para finalizar todos los labs existentes y comenzar este

Usa la navegación privada para ejecutar el lab

Usar una ventana de incógnito o de navegación privada es la mejor forma de ejecutar este lab. Así evitarás cualquier conflicto entre tu cuenta personal y la cuenta de estudiante, lo que podría generar cargos adicionales en tu cuenta personal.