Python์ผ๋ก UDTF ์์ฑํ๊ธฐยถ
Python์ผ๋ก ์ฌ์ฉ์ ์ ์ ํ ์ด๋ธ ํจ์ (UDTF) ์ฒ๋ฆฌ๊ธฐ๋ฅผ ๊ตฌํํ ์ ์์ต๋๋ค. ์ด ํธ๋ค๋ฌ ์ฝ๋๋ UDTF ํธ์ถ ์ ์คํ๋ฉ๋๋ค. ์ด ํญ๋ชฉ์์๋ Python์ผ๋ก ํธ๋ค๋ฌ๋ฅผ ๊ตฌํํ๊ณ UDTF๋ฅผ ๋ง๋๋ ๋ฐฉ๋ฒ์ ์ค๋ช ํฉ๋๋ค.
UDTF๋ ํ ์ด๋ธ ํ์์ ๊ฒฐ๊ณผ๋ฅผ ๋ฐํํ๋ ์ฌ์ฉ์ ์ ์ ํจ์(UDF)์ ๋๋ค. Python์ผ๋ก ๊ตฌํํ UDF ํธ๋ค๋ฌ์ ๋ํ ์์ธํ ๋ด์ฉ์ Python UDF ๋ง๋ค๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค. UDF์ ๋ํ ๋ณด๋ค ์ผ๋ฐ์ ์ธ ๋ด์ฉ์ ์ฌ์ฉ์ ์ ์ ํจ์ ๊ฐ์ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
UDTF์ฉ ํธ๋ค๋ฌ์์ ์ ๋ ฅ ํ์ ์ฒ๋ฆฌํ ์ ์์ต๋๋ค(์ด ํญ๋ชฉ์ ํ ์ฒ๋ฆฌํ๊ธฐ ์ฐธ์กฐ). ๋ํ ๊ฐ ์ ๋ ฅ ํํฐ์ ์ ๋ํด ์คํ๋๋ ๋ ผ๋ฆฌ๊ฐ ์์ ์๋ ์์ต๋๋ค(์ด ํญ๋ชฉ์ ํํฐ์ ์ฒ๋ฆฌํ๊ธฐ ์ฐธ์กฐ).
Python UDTF๋ฅผ ๋ง๋ค ๋ ๋ค์์ ์ํํฉ๋๋ค.
UDTF ํธ์ถ ์ Snowflake๊ฐ ํธ์ถํ ๋ฉ์๋๋ก ํด๋์ค๋ฅผ ๊ตฌํํฉ๋๋ค.
์์ธํ ๋ด์ฉ์ ์ด ํญ๋ชฉ์ ํธ๋ค๋ฌ ๊ตฌํํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
CREATE FUNCTION ๋ช ๋ น์ผ๋ก SQL๋ก ์์ฑ๋ UDTF๋ฅผ ๋ง๋ค์ด ํด๋์ค๋ฅผ ํธ๋ค๋ฌ๋ก ์ง์ ํฉ๋๋ค. UDTF๋ฅผ ๋ง๋ค ๋ ๋ค์์ ์ง์ ํฉ๋๋ค.
UDTF ์ ๋ ฅ ๋งค๊ฐ ๋ณ์์ ๋ฐ์ดํฐ ํ์ .
UDTF์์ ๋ฐํ๋ ์ด์ ๋ฐ์ดํฐ ํ์ .
UDTF ํธ์ถ ์ ํธ๋ค๋ฌ๋ก ์คํํ๋ ์ฝ๋.
ํธ๋ค๋ฌ๊ฐ ๊ตฌํ๋๋ ์ธ์ด.
๊ตฌ๋ฌธ์ ๋ํ ์์ธํ ๋ด์ฉ์ ์ด ํญ๋ชฉ์ CREATE FUNCTION ์ผ๋ก UDTF ๋ง๋ค๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
UDF ์คํํ๊ธฐ ์ ์ค๋ช ๋ ๋๋ก UDF ๋๋ UDTF๋ฅผ ํธ์ถํ ์ ์์ต๋๋ค.
์ฐธ๊ณ
ํ ์ด๋ธ ํจ์(UDTF)์ ์ ๋ ฅ ์ธ์ 500๊ฐ, ์ถ๋ ฅ ์ด 500๊ฐ๋ก ์ ํ๋ฉ๋๋ค.
Snowflake๋ ํ์ฌ ๋ค์ Python ๋ฒ์ ์์ UDF ์์ฑ์ ์ง์ํฉ๋๋ค.
3.9
3.10
3.11
3.12
CREATE FUNCTION ๋ฌธ์์ runtime_version
์ ์ํ๋ ๋ฒ์ ์ผ๋ก ์ค์ ํฉ๋๋ค.
ํธ๋ค๋ฌ ๊ตฌํํ๊ธฐยถ
UDTF ์ธ์ ๊ฐ์ ํ ์ด๋ธ ํ์์ ๊ฒฐ๊ณผ๋ก ์ฒ๋ฆฌํ๊ณ ๋ถํ ๋ ์ ๋ ฅ์ ์ฒ๋ฆฌํ๋ ํธ๋ค๋ฌ ํด๋์ค๋ฅผ ๊ตฌํํฉ๋๋ค. ํธ๋ค๋ฌ ํด๋์ค์ ์๋ ์ด ํญ๋ชฉ์ ์ฒ๋ฆฌ๊ธฐ ํด๋์ค์ ์ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
CREATE FUNCTION์ผ๋ก UDTF๋ฅผ ๋ง๋ค ๋ ์ด ํด๋์ค๋ฅผ UDTF์ ํธ๋ค๋ฌ๋ก ์ง์ ํฉ๋๋ค. ํจ์๋ฅผ ๋ง๋๋ SQL์ ๋ํ ์์ธํ ๋ด์ฉ์ ์ด ํญ๋ชฉ์ CREATE FUNCTION ์ผ๋ก UDTF ๋ง๋ค๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
ํธ๋ค๋ฌ ํด๋์ค๋ UDTF ํธ์ถ ์ Snowflake๊ฐ ํธ์ถํ ๋ฉ์๋๋ฅผ ๊ตฌํํฉ๋๋ค. ์ด ํด๋์ค๋ UDTF์ ๋ ผ๋ฆฌ๋ฅผ ํฌํจํฉ๋๋ค.
๋ฉ์๋ |
์๊ตฌ ์ฌํญ |
์ค๋ช |
---|---|---|
|
์ ํ ์ฌํญ |
์ ๋ ฅ ํํฐ์ ์ ์ํ ์ ์ฅ ์ฒ๋ฆฌ๋ฅผ ์ํ ์ํ๋ฅผ ์ด๊ธฐํํฉ๋๋ค. ์์ธํ ๋ด์ฉ์ ์ด ํญ๋ชฉ์ ์ฒ๋ฆฌ๊ธฐ ์ด๊ธฐํํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค. |
|
ํ์ |
๊ฐ ์ ๋ ฅ ํ์ ์ฒ๋ฆฌํ์ฌ ํ ์ด๋ธ ํ์ ๊ฐ์ ํํ๋ก ๋ฐํํฉ๋๋ค. Snowflake๋ ์ด ๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ UDTF์ ์ธ์์์ ์ ๋ ฅ๊ฐ์ ์ ๋ฌํฉ๋๋ค. ์์ธํ ๋ด์ฉ์ ์ด ํญ๋ชฉ์ process ๋ฉ์๋ ์ ์ํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค. |
|
์ ํ ์ฌํญ |
์ ๋ ฅ ํํฐ์ ์ฒ๋ฆฌ๋ฅผ ์๋ฃํ์ฌ ํ ์ด๋ธ ํ์ ๊ฐ์ ํํ๋ก ๋ฐํํฉ๋๋ค. ์์ธํ ๋ด์ฉ์ ์ด ํญ๋ชฉ์ ํํฐ์ ์ฒ๋ฆฌ ๋ง๋ฌด๋ฆฌํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค. |
ํธ๋ค๋ฌ ํด๋์ค์ ์์์ ๋ฉ์๋์์ ์์ธ๊ฐ ๋ฐ์ํ๋ฉด ์ฒ๋ฆฌ๊ฐ ์ค์ง๋ฉ๋๋ค. UDTF๋ฅผ ํธ์ถํ ์ฟผ๋ฆฌ๋ ์ค๋ฅ ๋ฉ์์ง์ ํจ๊ป ์คํจํฉ๋๋ค.
์ฐธ๊ณ
์ฝ๋๊ฐ ์ฌ๊ธฐ์ ์ค๋ช ๋ ์๊ตฌ ์ฌํญ์ ์ถฉ์กฑํ์ง ์์ผ๋ฉด UDTF ์์ฑ ๋๋ ์คํ์ด ์คํจํ ์ ์์ต๋๋ค. Snowflake๋ CREATE FUNCTION ๋ฌธ ์คํ ์ ์๋ฐ ์ฌํญ์ ๊ฐ์งํฉ๋๋ค.
์ฒ๋ฆฌ๊ธฐ ํด๋์ค์ ์ยถ
๋ค์ ์์ ์ ์ฝ๋๋ฅผ ์คํํ๋ฉด ํํฐ์
์ ํ์ ์ฒ๋ฆฌํ๋ ํธ๋ค๋ฌ ํด๋์ค๊ฐ ์๋ UDTF๊ฐ ์์ฑ๋ฉ๋๋ค. process
๋ฉ์๋๋ ๊ฐ ์
๋ ฅ ํ์ ์ฒ๋ฆฌํ์ฌ ์ฃผ์ ๋งค๋งค์ ์ด๋น์ฉ์ด ์๋ ํ์ ๋ฐํํฉ๋๋ค. ํํฐ์
์ ํ์ ์ฒ๋ฆฌํ ํ, ํํฐ์
์ ํฌํจ๋ ๋ชจ๋ ๋งค๋งค์ ํฉ๊ณ๋ฅผ end_partition
๋ฉ์๋์์ ๋ฐํํฉ๋๋ค.
CREATE OR REPLACE FUNCTION stock_sale_sum(symbol VARCHAR, quantity NUMBER, price NUMBER(10,2))
RETURNS TABLE (symbol VARCHAR, total NUMBER(10,2))
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'StockSaleSum'
AS $$
class StockSaleSum:
def __init__(self):
self._cost_total = 0
self._symbol = ""
def process(self, symbol, quantity, price):
self._symbol = symbol
cost = quantity * price
self._cost_total += cost
yield (symbol, cost)
def end_partition(self):
yield (self._symbol, self._cost_total)
$$;
๋ค์ ์์ ์ ์ฝ๋์์๋ ์ด์ UDF๋ฅผ ํธ์ถํ์ฌ stocks_table
ํ
์ด๋ธ์ symbol
, quantity
๋ฐ price
์ด์ ๊ฐ์ ์ ๋ฌํฉ๋๋ค. UDTF ํธ์ถ์ ๋ํ ์์ธํ ๋ด์ฉ์ UDF ์คํํ๊ธฐ ์น์
์ ์ฐธ์กฐํ์ญ์์ค.
SELECT stock_sale_sum.symbol, total
FROM stocks_table, TABLE(stock_sale_sum(symbol, quantity, price) OVER (PARTITION BY symbol));
์ฒ๋ฆฌ๊ธฐ ์ด๊ธฐํํ๊ธฐยถ
ํธ๋ค๋ฌ๊ฐ ํ ์ฒ๋ฆฌ๋ฅผ ์์ํ๊ธฐ ์ ์ Snowflake๊ฐ ํธ์ถํ ํธ๋ค๋ฌ ํด๋์ค์์ __init__
๋ฉ์๋๋ฅผ ์ ํ์ ์ผ๋ก ๊ตฌํํ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด, ์ด ๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ํธ๋ค๋ฌ์ ๋ํด ํํฐ์
๋ฒ์๊ฐ ์ง์ ๋ ์ํ๋ฅผ ์ค์ ํ ์ ์์ต๋๋ค. __init__
๋ฉ์๋๋ก ์ถ๋ ฅ ํ์ด ์์ฑ๋์ง ์์ ์ ์์ต๋๋ค.
์ด ๋ฉ์๋์ ์๋ช ์ ๋ค์๊ณผ ๊ฐ์ ํ์์ด์ด์ผ ํฉ๋๋ค.
def __init__(self):
์๋ฅผ ๋ค์ด ๋ค์๊ณผ ๊ฐ์ด ํ ์ ์์ต๋๋ค.
ํํฐ์ ์ ์ํ๋ฅผ ์ด๊ธฐํํ ๋ค์
process
๋ฐend_partition
๋ฉ์๋์์ ์ด ์ํ๋ฅผ ์ฌ์ฉํฉ๋๋ค.ํ๋น ํ ๋ฒ์ด ์๋๋ผ ํํฐ์ ๋น ํ ๋ฒ๋ง ์ํํด์ผ ํ๋ ์ฅ๊ธฐ ์คํ ์ด๊ธฐํ๋ฅผ ์คํํฉ๋๋ค.
์ฐธ๊ณ
ํธ๋ค๋ฌ ํด๋์ค ์ธ๋ถ์ ํด๋น ์ฝ๋๋ฅผ ํฌํจํจ์ผ๋ก์จ, ํด๋์ค ์ ์ธ ์ด์ ๊ณผ ๊ฐ์ด ํํฐ์ ์ฒ๋ฆฌ๊ฐ ์์๋๊ธฐ ์ ์ ๋ ผ๋ฆฌ๋ฅผ ํ ๋ฒ ์คํํ ์๋ ์์ต๋๋ค.
ํํฐ์ ์ฒ๋ฆฌ์ ๋ํ ์์ธํ ๋ด์ฉ์ ์ด ํญ๋ชฉ์ ํํฐ์ ์ฒ๋ฆฌํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
__init__
๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ __init__
๊ฐ ๋ค์๊ณผ ๊ฐ์ ํน์ง์ด ์๋ค๋ ์ ์ ์ ๋
ํ์ญ์์ค.
self
๋ง ์ธ์๋ก ์ทจํ ์ ์์ต๋๋ค.์ถ๋ ฅ ํ์ ์์ฑํ ์ ์์ต๋๋ค. ๊ทธ ๋์
process
๋ฉ์๋ ๊ตฌํ์ ์ฌ์ฉํ์ญ์์ค.๊ฐ ํํฐ์ ์ ๋ํด ํ ๋ฒ, ๊ทธ๋ฆฌ๊ณ
process
๋ฉ์๋๊ฐ ํธ์ถ๋๊ธฐ ์ ์ ํธ์ถ๋ฉ๋๋ค.
ํ ์ฒ๋ฆฌํ๊ธฐยถ
Snowflake๊ฐ ๊ฐ ์
๋ ฅ ํ์ ๋ํด ํธ์ถํ process
๋ฉ์๋๋ฅผ ๊ตฌํํฉ๋๋ค.
process
๋ฉ์๋ ์ ์ํ๊ธฐยถ
SQL ํ์์์ ๋ณํ๋ UDTF ์ธ์๋ฅผ ๊ฐ์ผ๋ก ๋ฐ๋ process
๋ฉ์๋๋ฅผ ์ ์ํ์ฌ Snowflake๊ฐ UDTF์ ํ
์ด๋ธ ํ์ ๋ฐํ ๊ฐ์ ๋ง๋๋ ๋ฐ ์ฌ์ฉํ ๋ฐ์ดํฐ๋ฅผ ๋ฐํํฉ๋๋ค.
์ด ๋ฉ์๋์ ์๋ช ์ ๋ค์๊ณผ ๊ฐ์ ํ์์ด์ด์ผ ํฉ๋๋ค.
def process(self, *args):
process
๋ฉ์๋๋ ๋ค์ ์กฐ๊ฑด์ ์ถฉ์กฑํด์ผ ํฉ๋๋ค.
self
๋งค๊ฐ ๋ณ์๊ฐ ์์ด์ผ ํฉ๋๋ค.UDTF ๋งค๊ฐ ๋ณ์์ ํด๋นํ๋ ๋ฉ์๋ ๋งค๊ฐ ๋ณ์๋ฅผ ์ ์ธํด์ผ ํฉ๋๋ค.
๋ฉ์๋ ๋งค๊ฐ ๋ณ์ ์ด๋ฆ์ด UDTF ๋งค๊ฐ ๋ณ์ ์ด๋ฆ๊ณผ ์ผ์นํ ํ์๋ ์์ง๋ง, UDTF ๋งค๊ฐ ๋ณ์๊ฐ ์ ์ธ๋ ๊ฒ๊ณผ ๊ฐ์ ์์๋ก ๋ฉ์๋ ๋งค๊ฐ ๋ณ์๋ฅผ ์ ์ธํด์ผ ํฉ๋๋ค.
UDTF ์ธ์ ๊ฐ์ ๋ฉ์๋์ ์ ๋ฌํ ๋ Snowflake๋ ๊ฐ์ SQL ํ์์์ ๋ฉ์๋์์ ์ฌ์ฉํ๋ Python ํ์์ผ๋ก ๋ณํํฉ๋๋ค. Snowflake๊ฐ SQL ๋ฐ์ดํฐ ํ์ ๊ณผ Python ๋ฐ์ดํฐ ํ์ ๊ฐ์ ๋งคํํ๋ ๋ฐฉ๋ฒ์ ๋ํ ์์ธํ ๋ด์ฉ์ SQL-Python ๋ฐ์ดํฐ ํ์ ๋งคํ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
ํ๋ ์ด์์ ํํ์ ์์ฑ(๋๋ ํํ์ ํฌํจํ ์ดํฐ๋ฌ๋ธ์ ๋ฐํ)ํด์ผ ํ๋ฉฐ, ์ด๋ ํํ์ ์ํ์ค๋ UDTF ๋ฐํ ๊ฐ ์ด์ ์ํ์ค์ ์์ํฉ๋๋ค.
ํํ ์์๋ UDTF ๋ฐํ ๊ฐ ์ด์ด ์ ์ธ๋ ๊ฒ๊ณผ ๊ฐ์ ์์๋ก ๋ํ๋์ผ ํฉ๋๋ค. ์์ธํ ๋ด์ฉ์ ์ด ํญ๋ชฉ์ ๊ฐ ๋ฐํํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
Snowflake๋ ๊ฐ์ Python ํ์์์ UDTF ์ ์ธ์์ ์๊ตฌ๋๋ SQL ํ์์ผ๋ก ๋ณํํฉ๋๋ค. Snowflake๊ฐ SQL ๋ฐ์ดํฐ ํ์ ๊ณผ Python ๋ฐ์ดํฐ ํ์ ๊ฐ์ ๋งคํํ๋ ๋ฐฉ๋ฒ์ ๋ํ ์์ธํ ๋ด์ฉ์ SQL-Python ๋ฐ์ดํฐ ํ์ ๋งคํ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
ํธ๋ค๋ฌ ํด๋์ค์ ๋ฉ์๋์์ ์์ธ๊ฐ ๋ฐ์ํ๋ฉด ์ฒ๋ฆฌ๊ฐ ์ค์ง๋ฉ๋๋ค. UDTF๋ฅผ ํธ์ถํ ์ฟผ๋ฆฌ๋ ์ค๋ฅ ๋ฉ์์ง์ ํจ๊ป ์คํจํฉ๋๋ค. process
๋ฉ์๋๊ฐ None
์ ๋ฐํํ๋ฉด ์ฒ๋ฆฌ๊ฐ ์ค์ง๋ฉ๋๋ค. (process
๋ฉ์๋๊ฐ None
์ ๋ฐํํ๋๋ผ๋ end_partition
๋ฉ์๋๋ ์ฌ์ ํ ํธ์ถ๋ฉ๋๋ค.)
process ๋ฉ์๋์ ์
๋ค์ ์์ ์ ์ฝ๋๋ฅผ ์คํํ๋ฉด ์ธ UDTF ์ธ์(symbol
, quantity
, price
)๋ฅผ ์ฒ๋ฆฌํ์ฌ ๋ ์ด(symbol
, total
)์ด ์๋ ๋จ์ผ ํ์ ๋ฐํํ๋ process
๋ฉ์๋๊ฐ ์๋ StockSale
ํธ๋ค๋ฌ ํด๋์ค๊ฐ ํ์๋ฉ๋๋ค. process
๋ฉ์๋ ๋งค๊ฐ ๋ณ์๋ stock_sale
๋งค๊ฐ ๋ณ์์ ๋์ผํ ์์๋ก ์ ์ธ๋ฉ๋๋ค. process
๋ฉ์๋์ yield
๋ฌธ์ ์๋ ์ธ์๋ stock_sale
RETURNS TABLE ์ ์ ์ ์ธ๋ ์ด๊ณผ ๊ฐ์ ์์์
๋๋ค.
CREATE OR REPLACE FUNCTION stock_sale(symbol VARCHAR, quantity NUMBER, price NUMBER(10,2))
RETURNS TABLE (symbol VARCHAR, total NUMBER(10,2))
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'StockSale'
AS $$
class StockSale:
def process(self, symbol, quantity, price):
cost = quantity * price
yield (symbol, cost)
$$;
๋ค์ ์์ ์ ์ฝ๋์์๋ ์ด์ UDF๋ฅผ ํธ์ถํ์ฌ stocks_table
ํ
์ด๋ธ์ symbol
, quantity
๋ฐ price
์ด์ ๊ฐ์ ์ ๋ฌํฉ๋๋ค.
SELECT stock_sale.symbol, total
FROM stocks_table, TABLE(stock_sale(symbol, quantity, price) OVER (PARTITION BY symbol));
๊ฐ ๋ฐํํ๊ธฐยถ
์ถ๋ ฅ ํ์ ๋ฐํํ ๋ yield
๋๋ return
(๋จ, ๋ ๋ชจ๋๋ ์๋)์ ์ฌ์ฉํ์ฌ ํ
์ด๋ธ ํ์ ๊ฐ์ด ์๋ ํํ์ ๋ฐํํ ์ ์์ต๋๋ค. ๋ฉ์๋๊ฐ None
์ ๋ฐํํ๊ฑฐ๋ ์์ฑํ๋ฉด ํ์ฌ ํ์ ์ฒ๋ฆฌ๊ฐ ์ค์ง๋ฉ๋๋ค.
yield
๋ฅผ ์ฌ์ฉํ ๋ ๊ฐ ์ถ๋ ฅ ํ์ ๋ํด ๋ณ๋์yield
๋ฌธ์ ์คํํฉ๋๋ค. ์ด๋yield
์ ํจ๊ป ์๋ฐ๋๋ ์ง์ฐ ํ๊ฐ๋ฅผ ํตํด ๋ณด๋ค ํจ์จ์ ์ผ๋ก ์ฒ๋ฆฌํ๊ณ ์๊ฐ ์ด๊ณผ ๋ฐฉ์ง์ ๋์์ด ๋ ์ ์์ผ๋ฏ๋ก ๋ชจ๋ฒ ์ฌ๋ก์ ๋๋ค.ํํ์ ๊ฐ ์์๋ UDTF๊ฐ ๋ฐํํ๋ ๊ฒฐ๊ณผ์์ ์ด ๊ฐ์ด ๋ฉ๋๋ค.
yield
์ธ์์ ์์๋ CREATE FUNCTION์ RETURNS TABLE ์ ์์ ๋ฐํ ๊ฐ์ ๋ํด ์ ์ธ๋ ์ด์ ์์์ ์ผ์นํด์ผ ํฉ๋๋ค.๋ค์ ์์ ์ ์ฝ๋๋ ๋ ํ์ ๋ํ๋ด๋ ๊ฐ์ ๋ฐํํฉ๋๋ค.
def process(self, symbol, quantity, price): cost = quantity * price yield (symbol, cost) yield (symbol, cost)
yield ์ธ์๋ ํํ์ด๋ฏ๋ก ๋ค์ ์์ ์ ๊ฐ์ด ํํ์ ๋จ์ผ ๊ฐ์ ์ ๋ฌํ ๋ ํํ ์ผํ๋ฅผ ํฌํจํด์ผ ํฉ๋๋ค.
yield (cost,)
return
์ฌ์ฉ ์, ํํ์ด ์๋ ์ดํฐ๋ฌ๋ธ์ ๋ฐํํฉ๋๋ค.ํํ์์ ๊ฐ๊ฐ์ ๊ฐ์ UDTF๊ฐ ๋ฐํํ๋ ๊ฒฐ๊ณผ์์ ์ด ๊ฐ์ด ๋ฉ๋๋ค. ํํ์์ ์ด ๊ฐ์ ์์๋ CREATE FUNCTION์ RETURNS TABLE ์ ์์ ๋ฐํ ๊ฐ์ ๋ํด ์ ์ธ๋ ์ด์ ์์์ ์ผ์นํด์ผ ํฉ๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ ๊ฐ๊ฐ symbol๊ณผ total์ด๋ผ๋ ๋ ์ด์ด ์๋ ๋ ํ์ ๋ฐํํฉ๋๋ค.
def process(self, symbol, quantity, price): cost = quantity * price return [(symbol, cost), (symbol, cost)]
ํ ๊ฑด๋๋ฐ๊ธฐยถ
์
๋ ฅ ํ์ ๊ฑด๋๋ฐ๊ณ ๊ทธ๋ค์ ํ์ ์ฒ๋ฆฌํ๋ ค๋ฉด(์: ์
๋ ฅ ํ์ ์ ํจ์ฑ์ ๊ฒ์ฌํ ๋) process
๋ฉ์๋๊ฐ ๋ค์ ์ค ํ๋๋ฅผ ๋ฐํํ๋๋ก ํ์ญ์์ค.
return
์ ์ฌ์ฉํ ๋ ํ์ ๊ฑด๋๋ฐ๋ ค๋ฉดNone
,None
์ ํฌํจํ ๋ชฉ๋ก ๋๋ ๋น ๋ชฉ๋ก์ ๋ฐํํ์ญ์์ค.yield
๋ฅผ ์ฌ์ฉํ ๋ ํ์ ๊ฑด๋๋ฐ๋ ค๋ฉดNone
์ ๋ฐํํ์ญ์์ค.yield
๋ฅผ ์ฌ๋ฌ ๋ฒ ํธ์ถํ๋ ๊ฒฝ์ฐ Snowflake์์๋None
์ ๋ฐํํ๋ ํธ์ถ ์ดํ์ ํธ์ถ์ ์ ๋ถ ๋ฌด์ํฉ๋๋ค.
๋ค์ ์์ ์ ์ฝ๋๋ number
๊ฐ ์์ ์ ์์ธ ํ๋ง ๋ฐํํฉ๋๋ค. number
๊ฐ ์์๊ฐ ์๋ ๊ฒฝ์ฐ ๋ฉ์๋๋ ํ์ฌ ํ์ ๊ฑด๋๋ฐ๊ณ ๋ค์ ํ์ ๊ณ์ ์ฒ๋ฆฌํ๊ธฐ ์ํด None
์ ๋ฐํํฉ๋๋ค.
def process(self, number):
if number < 1:
yield None
else:
yield (number)
์ํ ์ ์ฅ ๋ฐ ์ํ ๋น์ ์ฅ ์ฒ๋ฆฌยถ
ํํฐ์ ์ธ์ ๋ฐฉ์์ผ๋ก ํ์ ์ฒ๋ฆฌํ๊ฑฐ๋ ๋จ์ํ ํ ๋จ์๋ก ์ฒ๋ฆฌํ๋ ํธ๋ค๋ฌ๋ฅผ ๊ตฌํํ ์ ์์ต๋๋ค.
ํํฐ์ ์ธ์ ์ฒ๋ฆฌ ์์๋ ํธ๋ค๋ฌ์ ํํฐ์ ๋ฒ์๊ฐ ์ง์ ๋ ์ํ๋ฅผ ๊ด๋ฆฌํ๋ ์ฝ๋๊ฐ ํฌํจ๋ฉ๋๋ค. ํํฐ์ ์ฒ๋ฆฌ ์์ ์์ ์คํ๋๋
__init__
๋ฉ์๋์ ํํฐ์ ์ ๋ง์ง๋ง ํ์ ์ฒ๋ฆฌํ ํ Snowflake๊ฐ ํธ์ถํ๋end_partition
๋ฉ์๋๊ฐ ํฌํจ๋ฉ๋๋ค. ์์ธํ ๋ด์ฉ์ ์ด ํญ๋ชฉ์ ํํฐ์ ์ฒ๋ฆฌํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.ํํฐ์ ๋น์ธ์ ์ฒ๋ฆฌ ์์๋ ํธ๋ค๋ฌ๊ฐ ํํฐ์ ๊ฒฝ๊ณ๋ฅผ ๋ฌด์ํ๊ณ ์ํ๋ฅผ ์ ์ฅํ์ง ์๊ณ ์คํ๋ฉ๋๋ค.
ํธ๋ค๋ฌ๊ฐ ์ด์ ๊ฐ์ด ์คํ๋๋๋ก ํ๋ ค๋ฉด
__init__
๋๋end_partition
๋ฉ์๋๋ฅผ ํฌํจํ์ง ๋ง์ญ์์ค.
ํํฐ์ ์ฒ๋ฆฌํ๊ธฐยถ
ํํฐ์ ๋ง๋ค ์คํ๋๋ ์ฝ๋(์: ์ํ ๊ด๋ฆฌ๋ฅผ ์ํ ์ฝ๋)์ ํํฐ์ ์ ๊ฐ ํ์ ๋ํด ์คํ๋๋ ์ฝ๋๋ก ์ ๋ ฅ์์ ํํฐ์ ์ ์ฒ๋ฆฌํ ์ ์์ต๋๋ค.
์ฐธ๊ณ
UDTF ํธ์ถ ์ ํํฐ์ ์ง์ ์ ๋ํ ์์ธํ ๋ด์ฉ์ ํ ์ด๋ธ ํจ์ ๋ฐ ํํฐ์ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
์ฟผ๋ฆฌ์ ํํฐ์ ์ด ํฌํจ๋๋ ๊ฒฝ์ฐ ์ด ๊ฐ๊ณผ ๊ฐ์ด ์ง์ ๋ ๊ฐ์ ์ฌ์ฉํ์ฌ ํ์ ์ง๊ณํฉ๋๋ค. ํธ๋ค๋ฌ๊ฐ ์์ ํ๋ ์ง๊ณ๋ ํ์ ํด๋น ๊ฐ์ผ๋ก ๋ถํ ๋๋ค๊ณ ํฉ๋๋ค. ๊ฐ ํํฐ์ ์ ๋ํ ์ฒ๋ฆฌ๊ฐ ํํฐ์ ๋ฒ์ ์ง์ ์ํ๋ฅผ ํฌํจํ๋๋ก ์ฝ๋๊ฐ ์ด๋ฌํ ํํฐ์ ๊ณผ ํด๋น ํ์ ์ฒ๋ฆฌํ ์ ์์ต๋๋ค.
๋ค์ SQL ์์ ์ ์ฝ๋๋ ์ฃผ์ ๋งค๋งค ์ ๋ณด๋ฅผ ์ฟผ๋ฆฌํฉ๋๋ค. ์ด ์ฝ๋๋ ์
๋ ฅ์ด symbol
์ด์ ๊ฐ์ผ๋ก ๋ถํ ๋ stock_sale_sum
UDTF๋ฅผ ์คํํฉ๋๋ค.
SELECT stock_sale_sum.symbol, total
FROM stocks_table, TABLE(stock_sale_sum(symbol, quantity, price) OVER (PARTITION BY symbol));
์์ ํ์ด ๋ถํ ๋์ด ์๋๋ผ๋ ์ฝ๋์์ ํํฐ์
๋ถ๋ฆฌ๋ฅผ ๋ฌด์ํ๊ณ ๊ทธ๋ฅ ํ์ ์ฒ๋ฆฌํ ์ ์๋ค๋ ์ ์ ์ ๋
ํ์ญ์์ค. ์๋ฅผ ๋ค์ด, ํธ๋ค๋ฌ ํด๋์ค __init__
๋ฉ์๋ ๋ฐ end_partition
๋ฉ์๋์ ๊ฐ์ ํํฐ์
๋ฒ์ ์ง์ ์ํ๋ฅผ ์ฒ๋ฆฌํ๋๋ก ์ค๊ณ๋ ์ฝ๋๋ฅผ ์๋ตํ๊ณ process
๋ฉ์๋๋ฅผ ๊ทธ๋ฅ ๊ตฌํํ ์ ์์ต๋๋ค. ์์ธํ ๋ด์ฉ์ ์ด ํญ๋ชฉ์ ์ํ ์ ์ฅ ๋ฐ ์ํ ๋น์ ์ฅ ์ฒ๋ฆฌ ์น์
์ ์ฐธ์กฐํ์ญ์์ค.
๊ฐ ํํฐ์ ์ ํ๋์ ๋จ์๋ก ์ฒ๋ฆฌํ๋ ค๋ฉด ๋ค์์ ์ํํฉ๋๋ค.
ํํฐ์ ์ฒ๋ฆฌ๋ฅผ ์ด๊ธฐํํ ํธ๋ค๋ฌ ํด๋์ค
__init__
๋ฉ์๋๋ฅผ ๊ตฌํํฉ๋๋ค.์์ธํ ๋ด์ฉ์ ์ด ํญ๋ชฉ์ ์ฒ๋ฆฌ๊ธฐ ์ด๊ธฐํํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
process
๋ฉ์๋๋ก ๊ฐ ํ์ ์ฒ๋ฆฌํ ๋ ํํฐ์ ์ธ์ ์ฝ๋๋ฅผ ํฌํจํฉ๋๋ค.ํ ์ฒ๋ฆฌ์ ๋ํ ์์ธํ ๋ด์ฉ์ ์ด ํญ๋ชฉ์ ํ ์ฒ๋ฆฌํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
end_partition
๋ฉ์๋๋ฅผ ๊ตฌํํ์ฌ ํํฐ์ ์ฒ๋ฆฌ๋ฅผ ๋ง๋ฌด๋ฆฌํฉ๋๋ค.์์ธํ ๋ด์ฉ์ ์ด ํญ๋ชฉ์ ํํฐ์ ์ฒ๋ฆฌ ๋ง๋ฌด๋ฆฌํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
๋ค์์ ํํฐ์ ๋ณ๋ก ์คํํ๋๋ก ์ค๊ณ๋ ์ฝ๋๋ฅผ ํฌํจํ์ ๋ ํธ๋ค๋ฌ์ ํธ์ถ ์ํ์ค์ ๋ํ ์ค๋ช ์ ๋๋ค.
ํํฐ์ ์ฒ๋ฆฌ๊ฐ ์์๋์ด ์ฒซ ๋ฒ์งธ ํ์ ์ฒ๋ฆฌ๊ฐ ์๋ฃ๋๊ธฐ ์ ์, Snowflake๋ ํธ๋ค๋ฌ ํด๋์ค์
__init__
๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ํด๋์ค์ ์ธ์คํด์ค๋ฅผ ๋ง๋ญ๋๋ค.์ฌ๊ธฐ์๋ ํํฐ์ ๋ฒ์ ์ง์ ์ํ๋ฅผ ์ค์ ํ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด ํํฐ์ ์ ํ์์ ๊ณ์ฐ๋ ๊ฐ์ ๋ณด์ ํ๋ ์ธ์คํด์ค ๋ณ์๋ฅผ ์ด๊ธฐํํ ์ ์์ต๋๋ค.
ํํฐ์ ์ ๊ฐ ํ์ ๋ํด Snowflake๋
process
๋ฉ์๋๋ฅผ ํธ์ถํฉ๋๋ค.์ด ๋ฉ์๋๋ ์คํ๋ ๋๋ง๋ค ์ํ ๊ฐ์ ๋ณ๊ฒฝํ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด,
process
๋ฉ์๋๊ฐ ์ธ์คํด์ค ๋ณ์์ ๊ฐ์ ์ ๋ฐ์ดํธํ๋๋ก ํ ์ ์์ต๋๋ค.Snowflake๋ ์ฝ๋๊ฐ ํํฐ์ ์ ๋ง์ง๋ง ํ์ ์ฒ๋ฆฌํ ํ์
end_partition
๋ฉ์๋๋ฅผ ํธ์ถํฉ๋๋ค.์ด ๋ฉ์๋์์ ๋ฐํํ๋ ค๋ ํํฐ์ ์์ค ๊ฐ์ด ํฌํจ๋ ์ถ๋ ฅ ํ์ ๋ฐํํ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด ํํฐ์ ์ ํ์ ์ฒ๋ฆฌํ๋ฉด์ ์ ๋ฐ์ดํธํ ์ธ์คํด์ค ๋ณ์์ ๊ฐ์ ๋ฐํํ ์ ์์ต๋๋ค.
end_partition
๋ฉ์๋๋ ํํฐ์ ์ ๋ง์ง๋ง ํ์ ์ฒ๋ฆฌํ ํ ๋จ์ํ ํธ์ถํ๊ธฐ๋ง ํ๋ Snowflake์์๋ ์ด๋ค ์ธ์๋ ๋ฐ์ง ์์ต๋๋ค.
ํํฐ์ ์ฒ๋ฆฌ ๋ง๋ฌด๋ฆฌํ๊ธฐยถ
ํํฐ์
์ ๋ชจ๋ ํ์ ์ฒ๋ฆฌํ ํ Snowflake๊ฐ ํธ์ถํ ํธ๋ค๋ฌ ํด๋์ค์์ end_partition
๋ฉ์๋๋ฅผ ์ ํ์ ์ผ๋ก ๊ตฌํํ ์ ์์ต๋๋ค. ์ด ๋ฉ์๋์์๋ ํํฐ์
์ ๋ชจ๋ ํ์ด ์ฒ๋ฆฌ๋ ํ ํํฐ์
์ ์ฝ๋๋ฅผ ์คํํ ์ ์์ต๋๋ค. end_partition
๋ฉ์๋๋ ํํฐ์
๋ฒ์ ์ง์ ๊ณ์ฐ์ ๊ฒฐ๊ณผ ๋ฐํ๊ณผ ๊ฐ์ ์ถ๋ ฅ ํ์ ์์ฑํ ์ ์์ต๋๋ค. ์์ธํ ๋ด์ฉ์ ์ด ํญ๋ชฉ์ ํํฐ์
์ฒ๋ฆฌํ๊ธฐ ์น์
์ ์ฐธ์กฐํ์ญ์์ค.
์ด ๋ฉ์๋์ ์๋ช ์ ๋ค์๊ณผ ๊ฐ์ ํ์์ด์ด์ผ ํฉ๋๋ค.
def end_partition(self):
Snowflake๋ ๋ค์๊ณผ ๊ฐ์ end_partition
๋ฉ์๋ ๊ตฌํ์ ์์ํฉ๋๋ค.
๋ฉ์๋๊ฐ ์ ์ ์ด์ง ์์์ผ ํฉ๋๋ค.
๋ฉ์๋์
self
์ด์ธ์๋ ๋งค๊ฐ ๋ณ์๊ฐ ์์ ์๋ ์์ต๋๋ค.ํ ์ด๋ธ ํ์ ๊ฐ ๋ฐํ์ ๋์ฒด ์๋จ์ผ๋ก, ๋น ๋ชฉ๋ก ๋๋
None
์ ์์ฑํ ์ ์์ต๋๋ค.
์ฐธ๊ณ
Snowflake๋ ์ฑ๊ณต์ ์ผ๋ก ์ฒ๋ฆฌํ๋๋ก ์๊ฐ ์ ํ์ด ์กฐ์ ๋ ๋ํ ํํฐ์
์ ์ง์ํ์ง๋ง, ํนํ ๋ํ ํํฐ์
์ผ๋ก ์ธํด ์ฒ๋ฆฌ ์๊ฐ์ด ์ด๊ณผ๋ ์ ์์ต๋๋ค(์: end_partition
์ด ์๋ฃํ๋ ๋ฐ ๋๋ฌด ์ค๋ ๊ฑธ๋ฆฌ๋ ๊ฒฝ์ฐ). ํน์ ์ฌ์ฉ ์๋๋ฆฌ์ค์ ๋ง๊ฒ ์๊ฐ ์ด๊ณผ ์๊ณ๊ฐ์ ์กฐ์ ํด์ผ ํ๋ ๊ฒฝ์ฐ Snowflake ์ง์ ์ ๋ฌธ์ํ์ญ์์ค.
ํํฐ์ ์ฒ๋ฆฌ์ ์ยถ
๋ค์ ์์ ์ ์ฝ๋๋ฅผ ์คํํ๋ฉด (process
๋ฉ์๋์์) ๋จผ์ ๋งค์
๋น ๋น์ฉ์ ๊ณ์ฐํด ๋งค์
๊ธ์ก์ ํจ๊ป ๋ํจ์ผ๋ก์จ ์ด๋ค ์ฃผ์์ ๋งค์
์ ์ํด ์ง๋ถํ ์ด๋น์ฉ์ด ๊ณ์ฐ๋ฉ๋๋ค. ์ด ์ฝ๋๋ end_partition
๋ฉ์๋์์ ์ด๊ณ๋ฅผ ๋ฐํํฉ๋๋ค.
UDTF ํธ์ถ๊ณผ ํจ๊ป ์ด ์ฒ๋ฆฌ๊ธฐ๋ฅผ ํฌํจํ๋ UDTF์ ์๋ ์ฒ๋ฆฌ๊ธฐ ํด๋์ค์ ์ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
class StockSaleSum:
def __init__(self):
self._cost_total = 0
self._symbol = ""
def process(self, symbol, quantity, price):
self._symbol = symbol
cost = quantity * price
self._cost_total += cost
yield (symbol, cost)
def end_partition(self):
yield (self._symbol, self._cost_total)
ํํฐ์ ์ ์ฒ๋ฆฌํ ๋๋ ๋ค์ ์ฌํญ์ ์ ๋ ํ์ญ์์ค.
์ฌ์ฉ์์ ์ฝ๋๊ฐ UDTF์ ๋ํ ํธ์ถ์ ๋ช ์์ ์ผ๋ก ์ง์ ๋์ง ์์ ํํฐ์ ์ ์ฒ๋ฆฌํ ์ ์์ต๋๋ค. UDTF์ ๋ํ ํธ์ถ์ PARTITION BY ์ ์ด ํฌํจ๋์ง ์๋๋ผ๋ Snowflake๋ ๋ฐ์ดํฐ๋ฅผ ์์์ ์ผ๋ก ๋ถํ ํฉ๋๋ค.
process
๋ฉ์๋๋ ํํฐ์ ์ ORDER BY ์ ์ ์ง์ ๋ ์์๋๋ก ํ ๋ฐ์ดํฐ(์๋ ๊ฒฝ์ฐ)๋ฅผ ์์ ํฉ๋๋ค.
์ยถ
๊ฐ์ ธ์จ ํจํค์ง ์ฌ์ฉํ๊ธฐยถ
Snowflake์์ ์ ๊ณต๋๋ Anaconda์ ์ ๋ณ๋ ์๋ ํํฐ ํจํค์ง ๋ชฉ๋ก์ ํฌํจ๋ Python ํจํค์ง๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค. ์ด๋ฌํ ํจํค์ง๋ฅผ UDTF์ ์ข ์ ํญ๋ชฉ์ผ๋ก ์ง์ ํ๋ ค๋ฉด CREATE FUNCTION์์ PACKAGES ์ ์ ์ฌ์ฉํ์ญ์์ค.
Snowflake์์ ๋ค์ SQL์ ์คํํ์ฌ ํฌํจ๋ ํจํค์ง์ ๋ชฉ๋ก์ ๊ฒ์ํ ์ ์์ต๋๋ค.
SELECT * FROM INFORMATION_SCHEMA.PACKAGES WHERE LANGUAGE = 'python';
์์ธํ ๋ด์ฉ์ ์๋ ํํฐ ํจํค์ง ์ฌ์ฉํ๊ธฐ ๋ฐ Python UDF ๋ง๋ค๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
๋ค์ ์์ ์ ์ฝ๋์์๋ NumPy(Numerical Python) ํจํค์ง์ ํจ์๋ฅผ ์ฌ์ฉํ์ฌ ๊ฐ๊ฐ ์ฃผ๋น ๊ฐ๊ฒฉ์ด ๋ค๋ฅธ ์ฃผ์ ๋งค์ ๋ฐฐ์ด์์ ์ฃผ๋น ํ๊ท ๊ฐ๊ฒฉ์ ๊ณ์ฐํฉ๋๋ค.
CREATE OR REPLACE FUNCTION stock_sale_average(symbol VARCHAR, quantity NUMBER, price NUMBER(10,2))
RETURNS TABLE (symbol VARCHAR, total NUMBER(10,2))
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('numpy')
HANDLER = 'StockSaleAverage'
AS $$
import numpy as np
class StockSaleAverage:
def __init__(self):
self._price_array = []
self._quantity_total = 0
self._symbol = ""
def process(self, symbol, quantity, price):
self._symbol = symbol
self._price_array.append(float(price))
cost = quantity * price
yield (symbol, cost)
def end_partition(self):
np_array = np.array(self._price_array)
avg = np.average(np_array)
yield (self._symbol, avg)
$$;
๋ค์ ์์ ์ ์ฝ๋์์๋ ์ด์ UDF๋ฅผ ํธ์ถํ์ฌ stocks_table
ํ
์ด๋ธ์ symbol
, quantity
๋ฐ price
์ด์ ๊ฐ์ ์ ๋ฌํฉ๋๋ค. UDTF ํธ์ถ์ ๋ํ ์์ธํ ๋ด์ฉ์ UDF ์คํํ๊ธฐ ์น์
์ ์ฐธ์กฐํ์ญ์์ค.
SELECT stock_sale_average.symbol, total
FROM stocks_table,
TABLE(stock_sale_average(symbol, quantity, price)
OVER (PARTITION BY symbol));
์์ปค ํ๋ก์ธ์ค๋ก ๋์ ์์ ์คํํ๊ธฐยถ
Python ์์ ์ ํ๋ก์ธ์ค๋ฅผ ์ฌ์ฉํ์ฌ ๋์ ์์ ์ ์คํํ ์ ์์ต๋๋ค. ์จ์ดํ์ฐ์ค ๋ ธ๋์์ ์ฌ๋ฌ CPU ์ฝ์ด๋ฅผ ํ์ฉํ๋ ๋ณ๋ ฌ ์์ ์ ์คํํด์ผ ํ ๋ ์ด ๊ธฐ๋ฅ์ด ์ ์ฉํ ์ ์์ต๋๋ค.
์ฐธ๊ณ
๊ธฐ๋ณธ ์ ๊ณต๋ Python ๋ค์ค ์ฒ๋ฆฌ ๋ชจ๋์ ์ฌ์ฉํ์ง ์๋ ๊ฒ์ด ์ข์ต๋๋ค.
Python Global Interpreter Lock ์ผ๋ก ์ธํด ๋ฉํฐํ์คํน ์ ๊ทผ ๋ฐฉ์์ด ๋ชจ๋ CPU ์ฝ์ด์์ ํ์ฅ๋์ง ๋ชปํ๋ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๋ ค๋ฉด ์ค๋ ๋๊ฐ ์๋ ๋ณ๋์ ์์ ์ ํ๋ก์ธ์ค๋ฅผ ์ฌ์ฉํ์ฌ ๋์ ์์ ์ ์คํํ ์ ์์ต๋๋ค.
๋ค์ ์์์์ฒ๋ผ joblib
๋ผ์ด๋ธ๋ฌ๋ฆฌ์ Parallel
ํด๋์ค๋ฅผ ์ฌ์ฉํ์ฌ Snowflake ์จ์ดํ์ฐ์ค์์ ์ด ์์
์ ์ํํ ์ ์์ต๋๋ค.
CREATE OR REPLACE FUNCTION joblib_multiprocessing_udtf(i INT)
RETURNS TABLE (result INT)
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'JoblibMultiprocessing'
PACKAGES = ('joblib')
AS $$
import joblib
from math import sqrt
class JoblibMultiprocessing:
def process(self, i):
pass
def end_partition(self):
result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
for r in result:
yield (r, )
$$;
์ฐธ๊ณ
joblib.Parallel
์ ์ฌ์ฉ๋๋ ๊ธฐ๋ณธ ๋ฐฑ์๋๋ Snowflake ํ์ค๊ณผ Snowpark ์ต์ ํ ์จ์ดํ์ฐ์ค ๊ฐ์ ๋ค๋ฆ
๋๋ค.
ํ์ค ์จ์ดํ์ฐ์ค ๊ธฐ๋ณธ๊ฐ:
threading
Snowpark ์ต์ ํ ์จ์ดํ์ฐ์ค ๊ธฐ๋ณธ๊ฐ:
loky
(๋ค์ค ์ฒ๋ฆฌ)
๋ค์ ์์์์ฒ๋ผ joblib.parallel_backend
ํจ์๋ฅผ ํธ์ถํ์ฌ ๊ธฐ๋ณธ ๋ฐฑ์๋ ์ค์ ์ ์ฌ์ ์ํ ์ ์์ต๋๋ค.
import joblib
joblib.parallel_backend('loky')
CREATE FUNCTION
์ผ๋ก UDTF ๋ง๋ค๊ธฐยถ
CREATE FUNCTION ๋ช ๋ น์ ์ฌ์ฉํ์ฌ SQL๋ก UDTF๋ฅผ ๋ง๋ค์ด ์์ฑํ ์ฝ๋๋ฅผ ํธ๋ค๋ฌ๋ก ์ง์ ํฉ๋๋ค. ๋ช ๋ น ์ฐธ์กฐ๋ CREATE FUNCTION ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
UDTF๋ฅผ ๋ง๋ค ๋ ๋ค์ ๊ตฌ๋ฌธ์ ์ฌ์ฉํ์ญ์์ค.
CREATE OR REPLACE FUNCTION <name> ( [ <arguments> ] )
RETURNS TABLE ( <output_column_name> <output_column_type> [, <output_column_name> <output_column_type> ... ] )
LANGUAGE PYTHON
[ IMPORTS = ( '<imports>' ) ]
RUNTIME_VERSION = 3.9
[ PACKAGES = ( '<package_name>' [, '<package_name>' . . .] ) ]
[ TARGET_PATH = '<stage_path_and_file_name_to_write>' ]
HANDLER = '<handler_class>'
[ AS '<python_code>' ]
์์ฑํ ํธ๋ค๋ฌ ์ฝ๋๋ฅผ UDTF์ ์ฐ๊ฒฐํ๋ ค๋ฉด CREATE FUNCTION ์คํ ์ ๋ค์์ ์ํํ์ญ์์ค.
RETURNS TABLE์์ ์ด ์ด๋ฆ ๋ฐ ํ์ ์์ ์ถ๋ ฅ ์ด์ ์ง์ ํฉ๋๋ค.
LANGUAGE๋ฅผ PYTHON์ผ๋ก ์ค์ ํฉ๋๋ค.
ํธ๋ค๋ฌ ํด๋์ค๊ฐ ์คํ ์ด์ง์ ๊ฐ์ ์ธ๋ถ ์์น์ ์๋ ๊ฒฝ์ฐ IMPORTS ์ ๊ฐ์ ํธ๋ค๋ฌ ํด๋์ค์ ๊ฒฝ๋ก์ ์ด๋ฆ์ผ๋ก ์ค์ ํฉ๋๋ค.
์์ธํ ๋ด์ฉ์ Python UDF ๋ง๋ค๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
RUNTIME_VERSION์ ์ฝ๋์์ ํ์ํ Python ๋ฐํ์ ๋ฒ์ ์ผ๋ก ์ค์ ํฉ๋๋ค. ์ง์๋๋ Python ๋ฒ์ ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
3.9
3.10
3.11
3.12
PACKAGES ์ ๊ฐ์ ํธ๋ค๋ฌ ํด๋์ค์ ํ์ํ ํ๋ ์ด์์ ํจํค์ง(์๋ ๊ฒฝ์ฐ)์ ์ด๋ฆ์ผ๋ก ์ค์ ํฉ๋๋ค.
์์ธํ ๋ด์ฉ์ ์๋ ํํฐ ํจํค์ง ์ฌ์ฉํ๊ธฐ ๋ฐ Python UDF ๋ง๋ค๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
HANDLER ์ ๊ฐ์ ํธ๋ค๋ฌ ํด๋์ค์ ์ด๋ฆ์ผ๋ก ์ค์ ํฉ๋๋ค.
Python ํธ๋ค๋ฌ ์ฝ๋๋ฅผ UDTF์ ์ฐ๊ฒฐํ ๋ ์ฝ๋๋ฅผ ์ธ๋ผ์ธ์ผ๋ก ํฌํจํ๊ฑฐ๋ Snowflake ์คํ ์ด์ง์์ ํ ์์น์์ ์ฐธ์กฐํ ์ ์์ต๋๋ค. HANDLER ๊ฐ์ ๋/์๋ฌธ์๋ฅผ ๊ตฌ๋ถํ๋ฉฐ Python ํด๋์ค์ ์ด๋ฆ๊ณผ ์ผ์นํด์ผ ํฉ๋๋ค.
์์ธํ ๋ด์ฉ์ ์ธ๋ผ์ธ ์ฝ๋๊ฐ ์๋ UDFs์ ์คํ ์ด์ง์์ ์ ๋ก๋๋ ์ฝ๋๊ฐ ์๋ UDFs ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
์ค์
์ค์นผ๋ผ Python UDF์ ๊ฒฝ์ฐ HANDLER ์ ๊ฐ์๋ ๋ฉ์๋ ์ด๋ฆ์ด ํฌํจ๋ฉ๋๋ค.
Python UDTF์ ๊ฒฝ์ฐ HANDLER ์ ๊ฐ์๋ ํด๋์ค ์ด๋ฆ์ด ํฌํจ๋์ง๋ง ๋ฉ์๋ ์ด๋ฆ์ ํฌํจ๋์ง ์์ต๋๋ค.
์ด๋ฐ ์ฐจ์ด๊ฐ ๋๋ ์ด์ ๋ ์ค์นผ๋ผ Python UDF์ ๊ฒฝ์ฐ, ํธ๋ค๋ฌ ๋ฉ์๋ ์ด๋ฆ์ ์ฌ์ฉ์๊ฐ ์ ํํ๋ฏ๋ก Snowflake๊ฐ ๋ฏธ๋ฆฌ ์์ง ๋ชปํ์ง๋ง, Python UDTF์ ๊ฒฝ์ฐ ๋ฉ์๋(์:
end_partition
๋ฉ์๋)์ ์ด๋ฆ์ Snowflake์ ์ํด ์ง์ ๋ ์ด๋ฆ๊ณผ ์ผ์นํด์ผ ํ๊ธฐ ๋๋ฌธ์ ์๋ ค์ ธ ์๋ค๋ ์ ์ ๋๋ค.ํธ๋ค๋ฌ ์ฝ๋๊ฐ CREATE FUNCTION๊ณผ ํจ๊ป ์ธ๋ผ์ธ์ผ๋ก ์ง์ ๋ ๊ฒฝ์ฐ
AS '<python_code>'
์ ์ด ํ์ํฉ๋๋ค.