arrow_back

Procesamiento de datos sin servidores con Dataflow: cómo usar Dataflow para hacer análisis de transmisiones (Python)

Acceder Unirse
Obtén acceso a más de 700 labs y cursos

Procesamiento de datos sin servidores con Dataflow: cómo usar Dataflow para hacer análisis de transmisiones (Python)

Lab 2 horas universal_currency_alt 5 créditos show_chart Avanzado
info Es posible que este lab incorpore herramientas de IA para facilitar tu aprendizaje.
Obtén acceso a más de 700 labs y cursos

Descripción general

En este lab, tomarás muchos de los conceptos que se presentaron en el contexto de lotes y los aplicarás en un contexto de transmisión para crear una canalización similar a batch_minute_traffic_pipeline, pero que opera en tiempo real. La canalización finalizada primero leerá los mensajes JSON de Pub/Sub y los analizará antes de la ramificación. Una rama escribe algunos datos sin procesar en BigQuery y toma nota del evento y del tiempo de procesamiento. La otra rama agrupa en ventanas y agrega los datos y, luego, escribe los resultados en BigQuery.

Objetivos

  • Leer datos de una fuente de transmisión
  • Escribir datos en un receptor de transmisión
  • Agrupar los datos en ventanas en un contexto de transmisión
  • Verificar de forma experimental los efectos del retraso

Crearás la siguiente canalización:

Una canalización en ejecución que se inicia en ReadMessage y finaliza en dos instancias de WriteAggregateToBQ

Configuración y requisitos

Antes de hacer clic en el botón Comenzar lab

Nota: Lee estas instrucciones.

Los labs son cronometrados y no se pueden pausar. El cronómetro, que comienza a funcionar cuando haces clic en Comenzar lab, indica por cuánto tiempo tendrás a tu disposición los recursos de Google Cloud.

En este lab práctico de Qwiklabs, se te proporcionarán credenciales temporales nuevas para acceder a Google Cloud y realizar las actividades en un entorno de nube real, no en uno de simulación o demostración.

Requisitos

Para completar este lab, necesitarás lo siguiente:

  • Acceso a un navegador de Internet estándar (se recomienda el navegador Chrome)
  • Tiempo para completar el lab
Nota: Si ya tienes un proyecto o una cuenta personal de Google Cloud, no los uses para el lab. Nota: Si usas una Pixelbook, abre una ventana de incógnito para ejecutar el lab.

Cómo iniciar tu lab y acceder a la consola

  1. Haz clic en el botón Comenzar lab. Si debes pagar por el lab, se abrirá una ventana emergente para que selecciones tu forma de pago. A la izquierda, verás un panel con las credenciales temporales que debes usar para este lab.

    Panel de credenciales

  2. Copia el nombre de usuario y, luego, haz clic en Abrir la consola de Google. El lab inicia los recursos y abre otra pestaña que muestra la página Elige una cuenta.

    Sugerencia: Abre las pestañas en ventanas separadas, una junto a la otra.
  3. En la página Elige una cuenta, haz clic en Usar otra cuenta. Se abrirá la página de acceso.

    Cuadro de diálogo Elige una cuenta el que se destaca la opción Usar otra cuenta

  4. Pega el nombre de usuario que copiaste del panel Detalles de la conexión. Luego, copia y pega la contraseña.

Nota: Debes usar las credenciales del panel Detalles de la conexión. No uses tus credenciales de Google Cloud Skills Boost. Si tienes una cuenta propia de Google Cloud, no la utilices para este lab para no incurrir en cargos.
  1. Haz clic para avanzar por las páginas siguientes:
  • Acepta los Términos y Condiciones.
  • No agregues opciones de recuperación o autenticación de dos factores (esta es una cuenta temporal).
  • No te registres para obtener pruebas gratuitas.

Después de un momento, se abrirá la consola de Cloud en esta pestaña.

Nota: Para ver el menú con una lista de los productos y servicios de Google Cloud, haz clic en el menú de navegación que se encuentra en la parte superior izquierda de la pantalla. Menú de la consola de Cloud

Configuración del entorno de desarrollo basado en notebooks de Jupyter

En este lab, ejecutarás todos los comandos en una terminal del notebook.

  1. En el menú de navegación de la consola de Google Cloud, haz clic en Vertex AI > Workbench.

  2. Haz clic en Habilitar API de Notebooks.

  3. En la página de Workbench, selecciona NOTEBOOKS ADMINISTRADOS POR EL USUARIO y haz clic en CREAR NUEVO.

  4. En el cuadro de diálogo Instancia nueva que se muestra, establece la región en y la zona en .

  5. En Entorno, selecciona Apache Beam.

  6. Haz clic en CREAR en la parte inferior del cuadro de diálogo.

Nota: El aprovisionamiento completo del entorno tarda de 3 a 5 minutos. Espera hasta que se complete este paso. Nota: Haz clic en Habilitar API de Notebooks para habilitarla.
  1. Cuando el entorno esté listo, haz clic en el vínculo ABRIR JUPYTERLAB que se encuentra junto al nombre del notebook. Esto abrirá tu entorno en una nueva pestaña del navegador.

IDE_link

  1. Luego, haz clic en Terminal. Esto abrirá una terminal en la que podrás ejecutar todos los comandos del lab.

Abre la terminal

Descarga el repositorio de código

A continuación, descargarás un repositorio de código que usarás en este lab.

  1. En la terminal que acabas de abrir, ingresa lo siguiente:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. En el panel izquierdo de tu entorno de notebook, en el navegador de archivos, verás que se agregó el repo training-data-analyst.

  2. Navega al repo clonado /training-data-analyst/quests/dataflow_python/. Verás una carpeta para cada lab. Cada una de ellas se divide en una subcarpeta lab con un código que debes completar y una subcarpeta solution con un ejemplo viable que puedes consultar como referencia si no sabes cómo continuar.

Opción Explorador destacada en el menú Ver expandido

Nota: Para abrir un archivo y editarlo, simplemente debes navegar al archivo y hacer clic en él. Se abrirá el archivo, en el que puedes agregar o modificar código.

Abre el lab adecuado

  • En la terminal, ejecuta los siguientes comandos para cambiar al directorio que usarás en este lab:
# Change directory into the lab cd 5_Streaming_Analytics/lab export BASE_DIR=$(pwd)

Configura el entorno virtual y las dependencias

Antes de comenzar a editar el código de la canalización en sí, debes asegurarte de haber instalado las dependencias necesarias.

  1. Ejecuta el siguiente comando a fin de crear un entorno virtual para tu trabajo en este lab:
sudo apt-get install -y python3-venv ## Create and activate virtual environment python3 -m venv df-env source df-env/bin/activate
  1. Luego, instala los paquetes que necesitarás para ejecutar tu canalización:
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Asegúrate de que la API de Dataflow esté habilitada:
gcloud services enable dataflow.googleapis.com
  1. Por último, otorga el rol dataflow.worker a la cuenta de servicio predeterminada de Compute Engine:
PROJECT_ID=$(gcloud config get-value project) export PROJECT_NUMBER=$(gcloud projects list --filter="$PROJECT_ID" --format="value(PROJECT_NUMBER)") export serviceAccount=""$PROJECT_NUMBER"-compute@developer.gserviceaccount.com"
  1. En la consola de Cloud, ve a IAM Y ADMINISTRACIÓN > IAM y haz clic en el ícono Editar principal de Cuenta de servicio predeterminada de Compute Engine.

  2. Agrega Dataflow Worker como otro rol y haz clic en Guardar.

Configura el entorno de datos

# Create GCS buckets and BQ dataset cd $BASE_DIR/../.. source create_streaming_sinks.sh # Change to the directory containing the practice version of the code cd $BASE_DIR

Haz clic en Revisar mi progreso para verificar el objetivo. Configurar el entorno de datos

Tarea 1. Lee desde una fuente de transmisión

En los labs anteriores, usaste beam.io.ReadFromText para leer desde Google Cloud Storage. En este lab, en lugar de Google Cloud Storage, usarás Pub/Sub. Pub/Sub es un servicio de mensajería en tiempo real completamente administrado que permite a los publicadores enviar mensajes a un “tema”, al que los usuarios se pueden unir con una “suscripción”.

La canalización de cinco puntos desde el publicador al suscriptor, en la que el punto 2, “Almacenamiento del mensaje”, y el punto 3, “Suscripción”, están destacados

La canalización que crea se suscribe a un tema llamado my_topic que acaba de crear mediante la secuencia de comandos create_streaming_sinks.sh. En una situación de producción, es el equipo de publicación el que suele crear este tema. Puedes verlo en la sección de Pub/Sub de la consola.

  1. En el explorador de archivos, ve a training-data-analyst/quest/dataflow_python/5_Streaming_Analytics/lab/ y abre el archivo streaming_minute_traffic_pipeline.py.
  • Para leer desde Pub/Sub mediante los conectores de IO de Apache Beam, agrega una transformación a la canalización que usa la clase beam.io.ReadFromPubSub(). Esta clase tiene atributos para especificar el tema de origen, así como el timestamp_attribute. De forma predeterminada, este atributo se establece en la hora de publicación del mensaje.
Nota: La hora de publicación es la hora en que el servicio de Pub/Sub recibe el mensaje por primera vez. Hay sistemas en los que puede haber un retraso entre la hora real del evento y la de publicación (es decir, datos tardíos). Si deseas tener esto en cuenta, el código del cliente que publica el mensaje debe establecer un atributo de metadatos “timestamp” en el mensaje y proporcionar la marca de tiempo real del evento, ya que Pub/Sub no sabrá de forma nativa cómo extraer la marca de tiempo del evento incorporada en la carga útil. Puedes ver aquí el código del cliente que genera los mensajes que utilizarás.

Para completar esta tarea, sigue estos pasos:

  • Agrega una transformación que lea desde el tema de Pub/Sub especificado en el parámetro de la línea de comandos input_topic.
  • Luego, usa la función proporcionada, parse_json con beam.Map para convertir cada cadena JSON en una instancia de CommonLog.
  • Recopila los resultados de esta transformación en una PCollection de instancias de CommonLog mediante with_output_types()
  1. En el primer #TODO, agrega el siguiente código:
beam.io.ReadFromPubSub(input_topic)

Tarea 2. Agrupa los datos en ventanas

En el lab anterior que no es de SQL, implementaste un sistema de ventanas de tiempo fijo a fin de agrupar eventos por tiempo de evento en ventanas de tamaño fijo mutuamente excluyentes. Haz lo mismo aquí con las entradas de transmisión. Si tienes algún problema, no dudes en consultar el código del lab anterior o la solución.

Define ventanas de un minuto

Para completar esta tarea, sigue estos pasos:

  1. Agrega una transformación a tu canalización que acepte la PCollection de los datos de CommonLog y agrupa los elementos en ventanas de segundos window_duration, con window_duration como otro parámetro de la línea de comandos.
  2. Usa el siguiente código para agregar una transformación a tu canalización que agrupe los elementos en ventanas de un minuto:
"WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(60))

Tarea 3. Agrega los datos

En el lab anterior, utilizaste el combinador CountCombineFn() para calcular la cantidad de eventos por ventana. Haz lo mismo aquí.

Registra eventos por ventana

Para completar esta tarea, sigue estos pasos:

  1. Pasa la PCollection con ventanas como entrada a una transformación que calcule la cantidad de eventos por ventana.
  2. Luego, usa los DoFn y GetTimestampFn proporcionados con beam.ParDo para incluir la marca de tiempo del inicio de la ventana.
  3. Usa el siguiente código para agregar una transformación a tu canalización que calcule la cantidad de eventos por ventana:
"CountPerMinute" >> beam.CombineGlobally(CountCombineFn()).without_defaults()

Tarea 4. Escribe en BigQuery

Esta canalización escribe en BigQuery en dos ramas distintas. La primera rama escribe los datos agregados en BigQuery. La segunda rama, que ya creaste, escribe algunos metadatos sobre cada evento sin procesar, incluida la marca de tiempo del evento y la marca de tiempo de procesamiento real. Ambas escriben directamente en BigQuery a través de inserciones de transmisión.

Escribe datos agregados en BigQuery

La escritura en BigQuery ya se abordó ampliamente en labs anteriores, por lo que la mecánica básica no se tratará en profundidad en esta sección.

Para completar esta tarea, sigue estos pasos:

  • Crea un nuevo parámetro de línea de comandos llamado agg_table_name para la tabla destinada a alojar datos agregados.
  • Tal como lo hiciste antes, agrega una transformación que escriba en BigQuery.
Nota: En un contexto de transmisión, beam.io.WriteToBigQuery() no admite write_disposition de WRITE_TRUNCATE en la que la tabla se descarta y se vuelve a crear. En este ejemplo, usa WRITE_APPEND.

Método de inserción de BigQuery

beam.io.WriteToBigQuery se configurará de forma predeterminada como inserciones de transmisión para PCollections no delimitadas o como trabajos de carga de archivos por lotes para PCollections delimitadas. Las inserciones de transmisión pueden ser particularmente útiles cuando deseas que los datos se muestren en agregaciones de forma inmediata; sin embargo, se generan cargos adicionales. En los casos de uso de transmisión en los que no tienes problemas con las cargas periódicas por lotes cada par de minutos, puedes especificar este comportamiento a través del argumento de palabra clave del method, así como configurar la frecuencia con el argumento de palabras clave triggering_frequency. Obtén más información en la sección Escribe datos en BigQuery de la documentación del módulo apache_beam.io.gcp.bigquery.

  • Usa el siguiente código para agregar una transformación a tu canalización que escriba datos agregados en la tabla de BigQuery.
'WriteAggToBQ' >> beam.io.WriteToBigQuery( agg_table_name, schema=agg_table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND )

Tarea 5. Ejecuta tu canalización

  • Vuelve a la terminal y ejecuta el siguiente código para ejecutar la canalización:
export PROJECT_ID=$(gcloud config get-value project) export REGION='us-central1' export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export PUBSUB_TOPIC=projects/${PROJECT_ID}/topics/my_topic export WINDOW_DURATION=60 export AGGREGATE_TABLE_NAME=${PROJECT_ID}:logs.windowed_traffic export RAW_TABLE_NAME=${PROJECT_ID}:logs.raw python3 streaming_minute_traffic_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --staging_location=${PIPELINE_FOLDER}/staging \ --temp_location=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --input_topic=${PUBSUB_TOPIC} \ --window_duration=${WINDOW_DURATION} \ --agg_table_name=${AGGREGATE_TABLE_NAME} \ --raw_table_name=${RAW_TABLE_NAME} Nota: Si recibes un error en la canalización de Dataflow en el que se indica que no es posible abrir el archivo pipeline.py, vuelve a ejecutar la canalización para solucionar el problema.

En la IU de Dataflow, asegúrate de que se ejecute de forma correcta sin errores. Ten en cuenta que la canalización aún no está creando ni transfiriendo datos, por lo que se ejecutará, pero no procesará nada. Ingresarás datos en el siguiente paso.

Haz clic en Revisar mi progreso para verificar el objetivo. Ejecutar tu canalización

Tarea 6. Genera entradas de transmisión sin retrasos

Como se trata de una canalización de transmisión, se suscribe a una fuente de transmisión y espera la entrada; sin embargo, no hay ninguna en este momento. En esta sección, generarás datos sin retraso. Los datos reales casi siempre contendrán un retraso. Sin embargo, es útil entender las entradas de transmisión sin retrasos.

El código de esta Quest incluye una secuencia de comandos para publicar eventos JSON con Pub/Sub.

  • Para completar esta tarea y comenzar a publicar mensajes, abre una nueva terminal al lado de la actual y ejecuta la siguiente secuencia de comandos. Se seguirán publicando mensajes hasta que finalices la secuencia de comandos. Asegúrate de estar en la carpeta training-data-analyst/quests/dataflow_python.
bash generate_streaming_events.sh

Haz clic en Revisar mi progreso para verificar el objetivo. Generar entradas de transmisión sin retrasos

Analiza los resultados

  1. Espera unos minutos para que los datos comiencen a propagarse. Luego, navega a BigQuery y realiza una consulta en la tabla logs.minute_traffic:
SELECT timestamp, page_views FROM `logs.windowed_traffic` ORDER BY timestamp ASC

Deberías ver que la cantidad de páginas vistas se mantuvo cerca de las 100 vistas por minuto.

  1. Como alternativa, puedes usar la herramienta de línea de comandos de BigQuery como una forma rápida de confirmar que los resultados se escriben:
bq head logs.raw bq head logs.windowed_traffic
  1. Ahora, ingresa la siguiente consulta:
SELECT UNIX_MILLIS(TIMESTAMP(event_timestamp)) - min_millis.min_event_millis AS event_millis, UNIX_MILLIS(TIMESTAMP(processing_timestamp)) - min_millis.min_event_millis AS processing_millis, user_id, -- added as unique label so we see all the points CAST(UNIX_MILLIS(TIMESTAMP(event_timestamp)) - min_millis.min_event_millis AS STRING) AS label FROM `logs.raw` CROSS JOIN ( SELECT MIN(UNIX_MILLIS(TIMESTAMP(event_timestamp))) AS min_event_millis FROM `logs.raw`) min_millis WHERE event_timestamp IS NOT NULL ORDER BY event_millis ASC

En esta consulta, se ilustra la brecha entre la hora del evento y la de procesamiento. Sin embargo, puede ser difícil ver el panorama completo si solo observas los datos tabulares sin procesar. Utilizaremos Data Studio, un motor de IE y visualización de datos básicos.

  1. Para habilitar Data Studio, sigue estos pasos:
  • Visita https://datastudio.google.com.
  • En la esquina superior izquierda, haz clic en Crear.
  • Haz clic en Informe.
  • Marca la casilla de las Condiciones del Servicio y, luego, haz clic en Listo.
  • Regresa a la IU de BigQuery.
  1. En la IU de BigQuery, haz clic en el botón Explorar datos y selecciona Explorar con Data Studio.

Se abrirá una ventana nueva.

  1. En el panel ubicado en el lado derecho de esta ventana, selecciona el tipo de diagrama de dispersión.

  2. Configura los siguientes valores en la columna Datos del panel del lado derecho:

  • Dimensión: etiqueta
  • Jerarquía: inhabilitada
  • Métrica X: event_millis
  • Métrica Y: processing_millis

El gráfico se transformará para ser un diagrama de dispersión, en el que todos los puntos estarán en la diagonal. Esto se debe a que, en los datos de transmisión que se generan actualmente, los eventos se procesan inmediatamente después de que se generan, sin retraso. Si iniciaste rápidamente la secuencia de comandos de generación de datos, es decir, antes de que el trabajo de Dataflow estuviera completamente en funcionamiento, es posible que veas un palo de hockey, ya que había mensajes en cola en Pub/Sub que se procesaron más o menos a la vez.

Sin embargo, en el mundo real, los retrasos son algo con lo que las canalizaciones deben lidiar.

Un diagrama de dispersión en el que todos los puntos son diagonales

Tarea 7. Demora la entrada de transmisión

La secuencia de comandos del evento de transmisión es capaz de generar eventos con retraso simulado.

Esto representa situaciones en las que hay una demora entre los momentos en que se generan y se publican los eventos en Pub/Sub. Por ejemplo, cuando un cliente de un dispositivo móvil pasa al modo sin conexión porque no tiene servicio, pero los eventos se recopilan en el dispositivo y se publican todos a la vez cuando el dispositivo vuelve a estar en línea.

Genera entradas de transmisión con retraso

  1. Primero, cierra la ventana de Data Studio.

  2. Luego, para activar el retraso, regresa a la terminal y detén la secuencia de comandos en ejecución con CTRL+C.

  3. Después, ejecuta el siguiente comando:

bash generate_streaming_events.sh true

Analiza los resultados

  • Regresa a la IU de BigQuery, vuelve a ejecutar la consulta y, luego, vuelve a crear la vista de Data Studio como antes. Los datos nuevos que lleguen, que deberían aparecer en el lado derecho del gráfico, ya no deberían ser perfectos. En cambio, algunos aparecerán sobre la diagonal, lo que indica que se procesaron luego de que ocurrieron los eventos.

Tipo de gráfico: dispersión

  • Dimensión: etiqueta
  • Jerarquía: inhabilitada
  • Métrica X: event_millis
  • Métrica Y: processing_millis

Un diagrama de dispersión en el que algunos datos se encuentran sobre la línea diagonal

Finalice su lab

Cuando haya completado el lab, haga clic en Finalizar lab. Google Cloud Skills Boost quitará los recursos que usó y limpiará la cuenta.

Tendrá la oportunidad de calificar su experiencia en el lab. Seleccione la cantidad de estrellas que corresponda, ingrese un comentario y haga clic en Enviar.

La cantidad de estrellas indica lo siguiente:

  • 1 estrella = Muy insatisfecho
  • 2 estrellas = Insatisfecho
  • 3 estrellas = Neutral
  • 4 estrellas = Satisfecho
  • 5 estrellas = Muy satisfecho

Puede cerrar el cuadro de diálogo si no desea proporcionar comentarios.

Para enviar comentarios, sugerencias o correcciones, use la pestaña Asistencia.

Copyright 2020 Google LLC. All rights reserved. Google y el logotipo de Google son marcas de Google LLC. Los demás nombres de productos y empresas pueden ser marcas de las respectivas empresas a las que estén asociados.

Antes de comenzar

  1. Los labs crean un proyecto de Google Cloud y recursos por un tiempo determinado
  2. .
  3. Los labs tienen un límite de tiempo y no tienen la función de pausa. Si finalizas el lab, deberás reiniciarlo desde el principio.
  4. En la parte superior izquierda de la pantalla, haz clic en Comenzar lab para empezar

Usa la navegación privada

  1. Copia el nombre de usuario y la contraseña proporcionados para el lab
  2. Haz clic en Abrir la consola en modo privado

Accede a la consola

  1. Accede con tus credenciales del lab. Si usas otras credenciales, se generarán errores o se incurrirá en cargos.
  2. Acepta las condiciones y omite la página de recursos de recuperación
  3. No hagas clic en Finalizar lab, a menos que lo hayas terminado o quieras reiniciarlo, ya que se borrará tu trabajo y se quitará el proyecto

Este contenido no está disponible en este momento

Te enviaremos una notificación por correo electrónico cuando esté disponible

¡Genial!

Nos comunicaremos contigo por correo electrónico si está disponible

Un lab a la vez

Confirma para finalizar todos los labs existentes y comenzar este

Usa la navegación privada para ejecutar el lab

Usa una ventana de navegación privada o de Incógnito para ejecutar el lab. Así evitarás cualquier conflicto entre tu cuenta personal y la cuenta de estudiante, lo que podría generar cargos adicionales en tu cuenta personal.