Gestion des tùches et des graphiques de tùches Snowflake avec Python¶

Vous pouvez utiliser Python pour gĂ©rer les tĂąches Snowflake, avec lesquelles vous pouvez exĂ©cuter des instructionsSQL, des appels de procĂ©dure et une logique dans ExĂ©cution de scripts Snowflake. Pour une vue d’ensemble des tĂąches, voir Introduction aux tĂąches.

Les Snowflake Python APIs reprĂ©sentent des tĂąches avec deux types distincts :

  • Task : Expose les propriĂ©tĂ©s d’une tĂąche telles que sa planification, ses paramĂštres et ses prĂ©dĂ©cesseurs.

  • TaskResource : Expose des mĂ©thodes que vous pouvez utiliser pour rĂ©cupĂ©rer un objet Task correspondant, exĂ©cuter la tĂąche et modifier la tĂąche.

Conditions préalables¶

Les exemples de cette rubrique supposent que vous ayez ajouté le code nécessaire pour vous connecter à Snowflake et créer un objet Root à partir duquel utiliser les Snowflake Python APIs.

Par exemple, le code suivant utilise les paramÚtres de connexion définis dans un fichier de configuration pour créer une connexion à Snowflake.

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

En utilisant l’objet Session obtenu, le code crĂ©e un objet Root pour utiliser les types et les mĂ©thodes de l’API. Pour plus d’informations, voir Connexion Ă  Snowflake avec Snowflake Python APIs.

CrĂ©ation d’une tĂąche¶

Pour créer une tùche, commencez par créer un objet Task. Ensuite, en spécifiant la base de données et le schéma dans lesquels créer la tùche, créez un objet TaskCollection. Via TaskCollection.create, ajoutez la nouvelle tùche à Snowflake.

Le code de l’exemple suivant crĂ©e un objet Task reprĂ©sentant une tĂąche nommĂ©e my_task qui exĂ©cute une requĂȘte SQL spĂ©cifiĂ©e dans le paramĂštre definition :

from datetime import timedelta
from snowflake.core.task import Task

my_task = Task(name="my_task", definition="<sql query>", schedule=timedelta(hours=1))
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task)
Copy

Ce code crée une variable TaskCollection tasks à partir de la base de données my_db et du schéma my_schema. Via TaskCollection.create, il crée une nouvelle tùche dans Snowflake.

Cet exemple de code spĂ©cifie Ă©galement une valeur timedelta d’une heure pour la planification de la tĂąche. Vous pouvez dĂ©finir la planification d’une tĂąche via une valeur timedelta ou une expression Cron.

Vous pouvez Ă©galement crĂ©er une tĂąche qui exĂ©cute une fonction Python ou une procĂ©dure stockĂ©e. Le code de l’exemple suivant crĂ©e une tĂąche nommĂ©e my_task2 qui exĂ©cute une fonction reprĂ©sentĂ©e par un objet StoredProcedureCall :

from snowflake.core.task import StoredProcedureCall, Task

my_task2 = Task(
  "my_task2",
  StoredProcedureCall(
      dosomething, stage_location="@mystage"
  ),
  warehouse="test_warehouse"
)
tasks = root.databases['my_db'].schemas['my_schema'].tasks
tasks.create(my_task2)
Copy

Cet objet spĂ©cifie une fonction nommĂ©e dosomething situĂ©e dans l’emplacement de zone de prĂ©paration @mystage. Vous devez Ă©galement spĂ©cifier un warehouse lorsque vous crĂ©ez une tĂąche avec un objet StoredProcedureCall.

CrĂ©ation ou modification d’une tĂąche¶

Vous pouvez dĂ©finir les propriĂ©tĂ©s d’un objet Task et le transmettre Ă  la mĂ©thode TaskResource.create_or_alter pour crĂ©er une tĂąche si elle n’existe pas, ou la modifier selon la dĂ©finition de la tĂąche si elle existe. Le comportement de create_or_alter est censĂ© ĂȘtre idempotent, ce qui signifie que l’objet de tĂąche rĂ©sultant sera le mĂȘme, que la tĂąche existe ou non avant l’appel de la mĂ©thode.

Note

La mĂ©thode create_or_alter utilise des valeurs par dĂ©faut pour toute propriĂ©tĂ© de tĂąche que vous ne dĂ©finissez pas explicitement. Par exemple, si vous ne dĂ©finissez pas schedule, sa valeur par dĂ©faut est None mĂȘme si la tĂąche existait auparavant avec une valeur diffĂ©rente.

Le code de l’exemple suivant met Ă  jour la dĂ©finition et la planification de la tĂąche my_task, puis modifie la tĂąche sur Snowflake :

from datetime import timedelta
from snowflake.core.task import Task

my_task = root.databases['my_db'].schemas['my_schema'].tasks['my_task'].fetch()
my_task.definition = "<sql query 2>"
my_task.schedule = timedelta(hours=2)

my_task_res = root.databases['my_db'].schemas['my_schema'].tasks['my_task']
my_task_res.create_or_alter(my_task)
Copy

Répertorier les tùches¶

Vous pouvez rĂ©pertorier les tĂąches via la mĂ©thode TaskCollection.iter. La mĂ©thode renvoie un itĂ©rateur PagedIter d’objets Task.

Le code de l’exemple suivant rĂ©pertorie les tĂąches dont le nom commence par my :

from snowflake.core.task import TaskCollection

tasks: TaskCollection = root.databases['my_db'].schemas['my_schema'].tasks
task_iter = tasks.iter(like="my%")  # returns a PagedIter[Task]
for task_obj in task_iter:
  print(task_obj.name)
Copy

Effectuer des opérations de tùche¶

Vous pouvez effectuer des opĂ©rations de tĂąche courantes—comme exĂ©cuter, suspendre et reprendre des tĂąches—avec un objet TaskResource.

Le code de l’exemple suivant exĂ©cute, suspend, reprend et supprime la tĂąche my_task :

tasks = root.databases['my_db'].schemas['my_schema'].tasks
task_res = tasks['my_task']

task_res.execute()
task_res.suspend()
task_res.resume()
task_res.drop()
Copy

Gestion des tùches dans un graphique de tùches¶

Vous pouvez gĂ©rer des tĂąches rassemblĂ©es dans un graphique de tĂąches. Un graphique de tĂąches est une sĂ©rie de tĂąches composĂ©e d’une seule tĂąche racine et de tĂąches supplĂ©mentaires, organisĂ©es en fonction de leurs dĂ©pendances.

Pour en savoir plus sur les tĂąches dans un graphique de tĂąches, voir CrĂ©er une sĂ©quence de tĂąches Ă  l’aide d’un graphique de tĂąches.

CrĂ©ation d’un graphique de tĂąches¶

Pour crĂ©er un graphique de tĂąches, commencez par crĂ©er un objet DAG qui spĂ©cifie son nom et d’autres propriĂ©tĂ©s facultatives telles que sa planification. Vous pouvez dĂ©finir la planification d’un graphique de tĂąches via une valeur timedelta ou une expression Cron.

Le code de l’exemple suivant dĂ©finit une fonction Python dosomething, puis spĂ©cifie la fonction sous forme d’objet DAGTask nommĂ© dag_task2 dans le graphique de tĂąches.

from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sum as sum_

def dosomething(session: Session) -> None:
  df = session.table("target")
  df.group_by("a").agg(sum_("b")).save_as_table("agg_table")

with DAG("my_dag", schedule=timedelta(days=1)) as dag:
  # Create a task that runs some SQL.
  dag_task1 = DAGTask(
    "dagtask1",
    "MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v"
  )
  # Create a task that runs a Python function.
  dag_task2 = DAGTask(
    StoredProcedureCall(
      dosomething, stage_location="@mystage",
      packages=["snowflake-snowpark-python"]
    ),
    warehouse="test_warehouse"
  )
# Shift right and left operators can specify task relationships.
dag_task1 >> dag_task2  # dag_task1 is a predecessor of dag_task2
schema = root.databases["my_db"].schemas["my_schema"]
dag_op = DAGOperation(schema)
dag_op.deploy(dag)
Copy

Ce code dĂ©finit Ă©galement une instruction SQL sous la forme d’un autre objet DAGTask nommĂ© dag_task1, puis spĂ©cifie dag_task1 comme prĂ©dĂ©cesseur de dag_task2. Pour finir, il dĂ©ploie le graphique de tĂąches dans Snowflake dans la base de donnĂ©es my_db et le schĂ©ma my_schema.

CrĂ©ation d’un graphique de tĂąches avec une planification cron, des branches de tĂąches et des valeurs de renvoi de fonctions.¶

Vous pouvez également créer un graphique de tùches avec une planification cron spécifique, des branches de tùches et des valeurs de renvoi de fonctions qui sont utilisées comme valeurs de renvoi de tùches.

Le code de l’exemple suivant crĂ©e un objet DAG avec un objet Cron spĂ©cifiant sa planification. Il dĂ©finit un objet DAGTaskBranch nommĂ© task1_branch ainsi que d’autres objets DAGTask et spĂ©cifie leurs dĂ©pendances les uns par rapport aux autres :

from snowflake.core._common import CreateMode
from snowflake.core.task import Cron
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  pass  # do something

def task_branch_handler(session: Session) -> str:
  # do something
  return "task3"

try:
  with DAG(
    "my_dag",
    schedule=Cron("10 * * * *", "America/Los_Angeles"),
    stage_location="@mystage",
    packages=["snowflake-snowpark-python"],
    use_func_return_value=True,
  ) as dag:
    task1 = DAGTask(
      "task1",
      task_handler,
      warehouse=test_warehouse,
    )
    task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse)
    task2 = DAGTask("task2", task_handler, warehouse=test_warehouse)
    task3 = DAGTask("task3", task_handler, warehouse=test_warehouse, condition="1=1")
    task1 >> task1_branch
    task1_branch >> [task2, task3]
  schema = root.databases["my_db"].schemas["my_schema"]
  op = DAGOperation(schema)
  op.deploy(dag, mode=CreateMode.or_replace)
finally:
  session.close()
Copy

Cet exemple de code dĂ©finit Ă©galement les fonctions de gestion des tĂąches et crĂ©e chaque objet DAGTask et DAGTaskBranch avec un gestionnaire de tĂąches spĂ©cifique affectĂ© Ă  la tĂąche. Le code dĂ©finit le paramĂštre use_func_return_value du DAG sur True, qui indique qu’il convient d’utiliser la valeur de renvoi de la fonction Python comme valeur de renvoi de la tĂąche correspondante. Sinon, la valeur par dĂ©faut de use_func_return_value est False.

DĂ©finition et obtention de la valeur de retour d’une tĂąche dans un graphique de tĂąches¶

Lorsque la dĂ©finition d’une tĂąche est un objet StoredProcedureCall, le gestionnaire de la procĂ©dure stockĂ©e (ou de la fonction) peut explicitement dĂ©finir la valeur de renvoi de la tĂąche via un objet TaskContext.

Pour plus d’informations, voir SYSTEM$SET_RETURN_VALUE.

Le code de l’exemple suivant dĂ©finit une fonction de gestion des tĂąches qui crĂ©e un objet TaskContext nommĂ© context Ă  partir de la session en cours. Il utilise ensuite la mĂ©thode TaskContext.set_return_value pour dĂ©finir explicitement la valeur de renvoi sur une chaĂźne spĂ©cifiĂ©e :

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  # this return value can be retrieved by successor Tasks.
  context.set_return_value("predecessor_return_value")
Copy

Dans un graphique de tùches, une tùche de successeur immédiate qui identifie la tùche précédente comme son prédécesseur peut alors récupérer la valeur de renvoi explicitement définie par la tùche de prédécesseur.

Pour plus d’informations, voir SYSTEM$GET_PREDECESSOR_RETURN_VALUE.

Le code de l’exemple suivant dĂ©finit une fonction de gestionnaire de tĂąches qui utilise la mĂ©thode TaskContext.get_predecessor_return_value pour obtenir la valeur de renvoi de la tĂąche de prĂ©dĂ©cesseur nommĂ©e pred_task_name :

from snowflake.core.task.context import TaskContext
from snowflake.snowpark import Session

def task_handler(session: Session) -> None:
  context = TaskContext(session)
  pred_return_value = context.get_predecessor_return_value("pred_task_name")
Copy