Python UDF ์ฒ๋ฆฌ๊ธฐ ์ยถ
์ด ํญ๋ชฉ์๋ Python์ผ๋ก ์์ฑ๋ UDF ์ฒ๋ฆฌ๊ธฐ ์ฝ๋์ ๊ฐ๋จํ ์๊ฐ ๋์ ์์ต๋๋ค.
Python์ ์ฌ์ฉํ์ฌ UDF ์ฒ๋ฆฌ๊ธฐ๋ฅผ ๋ง๋๋ ๋ฐฉ๋ฒ์ ๋ํ ์์ธํ ๋ด์ฉ์ Python UDF ๋ง๋ค๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
runtime_version
์ ์ฝ๋์์ ํ์ํ Python ๋ฐํ์ ๋ฒ์ ์ผ๋ก ์ค์ ํฉ๋๋ค. ์ง์๋๋ Python ๋ฒ์ ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
3.9
3.10
3.11
3.12
์ธ๋ผ์ธ ์ฒ๋ฆฌ๊ธฐ์์ ํจํค์ง ๊ฐ์ ธ์ค๊ธฐยถ
Anaconda์์ ์ ๋ณ๋ ์๋ ํํฐ ํจํค์ง ๋ชฉ๋ก์ ์ ๊ณตํฉ๋๋ค. ์์ธํ ๋ด์ฉ์ ์๋ ํํฐ ํจํค์ง ์ฌ์ฉํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
์ฐธ๊ณ
Snowflake ์กฐ์ง ๊ด๋ฆฌ์๊ฐ Snowflake ์ธ๋ถ ์ ๊ณต ์ฝ๊ด ์ ์น์ธํด์ผ Anaconda์์ ์ ๊ณตํ๋ ํจํค์ง๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค. ์์ธํ ๋ด์ฉ์ Anaconda์ ์๋ ํํฐ ํจํค์ง ์ฌ์ฉํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
๋ค์ ์ฝ๋์์๋ ํจํค์ง๋ฅผ ๊ฐ์ ธ์ค๊ณ ํจํค์ง์ ๋ฒ์ ์ ๋ฐํํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ์ค๋๋ค.
UDF๋ฅผ ๋ง๋ญ๋๋ค.
CREATE OR REPLACE FUNCTION py_udf()
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('numpy','pandas','xgboost==1.5.0')
HANDLER = 'udf'
AS $$
import numpy as np
import pandas as pd
import xgboost as xgb
def udf():
return [np.__version__, pd.__version__, xgb.__version__]
$$;
UDF๋ฅผ ํธ์ถํฉ๋๋ค.
SELECT py_udf();
+-------------+
| PY_UDF() |
|-------------|
| [ |
| "1.19.2", |
| "1.4.0", |
| "1.5.0" |
| ] |
+-------------+
PACKAGES ํค์๋๋ฅผ ์ฌ์ฉํ์ฌ ๋ค์๊ณผ ๊ฐ์ด ํจํค์ง ๋ฒ์ ์ ์ง์ ํ ์ ์์ต๋๋ค.
๋ฒ์ ์์(์:
numpy
)์ ํํ ๋ฒ์ ์ ๊ณ ์ ๋จ(์:
numpy==1.25.2
)์์ผ๋์นด๋๋ฅผ ์ฌ์ฉํ์ฌ ๋ฒ์ ์ ๋์ฌ๋ก ์ ํ๋จ(์:
numpy==1.*
)๋ฒ์ ๋ฒ์๋ก ์ ํ๋จ(์:
numpy>=1.25
)๋ชจ๋ ๋ฒ์ ์ง์ ์๋ฅผ ๋ง์กฑํ๋ ํจํค์ง๊ฐ ์ ํ๋๋๋ก ์ฌ๋ฌ ๋ฒ์ ์ง์ ์๋ก ์ ํ๋จ(์:
numpy>=1.25,<2
)
์ฐธ๊ณ
ํจํค์ง ์ ์ฑ
์์๋ ์ฌ๋ฌ ๋ฒ์ ์ฐ์ฐ์(์: numpy>=1.25,<2
)๋ฅผ ์ฌ์ฉํ ์ ์์ง๋ง Python UDF, UDTF ๋ฐ ์ ์ฅ ํ๋ก์์ ๋ฅผ ์์ฑํ ๋๋ ์ฌ์ฉํ ์ ์์ต๋๋ค.
๋ค์์ ์์ผ๋์นด๋ *
๋ฅผ ์ฌ์ฉํ์ฌ ํจํค์ง๋ฅผ ๋ฒ์ ์ ๋์ฌ๋ก ์ ํํ๋ ๋ฐฉ๋ฒ์ ์์
๋๋ค.
CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES = ('numpy==1.*')
RUNTIME_VERSION = 3.10
HANDLER = 'echo'
AS $$
def echo():
return 'hi'
$$;
์ด ์์์๋ ํจํค์ง๋ฅผ ์ง์ ๋ ๋ฒ์ ๋ณด๋ค ํฌ๊ฑฐ๋ ๊ฐ๋๋ก ์ ํํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ์ค๋๋ค.
CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES = ('numpy>=1.2')
RUNTIME_VERSION = 3.10
HANDLER = 'echo'
AS $$
def echo():
return 'hi'
$$;
์ด ์์์๋ ์ฌ๋ฌ ํจํค์ง ๋ฒ์ ์ง์ ์๋ฅผ ์ฌ์ฉํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ์ค๋๋ค.
CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
PACKAGES = ('numpy>=1.2,<2')
RUNTIME_VERSION = 3.10
HANDLER = 'echo'
AS $$
def echo():
return 'hi'
$$;
ํ์ผ ์ฝ๊ธฐยถ
Python UDF ์ฒ๋ฆฌ๊ธฐ ์ฝ๋๋ก ํ์ผ์ ๋ด์ฉ์ ์ฝ์ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด ๋น์ ํ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ธฐ ์ํด ํ์ผ์ ์ฝ์ ์ ์์ต๋๋ค.
ํ์ผ์ ๋ด์ฉ์ ๋ค์๊ณผ ๊ฐ์ ๋ฐฉ๋ฒ์ผ๋ก ์ฝ์ ์ ์์ต๋๋ค.
IMPORTS ์ ๋ก ํ์ผ ๊ฒฝ๋ก์ ์ด๋ฆ์ ์ ์ ์ผ๋ก ์ง์ ํ ๋ค์, UDF์ ํ ๋๋ ํฐ๋ฆฌ์์ ์ฝ์ต๋๋ค. ์ด๋ ํ์ผ ์ด๋ฆ์ด ์ ์ ์ด๊ณ ํจ์ ๋ด์์ ์ผ๊ด๋๋ฉฐ ํ์ผ ์ด๋ฆ์ ๋ฏธ๋ฆฌ ์๊ณ ์์ ๋ ์ ์ฉํ ์ ์์ต๋๋ค.
ํ์ผ์ ๋์ ์ผ๋ก ์ง์ ํ๊ณ SnowflakeFile๋ก ํ์ผ์ ๋ด์ฉ์ ์ฝ์ต๋๋ค. ๊ณ์ฐ ์ค์ ํ์ผ์ ์ก์ธ์คํด์ผ ํ๋ ๊ฒฝ์ฐ ์ด๋ ๊ฒ ํ ์ ์์ต๋๋ค.
IMPORTS๋ฅผ ์ฌ์ฉํ์ฌ ์ ์ ์ผ๋ก ์ง์ ๋ ํ์ผ ์ฝ๊ธฐยถ
CREATE FUNCTION ๋ช ๋ น์ IMPORTS ์ ์ ํ์ผ ์ด๋ฆ๊ณผ ์คํ ์ด์ง ์ด๋ฆ์ ์ง์ ํ์ฌ ํ์ผ์ ์ฝ์ ์ ์์ต๋๋ค.
IMPORTS ์ ์ ํ์ผ์ ์ง์ ํ๋ฉด Snowflake๋ ํด๋น ํ์ผ์ ์คํ ์ด์ง์์ UDF์ ํ ๋๋ ํฐ๋ฆฌ (๊ฐ์ ธ์ค๊ธฐ ๋๋ ํฐ๋ฆฌ ๋ผ๊ณ ๋ ํจ)๋ก ๋ณต์ฌํฉ๋๋ค. ์ด ๋๋ ํฐ๋ฆฌ๋ UDF๊ฐ ํ์ผ์ ์ฝ๋ ๋๋ ํฐ๋ฆฌ์ ๋๋ค.
Snowflake๋ ๊ฐ์ ธ์จ ํ์ผ์ ๋จ์ผ ๋๋ ํฐ๋ฆฌ์ ๋ณต์ฌํฉ๋๋ค. ํด๋น ๋๋ ํฐ๋ฆฌ์ ๋ชจ๋ ํ์ผ์ ๊ณ ์ ํ ์ด๋ฆ์ ๊ฐ์ ธ์ผ ํ๋ฏ๋ก IMPORTS ์ ์ ๊ฐ ํ์ผ์ ๊ณ ์ ํ ์ด๋ฆ์ ๊ฐ์ ธ์ผ ํฉ๋๋ค. ์ด๋ ํ์ผ์ด ๋ค๋ฅธ ์คํ ์ด์ง ๋๋ ์คํ ์ด์ง ๋ด์ ๋ค๋ฅธ ํ์ ๋๋ ํฐ๋ฆฌ์์ ์์ํ๋ ๊ฒฝ์ฐ์๋ ์ ์ฉ๋ฉ๋๋ค.
์ฐธ๊ณ
ํ์ ํด๋๊ฐ ์๋ ์คํ ์ด์ง์ ์ต์์ ๋๋ ํฐ๋ฆฌ์์๋ง ํ์ผ์ ๊ฐ์ ธ์ฌ ์ ์์ต๋๋ค.
๋ค์ ์์์๋ my_stage
๋ผ๋ ์คํ
์ด์ง์์ file.txt
๋ผ๋ ํ์ผ์ ์ฝ๋ ์ธ๋ผ์ธ Python ์ฒ๋ฆฌ๊ธฐ๋ฅผ ์ฌ์ฉํฉ๋๋ค. ์ฒ๋ฆฌ๊ธฐ๋ snowflake_import_directory
์์คํ
์ต์
๊ณผ ํจ๊ป Python sys._xoptions
๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ UDF์ ํ ๋๋ ํฐ๋ฆฌ ์์น๋ฅผ ๊ฒ์ํฉ๋๋ค.
Snowflake๋ UDF ์์ฑ ์ค์ ํ ๋ฒ๋ง ํ์ผ์ ์ฝ์ผ๋ฉฐ ํ์ผ ์ฝ๊ธฐ๊ฐ ๋์ ์ฒ๋ฆฌ๊ธฐ ์ธ๋ถ์์ ๋ฐ์ํ๋ ๊ฒฝ์ฐ UDF ์คํ ์ค์ ํ์ผ์ ๋ค์ ์ฝ์ง ์์ต๋๋ค.
์ธ๋ผ์ธ ์ฒ๋ฆฌ๊ธฐ๋ก UDF๋ฅผ ๋ง๋ญ๋๋ค.
CREATE OR REPLACE FUNCTION my_udf()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
IMPORTS = ('@my_stage/file.txt')
HANDLER = 'compute'
AS $$
import sys
import os
with open(os.path.join(sys._xoptions["snowflake_import_directory"], 'file.txt'), "r") as f:
s = f.read()
def compute():
return s
$$;
SnowflakeFile
์ ์ฌ์ฉํ์ฌ ๋์ ์ผ๋ก ์ง์ ๋ ํ์ผ ์ฝ๊ธฐยถ
Snowpark snowflake.snowpark.files
๋ชจ๋์์ SnowflakeFile
ํด๋์ค๋ฅผ ์ฌ์ฉํ์ฌ ์คํ
์ด์ง์์ ํ์ผ์ ์ฝ์ ์ ์์ต๋๋ค. SnowflakeFile
ํด๋์ค๋ ๋ชจ๋ ํฌ๊ธฐ์ ํ์ผ์ ์คํธ๋ฆฌ๋ฐํ ์ ์๋ ๋์ ํ์ผ ์ก์ธ์ค๋ฅผ ์ ๊ณตํฉ๋๋ค. ๋์ ํ์ผ ์ก์ธ์ค๋ ์ฌ๋ฌ ํ์ผ์์ ๋ฐ๋ณตํ๋ ค๋ ๊ฒฝ์ฐ์๋ ์ ์ฉํฉ๋๋ค. ์๋ฅผ ๋ค์ด ์ฌ๋ฌ ํ์ผ ์ฒ๋ฆฌํ๊ธฐ ์น์
์ ์ฐธ์กฐํ์ญ์์ค.
SnowflakeFile
ํด๋์ค์๋ ํ์ผ์ ์ด๊ธฐ ์ํ ๋ฉ์๋๊ฐ ํ ๊ฐ ์๋๋ฐ, ๋ฐ๋ก open
์
๋๋ค. open
๋ฉ์๋๋ Python์ IOBase
ํ์ผ ์ค๋ธ์ ํธ๋ฅผ ํ์ฅํ๋ SnowflakeFile
์ค๋ธ์ ํธ๋ฅผ ๋ฐํํฉ๋๋ค.
SnowflakeFile
์ค๋ธ์ ํธ๋ ๋ค์ IOBase
, BufferedIOBase
๋ฐ RawIOBase
๋ฉ์๋๋ฅผ ์ง์ํฉ๋๋ค.
IOBase.fileno
IOBase.isatty
IOBase.readable
IOBase.readinto
IOBase.readline
IOBase.readlines
IOBase.seek
IOBase.seekable
IOBase.tell
BufferedIOBase.readinto1
RawIOBase.read
RawIOBase.readall
์์ธํ ๋ด์ฉ์ IOBase์ Python 3.9 ์ค๋ช
์ ๋ฅผ ์ฐธ์กฐํ์ญ์์ค. fileno
๋ฉ์๋์ ๊ฐ์ด Snowflake ์๋ฒ์์ ์ง์๋์ง ์๋ ๋ฉ์๋๋ฅผ ํธ์ถํ๋ฉด ์ค๋ฅ๊ฐ ๋ฐํ๋ฉ๋๋ค.
์ฐธ๊ณ
๊ธฐ๋ณธ์ ์ผ๋ก SnowflakeFile
์ ์ฌ์ฉํ ํ์ผ ์ก์ธ์ค์๋ ์ฝ๋๊ฐ ํ์ผ ์ฃผ์
๊ณต๊ฒฉ์ ๋ํ ๋ณต์๋ ฅ์ ๊ฐ๋๋ก ํ๊ธฐ ์ํด ๋ฒ์๊ฐ ์ง์ ๋ URL์ด ํ์ํฉ๋๋ค. ๊ธฐ๋ณธ ์ ๊ณต ํจ์ BUILD_SCOPED_FILE_URL ์ ์ฌ์ฉํ์ฌ SQL์์ ๋ฒ์๊ฐ ์ง์ ๋ URL์ ์์ฑํ ์ ์์ต๋๋ค. ๋ฒ์๊ฐ ์ง์ ๋ URL์ ๋ํ ์์ธํ ๋ด์ฉ์ ํ์ผ ์ก์ธ์ค์ ์ฌ์ฉํ ์ ์๋ URL์ ์ ํ ์น์
์ ์ฐธ์กฐํ์ญ์์ค. ํ์ผ์ ๋ํ ์ก์ธ์ค ๊ถํ์ด ์๋ ์ฌ์ฉ์๋ง ๋ฒ์๊ฐ ์ง์ ๋ URL์ ๋ง๋ค ์ ์์ต๋๋ค.
์ ์ ์กฐ๊ฑดยถ
๋ค์์ ์ํํ์ฌ ์ฝ๋์ ์คํ ์ด์ง์ ํ์ผ์ ์ฌ์ฉํ ์ ์๋๋ก ํด์ผ Python ์ฒ๋ฆฌ๊ธฐ ์ฝ๋๊ฐ ํด๋น ํ์ผ์ ์ฝ์ ์ ์์ต๋๋ค.
์ฒ๋ฆฌ๊ธฐ๊ฐ ์ฌ์ฉํ ์ ์๋ ์คํ ์ด์ง๋ฅผ ๋ง๋ญ๋๋ค.
์ธ๋ถ ์คํ ์ด์ง ๋๋ ๋ด๋ถ ์คํ ์ด์ง๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค. ๋ด๋ถ ์คํ ์ด์ง๋ฅผ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ ํธ์ถ์์ ๊ถํ ์ ์ฅ ํ๋ก์์ ๋ฅผ ๋ง๋ค ๊ณํ์ผ ๋๋ ์ฌ์ฉ์ ์คํ ์ด์ง๊ฐ ๋ ์ ์์ต๋๋ค. ๊ทธ๋ ์ง ์์ผ๋ฉด ๋ช ๋ช ๋ ์คํ ์ด์ง๋ฅผ ์ฌ์ฉํด์ผ ํฉ๋๋ค. Snowflake๋ ํ์ฌ UDF ์ข ์์ฑ์ ๋ํ ํ ์ด๋ธ ์คํ ์ด์ง ์ฌ์ฉ์ ์ง์ํ์ง ์์ต๋๋ค.
์คํ ์ด์ง ์์ฑ์ ๋ํ ์์ธํ ๋ด์ฉ์ CREATE STAGE ์น์ ์ ์ฐธ์กฐํ์ญ์์ค. ๋ด๋ถ ์คํ ์ด์ง ์ ํ ์ ํ์ ๋ํ ์์ธํ ๋ด์ฉ์ ๋ก์ปฌ ํ์ผ์ ์ํ ๋ด๋ถ ์คํ ์ด์ง ์ ํํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
์ฌ์ฉ ์ฌ๋ก์ ๋ฐ๋ผ ์คํ ์ด์ง์ ๋ํ ์ ์ ํ ๊ถํ์ ๋ค์ ์ญํ ์ ํ ๋นํด์ผ ํฉ๋๋ค.
์ฌ์ฉ ์ฌ๋ก
์ญํ
UDF ๋๋ ์์ ์์ ๊ถํ ์ ์ฅ ํ๋ก์์
์คํ ์ค์ธ UDF ๋๋ ์ ์ฅ ํ๋ก์์ ๋ฅผ ์์ ํ๋ ์ญํ .
ํธ์ถ์ ๊ถํ ์ ์ฅ ํ๋ก์์
์ฌ์ฉ์ ์ญํ .
์์ธํ ๋ด์ฉ์ ์ฌ์ฉ์ ์ ์ ํจ์์ ๋ํ ๊ถํ ๋ถ์ฌ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
์ฝ๋์์ ์ฝ์ ํ์ผ์ ์คํ ์ด์ง๋ก ๋ณต์ฌํฉ๋๋ค.
PUT ๋ช ๋ น์ ์ฌ์ฉํ์ฌ ๋ก์ปฌ ๋๋ผ์ด๋ธ์์ ๋ด๋ถ ์คํ ์ด์ง๋ก ํ์ผ์ ๋ณต์ฌํ ์ ์์ต๋๋ค. PUT์ผ๋ก ํ์ผ์ ์คํ ์ด์งํ๋ ์์ธํ ๋ฐฉ๋ฒ์ ๋ก์ปฌ ํ์ผ ์์คํ ์์ ๋ฐ์ดํฐ ํ์ผ ์คํ ์ด์งํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
ํด๋ผ์ฐ๋ ์ ์ฅ์ ์๋น์ค์์ ์ ๊ณตํ๋ ๋๊ตฌ๋ฅผ ์ฌ์ฉํ์ฌ ๋ก์ปฌ ๋๋ผ์ด๋ธ์์ ์ธ๋ถ ์คํ ์ด์ง ์์น๋ก ํ์ผ์ ๋ณต์ฌํ ์ ์์ต๋๋ค. ๋์๋ง์ ํด๋ผ์ฐ๋ ์ ์ฅ์ ์๋น์ค ์ค๋ช ์๋ฅผ ์ฐธ์กฐํ์ญ์์ค.
์ธ๋ผ์ธ Python ์ฒ๋ฆฌ๊ธฐ๋ก ์ด๋ฏธ์ง์ ์ง๊ฐ ํด์ ๊ณ์ฐํ๊ธฐยถ
์ด ์์์๋ SnowflakeFile
์ ์ฌ์ฉํ์ฌ ์คํ
์ด์ง๋ ์ด๋ฏธ์ง ํ์ผ ์์ ์ฝ๊ณ ๊ฐ ํ์ผ์ ์ง๊ฐ ํด์ (pHash)๋ฅผ ์ฌ์ฉํ์ฌ ์ด๋ฏธ์ง๊ฐ ์๋ก ์ผ๋ง๋ ์ ์ฌํ์ง ํ์ธํฉ๋๋ค.
mode
์ธ์์ rb
๋ฅผ ์ ๋ฌํ์ฌ ์
๋ ฅ ๋ชจ๋๋ฅผ ๋ฐ์ด๋๋ฆฌ๋ก ์ง์ ํ์ฌ ์ด๋ฏธ์ง์ phash ๊ฐ์ ๋ฐํํ๋ UDF๋ฅผ ๋ง๋ญ๋๋ค.
CREATE OR REPLACE FUNCTION calc_phash(file_path STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
PACKAGES = ('snowflake-snowpark-python','imagehash','pillow')
HANDLER = 'run'
AS $$
from PIL import Image
import imagehash
from snowflake.snowpark.files import SnowflakeFile
def run(file_path):
with SnowflakeFile.open(file_path, 'rb') as f:
return imagehash.average_hash(Image.open(f))
$$;
๋ ์ด๋ฏธ์ง์ phash ๊ฐ ์ฌ์ด์ ๊ฑฐ๋ฆฌ๋ฅผ ๊ณ์ฐํ๋ ๋ ๋ฒ์งธ UDF๋ฅผ ๋ง๋ญ๋๋ค.
CREATE OR REPLACE FUNCTION calc_phash_distance(h1 STRING, h2 STRING)
RETURNS INT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
PACKAGES = ('imagehash')
HANDLER = 'run'
AS $$
import imagehash
def run(h1, h2):
return imagehash.hex_to_hash(h1) - imagehash.hex_to_hash(h2)
$$;
์ด๋ฏธ์ง ํ์ผ์ ์คํ ์ด์งํ๊ณ ๋๋ ํฐ๋ฆฌ ํ ์ด๋ธ์ ์๋ก ๊ณ ์นฉ๋๋ค.
PUT file:///tmp/image1.jpg @images AUTO_COMPRESS=FALSE;
PUT file:///tmp/image2.jpg @images AUTO_COMPRESS=FALSE;
ALTER STAGE images REFRESH;
UDFs๋ฅผ ํธ์ถํฉ๋๋ค.
SELECT
calc_phash_distance(
calc_phash(build_scoped_file_url(@images, 'image1.jpg')),
calc_phash(build_scoped_file_url(@images, 'image2.jpg'))
) ;
UDTF๋ก CSV ํ์ผ ์ฒ๋ฆฌํ๊ธฐยถ
์ด ์์์๋ SnowflakeFile
์ ์ฌ์ฉํ์ฌ CSV ํ์ผ์ ๋ด์ฉ์ ์ถ์ถํ๊ณ ํ
์ด๋ธ์ ํ์ ๋ฐํํ๋ UDTF๋ฅผ ์์ฑํฉ๋๋ค.
์ธ๋ผ์ธ ์ฒ๋ฆฌ๊ธฐ๋ก UDTF๋ฅผ ๋ง๋ญ๋๋ค.
CREATE FUNCTION parse_csv(file_path STRING)
RETURNS TABLE (col1 STRING, col2 STRING, col3 STRING)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'csvparser'
AS $$
from snowflake.snowpark.files import SnowflakeFile
class csvparser:
def process(self, stagefile):
with SnowflakeFile.open(stagefile) as f:
for line in f.readlines():
lineStr = line.strip()
row = lineStr.split(",")
try:
# Read the columns from the line.
yield (row[1], row[0], row[2], )
except:
pass
$$;
CSV ํ์ผ์ ์คํ ์ด์งํ๊ณ ๋๋ ํฐ๋ฆฌ ํ ์ด๋ธ์ ์๋ก ๊ณ ์นฉ๋๋ค.
PUT file:///tmp/sample.csv @data_stage AUTO_COMPRESS=FALSE;
ALTER STAGE data_stage REFRESH;
UDTF๋ฅผ ํธ์ถํ์ฌ ํ์ผ URL์ ์ ๋ฌํฉ๋๋ค.
SELECT * FROM TABLE(PARSE_CSV(build_scoped_file_url(@data_stage, 'sample.csv')));
์ฌ๋ฌ ํ์ผ ์ฒ๋ฆฌํ๊ธฐยถ
๋๋ ํฐ๋ฆฌ ํ ์ด๋ธ์ RELATIVE_PATH ์ด์ ์ฒ๋ฆฌ๊ธฐ์ ์ ๋ฌํ์ฌ ์ฌ๋ฌ ํ์ผ์ ์ฝ๊ณ ์ฒ๋ฆฌํ ์ ์์ต๋๋ค. RELATIVE_PATH ์ด์ ๋ํ ์์ธํ ๋ด์ฉ์ ๋๋ ํฐ๋ฆฌ ํ ์ด๋ธ ์ฟผ๋ฆฌ์ ์ถ๋ ฅ ์ ์ฐธ์กฐํ์ญ์์ค.
์ฐธ๊ณ
ํ์ผ ํฌ๊ธฐ ๋ฐ ์ปดํจํ ์๊ตฌ ์ฌํญ์ ๋ฐ๋ผ ์ฌ๋ฌ ํ์ผ์ ์ฝ๊ณ ์ฒ๋ฆฌํ๋ ๋ฌธ์ ์คํํ๊ธฐ ์ ์ ALTER WAREHOUSE ๋ฅผ ์ฌ์ฉํ์ฌ ์จ์ดํ์ฐ์ค๋ฅผ ํ์ฅํ ์ ์์ต๋๋ค.
- ์ฌ๋ฌ ํ์ผ์ ์ฒ๋ฆฌํ๋ UDF๋ฅผ ํธ์ถํฉ๋๋ค.
๋ค์ ์์์๋ CREATE TABLE ๋ฌธ ๋ด์์ UDF๋ฅผ ํธ์ถํ์ฌ ์คํ ์ด์ง์ ๊ฐ ํ์ผ์ ์ฒ๋ฆฌํ ๋ค์ ๊ฒฐ๊ณผ๋ฅผ ์ ํ ์ด๋ธ์ ์ ์ฅํฉ๋๋ค.
๋ฐ๋ชจ ๋ชฉ์ ์ผ๋ก, ์์ ์์๋ ๋ค์์ ๊ฐ์ ํฉ๋๋ค.
my_stage
๋ผ๋ ์คํ ์ด์ง์ ์ฌ๋ฌ ํ ์คํธ ํ์ผ์ด ์์ต๋๋ค.๋น์ ํ ํ ์คํธ์ ๋ํ ๊ฐ์ ๋ถ์์ ์ํํ๋
get_sentiment
๋ผ๋ ๊ธฐ์กด UDF๊ฐ ์์ต๋๋ค. UDF๋ ํ ์คํธ ํ์ผ์ ๊ฒฝ๋ก๋ฅผ ์ ๋ ฅ๊ฐ์ผ๋ก ๋ฐ์ ๊ฐ์ ์ ๋ํ๋ด๋ ๊ฐ์ ๋ฐํํฉ๋๋ค.
CREATE OR REPLACE TABLE sentiment_results AS SELECT relative_path , get_sentiment(build_scoped_file_url(@my_stage, relative_path)) AS sentiment FROM directory(@my_stage);
- ์ฌ๋ฌ ํ์ผ์ ์ฒ๋ฆฌํ๋ UDTF๋ฅผ ํธ์ถํฉ๋๋ค.
๋ค์ ์์์๋
parse_excel_udtf
๋ผ๋ UDTF๋ฅผ ํธ์ถํฉ๋๋ค. ์ด ์์์๋my_excel_stage
๋ผ๋ ์คํ ์ด์ง์ ๋๋ ํฐ๋ฆฌ ํ ์ด๋ธ์์relative_path
๋ฅผ ์ ๋ฌํฉ๋๋ค.SELECT t.* FROM directory(@my_stage) d, TABLE(parse_excel_udtf(build_scoped_file_url(@my_excel_stage, relative_path)) t;
์คํ ์ด์ง URIs ๋ฐ URLs๊ฐ ์๋ ํ์ผ ์ฝ๊ธฐยถ
SnowflakeFile
์ ์ฌ์ฉํ ํ์ผ ์ก์ธ์ค์๋ ๊ธฐ๋ณธ์ ์ผ๋ก ๋ฒ์๊ฐ ์ง์ ๋ URL์ด ํ์ํฉ๋๋ค. ์ด๋ฅผ ํตํด ์ฝ๋๋ฅผ ํ์ผ ์ฃผ์
๊ณต๊ฒฉ์ ๋ํด ๋ณต์๋ ฅ ์๊ฒ ๋ง๋ค ์ ์์ต๋๋ค. ๊ทธ๋ฌ๋ ๋์ ์คํ
์ด์ง URI ๋๋ ์คํ
์ด์ง URL์ ์ฌ์ฉํ์ฌ ํ์ผ ์์น๋ฅผ ์ฐธ์กฐํ ์ ์์ต๋๋ค. ๊ทธ๋ฌ๋ ค๋ฉด require_scoped_url = False
ํค์๋ ์ธ์๋ก SnowflakeFile.open
๋ฉ์๋๋ฅผ ํธ์ถํด์ผ ํฉ๋๋ค.
์ด ์ต์ ์ ํธ์ถ์๊ฐ UDF ์์ ์๋ง ์ก์ธ์คํ ์ ์๋ URI๋ฅผ ์ ๊ณตํ ์ ์๋๋ก ํ๋ ค๋ ๊ฒฝ์ฐ์ ์ ์ฉํฉ๋๋ค. ์๋ฅผ ๋ค์ด UDF๋ฅผ ์์ ํ๊ณ ๊ตฌ์ฑ ํ์ผ ๋๋ ๋จธ์ ๋ฌ๋ ๋ชจ๋ธ์ ์ฝ์ผ๋ ค๋ ๊ฒฝ์ฐ ํ์ผ ์ก์ธ์ค๋ฅผ ์ํด ์คํ ์ด์ง URI๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค. ์ฌ์ฉ์ ์ ๋ ฅ์ ๊ธฐ๋ฐ์ผ๋ก ์์ฑ๋ ํ์ผ๊ณผ ๊ฐ์ด ์ด๋ฆ์ ์์ธกํ ์ ์๋ ํ์ผ๋ก ์์ ํ ๋๋ ์ด ์ต์ ์ ๊ถ์ฅํ์ง ์์ต๋๋ค.
์ด ์์ ์์๋ ํ์ผ์์ ๋จธ์ ๋ฌ๋ ๋ชจ๋ธ์ ์ฝ๊ณ ํจ์์์ ์ด ๋ชจ๋ธ์ ์ฌ์ฉํ์ฌ ๊ฐ์ ๋ถ์์ ์ํ ์์ฐ์ด ์ฒ๋ฆฌ๋ฅผ ์ํํฉ๋๋ค. ์ด ์์์๋ require_scoped_url = False
๋ก open
์ ํธ์ถํฉ๋๋ค. ๋ ํ์ผ ์์น ํ์(์คํ
์ด์ง URI ๋ฐ ์คํ
์ด์ง URL)์์ ๋ชจ๋ UDF ์์ ์๋ ๋ชจ๋ธ ํ์ผ์ ์ก์ธ์คํ ์ ์์ด์ผ ํฉ๋๋ค.
์ธ๋ผ์ธ ์ฒ๋ฆฌ๊ธฐ๋ก UDF๋ฅผ ๋ง๋ญ๋๋ค.
CREATE OR REPLACE FUNCTION extract_sentiment(input_data STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS $$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle
def run(input_data):
model_file = '@models/NLP_model.pickle'
# Specify 'mode = rb' to open the file in binary mode.
with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
model = pickle.load(f)
return model.predict([input_data])[0]
$$;
๋ชจ๋ธ ํ์ผ์ ์คํ ์ด์งํ๊ณ ๋๋ ํฐ๋ฆฌ ํ ์ด๋ธ์ ์๋ก ๊ณ ์นฉ๋๋ค.
PUT file:///tmp/NLP_model.pickle @models AUTO_COMPRESS=FALSE;
ALTER STAGE models REFRESH;
๋๋ ๋ชจ๋ธ์ ์คํ ์ด์ง URL๋ก UDF๋ฅผ ์ง์ ํ์ฌ ๊ฐ์ ์ ์ถ์ถํ ์ ์์ต๋๋ค.
์๋ฅผ ๋ค์ด ์คํ ์ด์ง URL์ ์ฌ์ฉํ์ฌ ํ์ผ์ ์ง์ ํ๋ ์ธ๋ผ์ธ ์ฒ๋ฆฌ๊ธฐ๊ฐ ์๋ UDF๋ฅผ ์์ฑํฉ๋๋ค.
CREATE OR REPLACE FUNCTION extract_sentiment(input_data STRING)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
PACKAGES = ('snowflake-snowpark-python','scikit-learn')
HANDLER = 'run'
AS $$
from snowflake.snowpark.files import SnowflakeFile
from sklearn.linear_model import SGDClassifier
import pickle
def run(input_data):
model_file = 'https://my_account/api/files/my_db/my_schema/models/NLP_model.pickle'
# Specify 'rb' to open the file in binary mode.
with SnowflakeFile.open(model_file, 'rb', require_scoped_url = False) as f:
model = pickle.load(f)
return model.predict([input_data])[0]
$$;
์ ๋ ฅ ๋ฐ์ดํฐ๋ก UDF๋ฅผ ํธ์ถํฉ๋๋ค.
SELECT extract_sentiment('I am writing to express my interest in a recent posting made.');
ํ์ผ ์ฐ๊ธฐยถ
UDF ์ฒ๋ฆฌ๊ธฐ๋ UDF๋ฅผ ํธ์ถํ๋ ์ฟผ๋ฆฌ์ ๋ํด ์์ฑ๋ /tmp
๋๋ ํฐ๋ฆฌ์ ํ์ผ์ ์ธ ์ ์์ต๋๋ค.
/tmp
๋๋ ํฐ๋ฆฌ๋ ๋จ์ผ ํธ์ถ ์ฟผ๋ฆฌ์ฉ์ผ๋ก ๋ฐ๋ก ๋ง๋ จ๋ ๊ฒ์ด์ง๋ง, ์ฌ๋ฌ Python ์์
์ ํ๋ก์ธ์ค๊ฐ ๋์์ ์คํ ์ค์ผ ์๋ ์์ต๋๋ค. ์ถฉ๋์ ๋ฐฉ์งํ๋ ค๋ฉด /tmp ๋๋ ํฐ๋ฆฌ์ ๋ํ ์ก์ธ์ค๊ฐ ๋ค๋ฅธ Python ์์
์ ํ๋ก์ธ์ค์ ๋๊ธฐํ๋๊ฑฐ๋ /tmp์ ๊ธฐ๋ก๋ ํ์ผ์ ์ด๋ฆ์ด ๊ณ ์ ํ๋๋ก ํด์ผ ํฉ๋๋ค.
์์ ์ฝ๋๋ ์ด ํญ๋ชฉ์ ์คํ ์ด์ง๋ ํ์ผ์ ์์ถ ํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
๋ค์ ์์ ์ ์ฝ๋๋ ์
๋ ฅ text
๋ฅผ /tmp
๋๋ ํฐ๋ฆฌ์ ์๋๋ค. ๋ํ ํ์ผ ์์น์ ๊ณ ์ ์ฑ์ ๋ณด์ฅํ๋๋ก ํจ์์ ํ๋ก์ธ์ค ID๋ฅผ ์ถ๊ฐํฉ๋๋ค.
def func(text):
# Append the function's process ID to ensure the file name's uniqueness.
file_path = '/tmp/content' + str(os.getpid())
with open(file_path, "w") as file:
file.write(text)
ํ์ผ ์์ฑ์ ๋ํ ์์ธํ ๋ด์ฉ์ Snowpark Python UDF ๋ฐ UDTF์์ ํ์ผ ์์ฑํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
์คํ ์ด์ง๋ ํ์ผ์ ์์ถ ํ๊ธฐยถ
.zip ํ์ผ์ ์คํ ์ด์ง์ ์ ์ฅํ ๋ค์, Python zipfile ๋ชจ๋์ ์ฌ์ฉํ์ฌ UDF ํ์์ผ๋ก ์์ถ์ ํ ์ ์์ต๋๋ค.
์๋ฅผ ๋ค์ด .zip ํ์ผ์ ์คํ ์ด์ง์ ์ ๋ก๋ํ ๋ค์, UDF๋ฅผ ๋ง๋ค ๋ IMPORTS ์ ์ ์คํ ์ด์ง๋ ์์น์์ .zip ํ์ผ์ ์ฐธ์กฐํ ์ ์์ต๋๋ค. ๋ฐํ์์ Snowflake๋ ์คํ ์ด์ง๋ ํ์ผ์ ์ฝ๋์์ ์ก์ธ์คํ ์ ์๋ ๊ฐ์ ธ์ค๊ธฐ ๋๋ ํฐ๋ฆฌ๋ก ๋ณต์ฌํฉ๋๋ค.
ํ์ผ ์ฝ๊ธฐ ๋ฐ ์ฐ๊ธฐ์ ๋ํ ์์ธํ ๋ด์ฉ์ ํ์ผ ์ฝ๊ธฐ ๋ฐ ํ์ผ ์ฐ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
๋ค์ ์์ ์์ UDF ์ฝ๋๋ NLP ๋ชจ๋ธ์ ์ฌ์ฉํ์ฌ ํ ์คํธ์์ ์ํฐํฐ๋ฅผ ๊ฒ์ํฉ๋๋ค. ์ฝ๋๋ ์ด๋ฌํ ์ํฐํฐ๋ก ๊ตฌ์ฑ๋ ๋ฐฐ์ด์ ๋ฐํํฉ๋๋ค. ํ ์คํธ ์ฒ๋ฆฌ๋ฅผ ์ํด NLP ๋ชจ๋ธ์ ์ค์ ํ๊ธฐ ์ํด, ์ฝ๋๋ ๋จผ์ zipfile ๋ชจ๋์ ์ฌ์ฉํ์ฌ .zip ํ์ผ์์ ๋ชจ๋ธ(en_core_web_sm-2.3.1)์ ์ํ ํ์ผ์ ์ถ์ถํฉ๋๋ค. ๊ทธ๋ฐ ๋ค์ ์ด ์ฝ๋๋ spaCy ๋ชจ๋์ ์ฌ์ฉํ์ฌ ํ์ผ์์ ๋ชจ๋ธ์ ๋ก๋ฉํฉ๋๋ค.
์ด ์ฝ๋๋ ์ด ํจ์๋ฅผ ํธ์ถํ๋ ์ฟผ๋ฆฌ๋ฅผ ์ํด ์์ฑ๋ /tmp ๋๋ ํฐ๋ฆฌ์ ์ถ์ถ๋ ํ์ผ ๋ด์ฉ์ ์๋๋ค. ์ด ์ฝ๋๋ ํ์ผ ์ ๊ธ์ ์ฌ์ฉํ์ฌ ์ถ์ถ์ด Python ์์ ์ ํ๋ก์ธ์ค ๊ฐ์ ๋๊ธฐํ๋๋๋ก ํฉ๋๋ค. ์ด์ ๊ฐ์ด, ๋ด์ฉ์ ํ ๋ฒ๋ง ์์ถ์ด ํ๋ฆฝ๋๋ค. ํ์ผ ์ฐ๊ธฐ์ ๋ํ ์์ธํ ๋ด์ฉ์ ํ์ผ ์ฐ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
zipfile ๋ชจ๋์ ๋ํ ์์ธํ ๋ด์ฉ์ zipfile ์ฐธ์กฐ ๋ฅผ ํ์ธํด ๋ณด์ญ์์ค. spaCy ๋ชจ๋์ ๋ํ ์์ธํ ๋ด์ฉ์ spaCy API ์ค๋ช ์ ๋ฅผ ์ฐธ์กฐํ์ญ์์ค.
์ธ๋ผ์ธ ์ฒ๋ฆฌ๊ธฐ๋ก UDF๋ฅผ ๋ง๋ญ๋๋ค.
CREATE OR REPLACE FUNCTION py_spacy(str STRING)
RETURNS ARRAY
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'func'
PACKAGES = ('spacy')
IMPORTS = ('@spacy_stage/spacy_en_core_web_sm.zip')
AS $$
import fcntl
import os
import spacy
import sys
import threading
import zipfile
# File lock class for synchronizing write access to /tmp.
class FileLock:
def __enter__(self):
self._lock = threading.Lock()
self._lock.acquire()
self._fd = open('/tmp/lockfile.LOCK', 'w+')
fcntl.lockf(self._fd, fcntl.LOCK_EX)
def __exit__(self, type, value, traceback):
self._fd.close()
self._lock.release()
# Get the location of the import directory. Snowflake sets the import
# directory location so code can retrieve the location via sys._xoptions.
IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
# Get the path to the ZIP file and set the location to extract to.
zip_file_path = import_dir + "spacy_en_core_web_sm.zip"
extracted = '/tmp/en_core_web_sm'
# Extract the contents of the ZIP. This is done under the file lock
# to ensure that only one worker process unzips the contents.
with FileLock():
if not os.path.isdir(extracted + '/en_core_web_sm/en_core_web_sm-2.3.1'):
with zipfile.ZipFile(zip_file_path, 'r') as myzip:
myzip.extractall(extracted)
# Load the model from the extracted file.
nlp = spacy.load(extracted + "/en_core_web_sm/en_core_web_sm-2.3.1")
def func(text):
doc = nlp(text)
result = []
for ent in doc.ents:
result.append((ent.text, ent.start_char, ent.end_char, ent.label_))
return result
$$;
NULL ๊ฐ ์ฒ๋ฆฌํ๊ธฐยถ
๋ค์ ์ฝ๋์์๋ NULL ๊ฐ์ด ์ฒ๋ฆฌ๋๋ ๋ฐฉ์์ ๋ณด์ฌ์ค๋๋ค. ์์ธํ ๋ด์ฉ์ NULL ๊ฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
UDF๋ฅผ ๋ง๋ญ๋๋ค.
CREATE OR REPLACE FUNCTION py_udf_null(a VARIANT)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'udf'
AS $$
def udf(a):
if not a:
return 'JSON null'
elif getattr(a, "is_sql_null", False):
return 'SQL null'
else:
return 'not null'
$$;
UDF๋ฅผ ํธ์ถํฉ๋๋ค.
SELECT py_udf_null(null);
+-------------------+
| PY_UDF_NULL(NULL) |
|-------------------|
| SQL null |
+-------------------+
SELECT py_udf_null(parse_json('null'));
+---------------------------------+
| PY_UDF_NULL(PARSE_JSON('NULL')) |
|---------------------------------|
| JSON null |
+---------------------------------+
SELECT py_udf_null(10);
+-----------------+
| PY_UDF_NULL(10) |
|-----------------|
| not null |
+-----------------+