Python์„ ์‚ฌ์šฉํ•˜์—ฌ Snowflake ์ŠคํŠธ๋ฆผ ๊ด€๋ฆฌํ•˜๊ธฐยถ

Python์„ ์‚ฌ์šฉํ•˜์—ฌ ์‚ฝ์ž…, ์—…๋ฐ์ดํŠธ, ์‚ญ์ œ ๋“ฑ ํ…Œ์ด๋ธ”์— ๋Œ€ํ•œ ๋ฐ์ดํ„ฐ ์กฐ์ž‘ ์–ธ์–ด(DML) ๋ณ€๊ฒฝ ์‚ฌํ•ญ๊ณผ ๊ฐ ๋ณ€๊ฒฝ ์‚ฌํ•ญ์— ๋Œ€ํ•œ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ๊ธฐ๋กํ•˜๋Š” ์˜ค๋ธŒ์ ํŠธ์ธ Snowflake ์ŠคํŠธ๋ฆผ์„ ๊ด€๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ž์„ธํ•œ ๋‚ด์šฉ์€ ์ŠคํŠธ๋ฆผ ์†Œ๊ฐœ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

์ฐธ๊ณ 

ALTER STREAM ์€ ํ˜„์žฌ ์ง€์›๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

Snowflake Python APIs ์€ ๋‹ค์Œ ๋‘ ๊ฐ€์ง€ ์œ ํ˜•์˜ ์ŠคํŠธ๋ฆผ์„ ๋‚˜ํƒ€๋ƒ…๋‹ˆ๋‹ค.

  • Stream: ์ŠคํŠธ๋ฆผ์˜ ์ด๋ฆ„, ๋ชฉํ‘œ ์ง€์—ฐ, ์›จ์–ดํ•˜์šฐ์Šค, ์ฟผ๋ฆฌ ๋ฌธ๊ณผ ๊ฐ™์€ ์†์„ฑ์„ ๋…ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

  • StreamResource: ํ•ด๋‹น Stream ์˜ค๋ธŒ์ ํŠธ ๊ฐ€์ ธ์˜ค๊ธฐ, ์ŠคํŠธ๋ฆผ ์ผ์‹œ ์ค‘๋‹จ ๋ฐ ์žฌ๊ฐœ, ์ŠคํŠธ๋ฆผ ์‚ญ์ œ์— ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ๋ฉ”์„œ๋“œ๋ฅผ ๋…ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

์ „์ œ ์กฐ๊ฑดยถ

์ด ํ•ญ๋ชฉ์˜ ์˜ˆ์ œ์—์„œ๋Š” Snowflake์™€ ์—ฐ๊ฒฐํ•˜๊ณ  Snowflake Python APIs ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” Root ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์ƒ์„ฑํ•˜๋Š” ์ฝ”๋“œ๋ฅผ ์ถ”๊ฐ€ํ–ˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•ฉ๋‹ˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด, ๋‹ค์Œ ์ฝ”๋“œ๋Š” ๊ตฌ์„ฑ ํŒŒ์ผ์— ์ •์˜๋œ ์—ฐ๊ฒฐ ๋งค๊ฐœ ๋ณ€์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Snowflake์— ๋Œ€ํ•œ ์—ฐ๊ฒฐ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

ํ•ด๋‹น ์ฝ”๋“œ์—์„œ๋Š” ๊ฒฐ๊ณผ Session ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ API์˜ ์œ ํ˜•๊ณผ ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด Root ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. ์ž์„ธํ•œ ๋‚ด์šฉ์€ Snowflake Python APIs ์„ ์‚ฌ์šฉํ•˜์—ฌ Snowflake์— ์—ฐ๊ฒฐ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

์ŠคํŠธ๋ฆผ ๋งŒ๋“ค๊ธฐยถ

์ŠคํŠธ๋ฆผ์„ ์ƒ์„ฑํ•˜๋ ค๋ฉด ๋จผ์ € Stream ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์ƒ์„ฑํ•œ ๋‹ค์Œ API Root ์˜ค๋ธŒ์ ํŠธ์—์„œ StreamCollection ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. StreamCollection.create ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Snowflake์— ์ƒˆ ์ŠคํŠธ๋ฆผ์„ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ค๋ธŒ์ ํŠธ ์œ ํ˜•์—์„œ ์ŠคํŠธ๋ฆผ์„ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  • ํ‘œ์ค€ ํ…Œ์ด๋ธ”

  • ๋ทฐ

  • ๋””๋ ‰ํ„ฐ๋ฆฌ ํ…Œ์ด๋ธ”

์†Œ์Šค ํ…Œ์ด๋ธ”์—์„œยถ

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” my_db ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ ์†Œ์Šค ํ…Œ์ด๋ธ” my_table ๋ฐ my_schema ์Šคํ‚ค๋งˆ์— ์ง€์ •๋œ ์ŠคํŠธ๋ฆผ ์†์„ฑ์„ ๊ฐ€์ง„ ์ด๋ฆ„์ด my_stream_on_table ์ธ ์ŠคํŠธ๋ฆผ์„ ๋‚˜ํƒ€๋‚ด๋Š” Stream ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

์ฐธ๊ณ 

StreamSourceTable ์œ ํ˜•์€ ํ‘œ์ค€ ํ…Œ์ด๋ธ”๋งŒ ์ง€์›ํ•ฉ๋‹ˆ๋‹ค. ๋™์  ํ…Œ์ด๋ธ”, ์ด๋ฒคํŠธ ํ…Œ์ด๋ธ”, ์™ธ๋ถ€ ํ…Œ์ด๋ธ”, Iceberg ํ…Œ์ด๋ธ” ๋“ฑ ๋‹ค๋ฅธ ์œ ํ˜•์˜ ํ…Œ์ด๋ธ”์€ ํ˜„์žฌ ์ง€์›๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceTable

stream_on_table = Stream(
  "my_stream_on_table",
  StreamSourceTable(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_table',
      append_only = True,
      show_initial_rows = False,
  ),
  comment = 'create stream on table'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_table)
Copy

์ด ์ฝ”๋“œ๋Š” StreamCollection ๋ณ€์ˆ˜ streams ๋ฅผ ์ƒ์„ฑํ•˜๊ณ  StreamCollection.create ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Snowflake์— ์ƒˆ ์ŠคํŠธ๋ฆผ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

์†Œ์Šค ๋ทฐ์—์„œยถ

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” my_db ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ ์†Œ์Šค ๋ทฐ my_view ๋ฐ my_schema ์Šคํ‚ค๋งˆ์— ์ง€์ •๋œ ์ŠคํŠธ๋ฆผ ์†์„ฑ์„ ๊ฐ€์ง„ ์ด๋ฆ„์ด my_stream_on_view ์ธ ์ŠคํŠธ๋ฆผ์„ ๋‚˜ํƒ€๋‚ด๋Š” Stream ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceView

stream_on_view = Stream(
  "my_stream_on_view",
  StreamSourceView(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_view',
  ),
  comment = 'create stream on view'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_view)
Copy

์†Œ์Šค ๋””๋ ‰ํ„ฐ๋ฆฌ ํ…Œ์ด๋ธ”์—์„œยถ

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” my_db ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ ์†Œ์Šค ๋””๋ ‰ํ„ฐ๋ฆฌ ํ…Œ์ด๋ธ” my_directory_table ๋ฐ my_schema ์Šคํ‚ค๋งˆ์— ์ง€์ •๋œ ์ŠคํŠธ๋ฆผ ์†์„ฑ์„ ๊ฐ€์ง„ ์ด๋ฆ„์ด my_stream_on_directory_table ์ธ ์ŠคํŠธ๋ฆผ์„ ๋‚˜ํƒ€๋‚ด๋Š” Stream ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

from snowflake.core.stream import PointOfTimeOffset, Stream, StreamSourceStage

stream_on_directory_table = Stream(
  "my_stream_on_directory_table",
  StreamSourceStage(
      point_of_time = PointOfTimeOffset(reference="before", offset="1"),
      name = 'my_directory_table',
  ),
  comment = 'create stream on directory table'
)

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create(stream_on_directory_table)
Copy

์ŠคํŠธ๋ฆผ ๋ณต์ œํ•˜๊ธฐยถ

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” my_db ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋ฐ my_schema ์Šคํ‚ค๋งˆ์—์„œ ์†Œ์Šค ์ŠคํŠธ๋ฆผ my_other_stream ๊ณผ ์ •์˜๊ฐ€ ๋™์ผํ•œ ์ด๋ฆ„์ด my_stream ์ธ ์ƒˆ ์ŠคํŠธ๋ฆผ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

from snowflake.core.stream import Stream

streams = root.databases['my_db'].schemas['my_schema'].streams
streams.create("my_stream", clone_stream="my_other_stream")
Copy

์ŠคํŠธ๋ฆผ ์„ธ๋ถ€ ์ •๋ณด ๊ฐ€์ ธ์˜ค๊ธฐยถ

Stream ์˜ค๋ธŒ์ ํŠธ๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋Š” StreamResource.fetch ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ์ŠคํŠธ๋ฆผ์— ๋Œ€ํ•œ ์ •๋ณด๋ฅผ ์–ป์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” my_db ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ my_schema ์Šคํ‚ค๋งˆ์—์„œ ์ด๋ฆ„์ด my_stream ์ธ ์ŠคํŠธ๋ฆผ์— ๋Œ€ํ•œ ์ •๋ณด๋ฅผ ๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค.

stream = root.databases['my_db'].schemas['my_schema'].streams['my_stream']
stream_details = stream.fetch()
print(stream_details.to_dict())
Copy

์ŠคํŠธ๋ฆผ ๋‚˜์—ดํ•˜๊ธฐยถ

Stream ์˜ค๋ธŒ์ ํŠธ์˜ PagedIter ๋ฐ˜๋ณต๊ธฐ๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋Š” StreamCollection.iter ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ŠคํŠธ๋ฆผ์„ ๋‚˜์—ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” my_db ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ my_schema ์Šคํ‚ค๋งˆ์—์„œ ์ด๋ฆ„์ด my ๋กœ ์‹œ์ž‘ํ•˜๋Š” ์ŠคํŠธ๋ฆผ์„ ๋‚˜์—ดํ•œ ๋‹ค์Œ ๊ฐ๊ฐ์˜ ์ด๋ฆ„์„ ์ถœ๋ ฅํ•ฉ๋‹ˆ๋‹ค.

stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(like='my%')
for stream_obj in stream_list:
  print(stream_obj.name)
Copy

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ์—๋„ ์ด๋ฆ„์ด my ๋กœ ์‹œ์ž‘ํ•˜๋Š” ์ŠคํŠธ๋ฆผ์ด ๋ชฉ๋ก์— ํ‘œ์‹œ๋˜์ง€๋งŒ, like ๋Œ€์‹  starts_with ๋งค๊ฐœ ๋ณ€์ˆ˜๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์ด ์˜ˆ์ œ์—์„œ๋Š” ์„ ํƒ์  ๋งค๊ฐœ ๋ณ€์ˆ˜ show_limit=10 ๋ฅผ ์„ค์ •ํ•˜์—ฌ ๊ฒฐ๊ณผ ์ˆ˜๋ฅผ 10 ๋กœ ์ œํ•œํ•ฉ๋‹ˆ๋‹ค.

stream_list = root.databases['my_db'].schemas['my_schema'].streams.iter(starts_with="my", show_limit=10)
for stream_obj in stream_list:
  print(stream_obj.name)
Copy

์ŠคํŠธ๋ฆผ ์‚ญ์ œํ•˜๊ธฐยถ

StreamResource ์˜ค๋ธŒ์ ํŠธ๊ฐ€ ํฌํ•จ๋œ ์ŠคํŠธ๋ฆผ์„ ์‚ญ์ œํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” my_stream ์ŠคํŠธ๋ฆผ ๋ฆฌ์†Œ์Šค ์˜ค๋ธŒ์ ํŠธ๋ฅผ ๊ฐ€์ ธ์˜จ ๋‹ค์Œ ์ŠคํŠธ๋ฆผ์„ ์‚ญ์ œํ•ฉ๋‹ˆ๋‹ค.

my_stream_res = root.streams["my_stream"]
my_stream_res.drop()
Copy