Gestion des ressources de chargement et de déchargement de données avec Python¶
Vous pouvez utiliser Python pour gérer les ressources de chargement et de déchargement de données dans Snowflake, y compris les volumes externes, les canaux et les zones de préparation.
Conditions préalables¶
Les exemples de cette rubrique supposent que vous ayez ajouté le code nécessaire pour vous connecter à Snowflake et créer un objet Root
Ă partir duquel utiliser les Snowflake Python APIs.
Par exemple, le code suivant utilise les paramÚtres de connexion définis dans un fichier de configuration pour créer une connexion à Snowflake.
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
En utilisant lâobjet Session
obtenu, le code crée un objet Root
pour utiliser les types et les mĂ©thodes de lâAPI. Pour plus dâinformations, voir Connexion Ă Snowflake avec Snowflake Python APIs.
Gestion des zones de préparation¶
Vous pouvez gĂ©rer les zones de prĂ©paration Snowflake, qui sont des emplacements de fichiers de donnĂ©es dans le stockage cloud. Pour un aperçu des zones de prĂ©paration, voir Vue dâensemble du chargement de donnĂ©es.
Les Snowflake Python APIs représentent les zones de préparation avec deux types distincts :
Stage
: expose les propriĂ©tĂ©s dâune zone de prĂ©paration telles que son nom, son type de chiffrement, ses identifiants de connexion et les paramĂštres de la table de rĂ©pertoires.StageResource
: expose les méthodes que vous pouvez utiliser pour extraire unStage
objet correspondant, télécharger et lister les fichiers sur la zone de préparation, et supprimer la zone de préparation.
CrĂ©ation dâune zone de prĂ©paration¶
Pour crĂ©er une zone de prĂ©paration, il faut dâabord crĂ©er un objet Stage
, puis créer un objet StageCollection
Ă partir de lâobjet Root
de lâAPI. En utilisant StageCollection.create
, ajoutez la nouvelle zone de préparation à Snowflake.
Le code de lâexemple suivant crĂ©e un objet Stage
qui représente une zone de préparation nommée my_stage
avec un type de chiffrement de SNOWFLAKE_SSE
(chiffrement cÎté serveur uniquement) :
from snowflake.core.stage import Stage, StageEncryption
my_stage = Stage(
name="my_stage",
encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
Ce code crée une variable StageCollection
stages
et utilise StageCollection.create
pour créer une nouvelle zone de préparation dans Snowflake.
Obtenir les détails de la zone de préparation¶
Vous pouvez obtenir des informations sur une zone de préparation en appelant la méthode StageResource.fetch
, qui renvoie un objet Stage
.
Le code de lâexemple suivant permet dâobtenir des informations sur une zone de prĂ©paration nommĂ©e my_stage
:
my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
Affichage des zones de préparation¶
Vous pouvez rĂ©pertorier des zones de prĂ©paration Ă lâaide de la mĂ©thode StageCollection.iter
qui renvoie un itérateur PagedIter
dâobjets Stage
.
Le code de lâexemple suivant rĂ©pertorie les zones de prĂ©paration dont le nom comprend le texte my
et affiche le nom de chacun :
from snowflake.core.stage import StageCollection
stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%") # returns a PagedIter[Stage]
for stage_obj in stage_iter:
print(stage_obj.name)
Effectuer des opérations de zone de préparation¶
Vous pouvez effectuer des opĂ©rations de zones de prĂ©paration courantes, telles que le tĂ©lĂ©chargement dâun fichier vers une zone de prĂ©paration et lâaffichage de fichiers sur une zone de prĂ©paration, avec un objet StageResource
.
Pour illustrer certaines opĂ©rations que vous pouvez effectuer avec une ressource de zone de prĂ©paration, le code de lâexemple suivant effectue les opĂ©rations suivantes :
Télécharge un fichier nommé
my-file.yaml
dans la zone de préparationmy_stage
avec les options dâauto-compression et dâĂ©crasement spĂ©cifiĂ©es.RĂ©pertorie tous les fichiers sur la zone de prĂ©paration pour vĂ©rifier que le fichier a Ă©tĂ© tĂ©lĂ©chargĂ© avec succĂšs.
Supprime la zone de préparation.
my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]
my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)
stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
print(stageFile)
my_stage_res.drop()
Gestion des canaux¶
Vous pouvez gĂ©rer les canaux Snowflake, qui sont des objets Snowflake nommĂ©s de premiĂšre classe qui contiennent une instruction COPY INTO utilisĂ©e par Snowpipe pour charger des donnĂ©es dâune file dâattente dâacquisition dans des tables. Pour un aperçu des canaux, voir Snowpipe.
Les Snowflake Python APIs représentent des canaux avec deux types distincts :
Pipe
: expose les propriĂ©tĂ©s dâun canal telles que son nom et lâinstruction COPY INTO Ă utiliser par Snowpipe.PipeResource
: expose les méthodes que vous pouvez utiliser pour extraire un objetPipe
correspondant, actualiser le canal avec les fichiers de données mis en zone de préparation et supprimer le canal.
Créer un canal¶
Pour crĂ©er un canal, il faut dâabord crĂ©er un objet Pipe
, puis créer un objet PipeCollection
Ă partir de lâobjet Root
de lâAPI. En utilisant PipeCollection.create
, ajoutez le nouveau canal Ă Snowflake.
Le code de lâexemple suivant crĂ©e un objet Pipe
qui représente un canal nommé my_pipe
avec lâinstruction COPY INTO spĂ©cifiĂ©e :
from snowflake.core.pipe import Pipe
my_pipe = Pipe(
name="my_pipe",
comment="creating my pipe",
copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)
pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
Ce code crée une variable PipeCollection
pipes
et utilise PipeCollection.create
pour créer un nouveau canal dans Snowflake.
Obtenir les dĂ©tails dâun canal¶
Vous pouvez obtenir des informations sur un canal en appelant la méthode PipeResource.fetch
, qui renvoie un objet Pipe
.
Le code de lâexemple suivant permet dâobtenir des informations sur un canal nommĂ© my_pipe
:
my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
Affichage de canaux¶
Vous pouvez rĂ©pertorier des canaux Ă lâaide de la mĂ©thode PipeCollection.iter
qui renvoie un itérateur PagedIter
dâobjets Pipe
.
Le code de lâexemple suivant rĂ©pertorie les canaux dont le nom commence par my
et imprime le nom de chacun :
from snowflake.core.pipe import PipeCollection
pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%") # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
print(pipe_obj.name)
Effectuer des opérations de canal¶
Vous pouvez effectuer des opĂ©rations de canal courantes, telles que lâactualisation dâun canal et la suppression dâun canal, avec un objet PipeResource
.
Note
Seule la fonctionnalité REFRESH de ALTER PIPE est actuellement prise en charge.
Pour illustrer les opĂ©rations que vous pouvez effectuer avec une ressource de canal, le code de lâexemple suivant effectue les opĂ©rations suivantes :
Obtient lâobjet ressource du canal
my_pipe
.Actualise le canal avec des fichiers de données mis en zone de préparation avec le préfixe (ou chemin) facultatif spécifié.
Supprime le canal.
my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]
# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")
my_pipe_res.drop()
Gestion des volumes externes¶
Vous pouvez gĂ©rer des volumes externes, qui sont des objets Snowflake nommĂ©s, de niveau compte, que vous utilisez pour connecter Snowflake Ă votre stockage cloud externe pour des tables Apache Icebergâą. Pour plus dâinformations, consultez la section Volume externe de Tables Apache Icebergâą.
Les Snowflake Python APIs représentent des volumes externes avec deux types distincts :
ExternalVolume
: Expose les propriĂ©tĂ©s dâun volume externe, telles que son nom et ses emplacements de stockage.ExternalVolumeResource
: Expose les méthodes que vous pouvez utiliser pour récupérer un objetExternalVolume
correspondant, et supprimer ou restaurer le volume externe.
CrĂ©ation dâun volume externe¶
Pour crĂ©er un volume externe, crĂ©ez dâabord un objet ExternalVolume
, puis créez un objet ExternalVolumeCollection
de lâobjet dâAPI Root
. En utilisant ExternalVolumeCollection.create
, ajoutez le nouveau volume externe Ă Snowflake.
Le code de lâexemple suivant crĂ©e un objet ExternalVolume
qui représente un volume externe nommé my_external_volume
avec les emplacements de stockage S3 AWS spécifiés :
from snowflake.core.external_volume import (
ExternalVolume,
StorageLocationS3,
)
my_external_volume = ExternalVolume(
name="my_external_volume",
storage_locations=[
StorageLocationS3(
name="my-s3-us-west-1",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
StorageLocationS3(
name="my-s3-us-west-2",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
]
)
root.external_volumes.create(my_external_volume)
Obtenir les détails du volume externe¶
Vous pouvez obtenir des informations sur un volume externe en appelant la méthode ExternalVolumeResource.fetch
, qui renvoie un objet ExternalVolume
.
Le code de lâexemple suivant obtient des informations sur un volume externe nommĂ© my_external_volume
:
my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
Affichage des volumes externes¶
Vous pouvez rĂ©pertorier des volumes externes Ă lâaide de la mĂ©thode ExternalVolumeCollection.iter
qui renvoie un itérateur PagedIter
dâobjets ExternalVolume
.
Le code de lâexemple suivant rĂ©pertorie les volumes externes dont le nom commence par my
et imprime le nom de chacun :
external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
print(external_volume_obj.name)
ExĂ©cution dâopĂ©rations sur des volumes externes¶
Vous pouvez effectuer des opĂ©rations courantes sur les volumes externes, telles que la suppression et la restauration dâun volume externe, avec un objet ExternalVolumeResource
.
Pour illustrer les opĂ©rations que vous pouvez effectuer avec une ressource de volume externe, le code de lâexemple suivant effectue les opĂ©rations suivantes :
Obtient lâobjet de ressource de volume externe
my_external_volume
.Supprime le volume externe.
Restaure la version la plus récente du volume externe supprimé.
my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()