Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Este instructivo es una modificación de Ejecuta un DAG de análisis de datos en Google Cloud, en el que se muestra cómo conectar tu entorno de Cloud Composer a Amazon Web Services para utilizar los datos almacenados allí. En este instructivo, se muestra cómo usar Cloud Composer para crear un DAG de Apache Airflow. El DAG une datos de un conjunto de datos públicos de BigQuery y un archivo CSV almacenado en un bucket de Amazon Web Services (AWS) S3 y, luego, ejecuta un trabajo por lotes de Dataproc Serverless para procesar los datos unidos.
El conjunto de datos públicos de BigQuery que se usa en este instructivo es ghcn_d, una base de datos integrada de resúmenes climáticos de todo el mundo. El archivo CSV contiene información sobre las fechas y los nombres de los días festivos de EE.UU. desde 1997 hasta 2021.
La pregunta que queremos responder con el DAG es: "¿Qué temperatura hizo en Chicago el Día de Acción de Gracias durante los últimos 25 años?".
Objetivos
- Crea un entorno de Cloud Composer con la configuración predeterminada
- Crea un bucket en AWS S3
- Crea un conjunto de datos vacío de BigQuery
- Cree un nuevo bucket de Cloud Storage
- Crea y ejecuta un DAG que incluya las siguientes tareas:
- Carga un conjunto de datos externo de S3 a Cloud Storage
- Carga un conjunto de datos externo de Cloud Storage a BigQuery
- Cómo unir dos conjuntos de datos en BigQuery
- Ejecuta un trabajo de PySpark de análisis de datos
Antes de comenzar
Administra permisos en AWS
Sigue la sección "Creación de políticas con el editor visual" del instructivo de AWS sobre la creación de políticas de IAM para crear una política de IAM personalizada para AWS S3 con la siguiente configuración:
- Servicio: S3
- ListAllMyBuckets (
s3:ListAllMyBuckets
), para ver tu bucket de S3 - CreateBucket (
s3:CreateBucket
), para crear un bucket - PutBucketOwnershipControls (
s3:PutBucketOwnershipControls
), para crear un bucket - ListBucket (
s3:ListBucket
), para otorgar permiso para enumerar objetos en un bucket de S3 - PutObject (
s3:PutObject
), para subir archivos a un bucket - GetBucketVersioning (
s3:GetBucketVersioning
), para borrar un objeto en un bucket - DeleteObject (
s3:DeleteObject
), para borrar un objeto en un bucket - ListBucketVersions (
s3:ListBucketVersions
), para borrar un bucket - DeleteBucket (
s3:DeleteBucket
), para borrar un bucket - Recursos: Elige "Cualquiera" junto a "bucket" y "Objeto" para otorgar permisos a cualquier recurso de ese tipo.
- Etiqueta: Ninguna
- Nombre: TutorialPolicy
Consulta la lista de acciones admitidas en Amazon S3 para obtener más información sobre cada configuración que se encuentra más arriba.
Habilita las APIs
Habilita las siguientes APIs:
Console
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Otorgar permisos
Otorga los siguientes roles y permisos a tu cuenta de usuario:
Otorga roles para administrar entornos y buckets de entornos de Cloud Composer.
Otorga el rol de propietario de datos de BigQuery (
roles/bigquery.dataOwner
) para crear un conjunto de datos de BigQuery.Otorga el rol de administrador de almacenamiento (
roles/storage.admin
) para crear un bucket de Cloud Storage.
Crea y prepara tu entorno de Cloud Composer
Crea un entorno de Cloud Composer con parámetros predeterminados:
- Elige una región ubicada en EE.UU.
- Elige la versión de Cloud Composer más reciente.
Otorga los siguientes roles a la cuenta de servicio que se usa en tu entorno de Cloud Composer para que los trabajadores de Airflow ejecuten correctamente las tareas del DAG:
- Usuario de BigQuery (
roles/bigquery.user
) - Propietario de datos de BigQuery (
roles/bigquery.dataOwner
) - Usuario de cuenta de servicio (
roles/iam.serviceAccountUser
) - Editor de Dataproc (
roles/dataproc.editor
) - Trabajador de Dataproc (
roles/dataproc.worker
)
- Usuario de BigQuery (
Crea y modifica recursos relacionados en Google Cloud
Instala el
apache-airflow-providers-amazon
paquete de PyPI en tu entorno de Cloud Composer.Crea un conjunto de datos vacío de BigQuery con los siguientes parámetros:
- Nombre:
holiday_weather
- Región:
US
- Nombre:
Crea un bucket nuevo de Cloud Storage en la multirregión
US
.Ejecuta el siguiente comando para habilitar el Acceso privado a Google en la subred predeterminada de la región en la que deseas ejecutar Dataproc Serverless para cumplir con los requisitos de redes. Te recomendamos que uses la misma región que tu entorno de Cloud Composer.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Crea recursos relacionados en AWS
Crea un bucket de S3 con la configuración predeterminada en la región que prefieras.
Conéctate a AWS desde Cloud Composer
- Obtén tu ID de clave de acceso de AWS y tu clave de acceso secreta
Agrega tu conexión de AWS S3 con la IU de Airflow:
- Ve a Administrador > Conexiones.
Crea una conexión nueva con la siguiente configuración:
- ID de conexión:
aws_s3_connection
- Tipo de conexión:
Amazon S3
- Extras:
{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}
- ID de conexión:
Procesamiento de datos con Dataproc Serverless
Explora el trabajo de PySpark de ejemplo
El siguiente código es un ejemplo de un trabajo de PySpark que convierte la temperatura de décimas de grado Celsius a grados Celsius. Este trabajo convierte los datos de temperatura del conjunto de datos en un formato diferente.
Sube el archivo de PySpark a Cloud Storage
Para subir el archivo de PySpark a Cloud Storage, haz lo siguiente:
Guarda data_analytics_process.py en tu máquina local.
En la Google Cloud consola, ve a la página del navegador de Cloud Storage:
Haz clic en el nombre del bucket que creaste anteriormente.
En la pestaña Objetos del bucket, haz clic en el botón Subir archivos, selecciona
data_analytics_process.py
en el diálogo que aparece y haz clic en Abrir.
Sube el archivo CSV a AWS S3
Para subir el archivo holidays.csv
, haz lo siguiente:
- Guarda
holidays.csv
en tu máquina local. - Sigue la guía de AWS para subir el archivo a tu bucket.
DAG de análisis de datos
Explora el DAG de ejemplo
El DAG usa varios operadores para transformar y unificar los datos:
El comando
S3ToGCSOperator
transfiere el archivo holidays.csv de tu bucket de AWS S3 a tu bucket de Cloud Storage.El comando
GCSToBigQueryOperator
transfiere el archivo holidays.csv de Cloud Storage a una tabla nueva en el conjunto de datosholidays_weather
de BigQuery que creaste anteriormente.El comando
DataprocCreateBatchOperator
crea y ejecuta un trabajo por lotes de PySpark con Dataproc Serverless.El
BigQueryInsertJobOperator
une los datos de holidays.csv en la columna "Date" con los datos meteorológicos del conjunto de datos públicos de BigQuery ghcn_d. Las tareasBigQueryInsertJobOperator
se generan de forma dinámica con un bucle for, y estas tareas se encuentran en unTaskGroup
para mejorar la legibilidad en la vista de gráfico de la IU de Airflow.
Usa la IU de Airflow para agregar variables
En Airflow, las variables son una forma universal de almacenar y recuperar parámetros de configuración o configuraciones arbitrarias como un simple almacén de pares clave-valor. Este DAG usa variables de Airflow para almacenar valores comunes. Para agregarlos a tu entorno, haz lo siguiente:
Accede a la IU de Airflow desde la consola de Cloud Composer.
Ve a Administrador > Variables.
Agrega las siguientes variables:
s3_bucket
: Es el nombre del bucket de S3 que creaste antes.gcp_project
: el ID de tu proyectogcs_bucket
: Es el nombre del bucket que creaste antes (sin el prefijogs://
).gce_region
: Es la región en la que deseas que se ejecute tu trabajo de Dataproc que cumpla con los requisitos de redes de Dataproc Serverless. Esta es la región en la que habilitaste el Acceso privado a Google anteriormente.dataproc_service_account
: Es la cuenta de servicio de tu entorno de Cloud Composer. Puedes encontrar esta cuenta de servicio en la pestaña de configuración del entorno de tu entorno de Cloud Composer.
Sube el DAG al bucket de tu entorno
Cloud Composer programa los DAG que se encuentran en la carpeta /dags
del bucket de tu entorno. Sigue estos pasos para subir el DAG con la consola deGoogle Cloud :
En tu máquina local, guarda s3togcsoperator_tutorial.py.
En la consola de Google Cloud , ve a la página Entornos.
En la lista de entornos, en la columna Carpeta de DAG, haz clic en el vínculo DAGs. Se abrirá la carpeta de DAGs de tu entorno.
Haz clic en Subir archivos.
Selecciona
s3togcsoperator_tutorial.py
en tu máquina local y haz clic en Abrir.
Activa el DAG
En tu entorno de Cloud Composer, haz clic en la pestaña DAGs.
Haz clic en el ID del DAG
s3_to_gcs_dag
.Haz clic en Trigger DAG.
Espera entre cinco y diez minutos hasta que veas una marca de verificación verde que indique que las tareas se completaron correctamente.
Valida el éxito del DAG
En la consola de Google Cloud , ve a la página BigQuery.
En el panel Explorador, haz clic en el nombre de tu proyecto.
Haz clic en
holidays_weather_joined
.Haz clic en Vista previa para ver la tabla resultante. Ten en cuenta que los números de la columna de valores están en décimas de grado Celsius.
Haz clic en
holidays_weather_normalized
.Haz clic en Vista previa para ver la tabla resultante. Ten en cuenta que los números de la columna de valores están en grados Celsius.
Limpieza
Borra los recursos individuales que creaste para este instructivo:
Borra el archivo
holidays.csv
en tu bucket de AWS S3.Borra el bucket de AWS S3 que creaste.
Borra el bucket de Cloud Storage que creaste para este instructivo.
Borra el entorno de Cloud Composer, lo que incluye borrar manualmente el bucket del entorno.