Descripción general
En este lab, tomarás muchos de los conceptos que se presentaron en el contexto de lotes y los aplicarás en un contexto de transmisión para crear una canalización similar a batch_minute_traffic_pipeline, pero que opera en tiempo real. La canalización finalizada primero leerá los mensajes JSON de Pub/Sub y los analizará antes de la ramificación. Una rama escribe algunos datos sin procesar en BigQuery y toma nota del evento y del tiempo de procesamiento. La otra rama agrupa en ventanas y agrega los datos y, luego, escribe los resultados en BigQuery.
Objetivos
- Leer datos de una fuente de transmisión
- Escribir datos en un receptor de transmisión
- Agrupar los datos en ventanas en un contexto de transmisión
- Verificar de forma experimental los efectos del retraso
Crearás la siguiente canalización:

Configuración y requisitos
Antes de hacer clic en el botón Comenzar lab
Nota: Lee estas instrucciones.
Los labs son cronometrados y no se pueden pausar. El cronómetro, que comienza a funcionar cuando haces clic en Comenzar lab, indica por cuánto tiempo tendrás a tu disposición los recursos de Google Cloud.
En este lab práctico de Qwiklabs, se te proporcionarán credenciales temporales nuevas para acceder a Google Cloud y realizar las actividades en un entorno de nube real, no en uno de simulación o demostración.
Requisitos
Para completar este lab, necesitarás lo siguiente:
- Acceso a un navegador de Internet estándar (se recomienda el navegador Chrome)
- Tiempo para completar el lab
Nota: Si ya tienes un proyecto o una cuenta personal de Google Cloud, no los uses para el lab.
Nota: Si usas una Pixelbook, abre una ventana de incógnito para ejecutar el lab.
Cómo iniciar tu lab y acceder a la consola
-
Haz clic en el botón Comenzar lab. Si debes pagar por el lab, se abrirá una ventana emergente para que selecciones tu forma de pago.
A la izquierda, verás un panel con las credenciales temporales que debes usar para este lab.

-
Copia el nombre de usuario y, luego, haz clic en Abrir la consola de Google.
El lab inicia los recursos y abre otra pestaña que muestra la página Elige una cuenta.
Sugerencia: Abre las pestañas en ventanas separadas, una junto a la otra.
-
En la página Elige una cuenta, haz clic en Usar otra cuenta. Se abrirá la página de acceso.

-
Pega el nombre de usuario que copiaste del panel Detalles de la conexión. Luego, copia y pega la contraseña.
Nota: Debes usar las credenciales del panel Detalles de la conexión. No uses tus credenciales de Google Cloud Skills Boost. Si tienes una cuenta propia de Google Cloud, no la utilices para este lab para no incurrir en cargos.
- Haz clic para avanzar por las páginas siguientes:
- Acepta los Términos y Condiciones.
- No agregues opciones de recuperación o autenticación de dos factores (esta es una cuenta temporal).
- No te registres para obtener pruebas gratuitas.
Después de un momento, se abrirá la consola de Cloud en esta pestaña.
Nota: Para ver el menú con una lista de los productos y servicios de Google Cloud, haz clic en el menú de navegación que se encuentra en la parte superior izquierda de la pantalla.
Configuración del entorno de desarrollo basado en notebooks de Jupyter
En este lab, ejecutarás todos los comandos en una terminal del notebook.
-
En el menú de navegación de la consola de Google Cloud, haz clic en Vertex AI > Workbench.
-
Haz clic en Habilitar API de Notebooks.
-
En la página de Workbench, selecciona NOTEBOOKS ADMINISTRADOS POR EL USUARIO y haz clic en CREAR NUEVO.
-
En el cuadro de diálogo Instancia nueva que se muestra, establece la región en y la zona en .
-
En Entorno, selecciona Apache Beam.
-
Haz clic en CREAR en la parte inferior del cuadro de diálogo.
Nota: El aprovisionamiento completo del entorno tarda de 3 a 5 minutos. Espera hasta que se complete este paso.
Nota: Haz clic en Habilitar API de Notebooks para habilitarla.
- Cuando el entorno esté listo, haz clic en el vínculo ABRIR JUPYTERLAB que se encuentra junto al nombre del notebook. Esto abrirá tu entorno en una nueva pestaña del navegador.

- Luego, haz clic en Terminal. Esto abrirá una terminal en la que podrás ejecutar todos los comandos del lab.

Descarga el repositorio de código
A continuación, descargarás un repositorio de código que usarás en este lab.
- En la terminal que acabas de abrir, ingresa lo siguiente:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst
cd /home/jupyter/training-data-analyst/quests/dataflow_python/
-
En el panel izquierdo de tu entorno de notebook, en el navegador de archivos, verás que se agregó el repo training-data-analyst.
-
Navega al repo clonado /training-data-analyst/quests/dataflow_python/
. Verás una carpeta para cada lab. Cada una de ellas se divide en una subcarpeta lab
con un código que debes completar y una subcarpeta solution
con un ejemplo viable que puedes consultar como referencia si no sabes cómo continuar.

Nota: Para abrir un archivo y editarlo, simplemente debes navegar al archivo y hacer clic en él. Se abrirá el archivo, en el que puedes agregar o modificar código.
Abre el lab adecuado
- En la terminal, ejecuta los siguientes comandos para cambiar al directorio que usarás en este lab:
# Change directory into the lab
cd 5_Streaming_Analytics/lab
export BASE_DIR=$(pwd)
Configura el entorno virtual y las dependencias
Antes de comenzar a editar el código de la canalización en sí, debes asegurarte de haber instalado las dependencias necesarias.
- Ejecuta el siguiente comando a fin de crear un entorno virtual para tu trabajo en este lab:
sudo apt-get install -y python3-venv
## Create and activate virtual environment
python3 -m venv df-env
source df-env/bin/activate
- Luego, instala los paquetes que necesitarás para ejecutar tu canalización:
python3 -m pip install -q --upgrade pip setuptools wheel
python3 -m pip install apache-beam[gcp]
- Asegúrate de que la API de Dataflow esté habilitada:
gcloud services enable dataflow.googleapis.com
- Por último, otorga el rol
dataflow.worker
a la cuenta de servicio predeterminada de Compute Engine:
PROJECT_ID=$(gcloud config get-value project)
export PROJECT_NUMBER=$(gcloud projects list --filter="$PROJECT_ID" --format="value(PROJECT_NUMBER)")
export serviceAccount=""$PROJECT_NUMBER"-compute@developer.gserviceaccount.com"
-
En la consola de Cloud, ve a IAM Y ADMINISTRACIÓN > IAM y haz clic en el ícono Editar principal de Cuenta de servicio predeterminada de Compute Engine
.
-
Agrega Dataflow Worker como otro rol y haz clic en Guardar.
Configura el entorno de datos
# Create GCS buckets and BQ dataset
cd $BASE_DIR/../..
source create_streaming_sinks.sh
# Change to the directory containing the practice version of the code
cd $BASE_DIR
Haz clic en Revisar mi progreso para verificar el objetivo.
Configurar el entorno de datos
Tarea 1. Lee desde una fuente de transmisión
En los labs anteriores, usaste beam.io.ReadFromText
para leer desde Google Cloud Storage. En este lab, en lugar de Google Cloud Storage, usarás Pub/Sub. Pub/Sub es un servicio de mensajería en tiempo real completamente administrado que permite a los publicadores enviar mensajes a un “tema”, al que los usuarios se pueden unir con una “suscripción”.

La canalización que crea se suscribe a un tema llamado my_topic
que acaba de crear mediante la secuencia de comandos create_streaming_sinks.sh
. En una situación de producción, es el equipo de publicación el que suele crear este tema. Puedes verlo en la sección de Pub/Sub de la consola.
- En el explorador de archivos, ve a
training-data-analyst/quest/dataflow_python/5_Streaming_Analytics/lab/
y abre el archivo streaming_minute_traffic_pipeline.py
.
- Para leer desde Pub/Sub mediante los conectores de IO de Apache Beam, agrega una transformación a la canalización que usa la clase
beam.io.ReadFromPubSub()
. Esta clase tiene atributos para especificar el tema de origen, así como el timestamp_attribute
. De forma predeterminada, este atributo se establece en la hora de publicación del mensaje.
Nota:
La hora de publicación es la hora en que el servicio de Pub/Sub recibe el mensaje por primera vez. Hay sistemas en los que puede haber un retraso entre la hora real del evento y la de publicación (es decir, datos tardíos). Si deseas tener esto en cuenta, el código del cliente que publica el mensaje debe establecer un atributo de metadatos “timestamp” en el mensaje y proporcionar la marca de tiempo real del evento, ya que Pub/Sub no sabrá de forma nativa cómo extraer la marca de tiempo del evento incorporada en la carga útil. Puedes ver aquí el código del cliente que genera los mensajes que utilizarás.
Para completar esta tarea, sigue estos pasos:
- Agrega una transformación que lea desde el tema de Pub/Sub especificado en el parámetro de la línea de comandos
input_topic
.
- Luego, usa la función proporcionada,
parse_json
con beam.Map
para convertir cada cadena JSON en una instancia de CommonLog
.
- Recopila los resultados de esta transformación en una
PCollection
de instancias de CommonLog
mediante with_output_types()
- En el primer
#TODO
, agrega el siguiente código:
beam.io.ReadFromPubSub(input_topic)
Tarea 2. Agrupa los datos en ventanas
En el lab anterior que no es de SQL, implementaste un sistema de ventanas de tiempo fijo a fin de agrupar eventos por tiempo de evento en ventanas de tamaño fijo mutuamente excluyentes. Haz lo mismo aquí con las entradas de transmisión. Si tienes algún problema, no dudes en consultar el código del lab anterior o la solución.
Define ventanas de un minuto
Para completar esta tarea, sigue estos pasos:
- Agrega una transformación a tu canalización que acepte la
PCollection
de los datos de CommonLog
y agrupa los elementos en ventanas de segundos window_duration
, con window_duration
como otro parámetro de la línea de comandos.
- Usa el siguiente código para agregar una transformación a tu canalización que agrupe los elementos en ventanas de un minuto:
"WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(60))
Tarea 3. Agrega los datos
En el lab anterior, utilizaste el combinador CountCombineFn()
para calcular la cantidad de eventos por ventana. Haz lo mismo aquí.
Registra eventos por ventana
Para completar esta tarea, sigue estos pasos:
- Pasa la
PCollection
con ventanas como entrada a una transformación que calcule la cantidad de eventos por ventana.
- Luego, usa los
DoFn
y GetTimestampFn
proporcionados con beam.ParDo
para incluir la marca de tiempo del inicio de la ventana.
- Usa el siguiente código para agregar una transformación a tu canalización que calcule la cantidad de eventos por ventana:
"CountPerMinute" >> beam.CombineGlobally(CountCombineFn()).without_defaults()
Tarea 4. Escribe en BigQuery
Esta canalización escribe en BigQuery en dos ramas distintas. La primera rama escribe los datos agregados en BigQuery. La segunda rama, que ya creaste, escribe algunos metadatos sobre cada evento sin procesar, incluida la marca de tiempo del evento y la marca de tiempo de procesamiento real. Ambas escriben directamente en BigQuery a través de inserciones de transmisión.
Escribe datos agregados en BigQuery
La escritura en BigQuery ya se abordó ampliamente en labs anteriores, por lo que la mecánica básica no se tratará en profundidad en esta sección.
Para completar esta tarea, sigue estos pasos:
- Crea un nuevo parámetro de línea de comandos llamado
agg_table_name
para la tabla destinada a alojar datos agregados.
- Tal como lo hiciste antes, agrega una transformación que escriba en BigQuery.
Nota:
En un contexto de transmisión, beam.io.WriteToBigQuery()
no admite write_disposition
de WRITE_TRUNCATE
en la que la tabla se descarta y se vuelve a crear. En este ejemplo, usa WRITE_APPEND
.
Método de inserción de BigQuery
beam.io.WriteToBigQuery
se configurará de forma predeterminada como inserciones de transmisión para PCollections no delimitadas o como trabajos de carga de archivos por lotes para PCollections delimitadas. Las inserciones de transmisión pueden ser particularmente útiles cuando deseas que los datos se muestren en agregaciones de forma inmediata; sin embargo, se generan cargos adicionales. En los casos de uso de transmisión en los que no tienes problemas con las cargas periódicas por lotes cada par de minutos, puedes especificar este comportamiento a través del argumento de palabra clave del method
, así como configurar la frecuencia con el argumento de palabras clave triggering_frequency
. Obtén más información en la sección Escribe datos en BigQuery de la documentación del módulo apache_beam.io.gcp.bigquery.
- Usa el siguiente código para agregar una transformación a tu canalización que escriba datos agregados en la tabla de BigQuery.
'WriteAggToBQ' >> beam.io.WriteToBigQuery(
agg_table_name,
schema=agg_table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
Tarea 5. Ejecuta tu canalización
- Vuelve a la terminal y ejecuta el siguiente código para ejecutar la canalización:
export PROJECT_ID=$(gcloud config get-value project)
export REGION='us-central1'
export BUCKET=gs://${PROJECT_ID}
export PIPELINE_FOLDER=${BUCKET}
export RUNNER=DataflowRunner
export PUBSUB_TOPIC=projects/${PROJECT_ID}/topics/my_topic
export WINDOW_DURATION=60
export AGGREGATE_TABLE_NAME=${PROJECT_ID}:logs.windowed_traffic
export RAW_TABLE_NAME=${PROJECT_ID}:logs.raw
python3 streaming_minute_traffic_pipeline.py \
--project=${PROJECT_ID} \
--region=${REGION} \
--staging_location=${PIPELINE_FOLDER}/staging \
--temp_location=${PIPELINE_FOLDER}/temp \
--runner=${RUNNER} \
--input_topic=${PUBSUB_TOPIC} \
--window_duration=${WINDOW_DURATION} \
--agg_table_name=${AGGREGATE_TABLE_NAME} \
--raw_table_name=${RAW_TABLE_NAME}
Nota: Si recibes un error en la canalización de Dataflow en el que se indica que no es posible abrir el archivo pipeline.py
, vuelve a ejecutar la canalización para solucionar el problema.
En la IU de Dataflow, asegúrate de que se ejecute de forma correcta sin errores. Ten en cuenta que la canalización aún no está creando ni transfiriendo datos, por lo que se ejecutará, pero no procesará nada. Ingresarás datos en el siguiente paso.
Haz clic en Revisar mi progreso para verificar el objetivo.
Ejecutar tu canalización
Tarea 6. Genera entradas de transmisión sin retrasos
Como se trata de una canalización de transmisión, se suscribe a una fuente de transmisión y espera la entrada; sin embargo, no hay ninguna en este momento. En esta sección, generarás datos sin retraso. Los datos reales casi siempre contendrán un retraso. Sin embargo, es útil entender las entradas de transmisión sin retrasos.
El código de esta Quest incluye una secuencia de comandos para publicar eventos JSON con Pub/Sub.
- Para completar esta tarea y comenzar a publicar mensajes, abre una nueva terminal al lado de la actual y ejecuta la siguiente secuencia de comandos. Se seguirán publicando mensajes hasta que finalices la secuencia de comandos. Asegúrate de estar en la carpeta
training-data-analyst/quests/dataflow_python
.
bash generate_streaming_events.sh
Haz clic en Revisar mi progreso para verificar el objetivo.
Generar entradas de transmisión sin retrasos
Analiza los resultados
- Espera unos minutos para que los datos comiencen a propagarse. Luego, navega a BigQuery y realiza una consulta en la tabla
logs.minute_traffic
:
SELECT timestamp, page_views
FROM `logs.windowed_traffic`
ORDER BY timestamp ASC
Deberías ver que la cantidad de páginas vistas se mantuvo cerca de las 100 vistas por minuto.
- Como alternativa, puedes usar la herramienta de línea de comandos de BigQuery como una forma rápida de confirmar que los resultados se escriben:
bq head logs.raw
bq head logs.windowed_traffic
- Ahora, ingresa la siguiente consulta:
SELECT
UNIX_MILLIS(TIMESTAMP(event_timestamp)) - min_millis.min_event_millis AS event_millis,
UNIX_MILLIS(TIMESTAMP(processing_timestamp)) - min_millis.min_event_millis AS processing_millis,
user_id,
-- added as unique label so we see all the points
CAST(UNIX_MILLIS(TIMESTAMP(event_timestamp)) - min_millis.min_event_millis AS STRING) AS label
FROM
`logs.raw`
CROSS JOIN (
SELECT
MIN(UNIX_MILLIS(TIMESTAMP(event_timestamp))) AS min_event_millis
FROM
`logs.raw`) min_millis
WHERE
event_timestamp IS NOT NULL
ORDER BY
event_millis ASC
En esta consulta, se ilustra la brecha entre la hora del evento y la de procesamiento. Sin embargo, puede ser difícil ver el panorama completo si solo observas los datos tabulares sin procesar. Utilizaremos Data Studio, un motor de IE y visualización de datos básicos.
- Para habilitar Data Studio, sigue estos pasos:
- Visita https://datastudio.google.com.
- En la esquina superior izquierda, haz clic en Crear.
- Haz clic en Informe.
- Marca la casilla de las Condiciones del Servicio y, luego, haz clic en Listo.
- Regresa a la IU de BigQuery.
- En la IU de BigQuery, haz clic en el botón Explorar datos y selecciona Explorar con Data Studio.
Se abrirá una ventana nueva.
-
En el panel ubicado en el lado derecho de esta ventana, selecciona el tipo de diagrama de dispersión.
-
Configura los siguientes valores en la columna Datos del panel del lado derecho:
- Dimensión: etiqueta
- Jerarquía: inhabilitada
- Métrica X: event_millis
- Métrica Y: processing_millis
El gráfico se transformará para ser un diagrama de dispersión, en el que todos los puntos estarán en la diagonal. Esto se debe a que, en los datos de transmisión que se generan actualmente, los eventos se procesan inmediatamente después de que se generan, sin retraso. Si iniciaste rápidamente la secuencia de comandos de generación de datos, es decir, antes de que el trabajo de Dataflow estuviera completamente en funcionamiento, es posible que veas un palo de hockey, ya que había mensajes en cola en Pub/Sub que se procesaron más o menos a la vez.
Sin embargo, en el mundo real, los retrasos son algo con lo que las canalizaciones deben lidiar.

Tarea 7. Demora la entrada de transmisión
La secuencia de comandos del evento de transmisión es capaz de generar eventos con retraso simulado.
Esto representa situaciones en las que hay una demora entre los momentos en que se generan y se publican los eventos en Pub/Sub. Por ejemplo, cuando un cliente de un dispositivo móvil pasa al modo sin conexión porque no tiene servicio, pero los eventos se recopilan en el dispositivo y se publican todos a la vez cuando el dispositivo vuelve a estar en línea.
Genera entradas de transmisión con retraso
-
Primero, cierra la ventana de Data Studio.
-
Luego, para activar el retraso, regresa a la terminal y detén la secuencia de comandos en ejecución con CTRL+C
.
-
Después, ejecuta el siguiente comando:
bash generate_streaming_events.sh true
Analiza los resultados
- Regresa a la IU de BigQuery, vuelve a ejecutar la consulta y, luego, vuelve a crear la vista de Data Studio como antes. Los datos nuevos que lleguen, que deberían aparecer en el lado derecho del gráfico, ya no deberían ser perfectos. En cambio, algunos aparecerán sobre la diagonal, lo que indica que se procesaron luego de que ocurrieron los eventos.
Tipo de gráfico: dispersión
- Dimensión: etiqueta
- Jerarquía: inhabilitada
- Métrica X: event_millis
- Métrica Y: processing_millis

Finalice su lab
Cuando haya completado el lab, haga clic en Finalizar lab. Google Cloud Skills Boost quitará los recursos que usó y limpiará la cuenta.
Tendrá la oportunidad de calificar su experiencia en el lab. Seleccione la cantidad de estrellas que corresponda, ingrese un comentario y haga clic en Enviar.
La cantidad de estrellas indica lo siguiente:
- 1 estrella = Muy insatisfecho
- 2 estrellas = Insatisfecho
- 3 estrellas = Neutral
- 4 estrellas = Satisfecho
- 5 estrellas = Muy satisfecho
Puede cerrar el cuadro de diálogo si no desea proporcionar comentarios.
Para enviar comentarios, sugerencias o correcciones, use la pestaña Asistencia.
Copyright 2020 Google LLC. All rights reserved. Google y el logotipo de Google son marcas de Google LLC. Los demás nombres de productos y empresas pueden ser marcas de las respectivas empresas a las que estén asociados.