Python์„ ์‚ฌ์šฉํ•˜์—ฌ Snowflake ๋™์  ํ…Œ์ด๋ธ” ๊ด€๋ฆฌํ•˜๊ธฐยถ

Python์„ ์‚ฌ์šฉํ•˜๋ฉด ์ง€์†์ ์ธ ์ฒ˜๋ฆฌ ํŒŒ์ดํ”„๋ผ์ธ์„ ์œ„ํ•œ ์ƒˆ๋กœ์šด ํ…Œ์ด๋ธ” ์œ ํ˜•์ธ Snowflake ๋™์  ํ…Œ์ด๋ธ”์„ ๊ด€๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋™์  ํ…Œ์ด๋ธ”์€ ์ง€์ •๋œ ์ฟผ๋ฆฌ์˜ ๊ฒฐ๊ณผ๋ฅผ ๊ตฌ์ฒดํ™”ํ•ฉ๋‹ˆ๋‹ค. ์ด ๊ธฐ๋Šฅ์— ๋Œ€ํ•œ ๊ฐœ์š”๋Š” ๋™์  ํ…Œ์ด๋ธ” ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

Snowflake Python APIs ์€ ๋‹ค์Œ ๋‘ ๊ฐ€์ง€ ๋ณ„๊ฐœ ์œ ํ˜•์˜ ๋™์  ํ…Œ์ด๋ธ”์„ ๋‚˜ํƒ€๋ƒ…๋‹ˆ๋‹ค.

  • DynamicTable: ๋™์  ํ…Œ์ด๋ธ”์˜ ์†์„ฑ(์ด๋ฆ„, ๋Œ€์ƒ ์ง€์—ฐ, ์›จ์–ดํ•˜์šฐ์Šค, ์ฟผ๋ฆฌ ๋ฌธ ๋“ฑ)์„ ํ‘œ์‹œํ•ฉ๋‹ˆ๋‹ค.

  • DynamicTableResource: ํ•ด๋‹น DynamicTable ์˜ค๋ธŒ์ ํŠธ ๊ฐ€์ ธ์˜ค๊ธฐ, ๋™์  ํ…Œ์ด๋ธ” ์ผ์‹œ ์ค‘๋‹จ ๋ฐ ์žฌ๊ฐœ, ๋™์  ํ…Œ์ด๋ธ” ์‚ญ์ œ์— ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ๋ฉ”์„œ๋“œ๋ฅผ ๋…ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

์ „์ œ ์กฐ๊ฑดยถ

์ด ํ•ญ๋ชฉ์˜ ์˜ˆ์ œ์—์„œ๋Š” Snowflake์™€ ์—ฐ๊ฒฐํ•˜๊ณ  Snowflake Python APIs ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” Root ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์ƒ์„ฑํ•˜๋Š” ์ฝ”๋“œ๋ฅผ ์ถ”๊ฐ€ํ–ˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•ฉ๋‹ˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด, ๋‹ค์Œ ์ฝ”๋“œ๋Š” ๊ตฌ์„ฑ ํŒŒ์ผ์— ์ •์˜๋œ ์—ฐ๊ฒฐ ๋งค๊ฐœ ๋ณ€์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Snowflake์— ๋Œ€ํ•œ ์—ฐ๊ฒฐ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

ํ•ด๋‹น ์ฝ”๋“œ์—์„œ๋Š” ๊ฒฐ๊ณผ Session ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ API์˜ ์œ ํ˜•๊ณผ ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด Root ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. ์ž์„ธํ•œ ๋‚ด์šฉ์€ Snowflake Python APIs ์„ ์‚ฌ์šฉํ•˜์—ฌ Snowflake์— ์—ฐ๊ฒฐ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

๋™์  ํ…Œ์ด๋ธ” ๋งŒ๋“ค๊ธฐยถ

๋™์  ํ…Œ์ด๋ธ”์„ ์ƒ์„ฑํ•˜๋ ค๋ฉด ๋จผ์ € DynamicTable ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์ƒ์„ฑํ•œ ๋‹ค์Œ API Root ์˜ค๋ธŒ์ ํŠธ์—์„œ DynamicTableCollection ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. DynamicTableCollection.create ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Snowflake์— ์ƒˆ ๋™์  ํ…Œ์ด๋ธ”์„ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” ์ตœ์†Œํ•œ์˜ ํ•„์ˆ˜ ์˜ต์…˜์„ ์ง€์ •ํ•˜์—ฌ my_db ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋ฐ my_schema ์Šคํ‚ค๋งˆ์—์„œ my_dynamic_table ์ด๋ผ๋Š” ๋™์  ํ…Œ์ด๋ธ”์„ ๋‚˜ํƒ€๋‚ด๋Š” DynamicTable ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

from snowflake.core.dynamic_table import DynamicTable, DownstreamLag

my_dt = DynamicTable(
  name='my_dynamic_table',
  target_lag=DownstreamLag(),
  warehouse='my_wh',
  query='SELECT * FROM t',
)
dynamic_tables = root.databases['my_db'].schemas['my_schema'].dynamic_tables
dynamic_tables.create(my_dt)
Copy

์ด ์ฝ”๋“œ๋Š” DynamicTableCollection ๋ณ€์ˆ˜ dynamic_tables ๋ฅผ ์ƒ์„ฑํ•˜๊ณ  DynamicTableCollection.create ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Snowflake์— ์ƒˆ ๋™์  ํ…Œ์ด๋ธ”์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” ํ˜„์žฌ ์ง€์›๋˜๋Š” ๋ชจ๋“  ์˜ต์…˜์„ ์ง€์ •ํ•˜์—ฌ my_db ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋ฐ my_schema ์Šคํ‚ค๋งˆ์—์„œ my_dynamic_table2 ์ด๋ผ๋Š” ๋™์  ํ…Œ์ด๋ธ”์„ ๋‚˜ํƒ€๋‚ด๋Š” DynamicTable ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

from snowflake.core.dynamic_table import DynamicTable, UserDefinedLag

root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
  DynamicTable(
      name='my_dynamic_table2',
      kind='PERMANENT',
      target_lag=UserDefinedLag(seconds=60),
      warehouse='my_wh',
      query='SELECT * FROM t',
      refresh_mode='FULL',
      initialize='ON_SCHEDULE',
      cluster_by=['id > 1'],
      comment='test table',
      data_retention_time_in_days=7,
      max_data_extension_time_in_days=7,
  )
)
Copy

๋™์  ํ…Œ์ด๋ธ” ๋ณต์ œํ•˜๊ธฐยถ

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” my_db ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์˜ ์†Œ์Šค ๋™์  ํ…Œ์ด๋ธ” my_dynamic_table ๋ฐ my_schema ์Šคํ‚ค๋งˆ์—์„œ ๋™์ผํ•œ ์—ด ์ •์˜์™€ ๋ชจ๋“  ๊ธฐ์กด ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ my_dynamic_table2 ์ด๋ผ๋Š” ์ด๋ฆ„์˜ ์ƒˆ ๋™์  ํ…Œ์ด๋ธ”์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

์ฐธ๊ณ 

์ด ๋ณต์ œ ์ž‘์—…์€ ์„ ํƒ์  target_lag ๋ฐ warehouse ๋งค๊ฐœ ๋ณ€์ˆ˜๋ฅผ ํฌํ•จํ•˜๋Š” DynamicTableClone ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉฐ, ํ˜„์žฌ ๋‹ค๋ฅธ ๋งค๊ฐœ ๋ณ€์ˆ˜๋Š” ์ง€์›ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

from snowflake.core.dynamic_table import DynamicTableClone

root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
  DynamicTableClone(
      name='my_dynamic_table2',
      warehouse='my_wh2',
  ),
  clone_table='my_dynamic_table',
)
Copy

์ด ๊ธฐ๋Šฅ์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ CREATE DYNAMIC TABLE โ€ฆ CLONE ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

๋™์  ํ…Œ์ด๋ธ” ์„ธ๋ถ€ ์ •๋ณด ์–ป๊ธฐยถ

DynamicTable ์˜ค๋ธŒ์ ํŠธ๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋Š” DynamicTableResource.fetch ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ๋™์  ํ…Œ์ด๋ธ”์— ๋Œ€ํ•œ ์ •๋ณด๋ฅผ ์–ป์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” my_db ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ my_schema ์Šคํ‚ค๋งˆ์—์„œ ์ด๋ฆ„์ด my_dynamic_table ์ธ ๋™์  ํ…Œ์ด๋ธ”์— ๋Œ€ํ•œ ์ •๋ณด๋ฅผ ๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค.

dynamic_table = root.databases['my_db'].schemas['my_schema'].dynamic_tables['my_dynamic_table']
dt_details = dynamic_table.fetch()
print(dt_details.to_dict())
Copy

๋™์  ํ…Œ์ด๋ธ” ๋‚˜์—ดํ•˜๊ธฐยถ

DynamicTable ์˜ค๋ธŒ์ ํŠธ์˜ PagedIter ๋ฐ˜๋ณต๊ธฐ๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋Š” DynamicTableCollection.iter ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋™์  ํ…Œ์ด๋ธ”์„ ๋‚˜์—ดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” my_db ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ my_schema ์Šคํ‚ค๋งˆ์—์„œ ์ด๋ฆ„์ด my ํ…์ŠคํŠธ๋กœ ์‹œ์ž‘ํ•˜๋Š” ๋™์  ํ…Œ์ด๋ธ”์„ ๋‚˜์—ดํ•œ ๋‹ค์Œ ๊ฐ ํ…Œ์ด๋ธ”์˜ ์ด๋ฆ„์„ ์ถœ๋ ฅํ•ฉ๋‹ˆ๋‹ค.

from snowflake.core.dynamic_table import DynamicTableCollection

dt_list = root.databases['my_db'].schemas['my_schema'].dynamic_tables.iter(like='my%')
for dt_obj in dt_list:
  print(dt_obj.name)
Copy

๋™์  ํ…Œ์ด๋ธ” ์ด๋ฆ„ ๋ฐ”๊พธ๊ธฐยถ

DynamicTableResource.swap_with ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋‹จ์ผ ํŠธ๋žœ์žญ์…˜์—์„œ ๋™์  ํ…Œ์ด๋ธ”์˜ ์ด๋ฆ„์„ ๋‹ค๋ฅธ ๋™์  ํ…Œ์ด๋ธ”๊ณผ ๋ฐ”๊ฟ€ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ž์„ธํ•œ ๋‚ด์šฉ์€ SWAP WITH ๋งค๊ฐœ ๋ณ€์ˆ˜ ์„ค๋ช…์„ ALTER DYNAMICTABLE ์—์„œ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” ๋™์ผํ•œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ ์Šคํ‚ค๋งˆ์—์„œ my_dynamic_table ์„ other_dynamic_table ๋กœ ๋ฐ”๊ฟ‰๋‹ˆ๋‹ค.

my_table_res = root.databases['my_db'].schemas['my_schema'].tables['my_dynamic_table']
my_table_res.swap_with('other_dynamic_table')
Copy

๋™์  ํ…Œ์ด๋ธ” ์ž‘์—… ์ˆ˜ํ–‰ยถ

๋™์  ํ…Œ์ด๋ธ” ์ƒˆ๋กœ ๊ณ ์นจ, ์ผ์‹œ ์ค‘๋‹จ ๋ฐ ์žฌ๊ฐœ์™€ ๊ฐ™์€ ์ผ๋ฐ˜์ ์ธ ๋™์  ํ…Œ์ด๋ธ” ์ž‘์—…์„ DynamicTableResource ์˜ค๋ธŒ์ ํŠธ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ด๋Ÿฌํ•œ ๋™์  ํ…Œ์ด๋ธ” ์ž‘์—…์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ SQL ๋ช…๋ น ์ฐธ์กฐ์—์„œ ํ…Œ์ด๋ธ”, ๋ทฐ ๋ฐ ์‹œํ€€์Šค ๋ช…๋ น ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

๋™์  ํ…Œ์ด๋ธ” ๋ฆฌ์†Œ์Šค๋กœ ํ•  ์ˆ˜ ์žˆ๋Š” ์ผ๋ถ€ ์ž‘์—…์„ ๋ณด์—ฌ์ฃผ๊ธฐ ์œ„ํ•ด ๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” ๋‹ค์Œ์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

  1. my_db ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ my_schema ์Šคํ‚ค๋งˆ์—์„œ my_dynamic_table ๋™์  ํ…Œ์ด๋ธ” ๋ฆฌ์†Œ์Šค ์˜ค๋ธŒ์ ํŠธ๋ฅผ ๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค.

  2. ๋™์  ํ…Œ์ด๋ธ”์„ ์ƒˆ๋กœ ๊ณ ์นฉ๋‹ˆ๋‹ค.

  3. ๋™์  ํ…Œ์ด๋ธ”์„ ์ผ์‹œ ์ค‘๋‹จํ•ฉ๋‹ˆ๋‹ค.

  4. ๋™์  ํ…Œ์ด๋ธ”์„ ์žฌ๊ฐœํ•ฉ๋‹ˆ๋‹ค.

  5. ๋™์  ํ…Œ์ด๋ธ”์„ ์‚ญ์ œํ•ฉ๋‹ˆ๋‹ค.

  6. ์ œ๊ฑฐํ•œ ๋™์  ํ…Œ์ด๋ธ”์˜ ๊ฐ€์žฅ ์ตœ๊ทผ ๋ฒ„์ „์„ ๋ณต์›ํ•ฉ๋‹ˆ๋‹ค.

my_dynamic_table_res = root.databases["my_db"].schemas["my_schema"].dynamic_tables["my_dynamic_table"]

my_dynamic_table_res.refresh()
my_dynamic_table_res.suspend()
my_dynamic_table_res.resume()
my_dynamic_table_res.drop()
my_dynamic_table_res.undrop()
Copy