arrow_back

Procesamiento de datos sin servidores con Dataflow: cómo usar Dataflow para hacer análisis de transmisiones (Java)

Acceder Unirse
Obtén acceso a más de 700 labs y cursos

Procesamiento de datos sin servidores con Dataflow: cómo usar Dataflow para hacer análisis de transmisiones (Java)

Lab 2 horas universal_currency_alt 5 créditos show_chart Avanzado
info Es posible que este lab incorpore herramientas de IA para facilitar tu aprendizaje.
Obtén acceso a más de 700 labs y cursos

Descripción general

En este lab, tomarás muchos de los conceptos que se presentaron en el contexto de lotes y los aplicarás en un contexto de transmisión para crear una canalización similar a BatchMinuteTrafficPipeline, pero que opera en tiempo real. La canalización finalizada primero leerá los mensajes JSON de Pub/Sub y los analizará antes de la ramificación. Una rama escribe algunos datos sin procesar en BigQuery y toma nota del evento y del tiempo de procesamiento. La otra rama agrupa en ventanas y agrega los datos y, luego, escribe los resultados en BigQuery.

Objetivos

  • Leer datos de una fuente de transmisión
  • Escribir datos en un receptor de transmisión
  • Agrupar los datos en ventanas en un contexto de transmisión
  • Verificar de forma experimental los efectos del retraso

Crearás la siguiente canalización:

El diagrama de canalizaciones que se ramifica desde el tema ReadMessage de Pub/Sub.

En cada lab, recibirá un proyecto de Google Cloud y un conjunto de recursos nuevos por tiempo limitado y sin costo adicional.

  1. Accede a Qwiklabs desde una ventana de incógnito.

  2. Ten en cuenta el tiempo de acceso del lab (por ejemplo, 1:15:00) y asegúrate de finalizarlo en el plazo asignado.
    No existe una función de pausa. Si lo necesita, puede reiniciar el lab, pero deberá hacerlo desde el comienzo.

  3. Cuando esté listo, haga clic en Comenzar lab.

  4. Anote las credenciales del lab (el nombre de usuario y la contraseña). Las usarás para acceder a la consola de Google Cloud.

  5. Haga clic en Abrir Google Console.

  6. Haga clic en Usar otra cuenta, copie las credenciales para este lab y péguelas en el mensaje emergente que aparece.
    Si usa otras credenciales, se generarán errores o incurrirá en cargos.

  7. Acepta las condiciones y omite la página de recursos de recuperación.

Activa Google Cloud Shell

Google Cloud Shell es una máquina virtual que cuenta con herramientas para desarrolladores. Ofrece un directorio principal persistente de 5 GB y se ejecuta en Google Cloud.

Google Cloud Shell proporciona acceso de línea de comandos a tus recursos de Google Cloud.

  1. En la consola de Cloud, en la barra de herramientas superior derecha, haz clic en el botón Abrir Cloud Shell.

    Ícono de Cloud Shell destacado

  2. Haz clic en Continuar.

El aprovisionamiento y la conexión al entorno demorarán unos minutos. Cuando te conectes, habrás completado la autenticación, y el proyecto estará configurado con tu PROJECT_ID. Por ejemplo:

ID del proyecto destacado en la terminal de Cloud Shell

gcloud es la herramienta de línea de comandos de Google Cloud. Viene preinstalada en Cloud Shell y es compatible con el completado de línea de comando.

  • Puedes solicitar el nombre de la cuenta activa con este comando:
gcloud auth list

Resultado:

Credentialed accounts: - @.com (active)

Resultado de ejemplo:

Credentialed accounts: - google1623327_student@qwiklabs.net
  • Puedes solicitar el ID del proyecto con este comando:
gcloud config list project

Resultado:

[core] project =

Resultado de ejemplo:

[core] project = qwiklabs-gcp-44776a13dea667a6 Nota: La documentación completa de gcloud está disponible en la guía de descripción general de gcloud CLI .

Verifica los permisos del proyecto

Antes de comenzar a trabajar en Google Cloud, asegúrate de que tu proyecto tenga los permisos correctos en Identity and Access Management (IAM).

  1. En la consola de Google Cloud, en el Menú de navegación (Ícono del menú de navegación), selecciona IAM y administración > IAM.

  2. Confirma que aparezca la cuenta de servicio predeterminada de Compute {número-del-proyecto}-compute@developer.gserviceaccount.com, y que tenga asignado el rol Editor. El prefijo de la cuenta es el número del proyecto, que puedes encontrar en el Menú de navegación > Descripción general de Cloud > Panel.

El nombre de la cuenta de servicio predeterminada de Compute Engine y el estado del editor destacados en la página de pestañas Permisos

Nota: Si la cuenta no aparece en IAM o no tiene asignado el rol Editor, sigue los pasos que se indican a continuación para asignar el rol necesario.
  1. En la consola de Google Cloud, en el Menú de navegación, haz clic en Descripción general de Cloud > Panel.
  2. Copia el número del proyecto (p. ej., 729328892908).
  3. En el Menú de navegación, selecciona IAM y administración > IAM.
  4. En la parte superior de la tabla de funciones, debajo de Ver por principales, haz clic en Otorgar acceso.
  5. En Principales nuevas, escribe lo siguiente:
{project-number}-compute@developer.gserviceaccount.com
  1. Reemplaza {número-del-proyecto} por el número de tu proyecto.
  2. En Rol, selecciona Proyecto (o Básico) > Editor.
  3. Haz clic en Guardar.

Configura tu IDE

Para los fines de este lab, usará principalmente un IDE web de Theia alojado en Google Compute Engine. El IDE tiene el repositorio de labs clonado previamente. Se ofrece compatibilidad con el servidor de lenguaje Java y una terminal para el acceso programático a las API de Google Cloud mediante la herramienta de línea de comandos de gcloud, similar a Cloud Shell.

Para acceder al IDE de Theia, copie y pegue en una pestaña nueva el vínculo que se muestra en Qwiklabs.

NOTA: Es posible que deba esperar entre 3 y 5 minutos para que se aprovisione por completo el entorno, incluso después de que aparezca la URL. Hasta ese momento, se mostrará un error en el navegador.

ide_url

El repositorio del lab se clonó en su entorno. Cada lab se divide en una carpeta labs con un código que debe completar y una carpeta solution con un ejemplo viable que puede consultar como referencia si no sabe cómo continuar. Haga clic en el botón File Explorer para ver lo siguiente:

file_explorer

También puede crear varias terminales en este entorno, como lo haría con Cloud Shell:

new_terminal

Para verificarlo, ejecute gcloud auth list en la terminal con la que accedió como cuenta de servicio proporcionada, que tiene exactamente los mismos permisos que su cuenta de usuario del lab:

gcloud_auth

Si en algún momento su entorno deja de funcionar, intente restablecer la VM en la que se aloja el IDE desde la consola de GCE de la siguiente manera:

gce_reset

Abre el lab adecuado

  • Si aún no lo has hecho, crea una terminal nueva en tu IDE y, luego, copia y pega el siguiente comando:
# Change directory into the lab cd 5_Streaming_Analytics/labs # Download dependencies mvn clean dependency:resolve export BASE_DIR=$(pwd)

Configura el entorno de datos

# Create GCS buckets, BQ dataset, and Pubsub Topic cd $BASE_DIR/../.. source create_streaming_sinks.sh # Change to the directory containing the practice version of the code cd $BASE_DIR

Haz clic en Revisar mi progreso para verificar el objetivo. Configurar el entorno de datos

Tarea 1. Lee desde una fuente de transmisión

En labs anteriores, usaste TextIO.read() para leer desde Google Cloud Storage. En este lab, en lugar de Google Cloud Storage, usarás Pub/Sub. Pub/Sub es un servicio de mensajería en tiempo real completamente administrado que permite a los publicadores enviar mensajes a un “tema”, al que los usuarios se pueden unir con una “suscripción”.

La sección Cloud Pub/Sub destacada del diagrama en el que se representa el servicio de mensajería.

La canalización que creas se suscribe a un tema llamado my_topic que acabas de crear mediante la secuencia de comandos create_streaming_sinks.sh. En una situación de producción, es el equipo de publicación el que suele crear este tema. Puedes verlo en la sección de Pub/Sub de la consola.

Para leer desde Pub/Sub con los conectores de IO de Apache Beam, abre el archivo StreamingMinuteTrafficPipeline.java y agrega una transformación a la canalización que usa la función PubsubIO.readStrings(). Esta función muestra una instancia de PubsubIO.Read, que tiene un método para especificar el tema de origen y el atributo de marca de tiempo. Ten en cuenta que ya existe una opción de línea de comandos para el nombre del tema de Pub/Sub. Configura el atributo de marca de tiempo en “timestamp”, que corresponde a un atributo que se agregará a cada mensaje de Pub/Sub. Si la hora de publicación del mensaje es suficiente, este paso no sería necesario.

Nota: La hora de publicación es la hora en que el servicio de Pub/Sub recibe el mensaje por primera vez. Hay sistemas en los que puede haber un retraso entre la hora real del evento y la de publicación (es decir, datos tardíos). Si deseas tener esto en cuenta, el código del cliente que publica el mensaje debe establecer un atributo de metadatos “timestamp” en el mensaje y proporcionar la marca de tiempo real del evento, ya que Pub/Sub no sabrá de forma nativa cómo extraer la marca de tiempo del evento incorporada en la carga útil.

Puedes ver el código del cliente que genera los mensajes que usarás en GitHub.

Para completar esta tarea, agrega una transformación que lea desde el tema de Pub/Sub especificado en el parámetro de la línea de comandos inputTopic. Luego, usa el DoFn proporcionado, JsonToCommonLog, para convertir cada string JSON en una instancia de CommonLog. Recopila los resultados de esta transformación en una PCollection de instancias de CommonLog.

Tarea 2. Agrupa los datos en ventanas

En el lab anterior que no es de SQL, implementaste un sistema de ventanas de tiempo fijo para agrupar eventos por tiempo de evento en ventanas de tamaño fijo mutuamente excluyentes. Haz lo mismo aquí con las entradas de transmisión. Si tienes algún problema, no dudes en consultar el código del lab anterior o la solución.

Define períodos de un minuto

  1. Implementa una ventana de tiempo fija con una duración de un minuto, como se indica a continuación:
PCollection<String> pColl= ...; PCollection<String> windowedPCollection = pColl.apply( Window.<String>into(FixedWindows.of(Duration.standardSeconds(60))));
  1. Para completar esta tarea, agrega una transformación a tu canalización que acepte la PCollection de los datos de CommonLog y agrupa los elementos en ventanas de windowDuration segundos, con windowDuration como otro parámetro de la línea de comandos.

Tarea 3. Agrega los datos

En el lab anterior, utilizaste la transformación Count para contar la cantidad de eventos por ventana. Haz lo mismo aquí.

Registra eventos por ventana

  1. Para contar la cantidad de eventos en una ventana no global, puedes escribir código como el siguiente:
PCollection<Long> counts = pColl.apply("CountPerWindow", Combine.globally(Count.<MyClass>combineFn()).withoutDefaults());
  1. Al igual que antes, debes volver a convertir el valor <Long> en un valor <Row> y extraer la marca de tiempo como se indica a continuación:
@ProcessElement public void processElement(@Element T l, OutputReceiver<T> r, IntervalWindow window) { Instant i = Instant.ofEpochMilli(window.end().getMillis()); Row row = Row.withSchema(appSchema) .addValues(.......) .build() ... r.output(...); }
  1. Recuerda indicar el esquema para serialización, como este:
apply().setRowSchema(appSchema)
  1. Para completar esta tarea, pasa la PCollection con ventanas como entrada a una transformación que cuente la cantidad de eventos por ventana.

  2. Agrega una transformación adicional para volver a convertir los resultados en una PCollection de Row con el esquema pageviewsSchema que se te proporcionó.

Tarea 4. Escribe en BigQuery

Esta canalización escribe en BigQuery en dos ramas distintas. La primera rama escribe los datos agregados en BigQuery. La segunda rama, que tú ya creaste, escribe algunos metadatos sobre cada evento sin procesar, incluida la marca de tiempo del evento y la marca de tiempo de procesamiento real. Ambas escriben directamente en BigQuery a través de inserciones de transmisión.

Escribe datos agregados en BigQuery

La escritura en BigQuery ya se abordó ampliamente en labs anteriores, por lo que la mecánica básica no se tratará en profundidad en esta sección.

Esta tarea utiliza el código que se proporcionó.

Para completar esta tarea, sigue estos pasos:

  1. Crea un nuevo parámetro de línea de comandos llamado aggregateTableName para la tabla destinada a alojar datos agregados.

  2. Tal como lo hiciste antes, agrega una transformación que escriba en BigQuery y use .useBeamSchema().

Nota: En un contexto de transmisión, BigQueryIO.write() no admite WriteDisposition de WRITE_TRUNCATE en la que la tabla se descarta y se vuelve a crear. En este ejemplo, usa WRITE_APPEND.

Método de inserción de BigQuery

BigQueryIO.Write se configurará de forma predeterminada como inserciones de transmisión para PCollections no delimitadas o como trabajos de carga de archivos por lotes para PCollections delimitadas. Las inserciones de transmisión pueden ser particularmente útiles cuando deseas que los datos se muestren en agregaciones de inmediato, pero se generan cargos adicionales.

En los casos de uso de transmisión en los que sea aceptable utilizar cargas periódicas por lotes en el orden de cada par de minutos, puedes especificar este comportamiento a través de .withMethod() y también establecer la frecuencia con .withTriggeringFrequency(org.joda.time.Duration).

Consulta la documentación de BigQueryIO.Write.Method para obtener más información.

rowsPCollection.apply("WriteToBQ", BigQueryIO.<Row>write().to(myTableName).useBeamSchema() .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

Escribe datos sin procesar en BigQuery

Para completar esta tarea, sigue estos pasos:

  1. Busca el parámetro de la línea de comandos para el nombre de la tabla destinada a alojar los datos sin procesar.

  2. Examina la rama de la canalización que creaste anteriormente. Está capturando el tiempo de procesamiento además del tiempo del evento.

Tarea 5. Ejecuta tu canalización

  1. Para ejecutar tu canalización, crea un comando similar al siguiente ejemplo:
Nota: Ten en cuenta que tal vez debas modificarlo para que refleje los nombres de las opciones de línea de comandos que incluiste. export PROJECT_ID=$(gcloud config get-value project) export REGION=us-central1 export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export MAIN_CLASS_NAME=com.mypackage.pipeline.StreamingMinuteTrafficPipeline export RUNNER=DataflowRunner export PUBSUB_TOPIC=projects/${PROJECT_ID}/topics/my_topic export WINDOW_DURATION=60 export AGGREGATE_TABLE_NAME=${PROJECT_ID}:logs.windowed_traffic export RAW_TABLE_NAME=${PROJECT_ID}:logs.raw cd $BASE_DIR mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --inputTopic=${PUBSUB_TOPIC} \ --windowDuration=${WINDOW_DURATION} \ --aggregateTableName=${AGGREGATE_TABLE_NAME} \ --rawTableName=${RAW_TABLE_NAME}"
  1. En la IU de Dataflow, asegúrate de que se ejecute de forma correcta sin errores.
Nota: Ten en cuenta que la canalización aún no está creando ni transfiriendo datos, por lo que se ejecutará, pero no procesará nada. Ingresarás datos en el siguiente paso.

Haz clic en Revisar mi progreso para verificar el objetivo. Ejecutar tu canalización

Tarea 6. Genera entradas de transmisión sin retrasos

Como se trata de una canalización de transmisión, se suscribe a una fuente de transmisión y espera la entrada; sin embargo, no hay ninguna en este momento. En esta sección, generarás datos sin retraso. Los datos reales casi siempre contendrán un retraso. Sin embargo, es útil entender las entradas de transmisión sin retrasos.

El código de esta Quest incluye una secuencia de comandos para publicar eventos JSON con Pub/Sub.

  • Para completar esta tarea y comenzar a publicar mensajes, abre una nueva terminal al lado de la actual y ejecuta la siguiente secuencia de comandos. Se seguirán publicando mensajes hasta que finalices la secuencia de comandos. Asegúrate de estar en la carpeta training-data-analyst/quests/dataflow.
bash generate_streaming_events.sh

Haz clic en Revisar mi progreso para verificar el objetivo. Generar entradas de transmisión sin retrasos

Analiza los resultados

  1. Espera unos minutos para que los datos comiencen a propagarse. Luego, navega a BigQuery y consulta la tabla logs.minute_traffic con una consulta similar a la siguiente:
SELECT minute, pageviews FROM `logs.windowed_traffic` ORDER BY minute ASC

Deberías ver que la cantidad de vistas de página se mantuvo cerca de las 100 vistas por minuto.

Como alternativa, puedes usar la herramienta de línea de comandos de BigQuery como una forma rápida de confirmar que los resultados se escriben:

bq head logs.raw bq head logs.windowed_traffic
  1. Ahora, ingresa la siguiente consulta:
SELECT UNIX_MILLIS(event_timestamp) - min_millis.min_event_millis AS event_millis, UNIX_MILLIS(processing_timestamp) - min_millis.min_event_millis AS processing_millis, user_id, -- added as unique label so we see all the points CAST(UNIX_MILLIS(event_timestamp) - min_millis.min_event_millis AS STRING) AS label FROM `logs.raw` CROSS JOIN ( SELECT MIN(UNIX_MILLIS(event_timestamp)) AS min_event_millis FROM `logs.raw`) min_millis WHERE event_timestamp IS NOT NULL ORDER BY event_millis ASC

En esta consulta, se ilustra la brecha entre la hora del evento y la hora de procesamiento. Sin embargo, puede ser difícil ver el panorama completo si solo observas los datos tabulares sin procesar. Utilizaremos Data Studio, un motor ligero de IE y visualización de datos.

  1. Para habilitar Data Studio, sigue estos pasos:
  • Visita Data Studio.
  • En la esquina superior izquierda, haz clic en Crear.
  • Haz clic en Informe.
  • Marca la casilla de las Condiciones del Servicio y, luego, haz clic en Listo.
  • Regresa a la IU de BigQuery.
  1. En la IU de BigQuery, haz clic en el botón Explorar datos y selecciona Explorar con Data Studio.

Se abrirá una ventana nueva.

  1. En el panel ubicado en el lado derecho de esta ventana, selecciona el tipo de diagrama de dispersión.

  2. Configura los siguientes valores en la columna Datos del panel del lado derecho:

  • Dimensión: etiqueta
  • Métrica X: event_millis
  • Métrica Y: processing_millis

El gráfico se transformará para ser un diagrama de dispersión, en el que todos los puntos estarán en la diagonal. Esto se debe a que, en los datos de transmisión que se generan actualmente, los eventos se procesan inmediatamente después de que se generan, sin retraso. Si iniciaste rápidamente la secuencia de comandos de generación de datos, es decir, antes de que el trabajo de Dataflow estuviera completamente en funcionamiento, es posible que veas un palo de hockey, ya que había mensajes en cola en Pub/Sub que se procesaron más o menos a la vez.

Sin embargo, en el mundo real, los retrasos son algo con lo que las canalizaciones deben lidiar.

El recién transformado diagrama de dispersión.

Tarea 7. Demora la entrada de transmisión

La secuencia de comandos del evento de transmisión es capaz de generar eventos con retraso simulado.

Esto representa situaciones en las que hay una demora entre los momentos en que se generan y se publican los eventos en Pub/Sub. Por ejemplo, cuando un cliente de un dispositivo móvil pasa al modo sin conexión porque no tiene servicio, pero los eventos se recopilan en el dispositivo y se publican todos a la vez cuando el dispositivo vuelve a estar en línea.

Genera entradas de transmisión con retraso

  1. Primero, cierra la ventana de Data Studio.

  2. Luego, para activar el retraso, regresa a la ventana que contiene la terminal del IDE.

  3. A continuación, detén la secuencia de comandos en ejecución con CTRL + C en la Terminal.

  4. Después, ejecuta el siguiente comando:

bash generate_streaming_events.sh true

Analiza los resultados

  • Regresa a la IU de BigQuery, vuelve a ejecutar la consulta y, luego, vuelve a crear la vista de Data Studio como antes.

Los datos nuevos que lleguen, que deberían aparecer en el lado derecho del gráfico, ya no deberían ser perfectos. En cambio, algunos aparecerán sobre la diagonal, lo que indica que se procesaron luego de que ocurrieron los eventos.

Tipo de gráfico: dispersión

  • Dimensión: etiqueta
  • Métrica X: event_millis
  • Métrica Y: processing_millis

El diagrama de dispersión actualizado.

Finalice su lab

Cuando haya completado el lab, haga clic en Finalizar lab. Google Cloud Skills Boost quitará los recursos que usó y limpiará la cuenta.

Tendrá la oportunidad de calificar su experiencia en el lab. Seleccione la cantidad de estrellas que corresponda, ingrese un comentario y haga clic en Enviar.

La cantidad de estrellas indica lo siguiente:

  • 1 estrella = Muy insatisfecho
  • 2 estrellas = Insatisfecho
  • 3 estrellas = Neutral
  • 4 estrellas = Satisfecho
  • 5 estrellas = Muy satisfecho

Puede cerrar el cuadro de diálogo si no desea proporcionar comentarios.

Para enviar comentarios, sugerencias o correcciones, use la pestaña Asistencia.

Copyright 2020 Google LLC. All rights reserved. Google y el logotipo de Google son marcas de Google LLC. Los demás nombres de productos y empresas pueden ser marcas de las respectivas empresas a las que estén asociados.

Antes de comenzar

  1. Los labs crean un proyecto de Google Cloud y recursos por un tiempo determinado
  2. .
  3. Los labs tienen un límite de tiempo y no tienen la función de pausa. Si finalizas el lab, deberás reiniciarlo desde el principio.
  4. En la parte superior izquierda de la pantalla, haz clic en Comenzar lab para empezar

Usa la navegación privada

  1. Copia el nombre de usuario y la contraseña proporcionados para el lab
  2. Haz clic en Abrir la consola en modo privado

Accede a la consola

  1. Accede con tus credenciales del lab. Si usas otras credenciales, se generarán errores o se incurrirá en cargos.
  2. Acepta las condiciones y omite la página de recursos de recuperación
  3. No hagas clic en Finalizar lab, a menos que lo hayas terminado o quieras reiniciarlo, ya que se borrará tu trabajo y se quitará el proyecto

Este contenido no está disponible en este momento

Te enviaremos una notificación por correo electrónico cuando esté disponible

¡Genial!

Nos comunicaremos contigo por correo electrónico si está disponible

Un lab a la vez

Confirma para finalizar todos los labs existentes y comenzar este

Usa la navegación privada para ejecutar el lab

Usa una ventana de navegación privada o de Incógnito para ejecutar el lab. Así evitarás cualquier conflicto entre tu cuenta personal y la cuenta de estudiante, lo que podría generar cargos adicionales en tu cuenta personal.