Gestion des tùches et des graphiques de tùches Snowflake avec Python¶
Vous pouvez utiliser Python pour gĂ©rer les tĂąches Snowflake, avec lesquelles vous pouvez exĂ©cuter des instructionsSQL, des appels de procĂ©dure et une logique dans ExĂ©cution de scripts Snowflake. Pour une vue dâensemble des tĂąches, voir Introduction aux tĂąches.
Les Snowflake Python APIs représentent des tùches avec deux types distincts :
Task
: Expose les propriĂ©tĂ©s dâune tĂąche telles que sa planification, ses paramĂštres et ses prĂ©dĂ©cesseurs.TaskResource
: Expose des méthodes que vous pouvez utiliser pour récupérer un objetTask
correspondant, exécuter la tùche et modifier la tùche.
Conditions préalables¶
Les exemples de cette rubrique supposent que vous ayez ajouté le code nécessaire pour vous connecter à Snowflake et créer un objet Root
Ă partir duquel utiliser les Snowflake Python APIs.
Par exemple, le code suivant utilise les paramÚtres de connexion définis dans un fichier de configuration pour créer une connexion à Snowflake.
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
En utilisant lâobjet Session
obtenu, le code crée un objet Root
pour utiliser les types et les mĂ©thodes de lâAPI. Pour plus dâinformations, voir Connexion Ă Snowflake avec Snowflake Python APIs.
CrĂ©ation dâune tĂąche¶
Pour créer une tùche, commencez par créer un objet Task
. Ensuite, en spécifiant la base de données et le schéma dans lesquels créer la tùche, créez un objet TaskCollection
. Via TaskCollection.create
, ajoutez la nouvelle tĂąche Ă Snowflake.
Le code de lâexemple suivant crĂ©e un objet Task
représentant une tùche nommée my_task
qui exĂ©cute une requĂȘte SQL spĂ©cifiĂ©e dans le paramĂštre definition
:
from datetime import timedelta
from snowflake.core.task import Task
my_task = Task(name="my_task", definition="<sql query>", schedule=timedelta(hours=1))
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task)
Ce code crée une variable TaskCollection
tasks
à partir de la base de données my_db
et du schéma my_schema
. Via TaskCollection.create
, il crée une nouvelle tùche dans Snowflake.
Cet exemple de code spécifie également une valeur timedelta
dâune heure pour la planification de la tĂąche. Vous pouvez dĂ©finir la planification dâune tĂąche via une valeur timedelta
ou une expression Cron
.
Vous pouvez Ă©galement crĂ©er une tĂąche qui exĂ©cute une fonction Python ou une procĂ©dure stockĂ©e. Le code de lâexemple suivant crĂ©e une tĂąche nommĂ©e my_task2
qui exécute une fonction représentée par un objet StoredProcedureCall
:
from snowflake.core.task import StoredProcedureCall, Task
my_task2 = Task(
"my_task2",
StoredProcedureCall(
dosomething, stage_location="@mystage"
),
warehouse="test_warehouse"
)
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task2)
Cet objet spécifie une fonction nommée dosomething
situĂ©e dans lâemplacement de zone de prĂ©paration @mystage
. Vous devez également spécifier un warehouse
lorsque vous créez une tùche avec un objet StoredProcedureCall
.
CrĂ©ation ou modification dâune tĂąche¶
Vous pouvez dĂ©finir les propriĂ©tĂ©s dâun objet Task
et le transmettre à la méthode TaskResource.create_or_alter
pour crĂ©er une tĂąche si elle nâexiste pas, ou la modifier selon la dĂ©finition de la tĂąche si elle existe. Le comportement de create_or_alter
est censĂ© ĂȘtre idempotent, ce qui signifie que lâobjet de tĂąche rĂ©sultant sera le mĂȘme, que la tĂąche existe ou non avant lâappel de la mĂ©thode.
Note
La méthode create_or_alter
utilise des valeurs par défaut pour toute propriété de tùche que vous ne définissez pas explicitement. Par exemple, si vous ne définissez pas schedule
, sa valeur par défaut est None
mĂȘme si la tĂąche existait auparavant avec une valeur diffĂ©rente.
Le code de lâexemple suivant met Ă jour la dĂ©finition et la planification de la tĂąche my_task
, puis modifie la tĂąche sur Snowflake :
from datetime import timedelta
from snowflake.core.task import Task
my_task = root.databases['my_db'].schemas['my_schema'].tasks['my_task'].fetch()
my_task.definition = "<sql query 2>"
my_task.schedule = timedelta(hours=2)
my_task_res = root.databases['my_db'].schemas['my_schema'].tasks['my_task']
my_task_res.create_or_alter(my_task)
Répertorier les tùches¶
Vous pouvez répertorier les tùches via la méthode TaskCollection.iter
. La méthode renvoie un itérateur PagedIter
dâobjets Task
.
Le code de lâexemple suivant rĂ©pertorie les tĂąches dont le nom commence par my :
from snowflake.core.task import TaskCollection
tasks: TaskCollection = root.databases['my_db'].schemas['my_schema'].tasks
task_iter = tasks.iter(like="my%") # returns a PagedIter[Task]
for task_obj in task_iter:
print(task_obj.name)
Effectuer des opérations de tùche¶
Vous pouvez effectuer des opĂ©rations de tĂąche courantesâcomme exĂ©cuter, suspendre et reprendre des tĂąchesâavec un objet TaskResource
.
Le code de lâexemple suivant exĂ©cute, suspend, reprend et supprime la tĂąche my_task
:
tasks = root.databases['my_db'].schemas['my_schema'].tasks
task_res = tasks['my_task']
task_res.execute()
task_res.suspend()
task_res.resume()
task_res.drop()
Gestion des tùches dans un graphique de tùches¶
Vous pouvez gĂ©rer des tĂąches rassemblĂ©es dans un graphique de tĂąches. Un graphique de tĂąches est une sĂ©rie de tĂąches composĂ©e dâune seule tĂąche racine et de tĂąches supplĂ©mentaires, organisĂ©es en fonction de leurs dĂ©pendances.
Pour en savoir plus sur les tĂąches dans un graphique de tĂąches, voir CrĂ©er une sĂ©quence de tĂąches Ă lâaide dâun graphique de tĂąches.
CrĂ©ation dâun graphique de tĂąches¶
Pour créer un graphique de tùches, commencez par créer un objet DAG
qui spĂ©cifie son nom et dâautres propriĂ©tĂ©s facultatives telles que sa planification. Vous pouvez dĂ©finir la planification dâun graphique de tĂąches via une valeur timedelta
ou une expression Cron
.
Le code de lâexemple suivant dĂ©finit une fonction Python dosomething
, puis spĂ©cifie la fonction sous forme dâobjet DAGTask
nommé dag_task2
dans le graphique de tĂąches.
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sum as sum_
def dosomething(session: Session) -> None:
df = session.table("target")
df.group_by("a").agg(sum_("b")).save_as_table("agg_table")
with DAG("my_dag", schedule=timedelta(days=1)) as dag:
# Create a task that runs some SQL.
dag_task1 = DAGTask(
"dagtask1",
"MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v"
)
# Create a task that runs a Python function.
dag_task2 = DAGTask(
StoredProcedureCall(
dosomething, stage_location="@mystage",
packages=["snowflake-snowpark-python"]
),
warehouse="test_warehouse"
)
# Shift right and left operators can specify task relationships.
dag_task1 >> dag_task2 # dag_task1 is a predecessor of dag_task2
schema = root.databases["my_db"].schemas["my_schema"]
dag_op = DAGOperation(schema)
dag_op.deploy(dag)
Ce code dĂ©finit Ă©galement une instruction SQL sous la forme dâun autre objet DAGTask
nommé dag_task1
, puis spécifie dag_task1
comme prédécesseur de dag_task2
. Pour finir, il déploie le graphique de tùches dans Snowflake dans la base de données my_db
et le schéma my_schema
.
CrĂ©ation dâun graphique de tĂąches avec une planification cron, des branches de tĂąches et des valeurs de renvoi de fonctions.¶
Vous pouvez également créer un graphique de tùches avec une planification cron spécifique, des branches de tùches et des valeurs de renvoi de fonctions qui sont utilisées comme valeurs de renvoi de tùches.
Le code de lâexemple suivant crĂ©e un objet DAG
avec un objet Cron
spécifiant sa planification. Il définit un objet DAGTaskBranch
nommé task1_branch
ainsi que dâautres objets DAGTask
et spécifie leurs dépendances les uns par rapport aux autres :
from snowflake.core._common import CreateMode
from snowflake.core.task import Cron
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
pass # do something
def task_branch_handler(session: Session) -> str:
# do something
return "task3"
try:
with DAG(
"my_dag",
schedule=Cron("10 * * * *", "America/Los_Angeles"),
stage_location="@mystage",
packages=["snowflake-snowpark-python"],
use_func_return_value=True,
) as dag:
task1 = DAGTask(
"task1",
task_handler,
warehouse=test_warehouse,
)
task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
task2 = DAGTask("task2", task_handler, warehouse=test_warehouse)
task3 = DAGTask("task3", task_handler, warehouse=test_warehouse, condition="1=1")
task1 >> task1_branch
task1_branch >> [task2, task3]
schema = root.databases["my_db"].schemas["my_schema"]
op = DAGOperation(schema)
op.deploy(dag, mode=CreateMode.or_replace)
finally:
session.close()
Cet exemple de code définit également les fonctions de gestion des tùches et crée chaque objet DAGTask
et DAGTaskBranch
avec un gestionnaire de tùches spécifique affecté à la tùche. Le code définit le paramÚtre use_func_return_value
du DAG sur True
, qui indique quâil convient dâutiliser la valeur de renvoi de la fonction Python comme valeur de renvoi de la tĂąche correspondante. Sinon, la valeur par dĂ©faut de use_func_return_value
est False
.
DĂ©finition et obtention de la valeur de retour dâune tĂąche dans un graphique de tĂąches¶
Lorsque la dĂ©finition dâune tĂąche est un objet StoredProcedureCall
, le gestionnaire de la procédure stockée (ou de la fonction) peut explicitement définir la valeur de renvoi de la tùche via un objet TaskContext
.
Pour plus dâinformations, voir SYSTEM$SET_RETURN_VALUE.
Le code de lâexemple suivant dĂ©finit une fonction de gestion des tĂąches qui crĂ©e un objet TaskContext
nommé context
à partir de la session en cours. Il utilise ensuite la méthode TaskContext.set_return_value
pour définir explicitement la valeur de renvoi sur une chaßne spécifiée :
from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
context = TaskContext(session)
# this return value can be retrieved by successor Tasks.
context.set_return_value("predecessor_return_value")
Dans un graphique de tùches, une tùche de successeur immédiate qui identifie la tùche précédente comme son prédécesseur peut alors récupérer la valeur de renvoi explicitement définie par la tùche de prédécesseur.
Pour plus dâinformations, voir SYSTEM$GET_PREDECESSOR_RETURN_VALUE.
Le code de lâexemple suivant dĂ©finit une fonction de gestionnaire de tĂąches qui utilise la mĂ©thode TaskContext.get_predecessor_return_value
pour obtenir la valeur de renvoi de la tùche de prédécesseur nommée pred_task_name
:
from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session
def task_handler(session: Session) -> None:
context = TaskContext(session)
pred_return_value = context.get_predecessor_return_value("pred_task_name")