Python์ ์ฌ์ฉํ์ฌ Snowflake ๋์ ํ ์ด๋ธ ๊ด๋ฆฌํ๊ธฐยถ
Python์ ์ฌ์ฉํ๋ฉด ์ง์์ ์ธ ์ฒ๋ฆฌ ํ์ดํ๋ผ์ธ์ ์ํ ์๋ก์ด ํ ์ด๋ธ ์ ํ์ธ Snowflake ๋์ ํ ์ด๋ธ์ ๊ด๋ฆฌํ ์ ์์ต๋๋ค. ๋์ ํ ์ด๋ธ์ ์ง์ ๋ ์ฟผ๋ฆฌ์ ๊ฒฐ๊ณผ๋ฅผ ๊ตฌ์ฒดํํฉ๋๋ค. ์ด ๊ธฐ๋ฅ์ ๋ํ ๊ฐ์๋ ๋์ ํ ์ด๋ธ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
Snowflake Python APIs ์ ๋ค์ ๋ ๊ฐ์ง ๋ณ๊ฐ ์ ํ์ ๋์ ํ ์ด๋ธ์ ๋ํ๋ ๋๋ค.
DynamicTable
: ๋์ ํ ์ด๋ธ์ ์์ฑ(์ด๋ฆ, ๋์ ์ง์ฐ, ์จ์ดํ์ฐ์ค, ์ฟผ๋ฆฌ ๋ฌธ ๋ฑ)์ ํ์ํฉ๋๋ค.DynamicTableResource
: ํด๋นDynamicTable
์ค๋ธ์ ํธ ๊ฐ์ ธ์ค๊ธฐ, ๋์ ํ ์ด๋ธ ์ผ์ ์ค๋จ ๋ฐ ์ฌ๊ฐ, ๋์ ํ ์ด๋ธ ์ญ์ ์ ์ฌ์ฉํ ์ ์๋ ๋ฉ์๋๋ฅผ ๋ ธ์ถํฉ๋๋ค.
์ ์ ์กฐ๊ฑดยถ
์ด ํญ๋ชฉ์ ์์ ์์๋ Snowflake์ ์ฐ๊ฒฐํ๊ณ Snowflake Python APIs ์ ์ฌ์ฉํ ์ ์๋ Root
์ค๋ธ์ ํธ๋ฅผ ์์ฑํ๋ ์ฝ๋๋ฅผ ์ถ๊ฐํ๋ค๊ณ ๊ฐ์ ํฉ๋๋ค.
์๋ฅผ ๋ค์ด, ๋ค์ ์ฝ๋๋ ๊ตฌ์ฑ ํ์ผ์ ์ ์๋ ์ฐ๊ฒฐ ๋งค๊ฐ ๋ณ์๋ฅผ ์ฌ์ฉํ์ฌ Snowflake์ ๋ํ ์ฐ๊ฒฐ์ ์์ฑํฉ๋๋ค.
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
ํด๋น ์ฝ๋์์๋ ๊ฒฐ๊ณผ Session
์ค๋ธ์ ํธ๋ฅผ ์ฌ์ฉํ์ฌ API์ ์ ํ๊ณผ ๋ฉ์๋๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด Root
์ค๋ธ์ ํธ๋ฅผ ์์ฑํฉ๋๋ค. ์์ธํ ๋ด์ฉ์ Snowflake Python APIs ์ ์ฌ์ฉํ์ฌ Snowflake์ ์ฐ๊ฒฐ ์น์
์ ์ฐธ์กฐํ์ญ์์ค.
๋์ ํ ์ด๋ธ ๋ง๋ค๊ธฐยถ
๋์ ํ
์ด๋ธ์ ์์ฑํ๋ ค๋ฉด ๋จผ์ DynamicTable
์ค๋ธ์ ํธ๋ฅผ ์์ฑํ ๋ค์ API Root
์ค๋ธ์ ํธ์์ DynamicTableCollection
์ค๋ธ์ ํธ๋ฅผ ์์ฑํฉ๋๋ค. DynamicTableCollection.create
๋ฅผ ์ฌ์ฉํ์ฌ Snowflake์ ์ ๋์ ํ
์ด๋ธ์ ์ถ๊ฐํฉ๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ ์ต์ํ์ ํ์ ์ต์
์ ์ง์ ํ์ฌ my_db
๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ฐ my_schema
์คํค๋ง์์ my_dynamic_table
์ด๋ผ๋ ๋์ ํ
์ด๋ธ์ ๋ํ๋ด๋ DynamicTable
์ค๋ธ์ ํธ๋ฅผ ์์ฑํฉ๋๋ค.
from snowflake.core.dynamic_table import DynamicTable, DownstreamLag
my_dt = DynamicTable(
name='my_dynamic_table',
target_lag=DownstreamLag(),
warehouse='my_wh',
query='SELECT * FROM t',
)
dynamic_tables = root.databases['my_db'].schemas['my_schema'].dynamic_tables
dynamic_tables.create(my_dt)
์ด ์ฝ๋๋ DynamicTableCollection
๋ณ์ dynamic_tables
๋ฅผ ์์ฑํ๊ณ DynamicTableCollection.create
๋ฅผ ์ฌ์ฉํ์ฌ Snowflake์ ์ ๋์ ํ
์ด๋ธ์ ์์ฑํฉ๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ ํ์ฌ ์ง์๋๋ ๋ชจ๋ ์ต์
์ ์ง์ ํ์ฌ my_db
๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ฐ my_schema
์คํค๋ง์์ my_dynamic_table2
์ด๋ผ๋ ๋์ ํ
์ด๋ธ์ ๋ํ๋ด๋ DynamicTable
์ค๋ธ์ ํธ๋ฅผ ์์ฑํฉ๋๋ค.
from snowflake.core.dynamic_table import DynamicTable, UserDefinedLag
root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
DynamicTable(
name='my_dynamic_table2',
kind='PERMANENT',
target_lag=UserDefinedLag(seconds=60),
warehouse='my_wh',
query='SELECT * FROM t',
refresh_mode='FULL',
initialize='ON_SCHEDULE',
cluster_by=['id > 1'],
comment='test table',
data_retention_time_in_days=7,
max_data_extension_time_in_days=7,
)
)
๋์ ํ ์ด๋ธ ๋ณต์ ํ๊ธฐยถ
๋ค์ ์์ ์ ์ฝ๋๋ my_db
๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์์ค ๋์ ํ
์ด๋ธ my_dynamic_table
๋ฐ my_schema
์คํค๋ง์์ ๋์ผํ ์ด ์ ์์ ๋ชจ๋ ๊ธฐ์กด ๋ฐ์ดํฐ๋ฅผ ์ฌ์ฉํ์ฌ my_dynamic_table2
์ด๋ผ๋ ์ด๋ฆ์ ์ ๋์ ํ
์ด๋ธ์ ์์ฑํฉ๋๋ค.
์ฐธ๊ณ
์ด ๋ณต์ ์์ ์ ์ ํ์
target_lag
๋ฐwarehouse
๋งค๊ฐ ๋ณ์๋ฅผ ํฌํจํ๋DynamicTableClone
์ค๋ธ์ ํธ๋ฅผ ์ฌ์ฉํ๋ฉฐ, ํ์ฌ ๋ค๋ฅธ ๋งค๊ฐ ๋ณ์๋ ์ง์ํ์ง ์์ต๋๋ค.
from snowflake.core.dynamic_table import DynamicTableClone
root.databases['my_db'].schemas['my_schema'].dynamic_tables.create(
DynamicTableClone(
name='my_dynamic_table2',
warehouse='my_wh2',
),
clone_table='my_dynamic_table',
)
์ด ๊ธฐ๋ฅ์ ๋ํ ์์ธํ ๋ด์ฉ์ CREATE DYNAMIC TABLE โฆ CLONE ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
๋์ ํ ์ด๋ธ ์ธ๋ถ ์ ๋ณด ์ป๊ธฐยถ
DynamicTable
์ค๋ธ์ ํธ๋ฅผ ๋ฐํํ๋ DynamicTableResource.fetch
๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ ๋์ ํ
์ด๋ธ์ ๋ํ ์ ๋ณด๋ฅผ ์ป์ ์ ์์ต๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ my_db
๋ฐ์ดํฐ๋ฒ ์ด์ค์ my_schema
์คํค๋ง์์ ์ด๋ฆ์ด my_dynamic_table
์ธ ๋์ ํ
์ด๋ธ์ ๋ํ ์ ๋ณด๋ฅผ ๊ฐ์ ธ์ต๋๋ค.
dynamic_table = root.databases['my_db'].schemas['my_schema'].dynamic_tables['my_dynamic_table']
dt_details = dynamic_table.fetch()
print(dt_details.to_dict())
๋์ ํ ์ด๋ธ ๋์ดํ๊ธฐยถ
DynamicTable
์ค๋ธ์ ํธ์ PagedIter
๋ฐ๋ณต๊ธฐ๋ฅผ ๋ฐํํ๋ DynamicTableCollection.iter
๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ๋์ ํ
์ด๋ธ์ ๋์ดํ ์ ์์ต๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ my_db
๋ฐ์ดํฐ๋ฒ ์ด์ค์ my_schema
์คํค๋ง์์ ์ด๋ฆ์ด my
ํ
์คํธ๋ก ์์ํ๋ ๋์ ํ
์ด๋ธ์ ๋์ดํ ๋ค์ ๊ฐ ํ
์ด๋ธ์ ์ด๋ฆ์ ์ถ๋ ฅํฉ๋๋ค.
from snowflake.core.dynamic_table import DynamicTableCollection
dt_list = root.databases['my_db'].schemas['my_schema'].dynamic_tables.iter(like='my%')
for dt_obj in dt_list:
print(dt_obj.name)
๋์ ํ ์ด๋ธ ์ด๋ฆ ๋ฐ๊พธ๊ธฐยถ
DynamicTableResource.swap_with
๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ๋จ์ผ ํธ๋์ญ์
์์ ๋์ ํ
์ด๋ธ์ ์ด๋ฆ์ ๋ค๋ฅธ ๋์ ํ
์ด๋ธ๊ณผ ๋ฐ๊ฟ ์ ์์ต๋๋ค. ์์ธํ ๋ด์ฉ์ SWAP WITH ๋งค๊ฐ ๋ณ์ ์ค๋ช
์ ALTER DYNAMICTABLE ์์ ์ฐธ์กฐํ์ญ์์ค.
๋ค์ ์์ ์ ์ฝ๋๋ ๋์ผํ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์คํค๋ง์์ my_dynamic_table
์ other_dynamic_table
๋ก ๋ฐ๊ฟ๋๋ค.
my_table_res = root.databases['my_db'].schemas['my_schema'].tables['my_dynamic_table']
my_table_res.swap_with('other_dynamic_table')
๋์ ํ ์ด๋ธ ์์ ์ํยถ
๋์ ํ
์ด๋ธ ์๋ก ๊ณ ์นจ, ์ผ์ ์ค๋จ ๋ฐ ์ฌ๊ฐ์ ๊ฐ์ ์ผ๋ฐ์ ์ธ ๋์ ํ
์ด๋ธ ์์
์ DynamicTableResource
์ค๋ธ์ ํธ๋ฅผ ์ฌ์ฉํ์ฌ ์ํํ ์ ์์ต๋๋ค.
์ด๋ฌํ ๋์ ํ ์ด๋ธ ์์ ์ ๋ํ ์์ธํ ๋ด์ฉ์ SQL ๋ช ๋ น ์ฐธ์กฐ์์ ํ ์ด๋ธ, ๋ทฐ ๋ฐ ์ํ์ค ๋ช ๋ น ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
๋์ ํ ์ด๋ธ ๋ฆฌ์์ค๋ก ํ ์ ์๋ ์ผ๋ถ ์์ ์ ๋ณด์ฌ์ฃผ๊ธฐ ์ํด ๋ค์ ์์ ์ ์ฝ๋๋ ๋ค์์ ์ํํฉ๋๋ค.
my_db
๋ฐ์ดํฐ๋ฒ ์ด์ค์my_schema
์คํค๋ง์์my_dynamic_table
๋์ ํ ์ด๋ธ ๋ฆฌ์์ค ์ค๋ธ์ ํธ๋ฅผ ๊ฐ์ ธ์ต๋๋ค.๋์ ํ ์ด๋ธ์ ์๋ก ๊ณ ์นฉ๋๋ค.
๋์ ํ ์ด๋ธ์ ์ผ์ ์ค๋จํฉ๋๋ค.
๋์ ํ ์ด๋ธ์ ์ฌ๊ฐํฉ๋๋ค.
๋์ ํ ์ด๋ธ์ ์ญ์ ํฉ๋๋ค.
์ ๊ฑฐํ ๋์ ํ ์ด๋ธ์ ๊ฐ์ฅ ์ต๊ทผ ๋ฒ์ ์ ๋ณต์ํฉ๋๋ค.
my_dynamic_table_res = root.databases["my_db"].schemas["my_schema"].dynamic_tables["my_dynamic_table"]
my_dynamic_table_res.refresh()
my_dynamic_table_res.suspend()
my_dynamic_table_res.resume()
my_dynamic_table_res.drop()
my_dynamic_table_res.undrop()