Gestion des tables dynamiques Snowflake avec Python¶
Vous pouvez utiliser Python pour gĂ©rer les tables dynamiques Snowflake, qui sont un nouveau type de table pour les pipelines de traitement continu. Les tables dynamiques matĂ©rialisent les rĂ©sultats dâune requĂȘte spĂ©cifique. Pour un aperçu de cette fonctionnalitĂ©, voir Tables dynamiques.
Les Snowflake Python APIs représentent des tables dynamiques avec deux types distincts :
DynamicTable
: expose les propriĂ©tĂ©s dâune table dynamique telles que son nom, sa latence cible, son entrepĂŽt et son instruction de requĂȘte.DynamicTableResource
: expose les méthodes que vous pouvez utiliser pour extraire un objetDynamicTable
correspondant, suspendre et reprendre la table dynamique, et supprimer la table dynamique.
Conditions préalables¶
Les exemples de cette rubrique supposent que vous ayez ajouté le code nécessaire pour vous connecter à Snowflake et créer un objet Root
Ă partir duquel utiliser les Snowflake Python APIs.
Par exemple, le code suivant utilise les paramÚtres de connexion définis dans un fichier de configuration pour créer une connexion à Snowflake.
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
En utilisant lâobjet Session
obtenu, le code crée un objet Root
pour utiliser les types et les mĂ©thodes de lâAPI. Pour plus dâinformations, voir Connexion Ă Snowflake avec Snowflake Python APIs.
CrĂ©ation dâune table dynamique¶
Pour crĂ©er une table dynamique, il faut dâabord crĂ©er un objet DynamicTable
, puis créer un objet DynamicTableCollection
Ă partir de lâobjet Root
de lâAPI. En utilisant DynamicTableCollection.create
, ajoutez la nouvelle table dynamique Ă Snowflake.
Le code dans lâexemple suivant crĂ©e un objet DynamicTable
qui représente une table dynamique nommée my_dynamic_table
dans la base de données my_db
et le schéma my_schema
, avec les options minimales requises spécifiées :
from snowflake.core.dynamic_table import DynamicTable, DownstreamLag
my_dt = DynamicTable(
name='my_dynamic_table',
target_lag=DownstreamLag(),
warehouse='my_wh',
query='SELECT * FROM t',
)
dynamic_tables = root.databases['my_db'].schemas['my_schema'].dynamic_tables
dynamic_tables.create(my_dt)
Ce code crée une variable DynamicTableCollection
dynamic_tables
et utilise DynamicTableCollection.create
pour créer une nouvelle table dynamique dans Snowflake.
Le code de lâexemple suivant crĂ©e un objet DynamicTable
qui représente une table dynamique nommée my_dynamic_table2
dans la base de données my_db
et le schéma my_schema
avec toutes les options actuellement possibles spécifiées :
from snowflake.core.dynamic_table import DynamicTable, UserDefinedLag
root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
DynamicTable(
name='my_dynamic_table2',
kind='PERMANENT',
target_lag=UserDefinedLag(seconds=60),
warehouse='my_wh',
query='SELECT * FROM t',
refresh_mode='FULL',
initialize='ON_SCHEDULE',
cluster_by=['id > 1'],
comment='test table',
data_retention_time_in_days=7,
max_data_extension_time_in_days=7,
)
)
Clonage dâune table dynamique¶
Le code de lâexemple suivant crĂ©e une nouvelle table dynamique nommĂ©e my_dynamic_table2
avec les mĂȘmes dĂ©finitions de colonnes et toutes les donnĂ©es existantes de la table dynamique source my_dynamic_table
dans la base de données my_db
et le schéma my_schema
:
Note
Cette opĂ©ration de clonage utilise lâobjet
DynamicTableClone
, qui inclut lâoptionneltarget_lag
et les paramĂštreswarehouse
et ne prend actuellement pas en charge dâautres paramĂštres.
from snowflake.core.dynamic_table import DynamicTableClone
root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
DynamicTableClone(
name='my_dynamic_table2',
warehouse='my_wh2',
),
clone_table='my_dynamic_table',
)
Pour plus dâinformations sur cette fonctionnalitĂ©, voir CREATE DYNAMIC TABLE ⊠CLONE.
Obtenir les dĂ©tails dâune table dynamique¶
Vous pouvez obtenir des informations sur une table dynamique en appelant la méthode DynamicTableResource.fetch
, qui renvoie un objet DynamicTable
.
Le code de lâexemple suivant obtient des informations sur une table dynamique nommĂ©e my_dynamic_table
dans la base de données my_db
et le schéma my_schema
:
dynamic_table = root.databases['my_db'].schemas['my_schema'].dynamic_tables['my_dynamic_table']
dt_details = dynamic_table.fetch()
print(dt_details.to_dict())
Affichage de tables dynamiques¶
Vous pouvez rĂ©pertorier des tables dynamiques Ă lâaide de la mĂ©thode DynamicTableCollection.iter
qui renvoie un itérateur PagedIter
dâobjets DynamicTable
.
Le code de lâexemple suivant rĂ©pertorie les tables dynamiques dont le nom commence par le texte my
dans la base de données my_db
et le schéma my_schema
, puis imprime le nom de chacun :
from snowflake.core.dynamic_table import DynamicTableCollection
dt_list = root.databases['my_db'].schemas['my_schema'].dynamic_tables.iter(like='my%')
for dt_obj in dt_list:
print(dt_obj.name)
Ăchanges des noms de tables dynamiques¶
Vous pouvez Ă©changer le nom dâune table dynamique avec celui dâune autre table dynamique en une seule transaction Ă lâaide de la mĂ©thode DynamicTableResource.swap_with
. Pour plus dâinformations, voir la description du paramĂštre SWAP WITH dans ALTERDYNAMICTABLE.
Le code de lâexemple suivant Ă©change my_dynamic_table
avec other_dynamic_table
dans la mĂȘme base de donnĂ©es et le mĂȘme schĂ©ma :
my_table_res = root.databases['my_db'].schemas['my_schema'].tables['my_dynamic_table']
my_table_res.swap_with('other_dynamic_table')
ExĂ©cution dâopĂ©rations de table dynamiques¶
Vous pouvez effectuer des opĂ©rations de table dynamiques courantes, telles que lâactualisation, la suspension et la reprise dâune table dynamique, avec un objet DynamicTableResource
.
Pour plus dâinformations sur ces opĂ©rations de tables dynamiques, voir Commandes de tables, de vues et de sĂ©quences dans la rĂ©fĂ©rence de la commande SQL.
Pour illustrer certaines opĂ©rations que vous pouvez effectuer avec une ressource de table dynamique, le code de lâexemple suivant effectue les opĂ©rations suivantes :
Obtient lâobjet ressource de la table dynamique
my_dynamic_table
dans la base de donnéesmy_db
et le schémamy_schema
.Actualise la table dynamique.
Suspend la table dynamique.
Reprend la table dynamique.
Supprime la table dynamique.
Restaure la version la plus récente de la table dynamique supprimée.
my_dynamic_table_res = root.databases["my_db"].schemas["my_schema"].dynamic_tables["my_dynamic_table"]
my_dynamic_table_res.refresh()
my_dynamic_table_res.suspend()
my_dynamic_table_res.resume()
my_dynamic_table_res.drop()
my_dynamic_table_res.undrop()