Python ์‚ฌ์šฉ์ž ์ •์˜ ์ง‘๊ณ„ ํ•จ์ˆ˜ยถ

์‚ฌ์šฉ์ž ์ •์˜ ์ง‘๊ณ„ ํ•จ์ˆ˜(UDAFs)๋Š” ํ•˜๋‚˜ ์ด์ƒ์˜ ํ–‰์„ ์ž…๋ ฅ์œผ๋กœ ๋ฐ›์•„ ๋‹จ์ผ ํ–‰์˜ ์ถœ๋ ฅ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค. UDAF๋Š” ์—ฌ๋Ÿฌ ํ–‰์˜ ๊ฐ’์„ ์—ฐ์‚ฐํ•˜์—ฌ ํ•ฉ๊ณ„, ํ‰๊ท , ๊ณ„์‚ฐ, ์ตœ์†Œ๊ฐ’ ๋˜๋Š” ์ตœ๋Œ€๊ฐ’ ์ฐพ๊ธฐ, ํ‘œ์ค€ ํŽธ์ฐจ, ์ถ”์ •๊ณผ ๊ฐ™์€ ์ˆ˜ํ•™์  ๊ณ„์‚ฐ๊ณผ ์ผ๋ถ€ ๋น„์ˆ˜ํ•™์  ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

Python UDAFs๋Š” Snowflake ์‹œ์Šคํ…œ ์ •์˜ SQL ์ง‘๊ณ„ ํ•จ์ˆ˜ ์™€ ์œ ์‚ฌํ•œ ์ž์ฒด ์ง‘๊ณ„ ํ•จ์ˆ˜๋ฅผ ์ž‘์„ฑํ•  ์ˆ˜ ์žˆ๋Š” ๋ฐฉ๋ฒ•์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

Python์—์„œ DataFrames์šฉ ์‚ฌ์šฉ์ž ์ •์˜ ํ•จ์ˆ˜(UDAFs) ๋งŒ๋“ค๊ธฐ ์— ์„ค๋ช…๋œ ๋Œ€๋กœ Snowpark APIs๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ž์ฒด UDAFs๋ฅผ ์ƒ์„ฑํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

์ œํ•œ ์‚ฌํ•ญยถ

  • aggregate_state ๋Š” ์ง๋ ฌํ™”๋œ ๋ฒ„์ „์—์„œ ์ตœ๋Œ€ ํฌ๊ธฐ๊ฐ€ 8MB์ด๋ฏ€๋กœ ์ง‘๊ณ„ ์ƒํƒœ์˜ ํฌ๊ธฐ๋ฅผ ์กฐ์ •ํ•ฉ๋‹ˆ๋‹ค.

  • UDAF๋ฅผ ์œˆ๋„์šฐ ํ•จ์ˆ˜ ๋กœ ํ˜ธ์ถœํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค(์ฆ‰, OVER ์ ˆ์ด ์žˆ๋Š” ๊ฒฝ์šฐ).

  • IMMUTABLE์€ ์ง‘๊ณ„ ํ•จ์ˆ˜์—์„œ ์ง€์›๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค(AGGREGATE ๋งค๊ฐœ ๋ณ€์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ). ๋”ฐ๋ผ์„œ ๋ชจ๋“  ์ง‘๊ณ„ ํ•จ์ˆ˜๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ VOLATILE์ž…๋‹ˆ๋‹ค.

  • ์‚ฌ์šฉ์ž ์ •์˜ ์ง‘๊ณ„ ํ•จ์ˆ˜๋Š” WITHIN GROUP ์ ˆ๊ณผ ํ•จ๊ป˜ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. ์ฟผ๋ฆฌ๊ฐ€ ์‹คํ–‰๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

์ง‘๊ณ„ ํ•จ์ˆ˜ ํ•ธ๋“ค๋Ÿฌ๋ฅผ ์œ„ํ•œ ์ธํ„ฐํŽ˜์ด์Šคยถ

์ง‘๊ณ„ ํ•จ์ˆ˜๋Š” ํ•˜์œ„ ๋…ธ๋“œ์˜ ์ƒํƒœ๋ฅผ ์ง‘๊ณ„ํ•œ ๋‹ค์Œ, ์ด ์ง‘๊ณ„ ์ƒํƒœ๋ฅผ ์ง๋ ฌํ™”ํ•˜์—ฌ ์ƒ์œ„ ๋…ธ๋“œ๋กœ ์ „์†กํ•˜๊ณ , ์ƒ์œ„ ๋…ธ๋“œ์—์„œ ๋ณ‘ํ•ฉํ•˜์—ฌ ์ตœ์ข… ๊ฒฐ๊ณผ๋ฅผ ๊ณ„์‚ฐํ•ฉ๋‹ˆ๋‹ค.

์ง‘๊ณ„ ํ•จ์ˆ˜๋ฅผ ์ •์˜ํ•˜๋ ค๋ฉด Snowflake๊ฐ€ ๋Ÿฐํƒ€์ž„์— ํ˜ธ์ถœํ•˜๋Š” ๋ฉ”์„œ๋“œ๋ฅผ ํฌํ•จํ•˜๋Š” Python ํด๋ž˜์Šค(ํ•จ์ˆ˜์˜ ํ•ธ๋“ค๋Ÿฌ)๋ฅผ ์ •์˜ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์ด๋Ÿฌํ•œ ๋ฉ”์„œ๋“œ๋Š” ์•„๋ž˜ ํ…Œ์ด๋ธ”์— ์„ค๋ช…๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค. ์ด ํ•ญ๋ชฉ์˜ ๋‹ค๋ฅธ ๊ณณ์— ์žˆ๋Š” ์˜ˆ์ œ๋ฅผ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

๋ฉ”์„œ๋“œ

์š”๊ตฌ ์‚ฌํ•ญ

์„ค๋ช…

__init__

ํ•„์ˆ˜

์ง‘๊ณ„์˜ ๋‚ด๋ถ€ ์ƒํƒœ๋ฅผ ์ดˆ๊ธฐํ™”ํ•ฉ๋‹ˆ๋‹ค.

aggregate_state

ํ•„์ˆ˜

์ง‘๊ณ„์˜ ํ˜„์žฌ ์ƒํƒœ๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

  • ๋ฉ”์„œ๋“œ์—๋Š” @property decorator ๊ฐ€ ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

  • ์ง‘๊ณ„ ์ƒํƒœ ์˜ค๋ธŒ์ ํŠธ๋Š” Python ํ”ผํด ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ์—์„œ ์ง๋ ฌํ™”ํ•  ์ˆ˜ ์žˆ๋Š” ๋ชจ๋“  Python ๋ฐ์ดํ„ฐ ํƒ€์ž…์ด ๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  • ๋‹จ์ˆœ ์ง‘๊ณ„ ์ƒํƒœ์˜ ๊ฒฝ์šฐ ๊ธฐ๋ณธ Python ๋ฐ์ดํ„ฐ ํƒ€์ž…์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ๋” ๋ณต์žกํ•œ ์ง‘๊ณ„ ์ƒํƒœ์˜ ๊ฒฝ์šฐ Python ๋ฐ์ดํ„ฐ ํด๋ž˜์Šค ๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

accumulate

ํ•„์ˆ˜

์ƒˆ๋กœ์šด ์ž…๋ ฅ ํ–‰์„ ๊ธฐ์ค€์œผ๋กœ ์ง‘๊ณ„ ์ƒํƒœ๋ฅผ ๋ˆ„์ ํ•ฉ๋‹ˆ๋‹ค.

merge

ํ•„์ˆ˜

๋‘ ๊ฐœ์˜ ์ค‘๊ฐ„ ์ง‘๊ณ„ ์ƒํƒœ๋ฅผ ๊ฒฐํ•ฉํ•ฉ๋‹ˆ๋‹ค.

finish

ํ•„์ˆ˜

์ง‘๊ณ„๋œ ์ƒํƒœ๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ์ตœ์ข… ๊ฒฐ๊ณผ๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

์ž…๋ ฅ ๊ฐ’์ด ํ•˜์œ„ ๋…ธ๋“œ์— ๋ˆ„์ ๋œ ๋‹ค์Œ ์ƒ์œ„ ๋…ธ๋“œ๋กœ ์ „์†ก๋˜์–ด ๋ณ‘ํ•ฉ๋˜์–ด ์ตœ์ข… ๊ฒฐ๊ณผ๋ฅผ ์ƒ์„ฑํ•˜๋Š” ๊ณผ์ •์„ ๋ณด์—ฌ ์ฃผ๋Š” ๋‹ค์ด์–ด๊ทธ๋žจ์ž…๋‹ˆ๋‹ค.

์˜ˆ: ํ•ฉ๊ณ„ ๊ณ„์‚ฐยถ

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” ์ˆซ์ž ๊ฐ’์˜ ํ•ฉ๊ณ„๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋Š” python_sum ์‚ฌ์šฉ์ž ์ •์˜ ์ง‘๊ณ„ ํ•จ์ˆ˜(UDAF)๋ฅผ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค.

  1. UDAF๋ฅผ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

    CREATE OR REPLACE AGGREGATE FUNCTION PYTHON_SUM(a INT)
      RETURNS INT
      LANGUAGE PYTHON
      RUNTIME_VERSION = 3.9
      HANDLER = 'PythonSum'
    AS $$
    class PythonSum:
      def __init__(self):
        # This aggregate state is a primitive Python data type.
        self._partial_sum = 0
    
      @property
      def aggregate_state(self):
        return self._partial_sum
    
      def accumulate(self, input_value):
        self._partial_sum += input_value
    
      def merge(self, other_partial_sum):
        self._partial_sum += other_partial_sum
    
      def finish(self):
        return self._partial_sum
    $$;
    
    Copy
  2. ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ ํ…Œ์ด๋ธ”์„ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

    CREATE OR REPLACE TABLE sales(item STRING, price INT);
    
    INSERT INTO sales VALUES ('car', 10000), ('motorcycle', 5000), ('car', 7500), ('motorcycle', 3500), ('motorcycle', 1500), ('car', 20000);
    
    SELECT * FROM sales;
    
    Copy
  3. python_sum UDAF๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

    SELECT python_sum(price) FROM sales;
    
    Copy
  4. ๊ฒฐ๊ณผ๋ฅผ Snowflake ์‹œ์Šคํ…œ ์ •์˜ SQL ํ•จ์ˆ˜์ธ SUM ์˜ ์ถœ๋ ฅ๊ณผ ๋น„๊ตํ•˜์—ฌ ๊ฒฐ๊ณผ๊ฐ€ ๋™์ผํ•œ์ง€ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค.

    SELECT sum(col) FROM sales;
    
    Copy
  5. ํŒ๋งค ํ…Œ์ด๋ธ”์—์„œ ํ’ˆ๋ชฉ ์œ ํ˜•๋ณ„ ํ•ฉ๊ณ„ ๊ฐ’์œผ๋กœ ๊ทธ๋ฃนํ™”ํ•ฉ๋‹ˆ๋‹ค.

    SELECT item, python_sum(price) FROM sales GROUP BY item;
    
    Copy

์˜ˆ: ํ‰๊ท  ๊ณ„์‚ฐยถ

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” ์ˆซ์ž ๊ฐ’์˜ ํ‰๊ท ์„ ๋ฐ˜ํ™˜ํ•˜๋Š” python_avg ์‚ฌ์šฉ์ž ์ •์˜ ์ง‘๊ณ„ ํ•จ์ˆ˜๋ฅผ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค.

  1. ํ•จ์ˆ˜๋ฅผ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

    CREATE OR REPLACE AGGREGATE FUNCTION python_avg(a INT)
      RETURNS FLOAT
      LANGUAGE PYTHON
      RUNTIME_VERSION = 3.9
      HANDLER = 'PythonAvg'
    AS $$
    from dataclasses import dataclass
    
    @dataclass
    class AvgAggState:
        count: int
        sum: int
    
    class PythonAvg:
        def __init__(self):
            # This aggregate state is an object data type.
            self._agg_state = AvgAggState(0, 0)
    
        @property
        def aggregate_state(self):
            return self._agg_state
    
        def accumulate(self, input_value):
            sum = self._agg_state.sum
            count = self._agg_state.count
    
            self._agg_state.sum = sum + input_value
            self._agg_state.count = count + 1
    
        def merge(self, other_agg_state):
            sum = self._agg_state.sum
            count = self._agg_state.count
    
            other_sum = other_agg_state.sum
            other_count = other_agg_state.count
    
            self._agg_state.sum = sum + other_sum
            self._agg_state.count = count + other_count
    
        def finish(self):
            sum = self._agg_state.sum
            count = self._agg_state.count
            return sum / count
    $$;
    
    Copy
  2. ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ ํ…Œ์ด๋ธ”์„ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

    CREATE OR REPLACE TABLE sales(item STRING, price INT);
    INSERT INTO sales VALUES ('car', 10000), ('motorcycle', 5000), ('car', 7500), ('motorcycle', 3500), ('motorcycle', 1500), ('car', 20000);
    
    Copy
  3. python_avg ์‚ฌ์šฉ์ž ์ •์˜ ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

    SELECT python_avg(price) FROM sales;
    
    Copy
  4. ๊ฒฐ๊ณผ๋ฅผ Snowflake ์‹œ์Šคํ…œ ์ •์˜ SQL ํ•จ์ˆ˜์ธ AVG ์˜ ์ถœ๋ ฅ๊ณผ ๋น„๊ตํ•˜์—ฌ ๊ฒฐ๊ณผ๊ฐ€ ๋™์ผํ•œ์ง€ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค.

    SELECT avg(price) FROM sales;
    
    Copy
  5. ํŒ๋งค ํ…Œ์ด๋ธ”์—์„œ ํ’ˆ๋ชฉ ์œ ํ˜•๋ณ„๋กœ ํ‰๊ท ๊ฐ’์„ ๊ทธ๋ฃนํ™”ํ•ฉ๋‹ˆ๋‹ค.

    SELECT item, python_avg(price) FROM sales GROUP BY item;
    
    Copy

์˜ˆ: ๊ณ ์œ  ๊ฐ’๋งŒ ๋ฐ˜ํ™˜ยถ

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” ๋ฐฐ์—ด์„ ๋ฐ›์•„์„œ ๊ณ ์œ  ๊ฐ’๋งŒ ํฌํ•จ๋œ ๋ฐฐ์—ด์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

CREATE OR REPLACE AGGREGATE FUNCTION pythonGetUniqueValues(input ARRAY)
  RETURNS ARRAY
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  HANDLER = 'PythonGetUniqueValues'
AS $$
class PythonGetUniqueValues:
    def __init__(self):
        self._agg_state = set()

    @property
    def aggregate_state(self):
        return self._agg_state

    def accumulate(self, input):
        self._agg_state.update(input)

    def merge(self, other_agg_state):
        self._agg_state.update(other_agg_state)

    def finish(self):
        return list(self._agg_state)
$$;
Copy
CREATE OR REPLACE TABLE array_table(x array) AS
SELECT ARRAY_CONSTRUCT(0, 1, 2, 3, 4, 'foo', 'bar', 'snowflake') UNION ALL
SELECT ARRAY_CONSTRUCT(1, 3, 5, 7, 9, 'foo', 'barbar', 'snowpark') UNION ALL
SELECT ARRAY_CONSTRUCT(0, 2, 4, 6, 8, 'snow');

SELECT * FROM array_table;

SELECT pythonGetUniqueValues(x) FROM array_table;
Copy

์˜ˆ: ๋ฌธ์ž์—ด ๊ฐœ์ˆ˜ ๋ฐ˜ํ™˜ยถ

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” ์˜ค๋ธŒ์ ํŠธ์—์„œ ๋ฌธ์ž์—ด์˜ ๋ชจ๋“  ์ธ์Šคํ„ด์Šค ์ˆ˜๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

CREATE OR REPLACE AGGREGATE FUNCTION pythonMapCount(input STRING)
  RETURNS OBJECT
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  HANDLER = 'PythonMapCount'
AS $$
from collections import defaultdict

class PythonMapCount:
    def __init__(self):
        self._agg_state = defaultdict(int)

    @property
    def aggregate_state(self):
        return self._agg_state

    def accumulate(self, input):
        # Increment count of lowercase input
        self._agg_state[input.lower()] += 1

    def merge(self, other_agg_state):
        for item, count in other_agg_state.items():
            self._agg_state[item] += count

    def finish(self):
        return dict(self._agg_state)
$$;
Copy
CREATE OR REPLACE TABLE string_table(x STRING);
INSERT INTO string_table SELECT 'foo' FROM TABLE(GENERATOR(ROWCOUNT => 1000));
INSERT INTO string_table SELECT 'bar' FROM TABLE(GENERATOR(ROWCOUNT => 2000));
INSERT INTO string_table SELECT 'snowflake' FROM TABLE(GENERATOR(ROWCOUNT => 50));
INSERT INTO string_table SELECT 'snowpark' FROM TABLE(GENERATOR(ROWCOUNT => 123));
INSERT INTO string_table SELECT 'SnOw' FROM TABLE(GENERATOR(ROWCOUNT => 1));
INSERT INTO string_table SELECT 'snow' FROM TABLE(GENERATOR(ROWCOUNT => 4));

SELECT pythonMapCount(x) FROM string_table;
Copy

์˜ˆ: ์ƒ์œ„ k๊ฐœ์˜ ๊ฐ€์žฅ ํฐ ๊ฐ’ ๋ฐ˜ํ™˜ยถ

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” k ์— ๋Œ€ํ•ด ๊ฐ€์žฅ ํฐ ๊ฐ’์˜ ๋ชฉ๋ก์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค. ์ด ์ฝ”๋“œ๋Š” ์ตœ์†Œ ํž™์— ์Œ์ˆ˜ ์ž…๋ ฅ๊ฐ’์„ ๋ˆ„์ ํ•œ ๋‹ค์Œ ๊ฐ€์žฅ ํฐ ๊ฐ’์˜ ์ƒ์œ„ k ๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

CREATE OR REPLACE AGGREGATE FUNCTION pythonTopK(input INT, k INT)
  RETURNS ARRAY
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  HANDLER = 'PythonTopK'
AS $$
import heapq
from dataclasses import dataclass
import itertools
from typing import List

@dataclass
class AggState:
    minheap: List[int]
    k: int

class PythonTopK:
    def __init__(self):
        self._agg_state = AggState([], 0)

    @property
    def aggregate_state(self):
        return self._agg_state

    @staticmethod
    def get_top_k_items(minheap, k):
      # Return k smallest elements if there are more than k elements on the min heap.
      if (len(minheap) > k):
        return [heapq.heappop(minheap) for i in range(k)]
      return minheap

    def accumulate(self, input, k):
        self._agg_state.k = k

        # Store the input as negative value, as heapq is a min heap.
        heapq.heappush(self._agg_state.minheap, -input)

        # Store only top k items on the min heap.
        self._agg_state.minheap = self.get_top_k_items(self._agg_state.minheap, k)

    def merge(self, other_agg_state):
        k = self._agg_state.k if self._agg_state.k > 0 else other_agg_state.k

        # Merge two min heaps by popping off elements from one and pushing them onto another.
        while(len(other_agg_state.minheap) > 0):
            heapq.heappush(self._agg_state.minheap, heapq.heappop(other_agg_state.minheap))

        # Store only k elements on the min heap.
        self._agg_state.minheap = self.get_top_k_items(self._agg_state.minheap, k)

    def finish(self):
        return [-x for x in self._agg_state.minheap]
$$;
Copy
CREATE OR REPLACE TABLE numbers_table(num_column INT);
INSERT INTO numbers_table SELECT 5 FROM TABLE(GENERATOR(ROWCOUNT => 10));
INSERT INTO numbers_table SELECT 1 FROM TABLE(GENERATOR(ROWCOUNT => 10));
INSERT INTO numbers_table SELECT 9 FROM TABLE(GENERATOR(ROWCOUNT => 10));
INSERT INTO numbers_table SELECT 7 FROM TABLE(GENERATOR(ROWCOUNT => 10));
INSERT INTO numbers_table SELECT 10 FROM TABLE(GENERATOR(ROWCOUNT => 10));
INSERT INTO numbers_table SELECT 3 FROM TABLE(GENERATOR(ROWCOUNT => 10));

-- Return top 15 largest values from numbers_table.
SELECT pythonTopK(num_column, 15) FROM numbers_table;
Copy