En este documento, se muestra cómo hacer lo siguiente en Dataform:
- Programa ejecuciones con parámetros de configuración del flujo de trabajo.
- Programa ejecuciones con Workflows y Cloud Scheduler.
- Programa ejecuciones con Cloud Composer.
Antes de comenzar
Para programar ejecuciones con configuraciones de flujo de trabajo o programar ejecuciones con flujos de trabajo y Cloud Scheduler, haz lo siguiente:
En la consola de Google Cloud , ve a la página Dataform.
Selecciona o crea un repositorio.
Crea una configuración de lanzamiento.
Para programar ejecuciones con Cloud Composer, haz lo siguiente:
- Selecciona o crea un repositorio de Dataform.
- Otorga acceso de Dataform a BigQuery.
- Selecciona o crea un espacio de trabajo de Dataform.
- Crea al menos una tabla.
- Crea un entorno de Cloud Composer 2.
Roles requeridos
Para obtener los permisos que necesitas para completar las tareas de este documento, pídele a tu administrador que te otorgue los siguientes roles de IAM:
-
Administrador de Dataform (
roles/dataform.admin
) en repositorios -
Trabajador de Composer (
roles/composer.worker
) en la cuenta de servicio del entorno de Cloud Composer
Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.
También puedes obtener los permisos necesarios a través de roles personalizados o cualquier otro rol predefinido.
Para usar una cuenta de servicio que no sea la cuenta de servicio predeterminada de Dataform, otorga acceso a la cuenta de servicio personalizada.
Para habilitar las ejecuciones programadas de una configuración de flujo de trabajo cuando se habilita el modo estricto de suplantación, debes otorgar el permiso iam.serviceAccounts.actAs
a la cuenta de servicio de Dataform para la cuenta de servicio que se usa en la configuración del flujo de trabajo. Este permiso está disponible en el rol de usuario de cuenta de servicio (roles/iam.serviceAccountUser
).
Para usar las credenciales de usuario de la Cuenta de Google cuando crees una configuración de flujo de trabajo (vista previa), otorga acceso a la Cuenta de Google.
Cómo programar ejecuciones con parámetros de configuración del flujo de trabajo
En esta sección, se muestra cómo crear una configuración de flujo de trabajo en Dataform para programar y configurar ejecuciones de flujos de trabajo. Puedes usar parámetros de configuración de flujo de trabajo para ejecutar flujos de trabajo de Dataform de manera programada.
Acerca de los parámetros de configuración del flujo de trabajo
Para programar ejecuciones de Dataform de todas las acciones del flujo de trabajo o de las acciones seleccionadas en BigQuery, puedes crear parámetros de configuración del flujo de trabajo. En la configuración de un flujo de trabajo, seleccionas una configuración de versión de compilación, eliges las acciones del flujo de trabajo para la ejecución y estableces la programación de ejecución.
Luego, durante una ejecución programada de la configuración de tu flujo de trabajo, Dataform implementa tu selección de acciones del resultado de compilación más reciente en la configuración de tu versión en BigQuery. También puedes activar manualmente la ejecución de un parámetro de configuración de flujo de trabajo con workflowConfigs de la API de Dataform.
Una configuración del flujo de trabajo de Dataform contiene los siguientes parámetros de configuración de ejecución:
- Es el ID de la configuración del flujo de trabajo.
- Es la configuración de lanzamiento.
Es la cuenta de servicio.
Esta es la cuenta de servicio asociada con la configuración del flujo de trabajo. Puedes seleccionar la cuenta de servicio predeterminada de Dataform o una cuenta de servicio asociada a tu proyecto de Google Cloud , o bien puedes ingresar manualmente otra cuenta de servicio. De forma predeterminada, las configuraciones de flujo de trabajo usan las mismas cuentas de servicio que sus repositorios.
Las credenciales de la cuenta de servicio son el método de autorización predeterminado para la creación y ejecución de configuraciones de flujos de trabajo programados.
Credenciales de usuario de la Cuenta de Google (vista previa)
Las credenciales de usuario de la Cuenta de Google son el método de autorización predeterminado para la creación y ejecución manuales y no programadas de la configuración del flujo de trabajo. Para obtener más información, consulta Autoriza tu Cuenta de Google.
Acciones del flujo de trabajo que se ejecutarán:
- Todas las acciones.
- Selección de acciones
- Selección de etiquetas
Esquema de ejecución y zona horaria.
Crea una configuración de flujo de trabajo
Para crear una configuración de flujo de trabajo de Dataform, sigue estos pasos:
- En tu repositorio, ve a Lanzamientos y programación.
- En la sección Configuración del flujo de trabajo, haz clic en Crear.
En el panel Crear configuración de flujo de trabajo, en el campo ID de configuración, ingresa un ID único para la configuración del flujo de trabajo.
Los IDs solo pueden incluir números, letras, guiones y guiones bajos.
En el menú Configuración de lanzamiento, selecciona una configuración de lanzamiento de compilación.
En la sección Autenticación, autoriza la configuración del flujo de trabajo con las credenciales de usuario de tu Cuenta de Google o una cuenta de servicio.
- Para usar las credenciales de usuario de tu Cuenta de Google (versión preliminar), selecciona Ejecutar con mis credenciales de usuario.
- Para usar una cuenta de servicio, selecciona Ejecutar con la cuenta de servicio seleccionada y, luego, selecciona la cuenta de servicio predeterminada de Dataform o cualquier cuenta de servicio asociada con tu proyectoGoogle Cloud al que tengas acceso. Si no seleccionas una cuenta de servicio, la configuración del flujo de trabajo usa la cuenta de servicio del repositorio.
Opcional: En el campo Frecuencia de programación, ingresa la frecuencia de las ejecuciones en el formato unix-cron.
Para verificar que Dataform ejecute el resultado de compilación más reciente en la configuración de versión correspondiente, mantén un descanso de al menos una hora entre el momento de la creación del resultado de compilación y el momento de la ejecución programada.
Opcional: En el menú Zona horaria, selecciona la zona horaria para las ejecuciones.
La zona horaria predeterminada es UTC.
Selecciona las acciones del flujo de trabajo que se ejecutarán:
- Para ejecutar todo el flujo de trabajo, haz clic en Todas las acciones.
- Para ejecutar las acciones seleccionadas en el flujo de trabajo, haz clic en Selección de acciones y, luego, selecciona las acciones.
- Para ejecutar acciones con las etiquetas seleccionadas, haz clic en Selección de etiquetas y, luego, selecciona las etiquetas.
- Opcional: Para ejecutar las acciones o etiquetas seleccionadas y sus dependencias, selecciona la opción Incluir dependencias.
- Opcional: Para ejecutar las acciones o etiquetas seleccionadas y sus elementos dependientes, selecciona la opción Incluir dependientes.
- Opcional: Para volver a compilar todas las tablas desde cero, selecciona la opción Ejecutar con actualización completa.
Sin esta opción, Dataform actualiza las tablas incrementales sin volver a compilarlas desde cero.
Haz clic en Crear. Si seleccionaste Ejecutar con mis credenciales de usuario como método de autenticación, debes autorizar tu Cuenta de Google (versión preliminar).
Por ejemplo, la siguiente configuración del flujo de trabajo ejecuta acciones con la etiqueta hourly
cada hora en la zona horaria CEST:
- ID de configuración:
production-hourly
- Configuración de lanzamiento: -
- Frecuencia:
0 * * * *
- Zona horaria:
Central European Summer Time (CEST)
- Selección de acciones del flujo de trabajo: selección de etiquetas, etiqueta
hourly
Autoriza tu Cuenta de Google
Para autenticar el recurso con las credenciales de usuario de tu Cuenta de Google, debes otorgar permiso de forma manual a las canalizaciones de BigQuery para obtener el token de acceso de tu Cuenta de Google y acceder a los datos de origen en tu nombre. Puedes otorgar la aprobación manual con la interfaz del diálogo de OAuth.
Solo debes otorgar permiso a las canalizaciones de BigQuery una vez.
Para revocar el permiso que otorgaste, sigue estos pasos:
- Ve a la página de tu Cuenta de Google.
- Haz clic en BigQuery Pipelines.
- Haga clic en Quitar acceso.
Cambiar el propietario de la configuración del flujo de trabajo actualizando las credenciales también requiere aprobación manual si el nuevo propietario de la Cuenta de Google nunca antes creó una configuración del flujo de trabajo.
Cómo editar la configuración de un flujo de trabajo
Para editar la configuración de un flujo de trabajo, sigue estos pasos:
- En tu repositorio, ve a Lanzamientos y programación.
- En la configuración del flujo de trabajo que deseas editar, haz clic en el menú Más y, luego, en Editar.
- En el panel Editar configuración del flujo de trabajo, edita la configuración de lanzamiento y, luego, haz clic en Guardar.
Borra la configuración de un flujo de trabajo
Para borrar la configuración de un flujo de trabajo, sigue estos pasos:
- En tu repositorio, ve a Lanzamientos y programación.
- En la configuración del flujo de trabajo que deseas borrar, haz clic en el menú Más y, luego, en Borrar.
- En el cuadro de diálogo Borrar configuración de lanzamiento, haz clic en Borrar.
Programa ejecuciones con Workflows y Cloud Scheduler
En esta sección, se muestra cómo programar ejecuciones de flujos de trabajo de Dataform con Workflows y Cloud Scheduler.
Acerca de las ejecuciones programadas del flujo de trabajo
Puedes configurar la frecuencia de las ejecuciones de tu flujo de trabajo de Dataform creando un trabajo de Cloud Scheduler que active un flujo de trabajo de Workflows. Workflows ejecuta servicios en un flujo de trabajo de organización que tú defines.
Workflows ejecuta tu flujo de trabajo de Dataform en un proceso de dos pasos. Primero, extrae el código de tu repositorio de Dataform de tu proveedor de Git y lo compila en un resultado de compilación. Luego, usa el resultado de la compilación para crear un flujo de trabajo de Dataform y lo ejecuta con la frecuencia que establezcas.
Crea un flujo de trabajo de coordinación programado
Para programar ejecuciones de tu flujo de trabajo de Dataform, usa Workflows para crear un flujo de trabajo de organización y agrega un trabajo de Cloud Scheduler como activador.
Workflows usa cuentas de servicio para otorgar acceso a los flujos de trabajo a los recursos deGoogle Cloud . Crea una cuenta de servicio y otórgale el rol de Identity and Access Management Editor de Dataform (
roles/dataform.editor
), así como los permisos mínimos necesarios para administrar tu flujo de trabajo de orquestación. Para obtener más información, consulta Otorga permiso a un flujo de trabajo para acceder a los recursos de Google Cloud .Crea un flujo de trabajo de organización y usa el siguiente código fuente en YAML como definición del flujo de trabajo:
main: steps: - init: assign: - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}
Reemplaza lo siguiente:
- PROJECT_ID: Es el ID de tu proyecto de Google Cloud .
- REPOSITORY_LOCATION: Es la ubicación de tu repositorio de Dataform.
- REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
- GIT_COMMITISH: Es la rama de Git desde la que deseas ejecutar el código de Dataform. Para un repositorio recién creado, reemplaza
main
.
Programa el flujo de trabajo de organización con Cloud Scheduler.
Personaliza la solicitud de resultado de compilación de creación del flujo de trabajo de Dataform
Puedes actualizar el flujo de trabajo de orquestación existente y definir la configuración de la solicitud del resultado de la compilación de creación del flujo de trabajo de Dataform en formato YAML. Para obtener más información sobre la configuración, consulta la referencia del recurso projects.locations.repositories.compilationResults
de REST.
Por ejemplo, para agregar un parámetro de configuración de _dev
schemaSuffix
a todas las acciones durante la compilación, reemplaza el cuerpo del paso createCompilationResult
con el siguiente fragmento de código:
- createCompilationResult:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
auth:
type: OAuth2
body:
gitCommitish: GIT_COMMITISH
codeCompilationConfig:
schemaSuffix: dev
También puedes pasar parámetros de configuración adicionales como argumentos de tiempo de ejecución en una solicitud de ejecución de Workflows y acceder a esos argumentos con variables. Para obtener más información, consulta Cómo pasar argumentos del entorno de ejecución en una solicitud de ejecución.
Personaliza la solicitud de invocación del flujo de trabajo de Dataform
Puedes actualizar el flujo de trabajo de orquestación existente y definir la configuración de la solicitud de invocación del flujo de trabajo de Dataform en formato YAML. Para obtener más información sobre la configuración de la solicitud de invocación, consulta la referencia del recurso projects.locations.repositories.workflowInvocations
de REST.
Por ejemplo, para ejecutar solo acciones con la etiqueta hourly
con todas las dependencias transitivas incluidas, reemplaza el cuerpo de createWorkflowInvocation
por el siguiente fragmento de código:
- createWorkflowInvocation:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: ${compilationResult.body.name}
invocationConfig:
includedTags:
- hourly
transitiveDependenciesIncluded: true
También puedes pasar parámetros de configuración adicionales como argumentos de tiempo de ejecución en una solicitud de ejecución de Workflows y acceder a esos argumentos con variables. Para obtener más información, consulta Cómo pasar argumentos del entorno de ejecución en una solicitud de ejecución.
Programa ejecuciones con Cloud Composer
Puedes usar Cloud Composer 2 para programar ejecuciones de Dataform. Dataform no admite Cloud Composer 1.
Para administrar las programaciones de las ejecuciones de Dataform con Cloud Composer 2, puedes usar operadores de Dataform en grafos acíclicos dirigidos (DAG) de Airflow. Puedes crear un DAG de Airflow que programe invocaciones de flujos de trabajo de Dataform.
Dataform proporciona varios operadores de Airflow. Estos incluyen operadores para obtener un resultado de compilación, obtener una invocación de flujo de trabajo y cancelar una invocación de flujo de trabajo. Para ver la lista completa de los operadores de Airflow de Dataform disponibles, consulta Operadores de Google Dataform.
Instala el paquete google-cloud-dataform
de PyPI
Si usas las versiones 2.0.25
y posteriores de Cloud Composer 2, este paquete ya está instalado en tu entorno. No es necesario que la instales.
Si usas versiones anteriores de Cloud Composer 2, instala el paquete google-cloud-dataform
de PyPI.
En la sección Paquetes de PyPI, especifica la versión ==0.2.0
.
Crea un DAG de Airflow que programe invocaciones de flujos de trabajo de Dataform
Para administrar las ejecuciones programadas de flujos de trabajo de Dataform con Cloud Composer 2, escribe el DAG con los operadores de Airflow de Dataform y, luego, súbelo al bucket de tu entorno.
En la siguiente muestra de código, se muestra un DAG de Airflow que crea un resultado de compilación de Dataform y, luego, inicia una invocación de flujo de trabajo de Dataform:
from datetime import datetime
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Reemplaza lo siguiente:
- PROJECT_ID: Es el ID de tu proyecto de Dataform Google Cloud .
- REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
- REGION: Es la región en la que se encuentra el repositorio de Dataform.
- COMPILATION_RESULT: Es el nombre del resultado de la compilación que deseas usar para esta invocación del flujo de trabajo.
- GIT_COMMITISH: Es el commitish de Git en el repositorio de Git remoto de la versión de tu código que deseas usar, por ejemplo, una rama o un SHA de Git.
En el siguiente muestra de código, se muestra un DAG de Airflow que realiza las siguientes acciones:
- Crea un resultado de compilación de Dataform.
- Inicia una invocación asíncrona del flujo de trabajo de Dataform.
- Consulta el estado de tu flujo de trabajo hasta que entre en el estado esperado con
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Reemplaza lo siguiente:
- PROJECT_ID: Es el ID de tu proyecto de Dataform Google Cloud .
- REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
- REGION: Es la región en la que se encuentra el repositorio de Dataform.
- COMPILATION_RESULT: Es el nombre del resultado de la compilación que deseas usar para esta invocación del flujo de trabajo.
- GIT_COMMITISH: Es el commitish de Git en el repositorio de Git remoto de la versión de tu código que deseas usar, por ejemplo, una rama o un SHA de Git.
- COMPILATION_RESULT: Es el nombre del resultado de la compilación que deseas usar para esta invocación del flujo de trabajo.
Agrega parámetros de configuración de compilación
Puedes agregar parámetros de configuración de compilación adicionales al objeto create_compilation_result
del DAG de Airflow. Para obtener más información sobre los parámetros disponibles, consulta la referencia de la API de CodeCompilationConfig
de Dataform.
Para agregar parámetros de configuración de compilación al objeto DAG de Airflow
create_compilation_result
, agrega los parámetros seleccionados al campocode_compilation_config
con el siguiente formato:create_compilation_result = DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"} }, )
Reemplaza lo siguiente:
- PROJECT_ID: Es el ID de tu proyecto de Dataform Google Cloud .
- REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
- REGION: Es la región en la que se encuentra el repositorio de Dataform.
- GIT_COMMITISH: Es el commitish de Git en el repositorio de Git remoto de la versión de tu código que deseas usar, por ejemplo, una rama o un SHA de Git.
- PARAMETER: Es el parámetro
CodeCompilationConfig
seleccionado. Puedes agregar varios parámetros. - PARAMETER_VALUE: Es el valor del parámetro seleccionado.
En el siguiente muestra de código, se muestra el parámetro defaultDatabase
agregado al objeto create_compilation_result
DAG de Airflow:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Agrega parámetros de configuración de invocación del flujo de trabajo
Puedes agregar parámetros de configuración adicionales de invocación del flujo de trabajo al objeto DAG de create_workflow_invocation
Airflow. Para obtener más información sobre los parámetros disponibles, consulta la referencia de la API de InvocationConfig
de Dataform.
Para agregar parámetros de configuración de invocación del flujo de trabajo al objeto
create_workflow_invocation
del DAG de Airflow, agrega los parámetros seleccionados al campoinvocation_config
con el siguiente formato:create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id='create_workflow_invocation', project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}", "invocation_config": { "PARAMETER": PARAMETER_VALUE } }, )
Reemplaza lo siguiente:
- PROJECT_ID: Es el ID de tu proyecto de Dataform Google Cloud .
- REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
- REGION: Es la región en la que se encuentra el repositorio de Dataform.
- PARAMETER: Es el parámetro
InvocationConfig
seleccionado. Puedes agregar varios parámetros. - PARAMETER_VALUE: Es el valor del parámetro seleccionado.
En el siguiente muestra de código, se muestran los parámetros includedTags[]
y transitiveDependenciesIncluded
agregados al objeto create_workflow_invocation
del DAG de Airflow:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
},
)
¿Qué sigue?
- Para obtener información sobre cómo configurar las configuraciones de lanzamiento de compilación de Dataform, consulta Crea una configuración de lanzamiento.
- Para obtener más información sobre el ciclo de vida del código en Dataform, consulta la Introducción al ciclo de vida del código en Dataform.
- Para obtener más información sobre la API de Dataform, consulta API de Dataform.
- Para obtener más información sobre los entornos de Cloud Composer, consulta la Descripción general de Cloud Composer.
- Para obtener más información sobre los precios de Workflows, consulta Precios de Workflows.