Ăcriture dâune UDTF en Python¶
Vous pouvez implĂ©menter un gestionnaire de fonction de table dĂ©finie par lâutilisateur (UDTF) en Python. Ce code de gestionnaire sâexĂ©cute lorsque lâUDTF est appelĂ©e. Cette rubrique dĂ©crit comment implĂ©menter un gestionnaire en Python et crĂ©er lâUDTF.
Une UDTF est une fonction dĂ©finie par lâutilisateur (UDF) qui renvoie des rĂ©sultats sous forme tabulaire. Pour en savoir plus sur les gestionnaires UDF implĂ©mentĂ©s en Python, voir CrĂ©ation dâUDFs Python. Pour plus dâinformations gĂ©nĂ©rales sur les UDFs, voir Vue dâensemble des fonctions dĂ©finies par lâutilisateur.
Dans le gestionnaire dâune UDTF, vous pouvez traiter les lignes dâentrĂ©e (voir Traitement des lignes dans cette rubrique). Vous pouvez Ă©galement avoir une logique qui sâexĂ©cute pour chaque partition dâentrĂ©e (voir Traitement des partitions dans cette rubrique).
Lorsque vous créez une UDTF Python, vous effectuez les opérations suivantes :
ImplĂ©mentez une classe avec des mĂ©thodes que Snowflake invoquera lorsque lâUDTF est appelĂ©e.
Pour plus de dĂ©tails, voir ImplĂ©mentation dâun gestionnaire (handler) dans cette rubrique.
CrĂ©ez lâUDTF en SQL avec la commande CREATE FUNCTION, en spĂ©cifiant votre classe comme gestionnaire. Quand vous crĂ©ez lâUDTF, vous spĂ©cifiez :
Les types de donnĂ©es des paramĂštres dâentrĂ©e de lâUDTF.
Les types de donnĂ©es des colonnes renvoyĂ©es par lâUDTF.
Le code Ă exĂ©cuter en tant que gestionnaire lorsque lâUDTF est appelĂ©e.
La langue dans laquelle le gestionnaire est implémenté.
Pour en savoir plus sur la syntaxe, voir CrĂ©ation de lâUDTF avec CREATE FUNCTION dans cette rubrique.
Vous pouvez appeler une UDF ou une UDTF comme décrit dans Exécutez une UDF.
Note
Les fonctions de table (UDTFs) ont une limite de 500 arguments dâentrĂ©e et 500 colonnes de sortie.
Snowflake prend actuellement en charge lâĂ©criture dâUDTFs Java dans les versions suivantes de Python :
3,9
3,10
3,11
3,12
Dans votre instruction CREATE FUNCTION, définissez runtime_version
comme étant la version souhaitée.
ImplĂ©mentation dâun gestionnaire (handler)¶
Vous implĂ©mentez une classe de gestionnaire pour traiter les valeurs des arguments dâUDTF dans des rĂ©sultats tabulaires et gĂ©rer les entrĂ©es partitionnĂ©es. Pour un exemple de classe de gestionnaire, voir Exemple de classe de gestionnaire dans cette rubrique.
Lorsque vous crĂ©ez lâUDTF avec CREATE FUNCTION, vous spĂ©cifiez cette classe comme gestionnaire de lâUDTF. Pour en savoir plus sur le code SQL permettant de crĂ©er la fonction, voir CrĂ©ation de lâUDTF avec CREATE FUNCTION dans cette rubrique.
Une classe de gestionnaire implĂ©mente les mĂ©thodes que Snowflake invoquera lorsque lâUDTF est appelĂ©e. Cette classe contient la logique de lâUDTF.
Méthode |
Exigence |
Description |
---|---|---|
Méthode |
Facultatif |
Initialise lâĂ©tat pour le traitement dâĂ©tat des partitions dâentrĂ©e. Pour plus dâinformations, voir Initialisation du gestionnaire dans cette rubrique. |
Méthode |
Obligatoire |
Traite chaque ligne dâentrĂ©e, en retournant une valeur tabulaire sous forme de tuples. Snowflake invoque cette mĂ©thode, en transmettant les entrĂ©es des arguments dâUDTF. Pour plus dâinformations, voir DĂ©finition dâune mĂ©thode process dans cette rubrique. |
Méthode |
Facultatif |
Finalise le traitement des partitions dâentrĂ©e, en renvoyant une valeur tabulaire sous forme de tuples. Pour plus dâinformations, voir Finalisation du traitement des partitions dans cette rubrique. |
LâĂ©mission dâune exception Ă partir de nâimporte quelle mĂ©thode de la classe du gestionnaire entraĂźne lâarrĂȘt du traitement. La requĂȘte qui a appelĂ© lâUDTF Ă©choue avec un message dâerreur.
Note
Si votre code ne rĂ©pond pas aux exigences dĂ©crites ici, la crĂ©ation ou lâexĂ©cution de lâUDTF peut Ă©chouer. Snowflake dĂ©tectera les violations lorsque lâinstruction CREATE FUNCTION sera exĂ©cutĂ©e.
Exemple de classe de gestionnaire¶
Le code de lâexemple suivant crĂ©e une UDTF dont la classe de gestionnaire traite les lignes dâune partition. La mĂ©thode process
traite chaque ligne dâentrĂ©e et renvoie une ligne contenant le coĂ»t total dâune vente dâaction. AprĂšs avoir traitĂ© les lignes de la partition, elle renvoie (Ă partir de sa mĂ©thode end_partition
) le total de toutes les ventes incluses dans la partition.
CREATE OR REPLACE FUNCTION stock_sale_sum(symbol VARCHAR, quantity NUMBER, price NUMBER(10,2))
RETURNS TABLE (symbol VARCHAR, total NUMBER(10,2))
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'StockSaleSum'
AS $$
class StockSaleSum:
def __init__(self):
self._cost_total = 0
self._symbol = ""
def process(self, symbol, quantity, price):
self._symbol = symbol
cost = quantity * price
self._cost_total += cost
yield (symbol, cost)
def end_partition(self):
yield (self._symbol, self._cost_total)
$$;
Le code de lâexemple suivant appelle lâUDF prĂ©cĂ©dente, en transmettant les valeurs des colonnes symbol
, quantity
, et price
de la table stocks_table
. Pour plus dâinformations sur lâappel dâune UDTF, reportez-vous Ă ExĂ©cutez une UDF.
SELECT stock_sale_sum.symbol, total
FROM stocks_table, TABLE(stock_sale_sum(symbol, quantity, price) OVER (PARTITION BY symbol));
Initialisation du gestionnaire¶
Vous pouvez éventuellement implémenter une méthode __init__
dans votre classe de gestionnaire que Snowflake invoquera avant que le gestionnaire ne commence à traiter les lignes. Par exemple, vous pouvez utiliser cette méthode pour établir un état spécifique à la partition pour le gestionnaire. Votre méthode __init__
peut ne pas produire de lignes de sortie.
La signature de la mĂ©thode doit ĂȘtre de la forme suivante :
def __init__(self):
Par exemple, vous pourriez vouloir :
Initialise lâĂ©tat dâune partition, puis utilise cet Ă©tat dans les mĂ©thodes
process
etend_partition
.ExĂ©cutez une initialisation de longue durĂ©e qui doit ĂȘtre effectuĂ©e une seule fois par partition plutĂŽt quâune fois par ligne.
Note
Vous pouvez également exécuter la logique une fois avant le début du traitement des partitions en incluant ce code en dehors de la classe du gestionnaire, par exemple avant la déclaration de la classe.
Pour plus dâinformations sur le traitement des partitions, voir Traitement des partitions dans cette rubrique.
Si vous utilisez une méthode __init__
nâoubliez pas que __init__
:
Ne peut prendre que
self
comme argument.Impossible de produire de lignes de sortie. Utilisez lâimplĂ©mentation de votre mĂ©thode
process
pour cela.Est invoquée une fois pour chaque partition, et avant que la méthode
process
soit invoquée.
Traitement des lignes¶
Implémentez une méthode process
que Snowflake invoquera pour chaque ligne dâentrĂ©e.
DĂ©finition dâune mĂ©thode process
¶
Définissez une méthode process
qui reçoit comme valeurs les UDTF arguments convertis Ă partir de types SQL, renvoyant des donnĂ©es que Snowflake utilisera pour crĂ©er la valeur de retour tabulaire de lâUDTF.
La signature de la mĂ©thode doit ĂȘtre de la forme suivante :
def process(self, *args):
Votre méthode process
doit :
Avoir un paramĂštre
self
.DĂ©clarer les paramĂštres de la mĂ©thode correspondant aux paramĂštres de lâUDTF.
Les noms des paramĂštres des mĂ©thodes ne doivent pas nĂ©cessairement correspondre aux noms des paramĂštres de lâUDTF, mais les paramĂštres des mĂ©thodes doivent ĂȘtre dĂ©clarĂ©s dans le mĂȘme ordre que les paramĂštres de lâUDTF.
Lorsque vous transmettez des valeurs dâarguments de lâUDTF Ă votre mĂ©thode, Snowflake convertit les valeurs des types SQL en types Python que vous utilisez dans la mĂ©thode. Pour plus dâinformations sur la maniĂšre dont Snowflake Ă©tablit une correspondance entre SQL et les types de donnĂ©es Python, voir Mappages des types de donnĂ©es SQL-Python.
Donne un ou plusieurs tuples (ou renvoie un itĂ©rable contenant des tuples), dans lequel la sĂ©quence de tuples correspond Ă la sĂ©quence de colonnes de valeurs de retour de lâUDTF.
Les Ă©lĂ©ments du tuple doivent apparaĂźtre dans le mĂȘme ordre que les colonnes de la valeur de retour UDTF sont dĂ©clarĂ©es. Pour plus dâinformations, voir Renvoi dâune valeur dans cette rubrique.
Snowflake convertira les valeurs des types Python en types SQL requis par la dĂ©claration UDTF. Pour plus dâinformations sur la maniĂšre dont Snowflake Ă©tablit une correspondance entre SQL et les types de donnĂ©es Python, voir Mappages des types de donnĂ©es SQL-Python.
Si une mĂ©thode de la classe du gestionnaire lĂšve une exception, le traitement sâarrĂȘte. La requĂȘte qui a appelĂ© lâUDTF Ă©choue avec un message dâerreur. Si la mĂ©thode process
renvoie None
, le traitement sâarrĂȘte. (La mĂ©thode end_partition
est toujours invoquĂ©e mĂȘme si la mĂ©thode process
renvoie None
).
Exemple de méthode de traitement
Lâexemple suivant prĂ©sente une classe de gestionnaire StockSale
avec une méthode process
qui traite trois arguments UDTF (symbol
, quantity
, et price
), et renvoie une seule ligne avec deux colonnes (symbol
et total
). Notez que les paramÚtres de la méthode process
sont dĂ©clarĂ©s dans le mĂȘme ordre que les paramĂštres stock_sale
. Les arguments de lâinstruction yield
de la méthode process
sont dans le mĂȘme ordre que les colonnes dĂ©clarĂ©es dans la clause stock_sale
RETURNS TABLE.
CREATE OR REPLACE FUNCTION stock_sale(symbol VARCHAR, quantity NUMBER, price NUMBER(10,2))
RETURNS TABLE (symbol VARCHAR, total NUMBER(10,2))
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'StockSale'
AS $$
class StockSale:
def process(self, symbol, quantity, price):
cost = quantity * price
yield (symbol, cost)
$$;
Le code de lâexemple suivant appelle lâUDF prĂ©cĂ©dente, en transmettant les valeurs des colonnes symbol
, quantity
, et price
de la table stocks_table
.
SELECT stock_sale.symbol, total
FROM stocks_table, TABLE(stock_sale(symbol, quantity, price) OVER (PARTITION BY symbol));
Renvoi dâune valeur¶
Lorsque vous renvoyez des lignes de sortie, vous pouvez utiliser soit yield
soit return
(mais pas les deux) pour renvoyer des tuples avec la valeur tabulaire. Si la méthode renvoie ou produit None
, le traitement de la ligne en cours sâarrĂȘte.
Lorsque vous utilisez
yield
, exécutez une instructionyield
distincte pour chaque ligne de sortie. Il sâagit de la meilleure pratique, car lâĂ©valuation paresseuse qui accompagneyield
permet un traitement plus efficace et peut contribuer Ă Ă©viter les dĂ©passements de dĂ©lai dâexpiration.Chaque Ă©lĂ©ment du tuple devient une valeur de colonne dans le rĂ©sultat renvoyĂ© par lâUDTF. Lâordre des arguments
yield
doit correspondre Ă lâordre des colonnes dĂ©clarĂ©es pour la valeur de retour dans la clause RETURNS TABLE de CREATE FUNCTION.Le code de lâexemple suivant renvoie des valeurs reprĂ©sentant deux rangĂ©es.
def process(self, symbol, quantity, price): cost = quantity * price yield (symbol, cost) yield (symbol, cost)
Notez que, comme lâargument de rendement est un tuple, vous devez inclure une virgule de fin lorsque vous passez une seule valeur dans le tuple, comme dans lâexemple suivant.
yield (cost,)
En utilisant
return
, renvoyez un itĂ©rable avec des tuples.Chaque valeur dâun tuple devient une valeur de colonne dans le rĂ©sultat renvoyĂ© par lâUDTF. Lâordre des valeurs des colonnes dans un tuple doit correspondre Ă lâordre des colonnes dĂ©clarĂ©es pour la valeur de retour dans la clause RETURNS TABLE de CREATE FUNCTION.
Le code de lâexemple suivant renvoie deux lignes, chacune avec deux colonnes : symbole et total.
def process(self, symbol, quantity, price): cost = quantity * price return [(symbol, cost), (symbol, cost)]
Ignorer des lignes¶
Pour ignorer une ligne dâentrĂ©e et traiter la ligne suivante (par exemple, lorsque vous validez les lignes dâentrĂ©e), faites en sorte que la mĂ©thode process
renvoie lâun des Ă©lĂ©ments suivants :
Lorsque vous utilisez
return
, vous devez retournerNone
, une liste contenantNone
, ou une liste vide pour ignorer la ligne.Lorsque vous utilisez
yield
, retournezNone
pour ignorer une ligne.Notez que si vous avez plusieurs appels Ă
yield
, tout appel aprĂšs un appel qui renvoieNone
sera ignoré par Snowflake.
Le code de lâexemple suivant renvoie uniquement les lignes pour lesquelles number
est un nombre entier positif. Si number
nâest pas positif, la mĂ©thode renvoie None
vide pour ignorer la ligne actuelle et poursuivre le traitement de la ligne suivante.
def process(self, number):
if number < 1:
yield None
else:
yield (number)
Traitement avec et sans état¶
Vous pouvez implémenter le gestionnaire pour traiter les lignes en tenant compte des partitions ou pour les traiter simplement ligne par ligne.
Dans le traitement en tenant compte des partitions, le gestionnaire inclut du code pour gĂ©rer lâĂ©tat spĂ©cifique Ă la partition. Cela comprend une mĂ©thode
__init__
qui sâexĂ©cute au dĂ©but du traitement de la partition et une mĂ©thodeend_partition
que Snowflake invoque aprĂšs avoir traitĂ© la derniĂšre ligne de la partition. Pour plus dâinformations, voir Traitement des partitions dans cette rubrique.Dans le traitement tenant compte de la partition, le gestionnaire sâexĂ©cute sans Ă©tat, en ignorant les limites des partitions.
Pour que le gestionnaire sâexĂ©cute de cette façon, il ne faut pas inclure de mĂ©thode
__init__
ouend_partition
.
Traitement des partitions¶
Vous pouvez traiter les partitions en entrĂ©e avec du code qui sâexĂ©cute par partition (par exemple pour gĂ©rer lâĂ©tat) ainsi que du code qui sâexĂ©cute pour chaque ligne de la partition.
Note
Pour plus dâinformations sur la spĂ©cification des partitions lors de lâappel dâune UDTF, reportez-vous Ă Fonctions et partitions des tables.
Lorsquâune requĂȘte inclut des partitions, elle agrĂšge les lignes en utilisant une valeur spĂ©cifiĂ©e, telle que la valeur dâune colonne. Les lignes agrĂ©gĂ©es que votre gestionnaire reçoit sont dites partitionnĂ©es par cette valeur. Votre code peut traiter ces partitions et leurs lignes de façon Ă ce que le traitement de chaque partition comprenne un Ă©tat spĂ©cifique Ă la partition.
Le code de lâexemple SQL suivant demande des informations sur les ventes dâactions. Il exĂ©cute une UDTF stock_sale_sum
dont lâentrĂ©e est partitionnĂ©e par la valeur de la colonne symbol
.
SELECT stock_sale_sum.symbol, total
FROM stocks_table, TABLE(stock_sale_sum(symbol, quantity, price) OVER (PARTITION BY symbol));
Gardez Ă lâesprit que mĂȘme lorsque les lignes entrantes sont partitionnĂ©es, votre code peut ignorer la sĂ©paration des partitions et traiter simplement les lignes. Par exemple, vous pouvez omettre le code conçu pour gĂ©rer lâĂ©tat de la partition, tel que la mĂ©thode __init__
et la méthode end_partition
dâune classe de gestionnaire, et nâimplĂ©menter que la mĂ©thode process
. Pour plus dâinformations, voir Traitement avec et sans Ă©tat dans cette rubrique.
Pour traiter chaque partition en tant quâunitĂ©, vous devez :
Implémentez une méthode
__init__
de classe de gestionnaire dans laquelle vous initialiserez le traitement de la partition.Pour plus dâinformations, voir Initialisation du gestionnaire dans cette rubrique.
Incluez un code tenant compte des partitions lors du traitement de chaque ligne avec la méthode
process
.Pour plus dâinformations sur le traitement des lignes, voir Traitement des lignes dans cette rubrique.
Implémentez une méthode
end_partition
pour finaliser le traitement de la partition.Pour plus dâinformations, voir Finalisation du traitement des partitions dans cette rubrique.
Les paragraphes suivants dĂ©crivent la sĂ©quence des invocations de votre gestionnaire lorsque vous avez inclus du code conçu pour ĂȘtre exĂ©cutĂ© par partition.
Lorsque le traitement dâune partition commence, et avant que la premiĂšre ligne nâait Ă©tĂ© traitĂ©e, Snowflake utilise la mĂ©thode
__init__
de votre classe de gestionnaire pour crĂ©er une instance de la classe.Ici, vous pouvez Ă©tablir lâĂ©tat de la partition spĂ©cifique. Par exemple, vous pouvez initialiser une variable dâinstance pour contenir une valeur calculĂ©e Ă partir de lignes de la partition.
Pour chaque ligne de la partition, Snowflake invoque la méthode
process
.Chaque fois que la mĂ©thode sâexĂ©cute, elle peut apporter des modifications aux valeurs de lâĂ©tat. Par exemple, vous pouvez demander Ă la mĂ©thode
process
de mettre Ă jour la valeur de la variable dâinstance.Une fois que votre code a traitĂ© la derniĂšre ligne de la partition, Snowflake invoque votre mĂ©thode
end_partition
.Ă partir de cette mĂ©thode, vous pouvez renvoyer des lignes de sortie contenant une valeur de niveau de partition que vous souhaitez renvoyer. Par exemple, vous pouvez renvoyer la valeur de la variable dâinstance que vous avez mise Ă jour au fur et Ă mesure que vous traitez les lignes de la partition.
Votre méthode
end_partition
ne recevra aucun argument de Snowflake, qui lâinvoquera simplement aprĂšs avoir traitĂ© la derniĂšre ligne de la partition.
Finalisation du traitement des partitions¶
Vous pouvez éventuellement implémenter une méthode end_partition
dans votre classe de gestionnaire que Snowflake invoquera aprĂšs avoir traitĂ© toutes les lignes dâune partition. Avec cette mĂ©thode, vous pouvez exĂ©cuter le code pour une partition aprĂšs que toutes les lignes de la partition aient Ă©tĂ© traitĂ©es. Votre mĂ©thode end_partition
peut produire des lignes de sortie, par exemple pour renvoyer les rĂ©sultats dâun calcul Ă lâĂ©chelle de la partition. Pour plus dâinformations, voir Traitement des partitions dans cette rubrique.
La signature de la mĂ©thode doit ĂȘtre de la forme suivante :
def end_partition(self):
Snowflake attend de lâimplĂ©mentation de votre mĂ©thode end_partition
ce qui suit :
Elle ne doit pas ĂȘtre statique.
Elle ne peut avoir dâautres paramĂštres que
self
.Au lieu de renvoyer une valeur tabulaire, elle peut produire une liste vide ou
None
.
Note
Bien que Snowflake prenne en charge les grandes partitions avec des dĂ©lais dâexpiration dĂ©finis pour les traiter avec succĂšs, les partitions particuliĂšrement grandes peuvent entraĂźner des expirations (par exemple lorsque end_partition
prend trop de temps Ă se terminer). Veuillez contacter le support Snowflake si vous avez besoin dâajuster le seuil dâexpiration pour des scĂ©narios dâutilisation spĂ©cifiques.
Exemple de traitement des partitions¶
Le code de lâexemple suivant calcule le coĂ»t total payĂ© sur lâensemble des achats pour une action en calculant dâabord le coĂ»t par achat et en additionnant les achats (dans la mĂ©thode process
). Le code renvoie le total dans la méthode end_partition
.
Pour un exemple dâune UDTF qui inclut ce gestionnaire, ainsi que lâappel de lâUDTF, reportez-vous Ă Exemple de classe de gestionnaire.
class StockSaleSum:
def __init__(self):
self._cost_total = 0
self._symbol = ""
def process(self, symbol, quantity, price):
self._symbol = symbol
cost = quantity * price
self._cost_total += cost
yield (symbol, cost)
def end_partition(self):
yield (self._symbol, self._cost_total)
Lors du traitement des partitions, gardez Ă lâesprit les points suivants :
Votre code peut gĂ©rer des partitions qui ne sont pas explicitement spĂ©cifiĂ©es dans un appel Ă lâUDTF. MĂȘme lorsquâun appel Ă lâUDTF nâinclut pas de clause PARTITION BY, Snowflake partitionne les donnĂ©es de maniĂšre implicite.
Votre méthode
process
recevra les donnĂ©es des lignes dans lâordre spĂ©cifiĂ© par la clause ORDER BY de la partition, le cas Ă©chĂ©ant.
Exemples¶
Utilisation dâun paquet importé¶
Vous pouvez utiliser les paquets Python qui sont inclus dans une liste de packages tiers dâAnaconda disponibles dans Snowflake. Pour spĂ©cifier ces paquets en tant que dĂ©pendances dans lâUDTF, utilisez la clause PACKAGES dans CREATE FUNCTION.
Vous pouvez découvrir la liste des paquets inclus en exécutant la commande SQL suivante dans Snowflake :
SELECT * FROM INFORMATION_SCHEMA.PACKAGES WHERE LANGUAGE = 'python';
Pour plus dâinformations, voir Utilisation de paquets tiers et CrĂ©ation dâUDFs Python.
Le code de lâexemple suivant utilise une fonction du paquet NumPy (Numerical Python) pour calculer le prix moyen par action Ă partir dâun tableau dâachats dâactions, chacune ayant un prix par action diffĂ©rent.
CREATE OR REPLACE FUNCTION stock_sale_average(symbol VARCHAR, quantity NUMBER, price NUMBER(10,2))
RETURNS TABLE (symbol VARCHAR, total NUMBER(10,2))
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('numpy')
HANDLER = 'StockSaleAverage'
AS $$
import numpy as np
class StockSaleAverage:
def __init__(self):
self._price_array = []
self._quantity_total = 0
self._symbol = ""
def process(self, symbol, quantity, price):
self._symbol = symbol
self._price_array.append(float(price))
cost = quantity * price
yield (symbol, cost)
def end_partition(self):
np_array = np.array(self._price_array)
avg = np.average(np_array)
yield (self._symbol, avg)
$$;
Le code de lâexemple suivant appelle lâUDF prĂ©cĂ©dente, en transmettant les valeurs des colonnes symbol
, quantity
, et price
de la table stocks_table
. Pour plus dâinformations sur lâappel dâune UDTF, reportez-vous Ă ExĂ©cutez une UDF.
SELECT stock_sale_average.symbol, total
FROM stocks_table,
TABLE(stock_sale_average(symbol, quantity, price)
OVER (PARTITION BY symbol));
ExĂ©cution de tĂąches simultanĂ©es Ă lâaide de processus de tĂąches worker¶
Vous pouvez exĂ©cuter des tĂąches simultanĂ©es Ă lâaide de processus de travail Python. Cela peut sâavĂ©rer utile lorsque vous devez exĂ©cuter des tĂąches parallĂšles qui tirent parti de plusieurs cĆurs CPU sur les nĆuds de lâentrepĂŽt.
Note
Snowflake vous recommande de ne pas utiliser le module de multitraitement intégré à Python.
Pour contourner les cas oĂč le Python Global Interpreter Lock empĂȘche une approche multitĂąche de sâĂ©tendre Ă tous les cĆurs CPU, vous pouvez exĂ©cuter des tĂąches concurrentes Ă lâaide de processus de travail distincts, plutĂŽt que de threads.
Vous pouvez le faire sur des entrepĂŽts Snowflake en utilisant la classe Parallel
de la bibliothĂšque joblib
comme dans lâexemple suivant.
CREATE OR REPLACE FUNCTION joblib_multiprocessing_udtf(i INT)
RETURNS TABLE (result INT)
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'JoblibMultiprocessing'
PACKAGES = ('joblib')
AS $$
import joblib
from math import sqrt
class JoblibMultiprocessing:
def process(self, i):
pass
def end_partition(self):
result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
for r in result:
yield (r, )
$$;
Note
Le backend par défaut utilisé pour joblib.Parallel
diffÚre entre les entrepÎts standards de Snowflake et les entrepÎts optimisés pour Snowpark.
Valeur par dĂ©faut de lâentrepĂŽt standard :
threading
Valeur par dĂ©faut de lâentrepĂŽt optimisĂ© pour Snowpark :
loky
(multitraitement)
Vous pouvez remplacer le paramÚtre de backend par défaut en appelant la fonction joblib.parallel_backend
comme dans lâexemple suivant.
import joblib
joblib.parallel_backend('loky')
CrĂ©ation de lâUDTF avec CREATE FUNCTION
¶
Vous créez une UDTF avec SQL en utilisant la commande CREATE FUNCTION, en spécifiant le code que vous avez écrit comme gestionnaire. Pour la référence de la commande, voir CREATE FUNCTION.
Utilisez la syntaxe suivante pour créer une UDTF.
CREATE OR REPLACE FUNCTION <name> ( [ <arguments> ] )
RETURNS TABLE ( <output_column_name> <output_column_type> [, <output_column_name> <output_column_type> ... ] )
LANGUAGE PYTHON
[ IMPORTS = ( '<imports>' ) ]
RUNTIME_VERSION = 3.9
[ PACKAGES = ( '<package_name>' [, '<package_name>' . . .] ) ]
[ TARGET_PATH = '<stage_path_and_file_name_to_write>' ]
HANDLER = '<handler_class>'
[ AS '<python_code>' ]
Pour associer le code du gestionnaire que vous avez Ă©crit avec lâUDTF, vous faites ce qui suit lors de lâexĂ©cution de CREATE FUNCTION :
Dans RETURNS TABLE, spécifiez les colonnes de sortie dans les paires nom de colonne et type.
Définissez LANGUAGE sur PYTHON.
Définissez la valeur de la clause IMPORTS sur le chemin et le nom de la classe du gestionnaire si la classe se trouve dans un emplacement externe, par exemple sur une zone de préparation.
Pour plus dâinformations, voir CrĂ©ation dâUDFs Python.
DĂ©finissez RUNTIME_VERSION sur la version de lâenvironnement dâexĂ©cution Python requise par votre code. Les versions de Python prises en charge sont les suivantes :
3,9
3,10
3,11
3,12
DĂ©finissez la valeur de la clause PACKAGES sur le nom dâun ou plusieurs paquets, le cas Ă©chĂ©ant, requis par la classe du gestionnaire.
Pour plus dâinformations, voir Utilisation de paquets tiers et CrĂ©ation dâUDFs Python.
Définissez la valeur de la clause HANDLER comme étant le nom de la classe du gestionnaire.
Lorsque vous associez un code de gestionnaire Python à une UDTF, vous pouvez soit inclure le code en ligne, soit y faire référence à un emplacement sur une zone de préparation Snowflake. La valeur de HANDLER est sensible à la casse et doit correspondre au nom de la classe Python.
Pour plus dâinformations, voir Les UDFs avec du code en ligne vs. les UDFs avec du code tĂ©lĂ©chargĂ© Ă partir dâune zone de prĂ©paration.
Important
Pour une UDF scalaire Python, la valeur de la clause HANDLER contient le nom de la méthode.
Pour une UDTF Python, la valeur de la clause HANDLER contient le nom de la classe, mais pas un nom de méthode.
La raison de cette diffĂ©rence est que pour une UDF scalaire Python, le nom de la mĂ©thode de gestion est choisi par lâutilisateur et nâest donc pas connu Ă lâavance par Snowflake, mais pour une UDTF Python, les noms des mĂ©thodes (par exemple
end_partition
) sont connus, car ils doivent correspondre aux noms spécifiés par Snowflake.La clause
AS '<python_code>'
est requise si le code du gestionnaire est spécifié en ligne avec CREATE FUNCTION.