Écriture d’une UDTF en Python¶

Vous pouvez implĂ©menter un gestionnaire de fonction de table dĂ©finie par l’utilisateur (UDTF) en Python. Ce code de gestionnaire s’exĂ©cute lorsque l’UDTF est appelĂ©e. Cette rubrique dĂ©crit comment implĂ©menter un gestionnaire en Python et crĂ©er l’UDTF.

Une UDTF est une fonction dĂ©finie par l’utilisateur (UDF) qui renvoie des rĂ©sultats sous forme tabulaire. Pour en savoir plus sur les gestionnaires UDF implĂ©mentĂ©s en Python, voir CrĂ©ation d’UDFs Python. Pour plus d’informations gĂ©nĂ©rales sur les UDFs, voir Vue d’ensemble des fonctions dĂ©finies par l’utilisateur.

Dans le gestionnaire d’une UDTF, vous pouvez traiter les lignes d’entrĂ©e (voir Traitement des lignes dans cette rubrique). Vous pouvez Ă©galement avoir une logique qui s’exĂ©cute pour chaque partition d’entrĂ©e (voir Traitement des partitions dans cette rubrique).

Lorsque vous crĂ©ez une UDTF Python, vous effectuez les opĂ©rations suivantes :

  1. ImplĂ©mentez une classe avec des mĂ©thodes que Snowflake invoquera lorsque l’UDTF est appelĂ©e.

    Pour plus de dĂ©tails, voir ImplĂ©mentation d’un gestionnaire (handler) dans cette rubrique.

  2. CrĂ©ez l’UDTF en SQL avec la commande CREATE FUNCTION, en spĂ©cifiant votre classe comme gestionnaire. Quand vous crĂ©ez l’UDTF, vous spĂ©cifiez :

    • Les types de donnĂ©es des paramĂštres d’entrĂ©e de l’UDTF.

    • Les types de donnĂ©es des colonnes renvoyĂ©es par l’UDTF.

    • Le code Ă  exĂ©cuter en tant que gestionnaire lorsque l’UDTF est appelĂ©e.

    • La langue dans laquelle le gestionnaire est implĂ©mentĂ©.

    Pour en savoir plus sur la syntaxe, voir CrĂ©ation de l’UDTF avec CREATE FUNCTION dans cette rubrique.

Vous pouvez appeler une UDF ou une UDTF comme décrit dans Exécutez une UDF.

Note

Les fonctions de table (UDTFs) ont une limite de 500 arguments d’entrĂ©e et 500 colonnes de sortie.

Snowflake prend actuellement en charge l’écriture d’UDTFs Java dans les versions suivantes de Python :

  • 3,9

  • 3,10

  • 3,11

  • 3,12

Dans votre instruction CREATE FUNCTION, définissez runtime_version comme étant la version souhaitée.

ImplĂ©mentation d’un gestionnaire (handler)¶

Vous implĂ©mentez une classe de gestionnaire pour traiter les valeurs des arguments d’UDTF dans des rĂ©sultats tabulaires et gĂ©rer les entrĂ©es partitionnĂ©es. Pour un exemple de classe de gestionnaire, voir Exemple de classe de gestionnaire dans cette rubrique.

Lorsque vous crĂ©ez l’UDTF avec CREATE FUNCTION, vous spĂ©cifiez cette classe comme gestionnaire de l’UDTF. Pour en savoir plus sur le code SQL permettant de crĂ©er la fonction, voir CrĂ©ation de l’UDTF avec CREATE FUNCTION dans cette rubrique.

Une classe de gestionnaire implĂ©mente les mĂ©thodes que Snowflake invoquera lorsque l’UDTF est appelĂ©e. Cette classe contient la logique de l’UDTF.

Méthode

Exigence

Description

Méthode __init__

Facultatif

Initialise l’état pour le traitement d’état des partitions d’entrĂ©e. Pour plus d’informations, voir Initialisation du gestionnaire dans cette rubrique.

Méthode process

Obligatoire

Traite chaque ligne d’entrĂ©e, en retournant une valeur tabulaire sous forme de tuples. Snowflake invoque cette mĂ©thode, en transmettant les entrĂ©es des arguments d’UDTF. Pour plus d’informations, voir DĂ©finition d’une mĂ©thode process dans cette rubrique.

Méthode end_partition

Facultatif

Finalise le traitement des partitions d’entrĂ©e, en renvoyant une valeur tabulaire sous forme de tuples. Pour plus d’informations, voir Finalisation du traitement des partitions dans cette rubrique.

L’émission d’une exception Ă  partir de n’importe quelle mĂ©thode de la classe du gestionnaire entraĂźne l’arrĂȘt du traitement. La requĂȘte qui a appelĂ© l’UDTF Ă©choue avec un message d’erreur.

Note

Si votre code ne rĂ©pond pas aux exigences dĂ©crites ici, la crĂ©ation ou l’exĂ©cution de l’UDTF peut Ă©chouer. Snowflake dĂ©tectera les violations lorsque l’instruction CREATE FUNCTION sera exĂ©cutĂ©e.

Exemple de classe de gestionnaire¶

Le code de l’exemple suivant crĂ©e une UDTF dont la classe de gestionnaire traite les lignes d’une partition. La mĂ©thode process traite chaque ligne d’entrĂ©e et renvoie une ligne contenant le coĂ»t total d’une vente d’action. AprĂšs avoir traitĂ© les lignes de la partition, elle renvoie (Ă  partir de sa mĂ©thode end_partition) le total de toutes les ventes incluses dans la partition.

CREATE OR REPLACE FUNCTION stock_sale_sum(symbol VARCHAR, quantity NUMBER, price NUMBER(10,2))
  RETURNS TABLE (symbol VARCHAR, total NUMBER(10,2))
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  HANDLER = 'StockSaleSum'
AS $$
class StockSaleSum:
    def __init__(self):
        self._cost_total = 0
        self._symbol = ""

    def process(self, symbol, quantity, price):
      self._symbol = symbol
      cost = quantity * price
      self._cost_total += cost
      yield (symbol, cost)

    def end_partition(self):
      yield (self._symbol, self._cost_total)
$$;
Copy

Le code de l’exemple suivant appelle l’UDF prĂ©cĂ©dente, en transmettant les valeurs des colonnes symbol, quantity, et price de la table stocks_table. Pour plus d’informations sur l’appel d’une UDTF, reportez-vous Ă  ExĂ©cutez une UDF.

SELECT stock_sale_sum.symbol, total
  FROM stocks_table, TABLE(stock_sale_sum(symbol, quantity, price) OVER (PARTITION BY symbol));
Copy

Initialisation du gestionnaire¶

Vous pouvez éventuellement implémenter une méthode __init__ dans votre classe de gestionnaire que Snowflake invoquera avant que le gestionnaire ne commence à traiter les lignes. Par exemple, vous pouvez utiliser cette méthode pour établir un état spécifique à la partition pour le gestionnaire. Votre méthode __init__ peut ne pas produire de lignes de sortie.

La signature de la mĂ©thode doit ĂȘtre de la forme suivante :

def __init__(self):
Copy

Par exemple, vous pourriez vouloir :

  • Initialise l’état d’une partition, puis utilise cet Ă©tat dans les mĂ©thodes process et end_partition.

  • ExĂ©cutez une initialisation de longue durĂ©e qui doit ĂȘtre effectuĂ©e une seule fois par partition plutĂŽt qu’une fois par ligne.

Note

Vous pouvez également exécuter la logique une fois avant le début du traitement des partitions en incluant ce code en dehors de la classe du gestionnaire, par exemple avant la déclaration de la classe.

Pour plus d’informations sur le traitement des partitions, voir Traitement des partitions dans cette rubrique.

Si vous utilisez une mĂ©thode __init__ n’oubliez pas que __init__ :

  • Ne peut prendre que self comme argument.

  • Impossible de produire de lignes de sortie. Utilisez l’implĂ©mentation de votre mĂ©thode process pour cela.

  • Est invoquĂ©e une fois pour chaque partition, et avant que la mĂ©thode process soit invoquĂ©e.

Traitement des lignes¶

ImplĂ©mentez une mĂ©thode process que Snowflake invoquera pour chaque ligne d’entrĂ©e.

DĂ©finition d’une mĂ©thode process¶

DĂ©finissez une mĂ©thode process qui reçoit comme valeurs les UDTF arguments convertis Ă  partir de types SQL, renvoyant des donnĂ©es que Snowflake utilisera pour crĂ©er la valeur de retour tabulaire de l’UDTF.

La signature de la mĂ©thode doit ĂȘtre de la forme suivante :

def process(self, *args):
Copy

Votre mĂ©thode process doit :

  • Avoir un paramĂštre self.

  • DĂ©clarer les paramĂštres de la mĂ©thode correspondant aux paramĂštres de l’UDTF.

    Les noms des paramĂštres des mĂ©thodes ne doivent pas nĂ©cessairement correspondre aux noms des paramĂštres de l’UDTF, mais les paramĂštres des mĂ©thodes doivent ĂȘtre dĂ©clarĂ©s dans le mĂȘme ordre que les paramĂštres de l’UDTF.

    Lorsque vous transmettez des valeurs d’arguments de l’UDTF Ă  votre mĂ©thode, Snowflake convertit les valeurs des types SQL en types Python que vous utilisez dans la mĂ©thode. Pour plus d’informations sur la maniĂšre dont Snowflake Ă©tablit une correspondance entre SQL et les types de donnĂ©es Python, voir Mappages des types de donnĂ©es SQL-Python.

  • Donne un ou plusieurs tuples (ou renvoie un itĂ©rable contenant des tuples), dans lequel la sĂ©quence de tuples correspond Ă  la sĂ©quence de colonnes de valeurs de retour de l’UDTF.

    Les Ă©lĂ©ments du tuple doivent apparaĂźtre dans le mĂȘme ordre que les colonnes de la valeur de retour UDTF sont dĂ©clarĂ©es. Pour plus d’informations, voir Renvoi d’une valeur dans cette rubrique.

    Snowflake convertira les valeurs des types Python en types SQL requis par la dĂ©claration UDTF. Pour plus d’informations sur la maniĂšre dont Snowflake Ă©tablit une correspondance entre SQL et les types de donnĂ©es Python, voir Mappages des types de donnĂ©es SQL-Python.

Si une mĂ©thode de la classe du gestionnaire lĂšve une exception, le traitement s’arrĂȘte. La requĂȘte qui a appelĂ© l’UDTF Ă©choue avec un message d’erreur. Si la mĂ©thode process renvoie None, le traitement s’arrĂȘte. (La mĂ©thode end_partition est toujours invoquĂ©e mĂȘme si la mĂ©thode process renvoie None).

Exemple de méthode de traitement

L’exemple suivant prĂ©sente une classe de gestionnaire StockSale avec une mĂ©thode process qui traite trois arguments UDTF (symbol, quantity, et price), et renvoie une seule ligne avec deux colonnes (symbol et total). Notez que les paramĂštres de la mĂ©thode process sont dĂ©clarĂ©s dans le mĂȘme ordre que les paramĂštres stock_sale. Les arguments de l’instruction yield de la mĂ©thode process sont dans le mĂȘme ordre que les colonnes dĂ©clarĂ©es dans la clause stock_sale RETURNS TABLE.

CREATE OR REPLACE FUNCTION stock_sale(symbol VARCHAR, quantity NUMBER, price NUMBER(10,2))
  RETURNS TABLE (symbol VARCHAR, total NUMBER(10,2))
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  HANDLER = 'StockSale'
AS $$
class StockSale:
    def process(self, symbol, quantity, price):
      cost = quantity * price
      yield (symbol, cost)
$$;
Copy

Le code de l’exemple suivant appelle l’UDF prĂ©cĂ©dente, en transmettant les valeurs des colonnes symbol, quantity, et price de la table stocks_table.

SELECT stock_sale.symbol, total
  FROM stocks_table, TABLE(stock_sale(symbol, quantity, price) OVER (PARTITION BY symbol));
Copy

Renvoi d’une valeur¶

Lorsque vous renvoyez des lignes de sortie, vous pouvez utiliser soit yield soit return (mais pas les deux) pour renvoyer des tuples avec la valeur tabulaire. Si la mĂ©thode renvoie ou produit None, le traitement de la ligne en cours s’arrĂȘte.

  • Lorsque vous utilisez yield, exĂ©cutez une instruction yield distincte pour chaque ligne de sortie. Il s’agit de la meilleure pratique, car l’évaluation paresseuse qui accompagne yield permet un traitement plus efficace et peut contribuer Ă  Ă©viter les dĂ©passements de dĂ©lai d’expiration.

    Chaque Ă©lĂ©ment du tuple devient une valeur de colonne dans le rĂ©sultat renvoyĂ© par l’UDTF. L’ordre des arguments yield doit correspondre Ă  l’ordre des colonnes dĂ©clarĂ©es pour la valeur de retour dans la clause RETURNS TABLE de CREATE FUNCTION.

    Le code de l’exemple suivant renvoie des valeurs reprĂ©sentant deux rangĂ©es.

    def process(self, symbol, quantity, price):
      cost = quantity * price
      yield (symbol, cost)
      yield (symbol, cost)
    
    Copy

    Notez que, comme l’argument de rendement est un tuple, vous devez inclure une virgule de fin lorsque vous passez une seule valeur dans le tuple, comme dans l’exemple suivant.

    yield (cost,)
    
    Copy
  • En utilisant return, renvoyez un itĂ©rable avec des tuples.

    Chaque valeur d’un tuple devient une valeur de colonne dans le rĂ©sultat renvoyĂ© par l’UDTF. L’ordre des valeurs des colonnes dans un tuple doit correspondre Ă  l’ordre des colonnes dĂ©clarĂ©es pour la valeur de retour dans la clause RETURNS TABLE de CREATE FUNCTION.

    Le code de l’exemple suivant renvoie deux lignes, chacune avec deux colonnes : symbole et total.

    def process(self, symbol, quantity, price):
      cost = quantity * price
      return [(symbol, cost), (symbol, cost)]
    
    Copy

Ignorer des lignes¶

Pour ignorer une ligne d’entrĂ©e et traiter la ligne suivante (par exemple, lorsque vous validez les lignes d’entrĂ©e), faites en sorte que la mĂ©thode process renvoie l’un des Ă©lĂ©ments suivants :

  • Lorsque vous utilisez return, vous devez retourner None, une liste contenant None, ou une liste vide pour ignorer la ligne.

  • Lorsque vous utilisez yield, retournez None pour ignorer une ligne.

    Notez que si vous avez plusieurs appels à yield, tout appel aprÚs un appel qui renvoie None sera ignoré par Snowflake.

Le code de l’exemple suivant renvoie uniquement les lignes pour lesquelles number est un nombre entier positif. Si number n’est pas positif, la mĂ©thode renvoie None vide pour ignorer la ligne actuelle et poursuivre le traitement de la ligne suivante.

def process(self, number):
  if number < 1:
    yield None
  else:
    yield (number)
Copy

Traitement avec et sans état¶

Vous pouvez implémenter le gestionnaire pour traiter les lignes en tenant compte des partitions ou pour les traiter simplement ligne par ligne.

  • Dans le traitement en tenant compte des partitions, le gestionnaire inclut du code pour gĂ©rer l’état spĂ©cifique Ă  la partition. Cela comprend une mĂ©thode __init__ qui s’exĂ©cute au dĂ©but du traitement de la partition et une mĂ©thode end_partition que Snowflake invoque aprĂšs avoir traitĂ© la derniĂšre ligne de la partition. Pour plus d’informations, voir Traitement des partitions dans cette rubrique.

  • Dans le traitement tenant compte de la partition, le gestionnaire s’exĂ©cute sans Ă©tat, en ignorant les limites des partitions.

    Pour que le gestionnaire s’exĂ©cute de cette façon, il ne faut pas inclure de mĂ©thode __init__ ou end_partition.

Traitement des partitions¶

Vous pouvez traiter les partitions en entrĂ©e avec du code qui s’exĂ©cute par partition (par exemple pour gĂ©rer l’état) ainsi que du code qui s’exĂ©cute pour chaque ligne de la partition.

Note

Pour plus d’informations sur la spĂ©cification des partitions lors de l’appel d’une UDTF, reportez-vous Ă  Fonctions et partitions des tables.

Lorsqu’une requĂȘte inclut des partitions, elle agrĂšge les lignes en utilisant une valeur spĂ©cifiĂ©e, telle que la valeur d’une colonne. Les lignes agrĂ©gĂ©es que votre gestionnaire reçoit sont dites partitionnĂ©es par cette valeur. Votre code peut traiter ces partitions et leurs lignes de façon Ă  ce que le traitement de chaque partition comprenne un Ă©tat spĂ©cifique Ă  la partition.

Le code de l’exemple SQL suivant demande des informations sur les ventes d’actions. Il exĂ©cute une UDTF stock_sale_sum dont l’entrĂ©e est partitionnĂ©e par la valeur de la colonne symbol.

SELECT stock_sale_sum.symbol, total
  FROM stocks_table, TABLE(stock_sale_sum(symbol, quantity, price) OVER (PARTITION BY symbol));
Copy

Gardez Ă  l’esprit que mĂȘme lorsque les lignes entrantes sont partitionnĂ©es, votre code peut ignorer la sĂ©paration des partitions et traiter simplement les lignes. Par exemple, vous pouvez omettre le code conçu pour gĂ©rer l’état de la partition, tel que la mĂ©thode __init__ et la mĂ©thode end_partition d’une classe de gestionnaire, et n’implĂ©menter que la mĂ©thode process. Pour plus d’informations, voir Traitement avec et sans Ă©tat dans cette rubrique.

Pour traiter chaque partition en tant qu’unitĂ©, vous devez :

  • ImplĂ©mentez une mĂ©thode __init__ de classe de gestionnaire dans laquelle vous initialiserez le traitement de la partition.

    Pour plus d’informations, voir Initialisation du gestionnaire dans cette rubrique.

  • Incluez un code tenant compte des partitions lors du traitement de chaque ligne avec la mĂ©thode process.

    Pour plus d’informations sur le traitement des lignes, voir Traitement des lignes dans cette rubrique.

  • ImplĂ©mentez une mĂ©thode end_partition pour finaliser le traitement de la partition.

    Pour plus d’informations, voir Finalisation du traitement des partitions dans cette rubrique.

Les paragraphes suivants dĂ©crivent la sĂ©quence des invocations de votre gestionnaire lorsque vous avez inclus du code conçu pour ĂȘtre exĂ©cutĂ© par partition.

  1. Lorsque le traitement d’une partition commence, et avant que la premiĂšre ligne n’ait Ă©tĂ© traitĂ©e, Snowflake utilise la mĂ©thode __init__ de votre classe de gestionnaire pour crĂ©er une instance de la classe.

    Ici, vous pouvez Ă©tablir l’état de la partition spĂ©cifique. Par exemple, vous pouvez initialiser une variable d’instance pour contenir une valeur calculĂ©e Ă  partir de lignes de la partition.

  2. Pour chaque ligne de la partition, Snowflake invoque la méthode process.

    Chaque fois que la mĂ©thode s’exĂ©cute, elle peut apporter des modifications aux valeurs de l’état. Par exemple, vous pouvez demander Ă  la mĂ©thode process de mettre Ă  jour la valeur de la variable d’instance.

  3. Une fois que votre code a traité la derniÚre ligne de la partition, Snowflake invoque votre méthode end_partition.

    À partir de cette mĂ©thode, vous pouvez renvoyer des lignes de sortie contenant une valeur de niveau de partition que vous souhaitez renvoyer. Par exemple, vous pouvez renvoyer la valeur de la variable d’instance que vous avez mise Ă  jour au fur et Ă  mesure que vous traitez les lignes de la partition.

    Votre mĂ©thode end_partition ne recevra aucun argument de Snowflake, qui l’invoquera simplement aprĂšs avoir traitĂ© la derniĂšre ligne de la partition.

Finalisation du traitement des partitions¶

Vous pouvez Ă©ventuellement implĂ©menter une mĂ©thode end_partition dans votre classe de gestionnaire que Snowflake invoquera aprĂšs avoir traitĂ© toutes les lignes d’une partition. Avec cette mĂ©thode, vous pouvez exĂ©cuter le code pour une partition aprĂšs que toutes les lignes de la partition aient Ă©tĂ© traitĂ©es. Votre mĂ©thode end_partition peut produire des lignes de sortie, par exemple pour renvoyer les rĂ©sultats d’un calcul Ă  l’échelle de la partition. Pour plus d’informations, voir Traitement des partitions dans cette rubrique.

La signature de la mĂ©thode doit ĂȘtre de la forme suivante :

def end_partition(self):
Copy

Snowflake attend de l’implĂ©mentation de votre mĂ©thode end_partition ce qui suit :

  • Elle ne doit pas ĂȘtre statique.

  • Elle ne peut avoir d’autres paramĂštres que self.

  • Au lieu de renvoyer une valeur tabulaire, elle peut produire une liste vide ou None.

Note

Bien que Snowflake prenne en charge les grandes partitions avec des dĂ©lais d’expiration dĂ©finis pour les traiter avec succĂšs, les partitions particuliĂšrement grandes peuvent entraĂźner des expirations (par exemple lorsque end_partition prend trop de temps Ă  se terminer). Veuillez contacter le support Snowflake si vous avez besoin d’ajuster le seuil d’expiration pour des scĂ©narios d’utilisation spĂ©cifiques.

Exemple de traitement des partitions¶

Le code de l’exemple suivant calcule le coĂ»t total payĂ© sur l’ensemble des achats pour une action en calculant d’abord le coĂ»t par achat et en additionnant les achats (dans la mĂ©thode process). Le code renvoie le total dans la mĂ©thode end_partition.

Pour un exemple d’une UDTF qui inclut ce gestionnaire, ainsi que l’appel de l’UDTF, reportez-vous à Exemple de classe de gestionnaire.

class StockSaleSum:
  def __init__(self):
    self._cost_total = 0
    self._symbol = ""

  def process(self, symbol, quantity, price):
    self._symbol = symbol
    cost = quantity * price
    self._cost_total += cost
    yield (symbol, cost)

  def end_partition(self):
    yield (self._symbol, self._cost_total)
Copy

Lors du traitement des partitions, gardez Ă  l’esprit les points suivants :

  • Votre code peut gĂ©rer des partitions qui ne sont pas explicitement spĂ©cifiĂ©es dans un appel Ă  l’UDTF. MĂȘme lorsqu’un appel Ă  l’UDTF n’inclut pas de clause PARTITION BY, Snowflake partitionne les donnĂ©es de maniĂšre implicite.

  • Votre mĂ©thode process recevra les donnĂ©es des lignes dans l’ordre spĂ©cifiĂ© par la clause ORDER BY de la partition, le cas Ă©chĂ©ant.

Exemples¶

Utilisation d’un paquet importé¶

Vous pouvez utiliser les paquets Python qui sont inclus dans une liste de packages tiers d’Anaconda disponibles dans Snowflake. Pour spĂ©cifier ces paquets en tant que dĂ©pendances dans l’UDTF, utilisez la clause PACKAGES dans CREATE FUNCTION.

Vous pouvez dĂ©couvrir la liste des paquets inclus en exĂ©cutant la commande SQL suivante dans Snowflake :

SELECT * FROM INFORMATION_SCHEMA.PACKAGES WHERE LANGUAGE = 'python';
Copy

Pour plus d’informations, voir Utilisation de paquets tiers et CrĂ©ation d’UDFs Python.

Le code de l’exemple suivant utilise une fonction du paquet NumPy (Numerical Python) pour calculer le prix moyen par action Ă  partir d’un tableau d’achats d’actions, chacune ayant un prix par action diffĂ©rent.

CREATE OR REPLACE FUNCTION stock_sale_average(symbol VARCHAR, quantity NUMBER, price NUMBER(10,2))
  RETURNS TABLE (symbol VARCHAR, total NUMBER(10,2))
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  PACKAGES = ('numpy')
  HANDLER = 'StockSaleAverage'
AS $$
import numpy as np

class StockSaleAverage:
    def __init__(self):
      self._price_array = []
      self._quantity_total = 0
      self._symbol = ""

    def process(self, symbol, quantity, price):
      self._symbol = symbol
      self._price_array.append(float(price))
      cost = quantity * price
      yield (symbol, cost)

    def end_partition(self):
      np_array = np.array(self._price_array)
      avg = np.average(np_array)
      yield (self._symbol, avg)
$$;
Copy

Le code de l’exemple suivant appelle l’UDF prĂ©cĂ©dente, en transmettant les valeurs des colonnes symbol, quantity, et price de la table stocks_table. Pour plus d’informations sur l’appel d’une UDTF, reportez-vous Ă  ExĂ©cutez une UDF.

SELECT stock_sale_average.symbol, total
  FROM stocks_table,
  TABLE(stock_sale_average(symbol, quantity, price)
    OVER (PARTITION BY symbol));
Copy

ExĂ©cution de tĂąches simultanĂ©es Ă  l’aide de processus de tĂąches worker¶

Vous pouvez exĂ©cuter des tĂąches simultanĂ©es Ă  l’aide de processus de travail Python. Cela peut s’avĂ©rer utile lorsque vous devez exĂ©cuter des tĂąches parallĂšles qui tirent parti de plusieurs cƓurs CPU sur les nƓuds de l’entrepĂŽt.

Note

Snowflake vous recommande de ne pas utiliser le module de multitraitement intégré à Python.

Pour contourner les cas oĂč le Python Global Interpreter Lock empĂȘche une approche multitĂąche de s’étendre Ă  tous les cƓurs CPU, vous pouvez exĂ©cuter des tĂąches concurrentes Ă  l’aide de processus de travail distincts, plutĂŽt que de threads.

Vous pouvez le faire sur des entrepîts Snowflake en utilisant la classe Parallel de la bibliothùque joblib comme dans l’exemple suivant.

CREATE OR REPLACE FUNCTION joblib_multiprocessing_udtf(i INT)
  RETURNS TABLE (result INT)
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  HANDLER = 'JoblibMultiprocessing'
  PACKAGES = ('joblib')
AS $$
import joblib
from math import sqrt

class JoblibMultiprocessing:
  def process(self, i):
    pass

  def end_partition(self):
    result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
    for r in result:
      yield (r, )
$$;
Copy

Note

Le backend par défaut utilisé pour joblib.Parallel diffÚre entre les entrepÎts standards de Snowflake et les entrepÎts optimisés pour Snowpark.

  • Valeur par dĂ©faut de l’entrepĂŽt standard : threading

  • Valeur par dĂ©faut de l’entrepĂŽt optimisĂ© pour Snowpark : loky (multitraitement)

Vous pouvez remplacer le paramĂštre de backend par dĂ©faut en appelant la fonction joblib.parallel_backend comme dans l’exemple suivant.

import joblib
joblib.parallel_backend('loky')
Copy

CrĂ©ation de l’UDTF avec CREATE FUNCTION¶

Vous créez une UDTF avec SQL en utilisant la commande CREATE FUNCTION, en spécifiant le code que vous avez écrit comme gestionnaire. Pour la référence de la commande, voir CREATE FUNCTION.

Utilisez la syntaxe suivante pour créer une UDTF.

CREATE OR REPLACE FUNCTION <name> ( [ <arguments> ] )
  RETURNS TABLE ( <output_column_name> <output_column_type> [, <output_column_name> <output_column_type> ... ] )
  LANGUAGE PYTHON
  [ IMPORTS = ( '<imports>' ) ]
  RUNTIME_VERSION = 3.9
  [ PACKAGES = ( '<package_name>' [, '<package_name>' . . .] ) ]
  [ TARGET_PATH = '<stage_path_and_file_name_to_write>' ]
  HANDLER = '<handler_class>'
  [ AS '<python_code>' ]
Copy

Pour associer le code du gestionnaire que vous avez Ă©crit avec l’UDTF, vous faites ce qui suit lors de l’exĂ©cution de CREATE FUNCTION :

  • Dans RETURNS TABLE, spĂ©cifiez les colonnes de sortie dans les paires nom de colonne et type.

  • DĂ©finissez LANGUAGE sur PYTHON.

  • DĂ©finissez la valeur de la clause IMPORTS sur le chemin et le nom de la classe du gestionnaire si la classe se trouve dans un emplacement externe, par exemple sur une zone de prĂ©paration.

    Pour plus d’informations, voir CrĂ©ation d’UDFs Python.

  • DĂ©finissez RUNTIME_VERSION sur la version de l’environnement d’exĂ©cution Python requise par votre code. Les versions de Python prises en charge sont les suivantes :

    • 3,9

    • 3,10

    • 3,11

    • 3,12

  • DĂ©finissez la valeur de la clause PACKAGES sur le nom d’un ou plusieurs paquets, le cas Ă©chĂ©ant, requis par la classe du gestionnaire.

    Pour plus d’informations, voir Utilisation de paquets tiers et CrĂ©ation d’UDFs Python.

  • DĂ©finissez la valeur de la clause HANDLER comme Ă©tant le nom de la classe du gestionnaire.

    Lorsque vous associez un code de gestionnaire Python à une UDTF, vous pouvez soit inclure le code en ligne, soit y faire référence à un emplacement sur une zone de préparation Snowflake. La valeur de HANDLER est sensible à la casse et doit correspondre au nom de la classe Python.

    Pour plus d’informations, voir Les UDFs avec du code en ligne vs. les UDFs avec du code tĂ©lĂ©chargĂ© Ă  partir d’une zone de prĂ©paration.

    Important

    Pour une UDF scalaire Python, la valeur de la clause HANDLER contient le nom de la méthode.

    Pour une UDTF Python, la valeur de la clause HANDLER contient le nom de la classe, mais pas un nom de méthode.

    La raison de cette diffĂ©rence est que pour une UDF scalaire Python, le nom de la mĂ©thode de gestion est choisi par l’utilisateur et n’est donc pas connu Ă  l’avance par Snowflake, mais pour une UDTF Python, les noms des mĂ©thodes (par exemple end_partition) sont connus, car ils doivent correspondre aux noms spĂ©cifiĂ©s par Snowflake.

  • La clause AS '<python_code>' est requise si le code du gestionnaire est spĂ©cifiĂ© en ligne avec CREATE FUNCTION.