Python ์ฌ์ฉ์ ์ ์ ์ง๊ณ ํจ์ยถ
์ฌ์ฉ์ ์ ์ ์ง๊ณ ํจ์(UDAFs)๋ ํ๋ ์ด์์ ํ์ ์ ๋ ฅ์ผ๋ก ๋ฐ์ ๋จ์ผ ํ์ ์ถ๋ ฅ์ ์์ฑํฉ๋๋ค. UDAF๋ ์ฌ๋ฌ ํ์ ๊ฐ์ ์ฐ์ฐํ์ฌ ํฉ๊ณ, ํ๊ท , ๊ณ์ฐ, ์ต์๊ฐ ๋๋ ์ต๋๊ฐ ์ฐพ๊ธฐ, ํ์ค ํธ์ฐจ, ์ถ์ ๊ณผ ๊ฐ์ ์ํ์ ๊ณ์ฐ๊ณผ ์ผ๋ถ ๋น์ํ์ ์ฐ์ฐ์ ์ํํฉ๋๋ค.
Python UDAFs๋ Snowflake ์์คํ ์ ์ SQL ์ง๊ณ ํจ์ ์ ์ ์ฌํ ์์ฒด ์ง๊ณ ํจ์๋ฅผ ์์ฑํ ์ ์๋ ๋ฐฉ๋ฒ์ ์ ๊ณตํฉ๋๋ค.
Python์์ DataFrames์ฉ ์ฌ์ฉ์ ์ ์ ํจ์(UDAFs) ๋ง๋ค๊ธฐ ์ ์ค๋ช ๋ ๋๋ก Snowpark APIs๋ฅผ ์ฌ์ฉํ์ฌ ์์ฒด UDAFs๋ฅผ ์์ฑํ ์๋ ์์ต๋๋ค.
์ ํ ์ฌํญยถ
aggregate_state
๋ ์ง๋ ฌํ๋ ๋ฒ์ ์์ ์ต๋ ํฌ๊ธฐ๊ฐ 8MB์ด๋ฏ๋ก ์ง๊ณ ์ํ์ ํฌ๊ธฐ๋ฅผ ์กฐ์ ํฉ๋๋ค.UDAF๋ฅผ ์๋์ฐ ํจ์ ๋ก ํธ์ถํ ์ ์์ต๋๋ค(์ฆ, OVER ์ ์ด ์๋ ๊ฒฝ์ฐ).
IMMUTABLE์ ์ง๊ณ ํจ์์์ ์ง์๋์ง ์์ต๋๋ค(AGGREGATE ๋งค๊ฐ ๋ณ์๋ฅผ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ). ๋ฐ๋ผ์ ๋ชจ๋ ์ง๊ณ ํจ์๋ ๊ธฐ๋ณธ์ ์ผ๋ก VOLATILE์ ๋๋ค.
์ฌ์ฉ์ ์ ์ ์ง๊ณ ํจ์๋ WITHIN GROUP ์ ๊ณผ ํจ๊ป ์ฌ์ฉํ ์ ์์ต๋๋ค. ์ฟผ๋ฆฌ๊ฐ ์คํ๋์ง ์์ต๋๋ค.
์ง๊ณ ํจ์ ํธ๋ค๋ฌ๋ฅผ ์ํ ์ธํฐํ์ด์คยถ
์ง๊ณ ํจ์๋ ํ์ ๋ ธ๋์ ์ํ๋ฅผ ์ง๊ณํ ๋ค์, ์ด ์ง๊ณ ์ํ๋ฅผ ์ง๋ ฌํํ์ฌ ์์ ๋ ธ๋๋ก ์ ์กํ๊ณ , ์์ ๋ ธ๋์์ ๋ณํฉํ์ฌ ์ต์ข ๊ฒฐ๊ณผ๋ฅผ ๊ณ์ฐํฉ๋๋ค.
์ง๊ณ ํจ์๋ฅผ ์ ์ํ๋ ค๋ฉด Snowflake๊ฐ ๋ฐํ์์ ํธ์ถํ๋ ๋ฉ์๋๋ฅผ ํฌํจํ๋ Python ํด๋์ค(ํจ์์ ํธ๋ค๋ฌ)๋ฅผ ์ ์ํด์ผ ํฉ๋๋ค. ์ด๋ฌํ ๋ฉ์๋๋ ์๋ ํ ์ด๋ธ์ ์ค๋ช ๋์ด ์์ต๋๋ค. ์ด ํญ๋ชฉ์ ๋ค๋ฅธ ๊ณณ์ ์๋ ์์ ๋ฅผ ์ฐธ์กฐํ์ญ์์ค.
๋ฉ์๋ |
์๊ตฌ ์ฌํญ |
์ค๋ช |
---|---|---|
|
ํ์ |
์ง๊ณ์ ๋ด๋ถ ์ํ๋ฅผ ์ด๊ธฐํํฉ๋๋ค. |
|
ํ์ |
์ง๊ณ์ ํ์ฌ ์ํ๋ฅผ ๋ฐํํฉ๋๋ค.
|
|
ํ์ |
์๋ก์ด ์ ๋ ฅ ํ์ ๊ธฐ์ค์ผ๋ก ์ง๊ณ ์ํ๋ฅผ ๋์ ํฉ๋๋ค. |
|
ํ์ |
๋ ๊ฐ์ ์ค๊ฐ ์ง๊ณ ์ํ๋ฅผ ๊ฒฐํฉํฉ๋๋ค. |
|
ํ์ |
์ง๊ณ๋ ์ํ๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ์ต์ข ๊ฒฐ๊ณผ๋ฅผ ์์ฑํฉ๋๋ค. |

์: ํฉ๊ณ ๊ณ์ฐยถ
๋ค์ ์์ ์ ์ฝ๋๋ ์ซ์ ๊ฐ์ ํฉ๊ณ๋ฅผ ๋ฐํํ๋ python_sum
์ฌ์ฉ์ ์ ์ ์ง๊ณ ํจ์(UDAF)๋ฅผ ์ ์ํฉ๋๋ค.
UDAF๋ฅผ ๋ง๋ญ๋๋ค.
CREATE OR REPLACE AGGREGATE FUNCTION PYTHON_SUM(a INT) RETURNS INT LANGUAGE PYTHON RUNTIME_VERSION = 3.9 HANDLER = 'PythonSum' AS $$ class PythonSum: def __init__(self): # This aggregate state is a primitive Python data type. self._partial_sum = 0 @property def aggregate_state(self): return self._partial_sum def accumulate(self, input_value): self._partial_sum += input_value def merge(self, other_partial_sum): self._partial_sum += other_partial_sum def finish(self): return self._partial_sum $$;
ํ ์คํธ ๋ฐ์ดํฐ ํ ์ด๋ธ์ ๋ง๋ญ๋๋ค.
CREATE OR REPLACE TABLE sales(item STRING, price INT); INSERT INTO sales VALUES ('car', 10000), ('motorcycle', 5000), ('car', 7500), ('motorcycle', 3500), ('motorcycle', 1500), ('car', 20000); SELECT * FROM sales;
python_sum
UDAF๋ฅผ ํธ์ถํฉ๋๋ค.SELECT python_sum(price) FROM sales;
๊ฒฐ๊ณผ๋ฅผ Snowflake ์์คํ ์ ์ SQL ํจ์์ธ SUM ์ ์ถ๋ ฅ๊ณผ ๋น๊ตํ์ฌ ๊ฒฐ๊ณผ๊ฐ ๋์ผํ์ง ํ์ธํฉ๋๋ค.
SELECT sum(col) FROM sales;
ํ๋งค ํ ์ด๋ธ์์ ํ๋ชฉ ์ ํ๋ณ ํฉ๊ณ ๊ฐ์ผ๋ก ๊ทธ๋ฃนํํฉ๋๋ค.
SELECT item, python_sum(price) FROM sales GROUP BY item;
์: ํ๊ท ๊ณ์ฐยถ
๋ค์ ์์ ์ ์ฝ๋๋ ์ซ์ ๊ฐ์ ํ๊ท ์ ๋ฐํํ๋ python_avg
์ฌ์ฉ์ ์ ์ ์ง๊ณ ํจ์๋ฅผ ์ ์ํฉ๋๋ค.
ํจ์๋ฅผ ๋ง๋ญ๋๋ค.
CREATE OR REPLACE AGGREGATE FUNCTION python_avg(a INT) RETURNS FLOAT LANGUAGE PYTHON RUNTIME_VERSION = 3.9 HANDLER = 'PythonAvg' AS $$ from dataclasses import dataclass @dataclass class AvgAggState: count: int sum: int class PythonAvg: def __init__(self): # This aggregate state is an object data type. self._agg_state = AvgAggState(0, 0) @property def aggregate_state(self): return self._agg_state def accumulate(self, input_value): sum = self._agg_state.sum count = self._agg_state.count self._agg_state.sum = sum + input_value self._agg_state.count = count + 1 def merge(self, other_agg_state): sum = self._agg_state.sum count = self._agg_state.count other_sum = other_agg_state.sum other_count = other_agg_state.count self._agg_state.sum = sum + other_sum self._agg_state.count = count + other_count def finish(self): sum = self._agg_state.sum count = self._agg_state.count return sum / count $$;
ํ ์คํธ ๋ฐ์ดํฐ ํ ์ด๋ธ์ ๋ง๋ญ๋๋ค.
CREATE OR REPLACE TABLE sales(item STRING, price INT); INSERT INTO sales VALUES ('car', 10000), ('motorcycle', 5000), ('car', 7500), ('motorcycle', 3500), ('motorcycle', 1500), ('car', 20000);
python_avg
์ฌ์ฉ์ ์ ์ ํจ์๋ฅผ ํธ์ถํฉ๋๋ค.SELECT python_avg(price) FROM sales;
๊ฒฐ๊ณผ๋ฅผ Snowflake ์์คํ ์ ์ SQL ํจ์์ธ AVG ์ ์ถ๋ ฅ๊ณผ ๋น๊ตํ์ฌ ๊ฒฐ๊ณผ๊ฐ ๋์ผํ์ง ํ์ธํฉ๋๋ค.
SELECT avg(price) FROM sales;
ํ๋งค ํ ์ด๋ธ์์ ํ๋ชฉ ์ ํ๋ณ๋ก ํ๊ท ๊ฐ์ ๊ทธ๋ฃนํํฉ๋๋ค.
SELECT item, python_avg(price) FROM sales GROUP BY item;
์: ๊ณ ์ ๊ฐ๋ง ๋ฐํยถ
๋ค์ ์์ ์ ์ฝ๋๋ ๋ฐฐ์ด์ ๋ฐ์์ ๊ณ ์ ๊ฐ๋ง ํฌํจ๋ ๋ฐฐ์ด์ ๋ฐํํฉ๋๋ค.
CREATE OR REPLACE AGGREGATE FUNCTION pythonGetUniqueValues(input ARRAY)
RETURNS ARRAY
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'PythonGetUniqueValues'
AS $$
class PythonGetUniqueValues:
def __init__(self):
self._agg_state = set()
@property
def aggregate_state(self):
return self._agg_state
def accumulate(self, input):
self._agg_state.update(input)
def merge(self, other_agg_state):
self._agg_state.update(other_agg_state)
def finish(self):
return list(self._agg_state)
$$;
CREATE OR REPLACE TABLE array_table(x array) AS
SELECT ARRAY_CONSTRUCT(0, 1, 2, 3, 4, 'foo', 'bar', 'snowflake') UNION ALL
SELECT ARRAY_CONSTRUCT(1, 3, 5, 7, 9, 'foo', 'barbar', 'snowpark') UNION ALL
SELECT ARRAY_CONSTRUCT(0, 2, 4, 6, 8, 'snow');
SELECT * FROM array_table;
SELECT pythonGetUniqueValues(x) FROM array_table;
์: ๋ฌธ์์ด ๊ฐ์ ๋ฐํยถ
๋ค์ ์์ ์ ์ฝ๋๋ ์ค๋ธ์ ํธ์์ ๋ฌธ์์ด์ ๋ชจ๋ ์ธ์คํด์ค ์๋ฅผ ๋ฐํํฉ๋๋ค.
CREATE OR REPLACE AGGREGATE FUNCTION pythonMapCount(input STRING)
RETURNS OBJECT
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'PythonMapCount'
AS $$
from collections import defaultdict
class PythonMapCount:
def __init__(self):
self._agg_state = defaultdict(int)
@property
def aggregate_state(self):
return self._agg_state
def accumulate(self, input):
# Increment count of lowercase input
self._agg_state[input.lower()] += 1
def merge(self, other_agg_state):
for item, count in other_agg_state.items():
self._agg_state[item] += count
def finish(self):
return dict(self._agg_state)
$$;
CREATE OR REPLACE TABLE string_table(x STRING);
INSERT INTO string_table SELECT 'foo' FROM TABLE(GENERATOR(ROWCOUNT => 1000));
INSERT INTO string_table SELECT 'bar' FROM TABLE(GENERATOR(ROWCOUNT => 2000));
INSERT INTO string_table SELECT 'snowflake' FROM TABLE(GENERATOR(ROWCOUNT => 50));
INSERT INTO string_table SELECT 'snowpark' FROM TABLE(GENERATOR(ROWCOUNT => 123));
INSERT INTO string_table SELECT 'SnOw' FROM TABLE(GENERATOR(ROWCOUNT => 1));
INSERT INTO string_table SELECT 'snow' FROM TABLE(GENERATOR(ROWCOUNT => 4));
SELECT pythonMapCount(x) FROM string_table;
์: ์์ k๊ฐ์ ๊ฐ์ฅ ํฐ ๊ฐ ๋ฐํยถ
๋ค์ ์์ ์ ์ฝ๋๋ k
์ ๋ํด ๊ฐ์ฅ ํฐ ๊ฐ์ ๋ชฉ๋ก์ ๋ฐํํฉ๋๋ค. ์ด ์ฝ๋๋ ์ต์ ํ์ ์์ ์
๋ ฅ๊ฐ์ ๋์ ํ ๋ค์ ๊ฐ์ฅ ํฐ ๊ฐ์ ์์ k
๋ฅผ ๋ฐํํฉ๋๋ค.
CREATE OR REPLACE AGGREGATE FUNCTION pythonTopK(input INT, k INT)
RETURNS ARRAY
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'PythonTopK'
AS $$
import heapq
from dataclasses import dataclass
import itertools
from typing import List
@dataclass
class AggState:
minheap: List[int]
k: int
class PythonTopK:
def __init__(self):
self._agg_state = AggState([], 0)
@property
def aggregate_state(self):
return self._agg_state
@staticmethod
def get_top_k_items(minheap, k):
# Return k smallest elements if there are more than k elements on the min heap.
if (len(minheap) > k):
return [heapq.heappop(minheap) for i in range(k)]
return minheap
def accumulate(self, input, k):
self._agg_state.k = k
# Store the input as negative value, as heapq is a min heap.
heapq.heappush(self._agg_state.minheap, -input)
# Store only top k items on the min heap.
self._agg_state.minheap = self.get_top_k_items(self._agg_state.minheap, k)
def merge(self, other_agg_state):
k = self._agg_state.k if self._agg_state.k > 0 else other_agg_state.k
# Merge two min heaps by popping off elements from one and pushing them onto another.
while(len(other_agg_state.minheap) > 0):
heapq.heappush(self._agg_state.minheap, heapq.heappop(other_agg_state.minheap))
# Store only k elements on the min heap.
self._agg_state.minheap = self.get_top_k_items(self._agg_state.minheap, k)
def finish(self):
return [-x for x in self._agg_state.minheap]
$$;
CREATE OR REPLACE TABLE numbers_table(num_column INT);
INSERT INTO numbers_table SELECT 5 FROM TABLE(GENERATOR(ROWCOUNT => 10));
INSERT INTO numbers_table SELECT 1 FROM TABLE(GENERATOR(ROWCOUNT => 10));
INSERT INTO numbers_table SELECT 9 FROM TABLE(GENERATOR(ROWCOUNT => 10));
INSERT INTO numbers_table SELECT 7 FROM TABLE(GENERATOR(ROWCOUNT => 10));
INSERT INTO numbers_table SELECT 10 FROM TABLE(GENERATOR(ROWCOUNT => 10));
INSERT INTO numbers_table SELECT 3 FROM TABLE(GENERATOR(ROWCOUNT => 10));
-- Return top 15 largest values from numbers_table.
SELECT pythonTopK(num_column, 15) FROM numbers_table;