Python์ ์ฌ์ฉํ์ฌ Snowflake ์คํธ๋ฆผ ๊ด๋ฆฌํ๊ธฐยถ
Python์ ์ฌ์ฉํ์ฌ ์ฝ์ , ์ ๋ฐ์ดํธ, ์ญ์ ๋ฑ ํ ์ด๋ธ์ ๋ํ ๋ฐ์ดํฐ ์กฐ์ ์ธ์ด(DML) ๋ณ๊ฒฝ ์ฌํญ๊ณผ ๊ฐ ๋ณ๊ฒฝ ์ฌํญ์ ๋ํ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ๊ธฐ๋กํ๋ ์ค๋ธ์ ํธ์ธ Snowflake ์คํธ๋ฆผ์ ๊ด๋ฆฌํ ์ ์์ต๋๋ค. ์์ธํ ๋ด์ฉ์ ์คํธ๋ฆผ ์๊ฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
์ฐธ๊ณ
ALTER STREAM ์ ํ์ฌ ์ง์๋์ง ์์ต๋๋ค.
Snowflake Python APIs ์ ๋ค์ ๋ ๊ฐ์ง ์ ํ์ ์คํธ๋ฆผ์ ๋ํ๋ ๋๋ค.
Stream
: ์คํธ๋ฆผ์ ์ด๋ฆ, ๋ชฉํ ์ง์ฐ, ์จ์ดํ์ฐ์ค, ์ฟผ๋ฆฌ ๋ฌธ๊ณผ ๊ฐ์ ์์ฑ์ ๋ ธ์ถํฉ๋๋ค.StreamResource
: ํด๋นStream
์ค๋ธ์ ํธ ๊ฐ์ ธ์ค๊ธฐ, ์คํธ๋ฆผ ์ผ์ ์ค๋จ ๋ฐ ์ฌ๊ฐ, ์คํธ๋ฆผ ์ญ์ ์ ์ฌ์ฉํ ์ ์๋ ๋ฉ์๋๋ฅผ ๋ ธ์ถํฉ๋๋ค.
์ ์ ์กฐ๊ฑดยถ
์ด ํญ๋ชฉ์ ์์ ์์๋ Snowflake์ ์ฐ๊ฒฐํ๊ณ Snowflake Python APIs ์ ์ฌ์ฉํ ์ ์๋ Root
์ค๋ธ์ ํธ๋ฅผ ์์ฑํ๋ ์ฝ๋๋ฅผ ์ถ๊ฐํ๋ค๊ณ ๊ฐ์ ํฉ๋๋ค.
์๋ฅผ ๋ค์ด, ๋ค์ ์ฝ๋๋ ๊ตฌ์ฑ ํ์ผ์ ์ ์๋ ์ฐ๊ฒฐ ๋งค๊ฐ ๋ณ์๋ฅผ ์ฌ์ฉํ์ฌ Snowflake์ ๋ํ ์ฐ๊ฒฐ์ ์์ฑํฉ๋๋ค.
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
ํด๋น ์ฝ๋์์๋ ๊ฒฐ๊ณผ Session
์ค๋ธ์ ํธ๋ฅผ ์ฌ์ฉํ์ฌ API์ ์ ํ๊ณผ ๋ฉ์๋๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด Root
์ค๋ธ์ ํธ๋ฅผ ์์ฑํฉ๋๋ค. ์์ธํ ๋ด์ฉ์ Snowflake Python APIs ์ ์ฌ์ฉํ์ฌ Snowflake์ ์ฐ๊ฒฐ ์น์
์ ์ฐธ์กฐํ์ญ์์ค.
์คํธ๋ฆผ ๋ง๋ค๊ธฐยถ
์คํธ๋ฆผ์ ์์ฑํ๋ ค๋ฉด ๋จผ์ Stream
์ค๋ธ์ ํธ๋ฅผ ์์ฑํ ๋ค์ API Root
์ค๋ธ์ ํธ์์ StreamCollection
์ค๋ธ์ ํธ๋ฅผ ์์ฑํฉ๋๋ค. StreamCollection.create
๋ฅผ ์ฌ์ฉํ์ฌ Snowflake์ ์ ์คํธ๋ฆผ์ ์ถ๊ฐํฉ๋๋ค.
๋ค์ ์ค๋ธ์ ํธ ์ ํ์์ ์คํธ๋ฆผ์ ์์ฑํ ์ ์์ต๋๋ค.
ํ์ค ํ ์ด๋ธ
๋ทฐ
๋๋ ํฐ๋ฆฌ ํ ์ด๋ธ
์์ค ํ ์ด๋ธ์์ยถ
๋ค์ ์์ ์ ์ฝ๋๋ my_db
๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์์ค ํ
์ด๋ธ my_table
๋ฐ my_schema
์คํค๋ง์ ์ง์ ๋ ์คํธ๋ฆผ ์์ฑ์ ๊ฐ์ง ์ด๋ฆ์ด my_stream_on_table
์ธ ์คํธ๋ฆผ์ ๋ํ๋ด๋ Stream
์ค๋ธ์ ํธ๋ฅผ ์์ฑํฉ๋๋ค.
์ฐธ๊ณ
StreamSourceTable
์ ํ์ ํ์ค ํ
์ด๋ธ๋ง ์ง์ํฉ๋๋ค. ๋์ ํ
์ด๋ธ, ์ด๋ฒคํธ ํ
์ด๋ธ, ์ธ๋ถ ํ
์ด๋ธ, Iceberg ํ
์ด๋ธ ๋ฑ ๋ค๋ฅธ ์ ํ์ ํ
์ด๋ธ์ ํ์ฌ ์ง์๋์ง ์์ต๋๋ค.
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable
stream_on_table = Stream(
"my_stream_on_table",
StreamSourceTable(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_table',
append_only = True,
show_initial_rows = False,
),
comment = 'create stream on table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
์ด ์ฝ๋๋ StreamCollection
๋ณ์ streams
๋ฅผ ์์ฑํ๊ณ StreamCollection.create
๋ฅผ ์ฌ์ฉํ์ฌ Snowflake์ ์ ์คํธ๋ฆผ์ ์์ฑํฉ๋๋ค.
์์ค ๋ทฐ์์ยถ
๋ค์ ์์ ์ ์ฝ๋๋ my_db
๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์์ค ๋ทฐ my_view
๋ฐ my_schema
์คํค๋ง์ ์ง์ ๋ ์คํธ๋ฆผ ์์ฑ์ ๊ฐ์ง ์ด๋ฆ์ด my_stream_on_view
์ธ ์คํธ๋ฆผ์ ๋ํ๋ด๋ Stream
์ค๋ธ์ ํธ๋ฅผ ์์ฑํฉ๋๋ค.
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView
stream_on_view = Stream(
"my_stream_on_view",
StreamSourceView(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_view',
),
comment = 'create stream on view'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
์์ค ๋๋ ํฐ๋ฆฌ ํ ์ด๋ธ์์ยถ
๋ค์ ์์ ์ ์ฝ๋๋ my_db
๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์์ค ๋๋ ํฐ๋ฆฌ ํ
์ด๋ธ my_directory_table
๋ฐ my_schema
์คํค๋ง์ ์ง์ ๋ ์คํธ๋ฆผ ์์ฑ์ ๊ฐ์ง ์ด๋ฆ์ด my_stream_on_directory_table
์ธ ์คํธ๋ฆผ์ ๋ํ๋ด๋ Stream
์ค๋ธ์ ํธ๋ฅผ ์์ฑํฉ๋๋ค.
from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage
stream_on_directory_table = Stream(
"my_stream_on_directory_table",
StreamSourceStage(
point_of_time = PointOfTimeOffset(reference="before", offset="1"),
name = 'my_directory_table',
),
comment = 'create stream on directory table'
)
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
์คํธ๋ฆผ ๋ณต์ ํ๊ธฐยถ
๋ค์ ์์ ์ ์ฝ๋๋ my_db
๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ฐ my_schema
์คํค๋ง์์ ์์ค ์คํธ๋ฆผ my_other_stream
๊ณผ ์ ์๊ฐ ๋์ผํ ์ด๋ฆ์ด my_stream
์ธ ์ ์คํธ๋ฆผ์ ์์ฑํฉ๋๋ค.
from snowflake.core.stream import Stream
streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
์คํธ๋ฆผ ์ธ๋ถ ์ ๋ณด ๊ฐ์ ธ์ค๊ธฐยถ
Stream
์ค๋ธ์ ํธ๋ฅผ ๋ฐํํ๋ StreamResource.fetch
๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ ์คํธ๋ฆผ์ ๋ํ ์ ๋ณด๋ฅผ ์ป์ ์ ์์ต๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ my_db
๋ฐ์ดํฐ๋ฒ ์ด์ค์ my_schema
์คํค๋ง์์ ์ด๋ฆ์ด my_stream
์ธ ์คํธ๋ฆผ์ ๋ํ ์ ๋ณด๋ฅผ ๊ฐ์ ธ์ต๋๋ค.
stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
์คํธ๋ฆผ ๋์ดํ๊ธฐยถ
Stream
์ค๋ธ์ ํธ์ PagedIter
๋ฐ๋ณต๊ธฐ๋ฅผ ๋ฐํํ๋ StreamCollection.iter
๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ์คํธ๋ฆผ์ ๋์ดํ ์ ์์ต๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ my_db
๋ฐ์ดํฐ๋ฒ ์ด์ค์ my_schema
์คํค๋ง์์ ์ด๋ฆ์ด my
๋ก ์์ํ๋ ์คํธ๋ฆผ์ ๋์ดํ ๋ค์ ๊ฐ๊ฐ์ ์ด๋ฆ์ ์ถ๋ ฅํฉ๋๋ค.
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
print(stream_obj.name)
๋ค์ ์์ ์ ์ฝ๋์๋ ์ด๋ฆ์ด my
๋ก ์์ํ๋ ์คํธ๋ฆผ์ด ๋ชฉ๋ก์ ํ์๋์ง๋ง, like
๋์ starts_with
๋งค๊ฐ ๋ณ์๋ฅผ ์ฌ์ฉํฉ๋๋ค. ์ด ์์ ์์๋ ์ ํ์ ๋งค๊ฐ ๋ณ์ show_limit=10
๋ฅผ ์ค์ ํ์ฌ ๊ฒฐ๊ณผ ์๋ฅผ 10
๋ก ์ ํํฉ๋๋ค.
stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
print(stream_obj.name)
์คํธ๋ฆผ ์ญ์ ํ๊ธฐยถ
StreamResource
์ค๋ธ์ ํธ๊ฐ ํฌํจ๋ ์คํธ๋ฆผ์ ์ญ์ ํ ์ ์์ต๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ my_stream
์คํธ๋ฆผ ๋ฆฌ์์ค ์ค๋ธ์ ํธ๋ฅผ ๊ฐ์ ธ์จ ๋ค์ ์คํธ๋ฆผ์ ์ญ์ ํฉ๋๋ค.
my_stream_res = root.streams["my_stream"]
my_stream_res.drop()