Managing data loading and unloading resources with PythonÂļ
You can use Python to manage data loading and unloading resources in Snowflake, including external volumes, pipes, and stages.
PrerequisitesÂļ
The examples in this topic assume that youâve added code to connect with Snowflake and to create a Root
object from which to use the
Snowflake Python APIs.
For example, the following code uses connection parameters defined in a configuration file to create a connection to Snowflake:
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Using the resulting Session
object, the code creates a Root
object to use the APIâs types and methods. For more information,
see Connect to Snowflake with the Snowflake Python APIs.
Managing stagesÂļ
You can manage Snowflake stages, which are locations of data files in cloud storage. For an overview of stages, see Overview of data loading.
The Snowflake Python APIs represents stages with two separate types:
Stage
: Exposes a stageâs properties such as its name, encryption type, credentials, and directory table settings.StageResource
: Exposes methods you can use to fetch a correspondingStage
object, upload and list files on the stage, and drop the stage.
Creating a stageÂļ
To create a stage, first create a Stage
object, and then create a StageCollection
object from the API Root
object. Using StageCollection.create
, add the new stage to Snowflake.
Code in the following example creates a Stage
object that represents a stage named my_stage
with an encryption type of
SNOWFLAKE_SSE
(server-side encryption only):
from snowflake.core.stage import Stage, StageEncryption
my_stage = Stage(
name="my_stage",
encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
The code creates a StageCollection
variable stages
and uses StageCollection.create
to create a new stage in Snowflake.
Getting stage detailsÂļ
You can get information about a stage by calling the StageResource.fetch
method, which returns a Stage
object.
Code in the following example gets information about a stage named my_stage
:
my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
Listing stagesÂļ
You can list stages using the StageCollection.iter
method, which returns a PagedIter
iterator of Stage
objects.
Code in the following example lists stages whose name includes the text my
and prints the name of each:
from snowflake.core.stage import StageCollection
stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%") # returns a PagedIter[Stage]
for stage_obj in stage_iter:
print(stage_obj.name)
Performing stage operationsÂļ
You can perform common stage operationsâsuch as uploading a file to a stage and listing files on a stageâwith a StageResource
object.
To demonstrate some operations you can do with a stage resource, code in the following example does the following:
Uploads a file named
my-file.yaml
to themy_stage
stage with the specified auto-compress and overwrite options.Lists all files on the stage to verify that the file was uploaded successfully.
Drops the stage.
my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]
my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)
stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
print(stageFile)
my_stage_res.drop()
Managing pipesÂļ
You can manage Snowflake pipes, which are named, first-class Snowflake objects that contain a COPY INTO statement used by Snowpipe to load data from an ingestion queue into tables. For an overview of pipes, see Snowpipe.
The Snowflake Python APIs represents pipes with two separate types:
Pipe
: Exposes a pipeâs properties such as its name and the COPY INTO statement to be used by Snowpipe.PipeResource
: Exposes methods you can use to fetch a correspondingPipe
object, refresh the pipe with staged data files, and drop the pipe.
Creating a pipeÂļ
To create a pipe, first create a Pipe
object, and then create a PipeCollection
object from the API Root
object. Using PipeCollection.create
, add the new pipe to Snowflake.
Code in the following example creates a Pipe
object that represents a pipe named my_pipe
with the specified COPY INTO
statement:
from snowflake.core.pipe import Pipe
my_pipe = Pipe(
name="my_pipe",
comment="creating my pipe",
copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)
pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
The code creates a PipeCollection
variable pipes
and uses PipeCollection.create
to create a new pipe in Snowflake.
Getting pipe detailsÂļ
You can get information about a pipe by calling the PipeResource.fetch
method, which returns a Pipe
object.
Code in the following example gets information about a pipe named my_pipe
:
my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
Listing pipesÂļ
You can list pipes using the PipeCollection.iter
method, which returns a PagedIter
iterator of Pipe
objects.
Code in the following example lists pipes whose name starts with my
and prints the name of each:
from snowflake.core.pipe import PipeCollection
pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%") # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
print(pipe_obj.name)
Performing pipe operationsÂļ
You can perform common pipe operationsâsuch as refreshing a pipe and dropping a pipeâwith a PipeResource
object.
Note
Only the REFRESH functionality of ALTER PIPE is currently supported.
To demonstrate operations you can do with a pipe resource, code in the following example does the following:
Gets the
my_pipe
pipe resource object.Refreshes the pipe with staged data files with the specified, optional prefix (or path).
Drops the pipe.
my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]
# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")
my_pipe_res.drop()
Managing external volumesÂļ
You can manage external volumes, which are named, account-level Snowflake objects that you use to connect Snowflake to your external cloud storage for Apache Icebergâĸ tables. For more information, see the External volume section of Apache Icebergâĸ tables.
The Snowflake Python APIs represents external volumes with two separate types:
ExternalVolume
: Exposes an external volumeâs properties, such as its name and storage locations.ExternalVolumeResource
: Exposes methods you can use to fetch a correspondingExternalVolume
object and drop or restore the external volume.
Creating an external volumeÂļ
To create an external volume, first create an ExternalVolume
object, and then create an ExternalVolumeCollection
object
from the API Root
object. Using ExternalVolumeCollection.create
, add the new external volume to Snowflake.
Code in the following example creates an ExternalVolume
object that represents an external volume named my_external_volume
with the specified AWS S3 storage locations:
from snowflake.core.external_volume import (
ExternalVolume,
StorageLocationS3,
)
my_external_volume = ExternalVolume(
name="my_external_volume",
storage_locations=[
StorageLocationS3(
name="my-s3-us-west-1",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
StorageLocationS3(
name="my-s3-us-west-2",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
]
)
root.external_volumes.create(my_external_volume)
Getting external volume detailsÂļ
You can get information about an external volume by calling the ExternalVolumeResource.fetch
method, which returns an
ExternalVolume
object.
Code in the following example gets information about an external volume named my_external_volume
:
my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
Listing external volumesÂļ
You can list external volumes using the ExternalVolumeCollection.iter
method, which returns a PagedIter
iterator of
ExternalVolume
objects.
Code in the following example lists external volumes whose name starts with my
and prints the name of each:
external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
print(external_volume_obj.name)
Performing external volume operationsÂļ
You can perform common external volume operationsâsuch as dropping and restoring an external volumeâwith an
ExternalVolumeResource
object.
To demonstrate operations you can do with an external volume resource, code in the following example does the following:
Gets the
my_external_volume
external volume resource object.Drops the external volume.
Restores the most recent version of the dropped external volume.
my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()