Python์ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ ๋ก๋ฉ ๋ฐ ์ธ๋ก๋ฉ ๋ฆฌ์์ค ๊ด๋ฆฌยถ
Python์ ์ฌ์ฉํ์ฌ Snowflake์์ ์ธ๋ถ ๋ณผ๋ฅจ, ํ์ดํ, ์คํ ์ด์ง๋ฅผ ํฌํจํ ๋ฆฌ์์ค์ ๋ฐ์ดํฐ ๋ก๋ฉ ๋ฐ ์ธ๋ก๋ฉ์ ๊ด๋ฆฌํ ์ ์์ต๋๋ค.
์ ์ ์กฐ๊ฑดยถ
์ด ํญ๋ชฉ์ ์์ ์์๋ Snowflake์ ์ฐ๊ฒฐํ๊ณ Snowflake Python APIs ์ ์ฌ์ฉํ ์ ์๋ Root
์ค๋ธ์ ํธ๋ฅผ ์์ฑํ๋ ์ฝ๋๋ฅผ ์ถ๊ฐํ๋ค๊ณ ๊ฐ์ ํฉ๋๋ค.
์๋ฅผ ๋ค์ด, ๋ค์ ์ฝ๋๋ ๊ตฌ์ฑ ํ์ผ์ ์ ์๋ ์ฐ๊ฒฐ ๋งค๊ฐ ๋ณ์๋ฅผ ์ฌ์ฉํ์ฌ Snowflake์ ๋ํ ์ฐ๊ฒฐ์ ์์ฑํฉ๋๋ค.
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
ํด๋น ์ฝ๋์์๋ ๊ฒฐ๊ณผ Session
์ค๋ธ์ ํธ๋ฅผ ์ฌ์ฉํ์ฌ API์ ์ ํ๊ณผ ๋ฉ์๋๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด Root
์ค๋ธ์ ํธ๋ฅผ ์์ฑํฉ๋๋ค. ์์ธํ ๋ด์ฉ์ Snowflake Python APIs ์ ์ฌ์ฉํ์ฌ Snowflake์ ์ฐ๊ฒฐ ์น์
์ ์ฐธ์กฐํ์ญ์์ค.
์คํ ์ด์ง ๊ด๋ฆฌํ๊ธฐยถ
ํด๋ผ์ฐ๋ ์ ์ฅ์์ ๋ฐ์ดํฐ ํ์ผ์ ์์น์ธ Snowflake ์คํ ์ด์ง๋ฅผ ๊ด๋ฆฌํ ์ ์์ต๋๋ค. ์คํ ์ด์ง์ ๋ํ ๊ฐ์๋ ๋ฐ์ดํฐ ๋ก๋ฉ ๊ฐ์ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
Snowflake Python APIs ์ ๋ค์ ๋ ๊ฐ์ง ๋ณ๊ฐ ์ ํ์ ์คํ ์ด์ง๋ฅผ ๋ํ๋ ๋๋ค.
Stage
: ์คํ ์ด์ง์ ์ด๋ฆ, ์ํธํ ์ ํ, ์๊ฒฉ ์ฆ๋ช , ๋๋ ํฐ๋ฆฌ ํ ์ด๋ธ ์ค์ ๋ฑ ์์ฑ์ ํ์ํฉ๋๋ค.StageResource
: ํด๋นStage
์ค๋ธ์ ํธ๋ฅผ ๊ฐ์ ธ์ค๊ณ , ์คํ ์ด์ง์ ํ์ผ์ ์ ๋ก๋ ๋ฐ ๋ชฉ๋ก์ผ๋ก ๋ง๋ค๊ณ , ์คํ ์ด์ง๋ฅผ ์ญ์ ํ๋ ๋ฐ ์ฌ์ฉํ ์ ์๋ ๋ฉ์๋๋ฅผ ๋ ธ์ถํฉ๋๋ค.
์คํ ์ด์ง ๋ง๋ค๊ธฐยถ
์คํ
์ด์ง๋ฅผ ์์ฑํ๋ ค๋ฉด ๋จผ์ Stage
์ค๋ธ์ ํธ๋ฅผ ์์ฑํ ๋ค์ API Root
์ค๋ธ์ ํธ์์ StageCollection
์ค๋ธ์ ํธ๋ฅผ ์์ฑํฉ๋๋ค. StageCollection.create
๋ฅผ ์ฌ์ฉํ์ฌ Snowflake์ ์ ์คํ
์ด์ง๋ฅผ ์ถ๊ฐํฉ๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ ์ํธํ ์ ํ์ด SNOWFLAKE_SSE
(์๋ฒ ์ธก ์ํธํ๋ง ํด๋น)์ธ my_stage
๋ผ๋ ์คํ
์ด์ง๋ฅผ ๋ํ๋ด๋ Stage
์ค๋ธ์ ํธ๋ฅผ ์์ฑํฉ๋๋ค.
from snowflake.core.stage import Stage, StageEncryption
my_stage = Stage(
name="my_stage",
encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
์ด ์ฝ๋๋ StageCollection
๋ณ์ stages
๋ฅผ ์์ฑํ๊ณ StageCollection.create
๋ฅผ ์ฌ์ฉํ์ฌ Snowflake์ ์ ์คํ
์ด์ง๋ฅผ ์์ฑํฉ๋๋ค.
์คํ ์ด์ง ์ธ๋ถ ์ ๋ณด ์ป๊ธฐยถ
Stage
์ค๋ธ์ ํธ๋ฅผ ๋ฐํํ๋ StageResource.fetch
๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ ์คํ
์ด์ง์ ๋ํ ์ ๋ณด๋ฅผ ์ป์ ์ ์์ต๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ my_stage
์คํ
์ด์ง์ ๋ํ ์ ๋ณด๋ฅผ ๊ฐ์ ธ์ต๋๋ค.
my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
๋ชฉ๋ก ์คํ ์ด์งยถ
Stage
์ค๋ธ์ ํธ์ PagedIter
๋ฐ๋ณต๊ธฐ๋ฅผ ๋ฐํํ๋ StageCollection.iter
๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ์คํ
์ด์ง๋ฅผ ๋์ดํ ์ ์์ต๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ ์ด๋ฆ์ my
ํ
์คํธ๊ฐ ํฌํจ๋ ์คํ
์ด์ง๋ฅผ ๋์ดํ๊ณ ๊ฐ ์คํ
์ด์ง์ ์ด๋ฆ์ ์ถ๋ ฅํฉ๋๋ค.
from snowflake.core.stage import StageCollection
stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%") # returns a PagedIter[Stage]
for stage_obj in stage_iter:
print(stage_obj.name)
์คํ ์ด์ง ์์ ์ํํ๊ธฐยถ
์คํ
์ด์ง์ ํ์ผ์ ์
๋ก๋ํ๊ณ ์คํ
์ด์ง์ ํ์ผ์ ๋ชฉ๋ก์ผ๋ก ๋์ดํ๋ ๋ฑ ์ผ๋ฐ์ ์ธ ์คํ
์ด์ง ์์
์ StageResource
์ค๋ธ์ ํธ๋ก ์ํํ ์ ์์ต๋๋ค.
์คํ ์ด์ง ๋ฆฌ์์ค๋ก ํ ์ ์๋ ์ผ๋ถ ์์ ์ ๋ณด์ฌ์ฃผ๊ธฐ ์ํด ๋ค์ ์์ ์ ์ฝ๋๋ ๋ค์์ ์ํํฉ๋๋ค.
์ง์ ๋ ์๋ ์์ถ ๋ฐ ๋ฎ์ด์ฐ๊ธฐ ์ต์ ์ ์ฌ์ฉํ์ฌ
my-file.yaml
ํ์ผ์my_stage
์คํ ์ด์ง์ ์ ๋ก๋ํฉ๋๋ค.ํ์ผ์ด ์ฑ๊ณต์ ์ผ๋ก ์ ๋ก๋๋์๋์ง ํ์ธํ๊ธฐ ์ํด ์คํ ์ด์ง์ ์๋ ๋ชจ๋ ํ์ผ์ ๋์ดํฉ๋๋ค.
์คํ ์ด์ง๋ฅผ ์ญ์ ํฉ๋๋ค.
my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]
my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)
stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
print(stageFile)
my_stage_res.drop()
ํ์ดํ ๊ด๋ฆฌํ๊ธฐยถ
Snowpipe๊ฐ ์์ง ํ์์ ํ ์ด๋ธ๋ก ๋ฐ์ดํฐ๋ฅผ ๋ก๋ํ๋ ๋ฐ ์ฌ์ฉํ๋ COPY INTO ๋ฌธ์ด ํฌํจ๋ ๋ช ๋ช ๋ ์ผ๊ธ Snowflake ์ค๋ธ์ ํธ์ธ Snowflake ํ์ดํ๋ฅผ ๊ด๋ฆฌํ ์ ์์ต๋๋ค. ํ์ดํ ๊ฐ์๋ Snowpipe ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
Snowflake Python APIs ์ ๋ค์ ๋ ๊ฐ์ง ๋ณ๊ฐ ์ ํ์ ํ์ดํ๋ฅผ ๋ํ๋ ๋๋ค.
Pipe
: ์ด๋ฆ๊ณผ ๊ฐ์ ํ์ดํ์ ์์ฑ๊ณผ Snowpipe์์ ์ฌ์ฉํ COPY INTO ๋ฌธ์ ๋ ธ์ถํฉ๋๋ค.PipeResource
: ํด๋นPipe
์ค๋ธ์ ํธ๋ฅผ ๊ฐ์ ธ์ค๊ณ , ์คํ ์ด์ง๋ ๋ฐ์ดํฐ ํ์ผ๋ก ํ์ดํ๋ฅผ ์๋ก ๊ณ ์น๊ณ , ํ์ดํ๋ฅผ ์ญ์ ํ๋ ๋ฐ ์ฌ์ฉํ ์ ์๋ ๋ฉ์๋๋ฅผ ๋ ธ์ถํฉ๋๋ค.
ํ์ดํ ๋ง๋ค๊ธฐยถ
ํ์ดํ๋ฅผ ์์ฑํ๋ ค๋ฉด ๋จผ์ Pipe
์ค๋ธ์ ํธ๋ฅผ ์์ฑํ ๋ค์ API Root
์ค๋ธ์ ํธ์์ PipeCollection
์ค๋ธ์ ํธ๋ฅผ ์์ฑํฉ๋๋ค. PipeCollection.create
๋ฅผ ์ฌ์ฉํ์ฌ Snowflake์ ์ ํ์ดํ๋ฅผ ์ถ๊ฐํฉ๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ ์ง์ ๋ COPY INTO ๋ฌธ์ผ๋ก my_pipe
ํ์ดํ๋ฅผ ๋ํ๋ด๋ Pipe
์ค๋ธ์ ํธ๋ฅผ ์์ฑํฉ๋๋ค.
from snowflake.core.pipe import Pipe
my_pipe = Pipe(
name="my_pipe",
comment="creating my pipe",
copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)
pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
์ด ์ฝ๋๋ PipeCollection
๋ณ์ pipes
๋ฅผ ์์ฑํ๊ณ PipeCollection.create
๋ฅผ ์ฌ์ฉํ์ฌ Snowflake์ ์ ํ์ดํ๋ฅผ ์์ฑํฉ๋๋ค.
ํ์ดํ ์ธ๋ถ ์ ๋ณด ์ป๊ธฐยถ
Pipe
์ค๋ธ์ ํธ๋ฅผ ๋ฐํํ๋ PipeResource.fetch
๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ ํ์ดํ์ ๋ํ ์ ๋ณด๋ฅผ ์ป์ ์ ์์ต๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ my_pipe
ํ์ดํ์ ๋ํ ์ ๋ณด๋ฅผ ๊ฐ์ ธ์ต๋๋ค.
my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
ํ์ดํ ๋์ดํ๊ธฐยถ
Pipe
์ค๋ธ์ ํธ์ PagedIter
๋ฐ๋ณต๊ธฐ๋ฅผ ๋ฐํํ๋ PipeCollection.iter
๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ํ์ดํ๋ฅผ ๋์ดํ ์ ์์ต๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ ์ด๋ฆ์ด my
๋ก ์์ํ๋ ํ์ดํ๋ฅผ ๋์ดํ๊ณ ๊ฐ ๊ณ์ ์ ์ด๋ฆ์ ์ถ๋ ฅํฉ๋๋ค.
from snowflake.core.pipe import PipeCollection
pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%") # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
print(pipe_obj.name)
ํ์ดํ ์์ ์ํํ๊ธฐยถ
ํ์ดํ ์๋ก ๊ณ ์นจ ๋ฐ ํ์ดํ ์ญ์ ์ ๊ฐ์ ์ผ๋ฐ์ ์ธ ํ์ดํ ์์
์ PipeResource
์ค๋ธ์ ํธ๋ฅผ ์ฌ์ฉํ์ฌ ์ํํ ์ ์์ต๋๋ค.
์ฐธ๊ณ
ํ์ฌ ALTER PIPE ์ REFRESH ๊ธฐ๋ฅ๋ง ์ง์๋ฉ๋๋ค.
ํ์ดํ ๋ฆฌ์์ค๋ก ํ ์ ์๋ ์์ ์ ๋ณด์ฌ์ฃผ๊ธฐ ์ํด ๋ค์ ์์ ์ ์ฝ๋๋ ๋ค์์ ์ํํฉ๋๋ค.
my_pipe
ํ์ดํ ๋ฆฌ์์ค ์ค๋ธ์ ํธ๋ฅผ ๊ฐ์ ธ์ต๋๋ค.์ง์ ๋ ์ ํ์ ์ ๋์ฌ(๋๋ ๊ฒฝ๋ก)๋ฅผ ์ฌ์ฉํ์ฌ ์ค๋น๋ ๋ฐ์ดํฐ ํ์ผ๋ก ํ์ดํ๋ฅผ ์๋ก ๊ณ ์นฉ๋๋ค.
ํ์ดํ๋ฅผ ์ญ์ ํฉ๋๋ค.
my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]
# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")
my_pipe_res.drop()
์ธ๋ถ ๋ณผ๋ฅจ ๊ด๋ฆฌยถ
์ธ๋ถ ๋ณผ๋ฅจ์ ๊ด๋ฆฌํ ์ ์์ผ๋ฉฐ, ์ด๋ Snowflake๋ฅผ Apache Icebergโข ํ ์ด๋ธ์ ์ธ๋ถ ํด๋ผ์ฐ๋ ์ ์ฅ์์ ์ฐ๊ฒฐํ๋ ๋ฐ ์ฌ์ฉํ๋ ์ด๋ฆ์ด ์ง์ ๋ ๊ณ์ ์์ค์ Snowflake ์ค๋ธ์ ํธ์ ๋๋ค. ์์ธํ ๋ด์ฉ์ Apache Icebergโข ํ ์ด๋ธ ์ ์ธ๋ถ ๋ณผ๋ฅจ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
Snowflake Python APIs ์ ๋ ๊ฐ์ง ์ ํ์ ์ธ๋ถ ๋ณผ๋ฅจ์ ๋ํ๋ ๋๋ค.
ExternalVolume
: ์ธ๋ถ ๋ณผ๋ฅจ์ ์ด๋ฆ, ์ ์ฅ ์์น ๋ฑ์ ์์ฑ์ ํ์ํฉ๋๋ค.ExternalVolumeResource
: ํด๋นExternalVolume
์ค๋ธ์ ํธ๋ฅผ ๊ฐ์ ธ์ค๊ณ ์ธ๋ถ ๋ณผ๋ฅจ์ ์ ๊ฑฐ ๋๋ ๋ณต์ํ๋ ๋ฐ ์ฌ์ฉํ ์ ์๋ ๋ฉ์๋๋ฅผ ๋ ธ์ถํฉ๋๋ค.
์ธ๋ถ ๋ณผ๋ฅจ ๋ง๋ค๊ธฐยถ
์ธ๋ถ ๋ณผ๋ฅจ์ ์์ฑํ๋ ค๋ฉด ๋จผ์ ExternalVolume
์ค๋ธ์ ํธ๋ฅผ ์์ฑํ ๋ค์ API Root
์ค๋ธ์ ํธ์์ ExternalVolumeCollection
์ค๋ธ์ ํธ๋ฅผ ์์ฑํฉ๋๋ค. ExternalVolumeCollection.create
๋ฅผ ์ฌ์ฉํ์ฌ Snowflake์ ์ ์ธ๋ถ ๋ณผ๋ฅจ์ ์ถ๊ฐํฉ๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ ์ง์ ๋ AWS S3 ์ ์ฅ์๊ฐ ์๋ my_external_volume
์ด๋ผ๋ ์ด๋ฆ์ ์ธ๋ถ ๋ณผ๋ฅจ์ ๋ํ๋ด๋ ExternalVolume
์ค๋ธ์ ํธ๋ฅผ ์์ฑํฉ๋๋ค.
from snowflake.core.external_volume import (
ExternalVolume,
StorageLocationS3,
)
my_external_volume = ExternalVolume(
name="my_external_volume",
storage_locations=[
StorageLocationS3(
name="my-s3-us-west-1",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
StorageLocationS3(
name="my-s3-us-west-2",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
]
)
root.external_volumes.create(my_external_volume)
์ธ๋ถ ๋ณผ๋ฅจ ์ธ๋ถ ์ ๋ณด ๊ฐ์ ธ์ค๊ธฐยถ
ExternalVolume
์ค๋ธ์ ํธ๋ฅผ ๋ฐํํ๋ ExternalVolumeResource.fetch
๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ ์ธ๋ถ ๋ณผ๋ฅจ์ ๋ํ ์ ๋ณด๋ฅผ ์ป์ ์ ์์ต๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ my_external_volume
์ธ๋ถ ๋ณผ๋ฅจ์ ๋ํ ์ ๋ณด๋ฅผ ๊ฐ์ ธ์ต๋๋ค.
my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
์ธ๋ถ ๋ณผ๋ฅจ ๋์ดยถ
ExternalVolume
์ค๋ธ์ ํธ์ PagedIter
๋ฐ๋ณต๊ธฐ๋ฅผ ๋ฐํํ๋ ExternalVolumeCollection.iter
๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ์ธ๋ถ ๋ณผ๋ฅจ์ ๋์ดํ ์ ์์ต๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ ์ด๋ฆ์ด my
๋ก ์์ํ๋ ์ธ๋ถ ๋ณผ๋ฅจ์ ๋์ดํ๊ณ ๊ฐ ๊ณ์ ์ ์ด๋ฆ์ ์ถ๋ ฅํฉ๋๋ค.
external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
print(external_volume_obj.name)
์ธ๋ถ ๋ณผ๋ฅจ ์์ ์ํยถ
์ธ๋ถ ๋ณผ๋ฅจ ์ ๊ฑฐ ๋ฐ ๋ณต์๊ณผ ๊ฐ์ ์ผ๋ฐ์ ์ธ ์ธ๋ถ ๋ณผ๋ฅจ ์์
์ ExternalVolumeResource
์ค๋ธ์ ํธ๋ก ์ํํ ์ ์์ต๋๋ค.
์ธ๋ถ ๋ณผ๋ฅจ ๋ฆฌ์์ค๋ก ํ ์ ์๋ ์์ ์ ๋ณด์ฌ์ฃผ๊ธฐ ์ํด ๋ค์ ์์ ์ ์ฝ๋๋ ๋ค์์ ์ํํฉ๋๋ค.
my_external_volume
์ธ๋ถ ๋ณผ๋ฅจ ๋ฆฌ์์ค ์ค๋ธ์ ํธ๋ฅผ ๊ฐ์ ธ์ต๋๋ค.์ธ๋ถ ๋ณผ๋ฅจ์ ์ญ์ ํฉ๋๋ค.
์ ๊ฑฐํ ์ธ๋ถ ๋ณผ๋ฅจ์ ๊ฐ์ฅ ์ต๊ทผ ๋ฒ์ ์ ๋ณต์ํฉ๋๋ค.
my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()