Python UDF ์ฒ˜๋ฆฌ๊ธฐ ์˜ˆยถ

์ด ํ•ญ๋ชฉ์—๋Š” Python์œผ๋กœ ์ž‘์„ฑ๋œ UDF ์ฒ˜๋ฆฌ๊ธฐ ์ฝ”๋“œ์˜ ๊ฐ„๋‹จํ•œ ์˜ˆ๊ฐ€ ๋‚˜์™€ ์žˆ์Šต๋‹ˆ๋‹ค.

Python์„ ์‚ฌ์šฉํ•˜์—ฌ UDF ์ฒ˜๋ฆฌ๊ธฐ๋ฅผ ๋งŒ๋“œ๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ Python UDF ๋งŒ๋“ค๊ธฐ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

runtime_version ์„ ์ฝ”๋“œ์—์„œ ํ•„์š”ํ•œ Python ๋Ÿฐํƒ€์ž„ ๋ฒ„์ „์œผ๋กœ ์„ค์ •ํ•ฉ๋‹ˆ๋‹ค. ์ง€์›๋˜๋Š” Python ๋ฒ„์ „์€ ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

  • 3.9

  • 3.10

  • 3.11

  • 3.12

์ธ๋ผ์ธ ์ฒ˜๋ฆฌ๊ธฐ์—์„œ ํŒจํ‚ค์ง€ ๊ฐ€์ ธ์˜ค๊ธฐยถ

Anaconda์—์„œ ์„ ๋ณ„๋œ ์„œ๋“œ ํŒŒํ‹ฐ ํŒจํ‚ค์ง€ ๋ชฉ๋ก์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. ์ž์„ธํ•œ ๋‚ด์šฉ์€ ์„œ๋“œ ํŒŒํ‹ฐ ํŒจํ‚ค์ง€ ์‚ฌ์šฉํ•˜๊ธฐ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

์ฐธ๊ณ 

Snowflake ์กฐ์ง ๊ด€๋ฆฌ์ž๊ฐ€ Snowflake ์™ธ๋ถ€ ์ œ๊ณต ์•ฝ๊ด€ ์„ ์Šน์ธํ•ด์•ผ Anaconda์—์„œ ์ œ๊ณตํ•˜๋Š” ํŒจํ‚ค์ง€๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ž์„ธํ•œ ๋‚ด์šฉ์€ Anaconda์˜ ์„œ๋“œ ํŒŒํ‹ฐ ํŒจํ‚ค์ง€ ์‚ฌ์šฉํ•˜๊ธฐ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

๋‹ค์Œ ์ฝ”๋“œ์—์„œ๋Š” ํŒจํ‚ค์ง€๋ฅผ ๊ฐ€์ ธ์˜ค๊ณ  ํŒจํ‚ค์ง€์˜ ๋ฒ„์ „์„ ๋ฐ˜ํ™˜ํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋ณด์—ฌ์ค๋‹ˆ๋‹ค.

UDF๋ฅผ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

CREATE OR REPLACE FUNCTION py_udf()
  RETURNS VARIANT
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  PACKAGES = ('numpy','pandas','xgboost==1.5.0')
  HANDLER = 'udf'
AS $$
import numpy as np
import pandas as pd
import xgboost as xgb
def udf():
  return [np.__version__, pd.__version__, xgb.__version__]
$$;
Copy

UDF๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

SELECT py_udf();
Copy
+-------------+
| PY_UDF()    |
|-------------|
| [           |
|   "1.19.2", |
|   "1.4.0",  |
|   "1.5.0"   |
| ]           |
+-------------+

PACKAGES ํ‚ค์›Œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ํŒจํ‚ค์ง€ ๋ฒ„์ „์„ ์ง€์ •ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  • ๋ฒ„์ „ ์—†์Œ(์˜ˆ: numpy)

  • ์ •ํ™•ํ•œ ๋ฒ„์ „์— ๊ณ ์ •๋จ(์˜ˆ: numpy==1.25.2)

  • ์™€์ผ๋“œ์นด๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ฒ„์ „ ์ ‘๋‘์‚ฌ๋กœ ์ œํ•œ๋จ(์˜ˆ: numpy==1.*)

  • ๋ฒ„์ „ ๋ฒ”์œ„๋กœ ์ œํ•œ๋จ(์˜ˆ: numpy>=1.25)

  • ๋ชจ๋“  ๋ฒ„์ „ ์ง€์ •์ž๋ฅผ ๋งŒ์กฑํ•˜๋Š” ํŒจํ‚ค์ง€๊ฐ€ ์„ ํƒ๋˜๋„๋ก ์—ฌ๋Ÿฌ ๋ฒ„์ „ ์ง€์ •์ž๋กœ ์ œํ•œ๋จ(์˜ˆ: numpy>=1.25,<2)

์ฐธ๊ณ 

ํŒจํ‚ค์ง€ ์ •์ฑ…์—์„œ๋Š” ์—ฌ๋Ÿฌ ๋ฒ”์œ„ ์—ฐ์‚ฐ์ž(์˜ˆ: numpy>=1.25,<2)๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†์ง€๋งŒ Python UDF, UDTF ๋ฐ ์ €์žฅ ํ”„๋กœ์‹œ์ €๋ฅผ ์ƒ์„ฑํ•  ๋•Œ๋Š” ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋‹ค์Œ์€ ์™€์ผ๋“œ์นด๋“œ * ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํŒจํ‚ค์ง€๋ฅผ ๋ฒ„์ „ ์ ‘๋‘์‚ฌ๋กœ ์ œํ•œํ•˜๋Š” ๋ฐฉ๋ฒ•์˜ ์˜ˆ์ž…๋‹ˆ๋‹ค.

CREATE OR REPLACE FUNCTION my_udf()
  RETURNS STRING
  LANGUAGE PYTHON
  PACKAGES = ('numpy==1.*')
  RUNTIME_VERSION = 3.10
  HANDLER = 'echo'
AS $$
def echo():
  return 'hi'
$$;
Copy

์ด ์˜ˆ์—์„œ๋Š” ํŒจํ‚ค์ง€๋ฅผ ์ง€์ •๋œ ๋ฒ„์ „๋ณด๋‹ค ํฌ๊ฑฐ๋‚˜ ๊ฐ™๋„๋ก ์ œํ•œํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋ณด์—ฌ์ค๋‹ˆ๋‹ค.

CREATE OR REPLACE FUNCTION my_udf()
  RETURNS STRING
  LANGUAGE PYTHON
  PACKAGES = ('numpy>=1.2')
  RUNTIME_VERSION = 3.10
  HANDLER = 'echo'
AS $$
def echo():
  return 'hi'
$$;
Copy

์ด ์˜ˆ์—์„œ๋Š” ์—ฌ๋Ÿฌ ํŒจํ‚ค์ง€ ๋ฒ„์ „ ์ง€์ •์ž๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋ณด์—ฌ์ค๋‹ˆ๋‹ค.

CREATE OR REPLACE FUNCTION my_udf()
  RETURNS STRING
  LANGUAGE PYTHON
  PACKAGES = ('numpy>=1.2,<2')
  RUNTIME_VERSION = 3.10
  HANDLER = 'echo'
AS $$
def echo():
  return 'hi'
$$;
Copy

ํŒŒ์ผ ์ฝ๊ธฐยถ

Python UDF ์ฒ˜๋ฆฌ๊ธฐ ์ฝ”๋“œ๋กœ ํŒŒ์ผ์˜ ๋‚ด์šฉ์„ ์ฝ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ๋น„์ •ํ˜• ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ํŒŒ์ผ์„ ์ฝ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

ํŒŒ์ผ์˜ ๋‚ด์šฉ์€ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๋ฐฉ๋ฒ•์œผ๋กœ ์ฝ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

IMPORTS๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ •์ ์œผ๋กœ ์ง€์ •๋œ ํŒŒ์ผ ์ฝ๊ธฐยถ

CREATE FUNCTION ๋ช…๋ น์˜ IMPORTS ์ ˆ์— ํŒŒ์ผ ์ด๋ฆ„๊ณผ ์Šคํ…Œ์ด์ง€ ์ด๋ฆ„์„ ์ง€์ •ํ•˜์—ฌ ํŒŒ์ผ์„ ์ฝ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

IMPORTS ์ ˆ์— ํŒŒ์ผ์„ ์ง€์ •ํ•˜๋ฉด Snowflake๋Š” ํ•ด๋‹น ํŒŒ์ผ์„ ์Šคํ…Œ์ด์ง€์—์„œ UDF์˜ ํ™ˆ ๋””๋ ‰ํ„ฐ๋ฆฌ (๊ฐ€์ ธ์˜ค๊ธฐ ๋””๋ ‰ํ„ฐ๋ฆฌ ๋ผ๊ณ ๋„ ํ•จ)๋กœ ๋ณต์‚ฌํ•ฉ๋‹ˆ๋‹ค. ์ด ๋””๋ ‰ํ„ฐ๋ฆฌ๋Š” UDF๊ฐ€ ํŒŒ์ผ์„ ์ฝ๋Š” ๋””๋ ‰ํ„ฐ๋ฆฌ์ž…๋‹ˆ๋‹ค.

Snowflake๋Š” ๊ฐ€์ ธ์˜จ ํŒŒ์ผ์„ ๋‹จ์ผ ๋””๋ ‰ํ„ฐ๋ฆฌ์— ๋ณต์‚ฌํ•ฉ๋‹ˆ๋‹ค. ํ•ด๋‹น ๋””๋ ‰ํ„ฐ๋ฆฌ์˜ ๋ชจ๋“  ํŒŒ์ผ์€ ๊ณ ์œ ํ•œ ์ด๋ฆ„์„ ๊ฐ€์ ธ์•ผ ํ•˜๋ฏ€๋กœ IMPORTS ์ ˆ์˜ ๊ฐ ํŒŒ์ผ์€ ๊ณ ์œ ํ•œ ์ด๋ฆ„์„ ๊ฐ€์ ธ์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์ด๋Š” ํŒŒ์ผ์ด ๋‹ค๋ฅธ ์Šคํ…Œ์ด์ง€ ๋˜๋Š” ์Šคํ…Œ์ด์ง€ ๋‚ด์˜ ๋‹ค๋ฅธ ํ•˜์œ„ ๋””๋ ‰ํ„ฐ๋ฆฌ์—์„œ ์‹œ์ž‘ํ•˜๋Š” ๊ฒฝ์šฐ์—๋„ ์ ์šฉ๋ฉ๋‹ˆ๋‹ค.

์ฐธ๊ณ 

ํ•˜์œ„ ํด๋”๊ฐ€ ์•„๋‹Œ ์Šคํ…Œ์ด์ง€์˜ ์ตœ์ƒ์œ„ ๋””๋ ‰ํ„ฐ๋ฆฌ์—์„œ๋งŒ ํŒŒ์ผ์„ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์—์„œ๋Š” my_stage ๋ผ๋Š” ์Šคํ…Œ์ด์ง€์—์„œ file.txt ๋ผ๋Š” ํŒŒ์ผ์„ ์ฝ๋Š” ์ธ๋ผ์ธ Python ์ฒ˜๋ฆฌ๊ธฐ๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. ์ฒ˜๋ฆฌ๊ธฐ๋Š” snowflake_import_directory ์‹œ์Šคํ…œ ์˜ต์…˜๊ณผ ํ•จ๊ป˜ Python sys._xoptions ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ UDF์˜ ํ™ˆ ๋””๋ ‰ํ„ฐ๋ฆฌ ์œ„์น˜๋ฅผ ๊ฒ€์ƒ‰ํ•ฉ๋‹ˆ๋‹ค.

Snowflake๋Š” UDF ์ƒ์„ฑ ์ค‘์— ํ•œ ๋ฒˆ๋งŒ ํŒŒ์ผ์„ ์ฝ์œผ๋ฉฐ ํŒŒ์ผ ์ฝ๊ธฐ๊ฐ€ ๋Œ€์ƒ ์ฒ˜๋ฆฌ๊ธฐ ์™ธ๋ถ€์—์„œ ๋ฐœ์ƒํ•˜๋Š” ๊ฒฝ์šฐ UDF ์‹คํ–‰ ์ค‘์— ํŒŒ์ผ์„ ๋‹ค์‹œ ์ฝ์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

์ธ๋ผ์ธ ์ฒ˜๋ฆฌ๊ธฐ๋กœ UDF๋ฅผ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

CREATE OR REPLACE FUNCTION my_udf()
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  IMPORTS = ('@my_stage/file.txt')
  HANDLER = 'compute'
AS $$
import sys
import os

with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
  s = f.read()

def compute():
  return s
$$;
Copy

SnowflakeFile ์„ ์‚ฌ์šฉํ•˜์—ฌ ๋™์ ์œผ๋กœ ์ง€์ •๋œ ํŒŒ์ผ ์ฝ๊ธฐยถ

Snowpark snowflake.snowpark.files ๋ชจ๋“ˆ์—์„œ SnowflakeFile ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์Šคํ…Œ์ด์ง€์—์„œ ํŒŒ์ผ์„ ์ฝ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. SnowflakeFile ํด๋ž˜์Šค๋Š” ๋ชจ๋“  ํฌ๊ธฐ์˜ ํŒŒ์ผ์„ ์ŠคํŠธ๋ฆฌ๋ฐํ•  ์ˆ˜ ์žˆ๋Š” ๋™์  ํŒŒ์ผ ์•ก์„ธ์Šค๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. ๋™์  ํŒŒ์ผ ์•ก์„ธ์Šค๋Š” ์—ฌ๋Ÿฌ ํŒŒ์ผ์—์„œ ๋ฐ˜๋ณตํ•˜๋ ค๋Š” ๊ฒฝ์šฐ์—๋„ ์œ ์šฉํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ์—ฌ๋Ÿฌ ํŒŒ์ผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

SnowflakeFile ํด๋ž˜์Šค์—๋Š” ํŒŒ์ผ์„ ์—ด๊ธฐ ์œ„ํ•œ ๋ฉ”์„œ๋“œ๊ฐ€ ํ•œ ๊ฐœ ์žˆ๋Š”๋ฐ, ๋ฐ”๋กœ open ์ž…๋‹ˆ๋‹ค. open ๋ฉ”์„œ๋“œ๋Š” Python์˜ IOBase ํŒŒ์ผ ์˜ค๋ธŒ์ ํŠธ๋ฅผ ํ™•์žฅํ•˜๋Š” SnowflakeFile ์˜ค๋ธŒ์ ํŠธ๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

SnowflakeFile ์˜ค๋ธŒ์ ํŠธ๋Š” ๋‹ค์Œ IOBase, BufferedIOBase ๋ฐ RawIOBase ๋ฉ”์„œ๋“œ๋ฅผ ์ง€์›ํ•ฉ๋‹ˆ๋‹ค.

  • IOBase.fileno

  • IOBase.isatty

  • IOBase.readable

  • IOBase.readinto

  • IOBase.readline

  • IOBase.readlines

  • IOBase.seek

  • IOBase.seekable

  • IOBase.tell

  • BufferedIOBase.readinto1

  • RawIOBase.read

  • RawIOBase.readall

์ž์„ธํ•œ ๋‚ด์šฉ์€ IOBase์˜ Python 3.9 ์„ค๋ช…์„œ ๋ฅผ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค. fileno ๋ฉ”์„œ๋“œ์™€ ๊ฐ™์ด Snowflake ์„œ๋ฒ„์—์„œ ์ง€์›๋˜์ง€ ์•Š๋Š” ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด ์˜ค๋ฅ˜๊ฐ€ ๋ฐ˜ํ™˜๋ฉ๋‹ˆ๋‹ค.

์ฐธ๊ณ 

๊ธฐ๋ณธ์ ์œผ๋กœ SnowflakeFile ์„ ์‚ฌ์šฉํ•œ ํŒŒ์ผ ์•ก์„ธ์Šค์—๋Š” ์ฝ”๋“œ๊ฐ€ ํŒŒ์ผ ์ฃผ์ž… ๊ณต๊ฒฉ์— ๋Œ€ํ•œ ๋ณต์›๋ ฅ์„ ๊ฐ–๋„๋ก ํ•˜๊ธฐ ์œ„ํ•ด ๋ฒ”์œ„๊ฐ€ ์ง€์ •๋œ URL์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค. ๊ธฐ๋ณธ ์ œ๊ณต ํ•จ์ˆ˜ BUILD_SCOPED_FILE_URL ์„ ์‚ฌ์šฉํ•˜์—ฌ SQL์—์„œ ๋ฒ”์œ„๊ฐ€ ์ง€์ •๋œ URL์„ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋ฒ”์œ„๊ฐ€ ์ง€์ •๋œ URL์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ ํŒŒ์ผ ์•ก์„ธ์Šค์— ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” URL์˜ ์œ ํ˜• ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค. ํŒŒ์ผ์— ๋Œ€ํ•œ ์•ก์„ธ์Šค ๊ถŒํ•œ์ด ์žˆ๋Š” ์‚ฌ์šฉ์ž๋งŒ ๋ฒ”์œ„๊ฐ€ ์ง€์ •๋œ URL์„ ๋งŒ๋“ค ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ „์ œ ์กฐ๊ฑดยถ

๋‹ค์Œ์„ ์ˆ˜ํ–‰ํ•˜์—ฌ ์ฝ”๋“œ์— ์Šคํ…Œ์ด์ง€์˜ ํŒŒ์ผ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•ด์•ผ Python ์ฒ˜๋ฆฌ๊ธฐ ์ฝ”๋“œ๊ฐ€ ํ•ด๋‹น ํŒŒ์ผ์„ ์ฝ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  1. ์ฒ˜๋ฆฌ๊ธฐ๊ฐ€ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ์Šคํ…Œ์ด์ง€๋ฅผ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

    ์™ธ๋ถ€ ์Šคํ…Œ์ด์ง€ ๋˜๋Š” ๋‚ด๋ถ€ ์Šคํ…Œ์ด์ง€๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋‚ด๋ถ€ ์Šคํ…Œ์ด์ง€๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ ํ˜ธ์ถœ์ž์˜ ๊ถŒํ•œ ์ €์žฅ ํ”„๋กœ์‹œ์ €๋ฅผ ๋งŒ๋“ค ๊ณ„ํš์ผ ๋•Œ๋Š” ์‚ฌ์šฉ์ž ์Šคํ…Œ์ด์ง€๊ฐ€ ๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ทธ๋ ‡์ง€ ์•Š์œผ๋ฉด ๋ช…๋ช…๋œ ์Šคํ…Œ์ด์ง€๋ฅผ ์‚ฌ์šฉํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. Snowflake๋Š” ํ˜„์žฌ UDF ์ข…์†์„ฑ์— ๋Œ€ํ•œ ํ…Œ์ด๋ธ” ์Šคํ…Œ์ด์ง€ ์‚ฌ์šฉ์„ ์ง€์›ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

    ์Šคํ…Œ์ด์ง€ ์ƒ์„ฑ์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ CREATE STAGE ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค. ๋‚ด๋ถ€ ์Šคํ…Œ์ด์ง€ ์œ ํ˜• ์„ ํƒ์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ ๋กœ์ปฌ ํŒŒ์ผ์„ ์œ„ํ•œ ๋‚ด๋ถ€ ์Šคํ…Œ์ด์ง€ ์„ ํƒํ•˜๊ธฐ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

    ์‚ฌ์šฉ ์‚ฌ๋ก€์— ๋”ฐ๋ผ ์Šคํ…Œ์ด์ง€์— ๋Œ€ํ•œ ์ ์ ˆํ•œ ๊ถŒํ•œ์„ ๋‹ค์Œ ์—ญํ• ์— ํ• ๋‹นํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

    ์‚ฌ์šฉ ์‚ฌ๋ก€

    ์—ญํ• 

    UDF ๋˜๋Š” ์†Œ์œ ์ž์˜ ๊ถŒํ•œ ์ €์žฅ ํ”„๋กœ์‹œ์ €

    ์‹คํ–‰ ์ค‘์ธ UDF ๋˜๋Š” ์ €์žฅ ํ”„๋กœ์‹œ์ €๋ฅผ ์†Œ์œ ํ•˜๋Š” ์—ญํ• .

    ํ˜ธ์ถœ์ž ๊ถŒํ•œ ์ €์žฅ ํ”„๋กœ์‹œ์ €

    ์‚ฌ์šฉ์ž ์—ญํ• .

    ์ž์„ธํ•œ ๋‚ด์šฉ์€ ์‚ฌ์šฉ์ž ์ •์˜ ํ•จ์ˆ˜์— ๋Œ€ํ•œ ๊ถŒํ•œ ๋ถ€์—ฌ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

  2. ์ฝ”๋“œ์—์„œ ์ฝ์„ ํŒŒ์ผ์„ ์Šคํ…Œ์ด์ง€๋กœ ๋ณต์‚ฌํ•ฉ๋‹ˆ๋‹ค.

    PUT ๋ช…๋ น์„ ์‚ฌ์šฉํ•˜์—ฌ ๋กœ์ปฌ ๋“œ๋ผ์ด๋ธŒ์—์„œ ๋‚ด๋ถ€ ์Šคํ…Œ์ด์ง€๋กœ ํŒŒ์ผ์„ ๋ณต์‚ฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. PUT์œผ๋กœ ํŒŒ์ผ์„ ์Šคํ…Œ์ด์ง•ํ•˜๋Š” ์ž์„ธํ•œ ๋ฐฉ๋ฒ•์€ ๋กœ์ปฌ ํŒŒ์ผ ์‹œ์Šคํ…œ์—์„œ ๋ฐ์ดํ„ฐ ํŒŒ์ผ ์Šคํ…Œ์ด์ง•ํ•˜๊ธฐ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

    ํด๋ผ์šฐ๋“œ ์ €์žฅ์†Œ ์„œ๋น„์Šค์—์„œ ์ œ๊ณตํ•˜๋Š” ๋„๊ตฌ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋กœ์ปฌ ๋“œ๋ผ์ด๋ธŒ์—์„œ ์™ธ๋ถ€ ์Šคํ…Œ์ด์ง€ ์œ„์น˜๋กœ ํŒŒ์ผ์„ ๋ณต์‚ฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋„์›€๋ง์€ ํด๋ผ์šฐ๋“œ ์ €์žฅ์†Œ ์„œ๋น„์Šค ์„ค๋ช…์„œ๋ฅผ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

์ธ๋ผ์ธ Python ์ฒ˜๋ฆฌ๊ธฐ๋กœ ์ด๋ฏธ์ง€์˜ ์ง€๊ฐ ํ•ด์‹œ ๊ณ„์‚ฐํ•˜๊ธฐยถ

์ด ์˜ˆ์—์„œ๋Š” SnowflakeFile ์„ ์‚ฌ์šฉํ•˜์—ฌ ์Šคํ…Œ์ด์ง•๋œ ์ด๋ฏธ์ง€ ํŒŒ์ผ ์Œ์„ ์ฝ๊ณ  ๊ฐ ํŒŒ์ผ์˜ ์ง€๊ฐ ํ•ด์‹œ (pHash)๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ด๋ฏธ์ง€๊ฐ€ ์„œ๋กœ ์–ผ๋งˆ๋‚˜ ์œ ์‚ฌํ•œ์ง€ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค.

mode ์ธ์ž์— rb ๋ฅผ ์ „๋‹ฌํ•˜์—ฌ ์ž…๋ ฅ ๋ชจ๋“œ๋ฅผ ๋ฐ”์ด๋„ˆ๋ฆฌ๋กœ ์ง€์ •ํ•˜์—ฌ ์ด๋ฏธ์ง€์˜ phash ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•˜๋Š” UDF๋ฅผ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

CREATE OR REPLACE FUNCTION calc_phash(file_path STRING)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.9'
  PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
  HANDLER = 'run'
AS $$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile

def run(file_path):
  with SnowflakeFile.open(file_path, 'rb') as f:
  return imagehash.average_hash(Image.open(f))
$$;
Copy

๋‘ ์ด๋ฏธ์ง€์˜ phash ๊ฐ’ ์‚ฌ์ด์˜ ๊ฑฐ๋ฆฌ๋ฅผ ๊ณ„์‚ฐํ•˜๋Š” ๋‘ ๋ฒˆ์งธ UDF๋ฅผ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

CREATE OR REPLACE FUNCTION calc_phash_distance(h1 STRING, h2 STRING)
  RETURNS INT
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.9'
  PACKAGES = ('imagehash')
  HANDLER = 'run'
AS $$
import imagehash

def run(h1, h2):
  return imagehash.hex_to_hash(h1) - imagehash.hex_to_hash(h2)
$$;
Copy

์ด๋ฏธ์ง€ ํŒŒ์ผ์„ ์Šคํ…Œ์ด์ง•ํ•˜๊ณ  ๋””๋ ‰ํ„ฐ๋ฆฌ ํ…Œ์ด๋ธ”์„ ์ƒˆ๋กœ ๊ณ ์นฉ๋‹ˆ๋‹ค.

PUT file:///tmp/image1.jpg @images AUTO_COMPRESS=FALSE;
PUT file:///tmp/image2.jpg @images AUTO_COMPRESS=FALSE;

ALTER STAGE images REFRESH;
Copy

UDFs๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

SELECT
  calc_phash_distance(
    calc_phash(build_scoped_file_url(@images, 'image1.jpg')),
    calc_phash(build_scoped_file_url(@images, 'image2.jpg'))
  ) ;
Copy

UDTF๋กœ CSV ํŒŒ์ผ ์ฒ˜๋ฆฌํ•˜๊ธฐยถ

์ด ์˜ˆ์—์„œ๋Š” SnowflakeFile ์„ ์‚ฌ์šฉํ•˜์—ฌ CSV ํŒŒ์ผ์˜ ๋‚ด์šฉ์„ ์ถ”์ถœํ•˜๊ณ  ํ…Œ์ด๋ธ”์˜ ํ–‰์„ ๋ฐ˜ํ™˜ํ•˜๋Š” UDTF๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

์ธ๋ผ์ธ ์ฒ˜๋ฆฌ๊ธฐ๋กœ UDTF๋ฅผ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

CREATE FUNCTION parse_csv(file_path STRING)
  RETURNS TABLE (col1 STRING, col2 STRING, col3 STRING)
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.9'
  PACKAGES = ('snowflake-snowpark-python')
  HANDLER = 'csvparser'
AS $$
from snowflake.snowpark.files import SnowflakeFile

class csvparser:
  def process(self, stagefile):
    with SnowflakeFile.open(stagefile) as f:
      for line in f.readlines():
        lineStr = line.strip()
        row = lineStr.split(",")
        try:
          # Read the columns from the line.
          yield (row[1], row[0], row[2], )
        except:
          pass
$$;
Copy

CSV ํŒŒ์ผ์„ ์Šคํ…Œ์ด์ง•ํ•˜๊ณ  ๋””๋ ‰ํ„ฐ๋ฆฌ ํ…Œ์ด๋ธ”์„ ์ƒˆ๋กœ ๊ณ ์นฉ๋‹ˆ๋‹ค.

PUT file:///tmp/sample.csv @data_stage AUTO_COMPRESS=FALSE;

ALTER STAGE data_stage REFRESH;
Copy

UDTF๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ํŒŒ์ผ URL์„ ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.

SELECT * FROM TABLE(PARSE_CSV(build_scoped_file_url(@data_stage, 'sample.csv')));
Copy

์—ฌ๋Ÿฌ ํŒŒ์ผ ์ฒ˜๋ฆฌํ•˜๊ธฐยถ

๋””๋ ‰ํ„ฐ๋ฆฌ ํ…Œ์ด๋ธ”์˜ RELATIVE_PATH ์—ด์„ ์ฒ˜๋ฆฌ๊ธฐ์— ์ „๋‹ฌํ•˜์—ฌ ์—ฌ๋Ÿฌ ํŒŒ์ผ์„ ์ฝ๊ณ  ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. RELATIVE_PATH ์—ด์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ ๋””๋ ‰ํ„ฐ๋ฆฌ ํ…Œ์ด๋ธ” ์ฟผ๋ฆฌ์˜ ์ถœ๋ ฅ ์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

์ฐธ๊ณ 

ํŒŒ์ผ ํฌ๊ธฐ ๋ฐ ์ปดํ“จํŒ… ์š”๊ตฌ ์‚ฌํ•ญ์— ๋”ฐ๋ผ ์—ฌ๋Ÿฌ ํŒŒ์ผ์„ ์ฝ๊ณ  ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฌธ์„ ์‹คํ–‰ํ•˜๊ธฐ ์ „์— ALTER WAREHOUSE ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์›จ์–ดํ•˜์šฐ์Šค๋ฅผ ํ™•์žฅํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์—ฌ๋Ÿฌ ํŒŒ์ผ์„ ์ฒ˜๋ฆฌํ•˜๋Š” UDF๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์—์„œ๋Š” CREATE TABLE ๋ฌธ ๋‚ด์—์„œ UDF๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ์Šคํ…Œ์ด์ง€์˜ ๊ฐ ํŒŒ์ผ์„ ์ฒ˜๋ฆฌํ•œ ๋‹ค์Œ ๊ฒฐ๊ณผ๋ฅผ ์ƒˆ ํ…Œ์ด๋ธ”์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.

๋ฐ๋ชจ ๋ชฉ์ ์œผ๋กœ, ์˜ˆ์ œ์—์„œ๋Š” ๋‹ค์Œ์„ ๊ฐ€์ •ํ•ฉ๋‹ˆ๋‹ค.

  • my_stage ๋ผ๋Š” ์Šคํ…Œ์ด์ง€์— ์—ฌ๋Ÿฌ ํ…์ŠคํŠธ ํŒŒ์ผ์ด ์žˆ์Šต๋‹ˆ๋‹ค.

  • ๋น„์ •ํ˜• ํ…์ŠคํŠธ์— ๋Œ€ํ•œ ๊ฐ์ • ๋ถ„์„์„ ์ˆ˜ํ–‰ํ•˜๋Š” get_sentiment ๋ผ๋Š” ๊ธฐ์กด UDF๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. UDF๋Š” ํ…์ŠคํŠธ ํŒŒ์ผ์˜ ๊ฒฝ๋กœ๋ฅผ ์ž…๋ ฅ๊ฐ’์œผ๋กœ ๋ฐ›์•„ ๊ฐ์ •์„ ๋‚˜ํƒ€๋‚ด๋Š” ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

CREATE OR REPLACE TABLE sentiment_results AS
SELECT
  relative_path
  , get_sentiment(build_scoped_file_url(@my_stage, relative_path)) AS sentiment
FROM directory(@my_stage);
Copy
์—ฌ๋Ÿฌ ํŒŒ์ผ์„ ์ฒ˜๋ฆฌํ•˜๋Š” UDTF๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์—์„œ๋Š” parse_excel_udtf ๋ผ๋Š” UDTF๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค. ์ด ์˜ˆ์—์„œ๋Š” my_excel_stage ๋ผ๋Š” ์Šคํ…Œ์ด์ง€์˜ ๋””๋ ‰ํ„ฐ๋ฆฌ ํ…Œ์ด๋ธ”์—์„œ relative_path ๋ฅผ ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.

SELECT t.*
FROM directory(@my_stage) d,
TABLE(parse_excel_udtf(build_scoped_file_url(@my_excel_stage, relative_path)) t;
Copy

์Šคํ…Œ์ด์ง€ URIs ๋ฐ URLs๊ฐ€ ์žˆ๋Š” ํŒŒ์ผ ์ฝ๊ธฐยถ

SnowflakeFile ์„ ์‚ฌ์šฉํ•œ ํŒŒ์ผ ์•ก์„ธ์Šค์—๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ ๋ฒ”์œ„๊ฐ€ ์ง€์ •๋œ URL์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค. ์ด๋ฅผ ํ†ตํ•ด ์ฝ”๋“œ๋ฅผ ํŒŒ์ผ ์ฃผ์ž… ๊ณต๊ฒฉ์— ๋Œ€ํ•ด ๋ณต์›๋ ฅ ์žˆ๊ฒŒ ๋งŒ๋“ค ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ๋Œ€์‹  ์Šคํ…Œ์ด์ง€ URI ๋˜๋Š” ์Šคํ…Œ์ด์ง€ URL์„ ์‚ฌ์šฉํ•˜์—ฌ ํŒŒ์ผ ์œ„์น˜๋ฅผ ์ฐธ์กฐํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋ ค๋ฉด require_scoped_url = False ํ‚ค์›Œ๋“œ ์ธ์ž๋กœ SnowflakeFile.open ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

์ด ์˜ต์…˜์€ ํ˜ธ์ถœ์ž๊ฐ€ UDF ์†Œ์œ ์ž๋งŒ ์•ก์„ธ์Šคํ•  ์ˆ˜ ์žˆ๋Š” URI๋ฅผ ์ œ๊ณตํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•˜๋ ค๋Š” ๊ฒฝ์šฐ์— ์œ ์šฉํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด UDF๋ฅผ ์†Œ์œ ํ•˜๊ณ  ๊ตฌ์„ฑ ํŒŒ์ผ ๋˜๋Š” ๋จธ์‹  ๋Ÿฌ๋‹ ๋ชจ๋ธ์„ ์ฝ์œผ๋ ค๋Š” ๊ฒฝ์šฐ ํŒŒ์ผ ์•ก์„ธ์Šค๋ฅผ ์œ„ํ•ด ์Šคํ…Œ์ด์ง€ URI๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์‚ฌ์šฉ์ž ์ž…๋ ฅ์„ ๊ธฐ๋ฐ˜์œผ๋กœ ์ƒ์„ฑ๋œ ํŒŒ์ผ๊ณผ ๊ฐ™์ด ์ด๋ฆ„์„ ์˜ˆ์ธกํ•  ์ˆ˜ ์—†๋Š” ํŒŒ์ผ๋กœ ์ž‘์—…ํ•  ๋•Œ๋Š” ์ด ์˜ต์…˜์„ ๊ถŒ์žฅํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

์ด ์˜ˆ์ œ์—์„œ๋Š” ํŒŒ์ผ์—์„œ ๋จธ์‹  ๋Ÿฌ๋‹ ๋ชจ๋ธ์„ ์ฝ๊ณ  ํ•จ์ˆ˜์—์„œ ์ด ๋ชจ๋ธ์„ ์‚ฌ์šฉํ•˜์—ฌ ๊ฐ์ • ๋ถ„์„์„ ์œ„ํ•œ ์ž์—ฐ์–ด ์ฒ˜๋ฆฌ๋ฅผ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค. ์ด ์˜ˆ์—์„œ๋Š” require_scoped_url = False ๋กœ open ์„ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค. ๋‘ ํŒŒ์ผ ์œ„์น˜ ํ˜•์‹(์Šคํ…Œ์ด์ง€ URI ๋ฐ ์Šคํ…Œ์ด์ง€ URL)์—์„œ ๋ชจ๋‘ UDF ์†Œ์œ ์ž๋Š” ๋ชจ๋ธ ํŒŒ์ผ์— ์•ก์„ธ์Šคํ•  ์ˆ˜ ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

์ธ๋ผ์ธ ์ฒ˜๋ฆฌ๊ธฐ๋กœ UDF๋ฅผ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

CREATE OR REPLACE FUNCTION extract_sentiment(input_data STRING)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.9'
  PACKAGES = ('snowflake-snowpark-python','scikit-learn')
  HANDLER = 'run'
AS $$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle

def run(input_data):
  model_file = '@models/NLP_model.pickle'
  # Specify 'mode = rb' to open the file in binary mode.
  with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
    model = pickle.load(f)
    return model.predict([input_data])[0]
$$;
Copy

๋ชจ๋ธ ํŒŒ์ผ์„ ์Šคํ…Œ์ด์ง•ํ•˜๊ณ  ๋””๋ ‰ํ„ฐ๋ฆฌ ํ…Œ์ด๋ธ”์„ ์ƒˆ๋กœ ๊ณ ์นฉ๋‹ˆ๋‹ค.

PUT file:///tmp/NLP_model.pickle @models AUTO_COMPRESS=FALSE;

ALTER STAGE models REFRESH;
Copy

๋˜๋Š” ๋ชจ๋ธ์˜ ์Šคํ…Œ์ด์ง€ URL๋กœ UDF๋ฅผ ์ง€์ •ํ•˜์—ฌ ๊ฐ์ •์„ ์ถ”์ถœํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด ์Šคํ…Œ์ด์ง€ URL์„ ์‚ฌ์šฉํ•˜์—ฌ ํŒŒ์ผ์„ ์ง€์ •ํ•˜๋Š” ์ธ๋ผ์ธ ์ฒ˜๋ฆฌ๊ธฐ๊ฐ€ ์žˆ๋Š” UDF๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

CREATE OR REPLACE FUNCTION extract_sentiment(input_data STRING)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.9'
  PACKAGES = ('snowflake-snowpark-python','scikit-learn')
  HANDLER = 'run'
AS $$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle

def run(input_data):
  model_file = 'https://my_account/api/files/my_db/my_schema/models/NLP_model.pickle'
  # Specify 'rb' to open the file in binary mode.
  with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
    model = pickle.load(f)
    return model.predict([input_data])[0]
$$;
Copy

์ž…๋ ฅ ๋ฐ์ดํ„ฐ๋กœ UDF๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

SELECT extract_sentiment('I am writing to express my interest in a recent posting made.');
Copy

ํŒŒ์ผ ์“ฐ๊ธฐยถ

UDF ์ฒ˜๋ฆฌ๊ธฐ๋Š” UDF๋ฅผ ํ˜ธ์ถœํ•˜๋Š” ์ฟผ๋ฆฌ์— ๋Œ€ํ•ด ์ƒ์„ฑ๋œ /tmp ๋””๋ ‰ํ„ฐ๋ฆฌ์— ํŒŒ์ผ์„ ์“ธ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

/tmp ๋””๋ ‰ํ„ฐ๋ฆฌ๋Š” ๋‹จ์ผ ํ˜ธ์ถœ ์ฟผ๋ฆฌ์šฉ์œผ๋กœ ๋”ฐ๋กœ ๋งˆ๋ จ๋œ ๊ฒƒ์ด์ง€๋งŒ, ์—ฌ๋Ÿฌ Python ์ž‘์—…์ž ํ”„๋กœ์„ธ์Šค๊ฐ€ ๋™์‹œ์— ์‹คํ–‰ ์ค‘์ผ ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค. ์ถฉ๋Œ์„ ๋ฐฉ์ง€ํ•˜๋ ค๋ฉด /tmp ๋””๋ ‰ํ„ฐ๋ฆฌ์— ๋Œ€ํ•œ ์•ก์„ธ์Šค๊ฐ€ ๋‹ค๋ฅธ Python ์ž‘์—…์ž ํ”„๋กœ์„ธ์Šค์™€ ๋™๊ธฐํ™”๋˜๊ฑฐ๋‚˜ /tmp์— ๊ธฐ๋ก๋œ ํŒŒ์ผ์˜ ์ด๋ฆ„์ด ๊ณ ์œ ํ•˜๋„๋ก ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

์˜ˆ์ œ ์ฝ”๋“œ๋Š” ์ด ํ•ญ๋ชฉ์˜ ์Šคํ…Œ์ด์ง•๋œ ํŒŒ์ผ์˜ ์••์ถ• ํ’€๊ธฐ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ๋Š” ์ž…๋ ฅ text ๋ฅผ /tmp ๋””๋ ‰ํ„ฐ๋ฆฌ์— ์”๋‹ˆ๋‹ค. ๋˜ํ•œ ํŒŒ์ผ ์œ„์น˜์˜ ๊ณ ์œ ์„ฑ์„ ๋ณด์žฅํ•˜๋„๋ก ํ•จ์ˆ˜์˜ ํ”„๋กœ์„ธ์Šค ID๋ฅผ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.

def func(text):
  # Append the function's process ID to ensure the file name's uniqueness.
  file_path = '/tmp/content' + str(os.getpid())
  with open(file_path, "w") as file:
    file.write(text)
Copy

ํŒŒ์ผ ์ž‘์„ฑ์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ Snowpark Python UDF ๋ฐ UDTF์—์„œ ํŒŒ์ผ ์ž‘์„ฑํ•˜๊ธฐ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

์Šคํ…Œ์ด์ง•๋œ ํŒŒ์ผ์˜ ์••์ถ• ํ’€๊ธฐยถ

.zip ํŒŒ์ผ์„ ์Šคํ…Œ์ด์ง€์— ์ €์žฅํ•œ ๋‹ค์Œ, Python zipfile ๋ชจ๋“ˆ์„ ์‚ฌ์šฉํ•˜์—ฌ UDF ํ˜•์‹์œผ๋กœ ์••์ถ•์„ ํ’€ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด .zip ํŒŒ์ผ์„ ์Šคํ…Œ์ด์ง€์— ์—…๋กœ๋“œํ•œ ๋‹ค์Œ, UDF๋ฅผ ๋งŒ๋“ค ๋•Œ IMPORTS ์ ˆ์˜ ์Šคํ…Œ์ด์ง•๋œ ์œ„์น˜์—์„œ .zip ํŒŒ์ผ์„ ์ฐธ์กฐํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋Ÿฐํƒ€์ž„์— Snowflake๋Š” ์Šคํ…Œ์ด์ง•๋œ ํŒŒ์ผ์„ ์ฝ”๋“œ์—์„œ ์•ก์„ธ์Šคํ•  ์ˆ˜ ์žˆ๋Š” ๊ฐ€์ ธ์˜ค๊ธฐ ๋””๋ ‰ํ„ฐ๋ฆฌ๋กœ ๋ณต์‚ฌํ•ฉ๋‹ˆ๋‹ค.

ํŒŒ์ผ ์ฝ๊ธฐ ๋ฐ ์“ฐ๊ธฐ์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ ํŒŒ์ผ ์ฝ๊ธฐ ๋ฐ ํŒŒ์ผ ์“ฐ๊ธฐ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

๋‹ค์Œ ์˜ˆ์ œ์—์„œ UDF ์ฝ”๋“œ๋Š” NLP ๋ชจ๋ธ์„ ์‚ฌ์šฉํ•˜์—ฌ ํ…์ŠคํŠธ์—์„œ ์—”ํ„ฐํ‹ฐ๋ฅผ ๊ฒ€์ƒ‰ํ•ฉ๋‹ˆ๋‹ค. ์ฝ”๋“œ๋Š” ์ด๋Ÿฌํ•œ ์—”ํ„ฐํ‹ฐ๋กœ ๊ตฌ์„ฑ๋œ ๋ฐฐ์—ด์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค. ํ…์ŠคํŠธ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด NLP ๋ชจ๋ธ์„ ์„ค์ •ํ•˜๊ธฐ ์œ„ํ•ด, ์ฝ”๋“œ๋Š” ๋จผ์ € zipfile ๋ชจ๋“ˆ์„ ์‚ฌ์šฉํ•˜์—ฌ .zip ํŒŒ์ผ์—์„œ ๋ชจ๋ธ(en_core_web_sm-2.3.1)์„ ์œ„ํ•œ ํŒŒ์ผ์„ ์ถ”์ถœํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฐ ๋‹ค์Œ ์ด ์ฝ”๋“œ๋Š” spaCy ๋ชจ๋“ˆ์„ ์‚ฌ์šฉํ•˜์—ฌ ํŒŒ์ผ์—์„œ ๋ชจ๋ธ์„ ๋กœ๋”ฉํ•ฉ๋‹ˆ๋‹ค.

์ด ์ฝ”๋“œ๋Š” ์ด ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•˜๋Š” ์ฟผ๋ฆฌ๋ฅผ ์œ„ํ•ด ์ƒ์„ฑ๋œ /tmp ๋””๋ ‰ํ„ฐ๋ฆฌ์— ์ถ”์ถœ๋œ ํŒŒ์ผ ๋‚ด์šฉ์„ ์”๋‹ˆ๋‹ค. ์ด ์ฝ”๋“œ๋Š” ํŒŒ์ผ ์ž ๊ธˆ์„ ์‚ฌ์šฉํ•˜์—ฌ ์ถ”์ถœ์ด Python ์ž‘์—…์ž ํ”„๋กœ์„ธ์Šค ๊ฐ„์— ๋™๊ธฐํ™”๋˜๋„๋ก ํ•ฉ๋‹ˆ๋‹ค. ์ด์™€ ๊ฐ™์ด, ๋‚ด์šฉ์€ ํ•œ ๋ฒˆ๋งŒ ์••์ถ•์ด ํ’€๋ฆฝ๋‹ˆ๋‹ค. ํŒŒ์ผ ์“ฐ๊ธฐ์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ ํŒŒ์ผ ์“ฐ๊ธฐ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

zipfile ๋ชจ๋“ˆ์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ zipfile ์ฐธ์กฐ ๋ฅผ ํ™•์ธํ•ด ๋ณด์‹ญ์‹œ์˜ค. spaCy ๋ชจ๋“ˆ์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ spaCy API ์„ค๋ช…์„œ ๋ฅผ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

์ธ๋ผ์ธ ์ฒ˜๋ฆฌ๊ธฐ๋กœ UDF๋ฅผ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

CREATE OR REPLACE FUNCTION py_spacy(str STRING)
  RETURNS ARRAY
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  HANDLER = 'func'
  PACKAGES = ('spacy')
  IMPORTS = ('@spacy_stage/spacy_en_core_web_sm.zip')
AS $$
import fcntl
import os
import spacy
import sys
import threading
import zipfile

 # File lock class for synchronizing write access to /tmp.
 class FileLock:
   def __enter__(self):
       self._lock = threading.Lock()
       self._lock.acquire()
       self._fd = open('/tmp/lockfile.LOCK', 'w+')
       fcntl.lockf(self._fd, fcntl.LOCK_EX)

    def __exit__(self, type, value, traceback):
       self._fd.close()
       self._lock.release()

 # Get the location of the import directory. Snowflake sets the import
 # directory location so code can retrieve the location via sys._xoptions.
 IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
 import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

 # Get the path to the ZIP file and set the location to extract to.
 zip_file_path = import_dir + "spacy_en_core_web_sm.zip"
 extracted = '/tmp/en_core_web_sm'

 # Extract the contents of the ZIP. This is done under the file lock
 # to ensure that only one worker process unzips the contents.
 with FileLock():
    if not os.path.isdir(extracted + '/en_core_web_sm/en_core_web_sm-2.3.1'):
       with zipfile.ZipFile(zip_file_path, 'r') as myzip:
          myzip.extractall(extracted)

 # Load the model from the extracted file.
 nlp = spacy.load(extracted + "/en_core_web_sm/en_core_web_sm-2.3.1")

 def func(text):
    doc = nlp(text)
    result = []

    for ent in doc.ents:
       result.append((ent.text, ent.start_char, ent.end_char, ent.label_))
    return result
 $$;
Copy

NULL ๊ฐ’ ์ฒ˜๋ฆฌํ•˜๊ธฐยถ

๋‹ค์Œ ์ฝ”๋“œ์—์„œ๋Š” NULL ๊ฐ’์ด ์ฒ˜๋ฆฌ๋˜๋Š” ๋ฐฉ์‹์„ ๋ณด์—ฌ์ค๋‹ˆ๋‹ค. ์ž์„ธํ•œ ๋‚ด์šฉ์€ NULL ๊ฐ’ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

UDF๋ฅผ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

CREATE OR REPLACE FUNCTION py_udf_null(a VARIANT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  HANDLER = 'udf'
AS $$

def udf(a):
   if not a:
       return 'JSON null'
   elif getattr(a, "is_sql_null", False):
       return 'SQL null'
   else:
       return 'not null'
$$;
Copy

UDF๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

SELECT py_udf_null(null);
Copy
+-------------------+
| PY_UDF_NULL(NULL) |
|-------------------|
| SQL null          |
+-------------------+
SELECT py_udf_null(parse_json('null'));
Copy
+---------------------------------+
| PY_UDF_NULL(PARSE_JSON('NULL')) |
|---------------------------------|
| JSON null                       |
+---------------------------------+
SELECT py_udf_null(10);
Copy
+-----------------+
| PY_UDF_NULL(10) |
|-----------------|
| not null        |
+-----------------+