Gestion des ressources de chargement et de déchargement de données avec Python¶

Vous pouvez utiliser Python pour gérer les ressources de chargement et de déchargement de données dans Snowflake, y compris les volumes externes, les canaux et les zones de préparation.

Conditions préalables¶

Les exemples de cette rubrique supposent que vous ayez ajouté le code nécessaire pour vous connecter à Snowflake et créer un objet Root à partir duquel utiliser les Snowflake Python APIs.

Par exemple, le code suivant utilise les paramÚtres de connexion définis dans un fichier de configuration pour créer une connexion à Snowflake.

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

En utilisant l’objet Session obtenu, le code crĂ©e un objet Root pour utiliser les types et les mĂ©thodes de l’API. Pour plus d’informations, voir Connexion Ă  Snowflake avec Snowflake Python APIs.

Gestion des zones de préparation¶

Vous pouvez gĂ©rer les zones de prĂ©paration Snowflake, qui sont des emplacements de fichiers de donnĂ©es dans le stockage cloud. Pour un aperçu des zones de prĂ©paration, voir Vue d’ensemble du chargement de donnĂ©es.

Les Snowflake Python APIs reprĂ©sentent les zones de prĂ©paration avec deux types distincts :

  • Stage : expose les propriĂ©tĂ©s d’une zone de prĂ©paration telles que son nom, son type de chiffrement, ses identifiants de connexion et les paramĂštres de la table de rĂ©pertoires.

  • StageResource : expose les mĂ©thodes que vous pouvez utiliser pour extraire un Stage objet correspondant, tĂ©lĂ©charger et lister les fichiers sur la zone de prĂ©paration, et supprimer la zone de prĂ©paration.

CrĂ©ation d’une zone de prĂ©paration¶

Pour crĂ©er une zone de prĂ©paration, il faut d’abord crĂ©er un objet Stage, puis crĂ©er un objet StageCollection Ă  partir de l’objet Root de l’API. En utilisant StageCollection.create, ajoutez la nouvelle zone de prĂ©paration Ă  Snowflake.

Le code de l’exemple suivant crĂ©e un objet Stage qui reprĂ©sente une zone de prĂ©paration nommĂ©e my_stage avec un type de chiffrement de SNOWFLAKE_SSE (chiffrement cĂŽtĂ© serveur uniquement) :

from snowflake.core.stage import Stage, StageEncryption

my_stage = Stage(
  name="my_stage",
  encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
Copy

Ce code crée une variable StageCollection stages et utilise StageCollection.create pour créer une nouvelle zone de préparation dans Snowflake.

Obtenir les détails de la zone de préparation¶

Vous pouvez obtenir des informations sur une zone de préparation en appelant la méthode StageResource.fetch, qui renvoie un objet Stage.

Le code de l’exemple suivant permet d’obtenir des informations sur une zone de prĂ©paration nommĂ©e my_stage :

my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
Copy

Affichage des zones de préparation¶

Vous pouvez rĂ©pertorier des zones de prĂ©paration Ă  l’aide de la mĂ©thode StageCollection.iter qui renvoie un itĂ©rateur PagedIter d’objets Stage.

Le code de l’exemple suivant rĂ©pertorie les zones de prĂ©paration dont le nom comprend le texte my et affiche le nom de chacun :

from snowflake.core.stage import StageCollection

stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%")  # returns a PagedIter[Stage]
for stage_obj in stage_iter:
  print(stage_obj.name)
Copy

Effectuer des opérations de zone de préparation¶

Vous pouvez effectuer des opĂ©rations de zones de prĂ©paration courantes, telles que le tĂ©lĂ©chargement d’un fichier vers une zone de prĂ©paration et l’affichage de fichiers sur une zone de prĂ©paration, avec un objet StageResource.

Pour illustrer certaines opĂ©rations que vous pouvez effectuer avec une ressource de zone de prĂ©paration, le code de l’exemple suivant effectue les opĂ©rations suivantes :

  1. TĂ©lĂ©charge un fichier nommĂ© my-file.yaml dans la zone de prĂ©paration my_stage avec les options d’auto-compression et d’écrasement spĂ©cifiĂ©es.

  2. Répertorie tous les fichiers sur la zone de préparation pour vérifier que le fichier a été téléchargé avec succÚs.

  3. Supprime la zone de préparation.

my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]

my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)

stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
  print(stageFile)

my_stage_res.drop()
Copy

Gestion des canaux¶

Vous pouvez gĂ©rer les canaux Snowflake, qui sont des objets Snowflake nommĂ©s de premiĂšre classe qui contiennent une instruction COPY INTO utilisĂ©e par Snowpipe pour charger des donnĂ©es d’une file d’attente d’acquisition dans des tables. Pour un aperçu des canaux, voir Snowpipe.

Les Snowflake Python APIs reprĂ©sentent des canaux avec deux types distincts :

  • Pipe : expose les propriĂ©tĂ©s d’un canal telles que son nom et l’instruction COPY INTO Ă  utiliser par Snowpipe.

  • PipeResource : expose les mĂ©thodes que vous pouvez utiliser pour extraire un objet Pipe correspondant, actualiser le canal avec les fichiers de donnĂ©es mis en zone de prĂ©paration et supprimer le canal.

Créer un canal¶

Pour crĂ©er un canal, il faut d’abord crĂ©er un objet Pipe, puis crĂ©er un objet PipeCollection Ă  partir de l’objet Root de l’API. En utilisant PipeCollection.create, ajoutez le nouveau canal Ă  Snowflake.

Le code de l’exemple suivant crĂ©e un objet Pipe qui reprĂ©sente un canal nommĂ© my_pipe avec l’instruction COPY INTO spĂ©cifiĂ©e :

from snowflake.core.pipe import Pipe

my_pipe = Pipe(
  name="my_pipe",
  comment="creating my pipe",
  copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)

pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
Copy

Ce code crée une variable PipeCollection pipes et utilise PipeCollection.create pour créer un nouveau canal dans Snowflake.

Obtenir les dĂ©tails d’un canal¶

Vous pouvez obtenir des informations sur un canal en appelant la méthode PipeResource.fetch, qui renvoie un objet Pipe.

Le code de l’exemple suivant permet d’obtenir des informations sur un canal nommĂ© my_pipe :

my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
Copy

Affichage de canaux¶

Vous pouvez rĂ©pertorier des canaux Ă  l’aide de la mĂ©thode PipeCollection.iter qui renvoie un itĂ©rateur PagedIter d’objets Pipe.

Le code de l’exemple suivant rĂ©pertorie les canaux dont le nom commence par my et imprime le nom de chacun :

from snowflake.core.pipe import PipeCollection

pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%")  # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
  print(pipe_obj.name)
Copy

Effectuer des opérations de canal¶

Vous pouvez effectuer des opĂ©rations de canal courantes, telles que l’actualisation d’un canal et la suppression d’un canal, avec un objet PipeResource.

Note

Seule la fonctionnalité REFRESH de ALTER PIPE est actuellement prise en charge.

Pour illustrer les opĂ©rations que vous pouvez effectuer avec une ressource de canal, le code de l’exemple suivant effectue les opĂ©rations suivantes :

  1. Obtient l’objet ressource du canal my_pipe.

  2. Actualise le canal avec des fichiers de données mis en zone de préparation avec le préfixe (ou chemin) facultatif spécifié.

  3. Supprime le canal.

my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]

# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")

my_pipe_res.drop()
Copy

Gestion des volumes externes¶

Vous pouvez gĂ©rer des volumes externes, qui sont des objets Snowflake nommĂ©s, de niveau compte, que vous utilisez pour connecter Snowflake Ă  votre stockage cloud externe pour des tables Apache Icebergℱ. Pour plus d’informations, consultez la section Volume externe de Tables Apache Icebergℱ.

Les Snowflake Python APIs reprĂ©sentent des volumes externes avec deux types distincts :

  • ExternalVolume : Expose les propriĂ©tĂ©s d’un volume externe, telles que son nom et ses emplacements de stockage.

  • ExternalVolumeResource : Expose les mĂ©thodes que vous pouvez utiliser pour rĂ©cupĂ©rer un objet ExternalVolume correspondant, et supprimer ou restaurer le volume externe.

CrĂ©ation d’un volume externe¶

Pour crĂ©er un volume externe, crĂ©ez d’abord un objet ExternalVolume, puis crĂ©ez un objet ExternalVolumeCollection de l’objet d’API Root. En utilisant ExternalVolumeCollection.create, ajoutez le nouveau volume externe Ă  Snowflake.

Le code de l’exemple suivant crĂ©e un objet ExternalVolume qui reprĂ©sente un volume externe nommĂ© my_external_volume avec les emplacements de stockage S3 AWS spĂ©cifiĂ©s :

from snowflake.core.external_volume import (
    ExternalVolume,
    StorageLocationS3,
)

my_external_volume = ExternalVolume(
    name="my_external_volume",
    storage_locations=[
        StorageLocationS3(
            name="my-s3-us-west-1",
            storage_base_url="s3://MY_EXAMPLE_BUCKET/",
            storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
            encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
        ),
        StorageLocationS3(
            name="my-s3-us-west-2",
            storage_base_url="s3://MY_EXAMPLE_BUCKET/",
            storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
            encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
        ),
    ]
)

root.external_volumes.create(my_external_volume)
Copy

Obtenir les détails du volume externe¶

Vous pouvez obtenir des informations sur un volume externe en appelant la méthode ExternalVolumeResource.fetch, qui renvoie un objet ExternalVolume.

Le code de l’exemple suivant obtient des informations sur un volume externe nommĂ© my_external_volume :

my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
Copy

Affichage des volumes externes¶

Vous pouvez rĂ©pertorier des volumes externes Ă  l’aide de la mĂ©thode ExternalVolumeCollection.iter qui renvoie un itĂ©rateur PagedIter d’objets ExternalVolume.

Le code de l’exemple suivant rĂ©pertorie les volumes externes dont le nom commence par my et imprime le nom de chacun :

external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
  print(external_volume_obj.name)
Copy

ExĂ©cution d’opĂ©rations sur des volumes externes¶

Vous pouvez effectuer des opĂ©rations courantes sur les volumes externes, telles que la suppression et la restauration d’un volume externe, avec un objet ExternalVolumeResource.

Pour illustrer les opĂ©rations que vous pouvez effectuer avec une ressource de volume externe, le code de l’exemple suivant effectue les opĂ©rations suivantes :

  1. Obtient l’objet de ressource de volume externe my_external_volume.

  2. Supprime le volume externe.

  3. Restaure la version la plus récente du volume externe supprimé.

my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()
Copy