UDTFs Python vectorisées¶
Cette rubrique présente les UDTFs Python vectorisées.
Dans ce chapitre :
Vue dâensemble¶
Les UDTFs de Python (fonctions de table dĂ©finies par lâutilisateur) permettent dâopĂ©rer sur des lignes par lots.
Snowflake prend en charge deux types dâUDTFs vectorisĂ©es :
Les UDTFs avec une méthode vectorisée
end_partition
Les UDTFs avec une méthode vectorisée
process
Vous devez choisir un type, car une UDTF ne peut pas avoir à la fois une méthode process
vectorisée et une méthode end_partition
vectorisée.
Les UDTFs avec une méthode end_partition vectorisée¶
UDTFs with a vectorized end_partition
method enable seamless partition-by-partition processing by operating on
partitions as pandas DataFrames
and returning results as
pandas DataFrames
or lists of pandas arrays
or pandas Series.
This facilitates integration with libraries that operate on pandas DataFrames or pandas arrays.
Utilisez une méthode end_partition
vectorisée pour les tùches suivantes :
Traiter vos données partition par partition plutÎt que ligne par ligne.
Renvoyer plusieurs lignes ou colonnes pour chaque partition.
Utiliser des bibliothĂšques qui opĂšrent sur des DataFrames pandas pour lâanalyse des donnĂ©es.
UDTFs Ă lâaide dâune mĂ©thode de processus vectorisé¶
Les UDTFs avec une méthode process
vectorisĂ©e permettent dâopĂ©rer sur des lignes par lots, en supposant que lâopĂ©ration effectue un mappage de 1 Ă 1. En dâautres termes, la mĂ©thode renvoie une ligne de sortie pour chaque ligne dâentrĂ©e. Le nombre de colonnes nâest pas limitĂ©.
Utilisez une méthode process
vectorisée pour les tùches suivantes :
Appliquer une transformation 1 pour 1 avec un résultat multi-colonnes par lots.
Utiliser une bibliothÚque qui nécessite
pandas.DataFrame
.Traiter des lignes par lots, sans partitionnement explicite.
Tirer parti de la fonction to_pandas() API pour transformer le rĂ©sultat de la requĂȘte directement en DataFrame pandas.
Conditions préalables¶
La bibliothÚque Snowpark pour Python version 1.14.0 ou ultérieure est requise.
Créer une UDTF avec une méthode end_partition vectorisée¶
Si vous le souhaitez, définissez une classe de gestionnaire avec une méthode
__init__
qui sera invoquée avant le traitement de chaque partition.Remarque : ne définissez pas de méthode
process
.Définissez une méthode
end_partition
qui prend en argument DataFrame et renvoie ou produit unpandas.DataFrame
ou un tuple depandas.Series
oupandas.arrays
oĂč chaque tableau est une colonne.Les types de colonne du rĂ©sultat doivent correspondre aux types de colonne de la dĂ©finition dâune UDTF.
Pour marquer la méthode
end_partition
comme étant vectorisée, utilisez le décorateur@vectorized
ou lâattribut de fonction_sf_vectorized_input
.Pour plus dâinformations, consultez UDFs vectorisĂ©es Python. Le dĂ©corateur
@vectorized
ne peut ĂȘtre utilisĂ© que lorsque lâUDTF Python est exĂ©cutĂ©e dans Snowflake, par exemple, lors de lâutilisation dâune feuille de calcul SQL. Lorsque lâexĂ©cution se fait Ă lâaide du client ou dâune feuille de calcul Python, vous devez utiliser lâattribut function.
Note
Les noms de colonne par dĂ©faut pour le DataFrame dâentrĂ©e dans une UDTF avec une end_partition
vectorisĂ©e correspondent Ă la signature de la fonction SQL. Les noms des colonnes suivent les exigences de lâidentificateur SQL. Ă savoir, si un identificateur nâest pas entre guillemets, il sera mis en majuscules, et sâil est entre guillemets, il sera conservĂ© tel quel.
Le bloc de code suivant est un exemple de crĂ©ation dâune UDTF avec une mĂ©thode end_partition
vectorisĂ©e, Ă lâaide du dĂ©corateur @vectorized
:
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
# initialize a state
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
Le bloc de code suivant est un exemple de crĂ©ation dâune UDTF avec une mĂ©thode end_partition
vectorisĂ©e, Ă lâaide de lâattribut de fonction :
import pandas
class handler:
def __init__(self):
# initialize a state
def end_partition(self, df):
# process the DataFrame
return result_df
handler.end_partition._sf_vectorized_input = pandas.DataFrame
Note
Une UDTF avec une méthode end_partition
vectorisĂ©e doit ĂȘtre appelĂ©e avec une clause PARTITION BY pour construire les partitions.
Pour appeler lâUDTF avec toutes les donnĂ©es dans la mĂȘme partition :
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY 1));
Pour appeler lâUDTF avec les donnĂ©es partitionnĂ©es par la colonne x :
SELECT * FROM table(udtf(x,y,z) OVER (PARTITION BY x));
Exemple : collecte de lignes Ă lâaide dâune UDTF classique ou dâune UDTF avec une mĂ©thode end_partition vectorisĂ©e¶
Collecte de lignes Ă lâaide dâune UDTF :
import pandas
class handler:
def __init__(self):
self.rows = []
def process(self, *row):
self.rows.append(row)
def end_partition(self):
df = pandas.DataFrame(self.rows)
# process the DataFrame
return result_df
Collecte de lignes Ă lâaide dâune UDTF avec une mĂ©thode end_partition
vectorisée :
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
self.rows = []
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
Exemple : calculez la statistique récapitulative pour chaque colonne de la partition¶
Voici un exemple de calcul de la statistique rĂ©capitulative pour chaque colonne de la partition Ă lâaide de la mĂ©thode describe()
pandas.
Créez une table et générez trois partitions de cinq lignes chacune :
CREATE OR REPLACE TABLE test_values(id VARCHAR, col1 FLOAT, col2 FLOAT, col3 FLOAT, col4 FLOAT, col5 FLOAT); -- generate 3 partitions of 5 rows each INSERT INTO test_values SELECT 'x', UNIFORM(1.5,1000.5,RANDOM(1))::FLOAT col1, UNIFORM(1.5,1000.5,RANDOM(2))::FLOAT col2, UNIFORM(1.5,1000.5,RANDOM(3))::FLOAT col3, UNIFORM(1.5,1000.5,RANDOM(4))::FLOAT col4, UNIFORM(1.5,1000.5,RANDOM(5))::FLOAT col5 FROM TABLE(GENERATOR(ROWCOUNT => 5)); INSERT INTO test_values SELECT 'y', UNIFORM(1.5,1000.5,RANDOM(10))::FLOAT col1, UNIFORM(1.5,1000.5,RANDOM(20))::FLOAT col2, UNIFORM(1.5,1000.5,RANDOM(30))::FLOAT col3, UNIFORM(1.5,1000.5,RANDOM(40))::FLOAT col4, UNIFORM(1.5,1000.5,RANDOM(50))::FLOAT col5 FROM TABLE(GENERATOR(ROWCOUNT => 5)); INSERT INTO test_values SELECT 'z', UNIFORM(1.5,1000.5,RANDOM(100))::FLOAT col1, UNIFORM(1.5,1000.5,RANDOM(200))::FLOAT col2, UNIFORM(1.5,1000.5,RANDOM(300))::FLOAT col3, UNIFORM(1.5,1000.5,RANDOM(400))::FLOAT col4, UNIFORM(1.5,1000.5,RANDOM(500))::FLOAT col5 FROM TABLE(GENERATOR(ROWCOUNT => 5));
Jetez un coup dâĆil aux donnĂ©es :
SELECT * FROM test_values;
----------------------------------------------------- |"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" | ----------------------------------------------------- |x |8.0 |99.4 |714.6 |168.7 |397.2 | |x |106.4 |237.1 |971.7 |828.4 |988.2 | |x |741.3 |207.9 |32.6 |640.6 |63.2 | |x |541.3 |828.6 |844.9 |77.3 |403.1 | |x |4.3 |723.3 |924.3 |282.5 |158.1 | |y |976.1 |562.4 |968.7 |934.3 |977.3 | |y |390.0 |244.3 |952.6 |101.7 |24.9 | |y |599.7 |191.8 |90.2 |788.2 |761.2 | |y |589.5 |201.0 |863.4 |415.1 |696.1 | |y |46.7 |659.7 |571.1 |938.0 |513.7 | |z |313.9 |188.5 |964.6 |435.4 |519.6 | |z |328.3 |643.1 |766.4 |148.1 |596.4 | |z |929.0 |255.4 |915.9 |857.2 |425.5 | |z |612.8 |816.4 |220.2 |879.5 |331.4 | |z |487.1 |704.5 |471.5 |378.9 |481.2 | -----------------------------------------------------
Créez la fonction :
CREATE OR REPLACE FUNCTION summary_stats(id VARCHAR, col1 FLOAT, col2 FLOAT, col3 FLOAT, col4 FLOAT, col5 FLOAT) RETURNS TABLE (column_name VARCHAR, count INT, mean FLOAT, std FLOAT, min FLOAT, q1 FLOAT, median FLOAT, q3 FLOAT, max FLOAT) LANGUAGE PYTHON RUNTIME_VERSION = 3.9 PACKAGES = ('pandas') HANDLER = 'handler' AS $$ from _snowflake import vectorized import pandas class handler: @vectorized(input=pandas.DataFrame) def end_partition(self, df): # using describe function to get the summary statistics result = df.describe().transpose() # add a column at the beginning for column ids result.insert(loc=0, column='column_name', value=['col1', 'col2', 'col3', 'col4', 'col5']) return result $$;
Effectuez au choix une de ces étapes :
Appelez la fonction et partitionnez par
id
:-- partition by id SELECT * FROM test_values, TABLE(summary_stats(id, col1, col2, col3, col4, col5) OVER (PARTITION BY id)) ORDER BY id, column_name;
-------------------------------------------------------------------------------------------------------------------------------------------------------------------- |"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" | -------------------------------------------------------------------------------------------------------------------------------------------------------------------- |x |NULL |NULL |NULL |NULL |NULL |col1 |5 |280.25999999999993 |339.5609267863427 |4.3 |8.0 |106.4 |541.3 |741.3 | |x |NULL |NULL |NULL |NULL |NULL |col2 |5 |419.25999999999993 |331.72476995244114 |99.4 |207.9 |237.1 |723.3 |828.6 | |x |NULL |NULL |NULL |NULL |NULL |col3 |5 |697.62 |384.2964311569911 |32.6 |714.6 |844.9 |924.3 |971.7 | |x |NULL |NULL |NULL |NULL |NULL |col4 |5 |399.5 |321.2689294033894 |77.3 |168.7 |282.5 |640.6 |828.4 | |x |NULL |NULL |NULL |NULL |NULL |col5 |5 |401.96000000000004 |359.83584173897964 |63.2 |158.1 |397.2 |403.1 |988.2 | |y |NULL |NULL |NULL |NULL |NULL |col1 |5 |520.4 |339.16133329139984 |46.7 |390.0 |589.5 |599.7 |976.1 | |y |NULL |NULL |NULL |NULL |NULL |col2 |5 |371.84 |221.94799616126298 |191.8 |201.0 |244.3 |562.4 |659.7 | |y |NULL |NULL |NULL |NULL |NULL |col3 |5 |689.2 |371.01012789410476 |90.2 |571.1 |863.4 |952.6 |968.7 | |y |NULL |NULL |NULL |NULL |NULL |col4 |5 |635.46 |366.6140927460372 |101.7 |415.1 |788.2 |934.3 |938.0 | |y |NULL |NULL |NULL |NULL |NULL |col5 |5 |594.64 |359.0334218425911 |24.9 |513.7 |696.1 |761.2 |977.3 | |z |NULL |NULL |NULL |NULL |NULL |col1 |5 |534.22 |252.58182238633088 |313.9 |328.3 |487.1 |612.8 |929.0 | |z |NULL |NULL |NULL |NULL |NULL |col2 |5 |521.58 |281.4870103574941 |188.5 |255.4 |643.1 |704.5 |816.4 | |z |NULL |NULL |NULL |NULL |NULL |col3 |5 |667.72 |315.53336907528495 |220.2 |471.5 |766.4 |915.9 |964.6 | |z |NULL |NULL |NULL |NULL |NULL |col4 |5 |539.8199999999999 |318.73025742781306 |148.1 |378.9 |435.4 |857.2 |879.5 | |z |NULL |NULL |NULL |NULL |NULL |col5 |5 |470.82 |99.68626786072393 |331.4 |425.5 |481.2 |519.6 |596.4 | --------------------------------------------------------------------------------------------------------------------------------------------------------------------
Appelez la fonction et traitez lâensemble de la table comme une seule partition :
-- treat the whole table as one partition SELECT * FROM test_values, TABLE(summary_stats(id, col1, col2, col3, col4, col5) OVER (PARTITION BY 1)) ORDER BY id, column_name;
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |"ID" |"COL1" |"COL2" |"COL3" |"COL4" |"COL5" |"COLUMN_NAME" |"COUNT" |"MEAN" |"STD" |"MIN" |"Q1" |"MEDIAN" |"Q3" |"MAX" | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |NULL |NULL |NULL |NULL |NULL |NULL |col1 |15 |444.96 |314.01110034974425 |4.3 |210.14999999999998 |487.1 |606.25 |976.1 | |NULL |NULL |NULL |NULL |NULL |NULL |col2 |15 |437.56 |268.95505944302295 |99.4 |204.45 |255.4 |682.1 |828.6 | |NULL |NULL |NULL |NULL |NULL |NULL |col3 |15 |684.8466666666667 |331.87254839915937 |32.6 |521.3 |844.9 |938.45 |971.7 | |NULL |NULL |NULL |NULL |NULL |NULL |col4 |15 |524.9266666666666 |327.074780585783 |77.3 |225.6 |435.4 |842.8 |938.0 | |NULL |NULL |NULL |NULL |NULL |NULL |col5 |15 |489.14 |288.9176669671038 |24.9 |364.29999999999995 |481.2 |646.25 |988.2 | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
CrĂ©er une UDTF Ă lâaide dâune mĂ©thode de processus vectorisé¶
Définissez une classe de gestionnaire (handler), similaire aux UDTFs classiques, avec des méthodes
__init__
etend_partition
facultatives.Définissez une méthode
process
qui prend un argument DataFrame et renvoie unpandas.DataFrame
ou un tuple depandas.Series
oupandas.arrays
oĂč chaque tableau est une colonne.Les types de colonne du rĂ©sultat doivent correspondre aux types de colonne de la dĂ©finition dâune UDTF. Le rĂ©sultat renvoyĂ© doit ĂȘtre exactement un DataFrame ou un tuple. Cela diffĂšre dâune mĂ©thode
end_partition
vectorisĂ©e oĂč vous pouvez cĂ©der ou renvoyer une liste.Pour marquer la mĂ©thode
process
comme étant vectorisée, utilisez le décorateur@vectorized
ou lâattribut de fonction_sf_vectorized_input
.Pour plus dâinformations, consultez UDFs vectorisĂ©es Python. Le dĂ©corateur
@vectorized
ne peut ĂȘtre utilisĂ© que lorsque lâUDTF Python est exĂ©cutĂ©e dans Snowflake, par exemple, lors de lâutilisation dâune feuille de calcul SQL. Lorsque lâexĂ©cution se fait Ă lâaide du client ou dâune feuille de calcul Python, vous devez utiliser lâattribut function.En option : si votre fonction de gestionnaire Python dĂ©passe la limite de temps dâexĂ©cution, dĂ©finissez une taille de lot cible.
Note
Les noms de colonne par dĂ©faut pour le DataFrame dâentrĂ©e dans une UDTF avec une process
vectorisĂ©e correspondent Ă la signature de la fonction SQL. Les noms des colonnes suivent les exigences de lâidentificateur SQL. Ă savoir, si un identificateur nâest pas entre guillemets, il sera mis en majuscules, et sâil est entre guillemets, il sera conservĂ© tel quel.
Le gestionnaire (handler) dâune UDTF avec une mĂ©thode process
vectorisĂ©e peut ĂȘtre mis en Ćuvre pour traiter des lots en tenant compte de la partition ou pour les traiter simplement lot par lot. Pour plus dâinformations, voir Traitement avec et sans Ă©tat.
Exemple : utiliser une UDTF avec une méthode de traitement vectorisée pour appliquer un codage à chaud¶
Utilisez une UDTF avec une méthode process
vectorisée pour appliquer un codage à chaud sur une table de dix catégories :
import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.types import PandasDataFrame
class one_hot_encode:
def process(self, df: PandasDataFrame[str]) -> PandasDataFrame[int,int,int,int,int,int,int,int,int,int]:
return pd.get_dummies(df)
process._sf_vectorized_input = pd.DataFrame
one_hot_encode_udtf = session.udtf.register(
one_hot_encode,
output_schema=["categ0", "categ1", "categ2", "categ3", "categ4", "categ5", "categ6", "categ7", "categ8", "categ9"],
input_names=['"categ"']
)
df_table = session.table("categories")
df_table.show()
Exemple de résultat :
-----------
|"CATEG" |
-----------
|categ1 |
|categ6 |
|categ8 |
|categ5 |
|categ7 |
|categ5 |
|categ1 |
|categ2 |
|categ2 |
|categ4 |
-----------
PrĂ©parez lâimpression de la table :
res = df_table.select("categ", one_hot_encode_udtf("categ")).to_pandas()
print(res.head())
Exemple de résultat :
CATEG CATEG0 CATEG1 CATEG2 CATEG3 CATEG4 CATEG5 CATEG6 CATEG7 CATEG8 CATEG9
0 categ0 1 0 0 0 0 0 0 0 0 0
1 categ0 1 0 0 0 0 0 0 0 0 0
2 categ5 0 0 0 0 0 1 0 0 0 0
3 categ3 0 0 0 1 0 0 0 0 0 0
4 categ8 0 0 0 0 0 0 0 0 1 0
Vous pouvez aussi obtenir le mĂȘme rĂ©sultat avec une UDF vectorisĂ©e, bien que cela soit moins pratique. Vous devez empaqueter les rĂ©sultats dans une colonne, puis dĂ©paqueter la colonne pour restaurer les rĂ©sultats dans un DataFrame pandas utilisable.
Exemples dâutilisation dâune UDF vectorisĂ©e :
def one_hot_encode(df: PandasSeries[str]) -> PandasSeries[Variant]:
return pd.get_dummies(df).to_dict('records')
one_hot_encode._sf_vectorized_input = pd.DataFrame
one_hot_encode_udf = session.udf.register(
one_hot_encode,
output_schema=["encoding"],
)
df_table = session.table("categories")
df_table.show()
res = df_table.select(one_hot_encode_udf("categ")).to_df("encoding").to_pandas()
print(res.head())
0 {\n "categ0": false,\n "categ1": false,\n "...
1 {\n "categ0": false,\n "categ1": true,\n "c...
2 {\n "categ0": false,\n "categ1": false,\n "...
3 {\n "categ0": false,\n "categ1": false,\n "...
4 {\n "categ0": true,\n "categ1": false,\n "c...
Prise en charge du type¶
Les UDTFs vectorisĂ©es prennent en charge les mĂȘmes types SQL que les UDFs vectorisĂ©es. Cependant, pour les UDTFs vectorisĂ©es, les arguments SQL NUMBER
avec une Ă©chelle de 0 qui tiennent tous dans un type dâentier de 64 bits ou plus petit seront toujours mappĂ©s avec Int16
, Int32
, ou Int64
. En dâautres termes, contrairement aux UDFs scalaires, si lâargument dâune UDTF ne peut pas devenir null, il ne sera pas converti en int16
, int32
ou int64
.
Pour afficher une table montrant comment les types SQL sont mappés aux types Pandas, consultez le tableau de prise en charge des types dans la rubrique sur les UDFs vectorisées Python.
Meilleures pratiques¶
Si un scalaire doit ĂȘtre renvoyĂ© avec chaque ligne, crĂ©ez une liste de valeurs rĂ©pĂ©tĂ©es au lieu de dĂ©compresser le tableau
numpy
pour créer des tuples. Par exemple, pour un résultat de deux colonnes, au lieu de :return tuple(map(lambda n: (scalar_value, n[0], n[1]), results))
Utilisez ceci :
return tuple([scalar_value] * len(results), results[:, 0], results[:, 1])
Pour améliorer les performances, décompressez les données semi-structurées en colonnes.
Par exemple, si vous avez une colonne de variante,
obj
, avec les élémentsx(int)
,y(float)
etz(string)
, alors au lieu de définir une UDTF avec une signature comme celle-ci, appelez-la avecvec_udtf(obj)
:CREATE FUNCTION vec_udtf(variant OBJ)
DĂ©finissez lâUDTF avec une signature comme celle-ci, et appelez-la en utilisant
vec_udtf(obj:x, obj:y, obj:z)
:CREATE FUNCTION vec_udtf(a INTEGER, b FLOAT, c STRING)
Par défaut, Snowflake code les entrées dans des types pandas qui prennent en charge les valeurs NULL (par exemple, Int64). Si vous utilisez une bibliothÚque qui nécessite un type primitif (tel que
numpy
) et que votre entrĂ©e nâa pas de valeurs NULL, vous devez convertir la colonne en type primitif avant dâutiliser la bibliothĂšque. Par exemple :input_df['y'] = input_df['y'].astype("int64")
Pour plus dâinformations, voir Prise en charge du type.
Lors de lâutilisation dâUDTFs avec une mĂ©thode
end_partition
vectorisĂ©e, pour amĂ©liorer les performances et Ă©viter les dĂ©lais dâattente, Ă©vitez dâutiliserpandas.concat
pour accumuler des rĂ©sultats partiels. Au lieu de cela, donnez le rĂ©sultat partiel chaque fois que vous ĂȘtes prĂȘt.Par exemple, au lieu de :
results = [] while(...): partial_result = pd.DataFrame(...) results.append(partial_result) return pd.concat(results)
Faites ceci :
while(...): partial_result = pd.DataFrame(...) yield partial_result