์ ์ฅ ํ๋ก์์ ์ฉ Python ์ฒ๋ฆฌ๊ธฐ ์ยถ
์์ปค ํ๋ก์ธ์ค๋ก ๋์ ์์ ์คํํ๊ธฐยถ
Python ์์ ์ ํ๋ก์ธ์ค๋ฅผ ์ฌ์ฉํ์ฌ ๋์ ์์ ์ ์คํํ ์ ์์ต๋๋ค. ์จ์ดํ์ฐ์ค ๋ ธ๋์์ ์ฌ๋ฌ CPU ์ฝ์ด๋ฅผ ํ์ฉํ๋ ๋ณ๋ ฌ ์์ ์ ์คํํด์ผ ํ ๋ ์ด ๊ธฐ๋ฅ์ด ์ ์ฉํ ์ ์์ต๋๋ค.
์ฐธ๊ณ
๊ธฐ๋ณธ ์ ๊ณต๋ Python ๋ค์ค ์ฒ๋ฆฌ ๋ชจ๋์ ์ฌ์ฉํ์ง ์๋ ๊ฒ์ด ์ข์ต๋๋ค.
Python Global Interpreter Lock ์ผ๋ก ์ธํด ๋ฉํฐํ์คํน ์ ๊ทผ ๋ฐฉ์์ด ๋ชจ๋ CPU ์ฝ์ด์์ ํ์ฅ๋์ง ๋ชปํ๋ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๋ ค๋ฉด ์ค๋ ๋๊ฐ ์๋ ๋ณ๋์ ์์ ์ ํ๋ก์ธ์ค๋ฅผ ์ฌ์ฉํ์ฌ ๋์ ์์ ์ ์คํํ ์ ์์ต๋๋ค.
๋ค์ ์์์์ฒ๋ผ joblib
๋ผ์ด๋ธ๋ฌ๋ฆฌ์ Parallel
ํด๋์ค๋ฅผ ์ฌ์ฉํ์ฌ Snowflake ์จ์ดํ์ฐ์ค์์ ์ด ์์
์ ์ํํ ์ ์์ต๋๋ค.
CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'joblib_multiprocessing'
PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt
def joblib_multiprocessing(session, i):
result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
return str(result)
$$;
์ฐธ๊ณ
joblib.Parallel
์ ์ฌ์ฉ๋๋ ๊ธฐ๋ณธ ๋ฐฑ์๋๋ Snowflake ํ์ค๊ณผ Snowpark ์ต์ ํ ์จ์ดํ์ฐ์ค ๊ฐ์ ๋ค๋ฆ
๋๋ค.
ํ์ค ์จ์ดํ์ฐ์ค ๊ธฐ๋ณธ๊ฐ:
threading
Snowpark ์ต์ ํ ์จ์ดํ์ฐ์ค ๊ธฐ๋ณธ๊ฐ:
loky
(๋ค์ค ์ฒ๋ฆฌ)
๋ค์ ์์์์ฒ๋ผ joblib.parallel_backend
ํจ์๋ฅผ ํธ์ถํ์ฌ ๊ธฐ๋ณธ ๋ฐฑ์๋ ์ค์ ์ ์ฌ์ ์ํ ์ ์์ต๋๋ค.
import joblib
joblib.parallel_backend('loky')
๋น๋๊ธฐ ์ฒ๋ฆฌ์ Snowpark APIs ์ฌ์ฉํ๊ธฐยถ
๋ค์ ์์ ์์๋ Snowpark APIs๋ฅผ ์ฌ์ฉํ์ฌ ๋น๋๊ธฐ ํ์ ์์ ์ ์์ํ๋ ๋ฐฉ๋ฒ๊ณผ ๋ค์ํ ์กฐ๊ฑด์์ ์ด๋ฌํ ์์ ์ด ์ด๋ป๊ฒ ์๋ํ๋์ง ์ค๋ช ํฉ๋๋ค.
๋น๋๊ธฐ ํ์ ์์ ์ ์ํ ํ์ธํ๊ธฐยถ
๋ค์ ์์ ์์ checkStatus
ํ๋ก์์ ๋ 60์ด ๋์ ๊ธฐ๋ค๋ฆฌ๋ ๋น๋๊ธฐ ํ์ ์์
์ ์คํํฉ๋๋ค. ๊ทธ๋ฐ ๋ค์ ํ๋ก์์ ๋ ์์
์ด ์๋ฃ๋๊ธฐ ์ ์ ์์
์ ์ํ๋ฅผ ํ์ธํ๋ฏ๋ก ํ์ธ์ False
๋ฅผ ๋ฐํํฉ๋๋ค.
CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(60)").collect_nowait()
return async_job.is_done()
$$;
๋ค์ ์ฝ๋๋ ํ๋ก์์ ๋ฅผ ํธ์ถํฉ๋๋ค.
CALL checkStatus();
+-------------+
| checkStatus |
|-------------|
| False |
+-------------+
๋น๋๊ธฐ ํ์ ์์ ์ทจ์ํ๊ธฐยถ
๋ค์ ์์ ์์, cancelJob
ํ๋ก์์ ๋ SQL์ ์ฌ์ฉํ์ฌ ์๋ฃํ๋ ๋ฐ 10์ด๊ฐ ๊ฑธ๋ฆฌ๋ ๋น๋๊ธฐ ํ์ ์์
์ผ๋ก test_tb
ํ
์ด๋ธ์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์
ํฉ๋๋ค. ๊ทธ๋ฐ ๋ค์ ์์
์ด ์๋ฃ๋๊ณ ๋ฐ์ดํฐ๊ฐ ์ฝ์
๋๊ธฐ ์ ์ ํ์ ์์
์ ์ทจ์ํฉ๋๋ค.
CREATE OR REPLACE TABLE test_tb(c1 STRING);
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
return async_job.cancel()
$$;
CALL cancelJob();
๋ค์ ์ฝ๋๋ test_tb
ํ
์ด๋ธ์ ์ฟผ๋ฆฌํ์ง๋ง, ๋ฐ์ดํฐ๊ฐ ์ฝ์
๋์ง ์์๊ธฐ ๋๋ฌธ์ ๊ฒฐ๊ณผ๋ฅผ ๋ฐํํ์ง ์์ต๋๋ค.
SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+
๋น๋๊ธฐ ํ์ ์์ ์ด ์คํ๋๋ ๋์ ๋๊ธฐ ๋ฐ ์ฐจ๋จํ๊ธฐยถ
๋ค์ ์์ ์์, blockUntilDone
ํ๋ก์์ ๋ ์๋ฃํ๋ ๋ฐ 5์ด๊ฐ ๊ฑธ๋ฆฌ๋ ๋น๋๊ธฐ ํ์ ์์
์ ์คํํฉ๋๋ค. snowflake.snowpark.AsyncJob.result
๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ฉด ํ๋ก์์ ๊ฐ ๋๊ธฐํ๋ค๊ฐ ์์
์ด ์๋ฃ๋๋ฉด ๋ฐํํฉ๋๋ค.
CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(5)").collect_nowait()
return async_job.result()
$$;
๋ค์ ์ฝ๋๋ 5์ด๋ฅผ ๊ธฐ๋ค๋ฆฐ ํ ๋ฐํ๋๋ blockUntilDone
ํ๋ก์์ ๋ฅผ ํธ์ถํฉ๋๋ค.
CALL blockUntilDone();
+------------------------------------------+
| blockUntilDone |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+
์๋ฃ๋์ง ์์ ๋น๋๊ธฐ ํ์ ์์ ์ ๊ฒฐ๊ณผ๋ฅผ ์์ฒญํ ํ ์ค๋ฅ ๋ฐํํ๊ธฐยถ
๋ค์ ์์ ์์, earlyReturn
ํ๋ก์์ ๋ ์๋ฃํ๋ ๋ฐ 60์ด๊ฐ ๊ฑธ๋ฆฌ๋ ๋น๋๊ธฐ ํ์ ์์
์ ์คํํฉ๋๋ค. ๊ทธ๋ฐ ๋ค์ ํ๋ก์์ ๋ ์์
์ด ์๋ฃ๋๊ธฐ ์ ์ ์์
๊ฒฐ๊ณผ์์ DataFrame
์ ๋ฐํ์ ์๋ํฉ๋๋ค. ๊ฒฐ๊ณผ์ ์ผ๋ก ์ค๋ฅ๊ฐ ๋ฐ์ํฉ๋๋ค.
CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
async_job = session.sql("select system$wait(60)").collect_nowait()
df = async_job.to_df()
try:
df.collect()
except Exception as ex:
return 'Error: (02000): Result for query <UUID> has expired'
$$;
๋ค์ ์ฝ๋๋ earlyReturn
ํ๋ก์์ ๋ฅผ ํธ์ถํ์ฌ ์ค๋ฅ๋ฅผ ๋ฐํํฉ๋๋ค.
CALL earlyReturn();
+------------------------------------------------------------+
| earlyReturn |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired |
+------------------------------------------------------------+
ํ์ ์์ ์ด ์๋ฃ๋๊ธฐ ์ ์ ์์ ์์ ์ ์๋ฃํ๊ณ ํ์ ์์ ์ทจ์ํ๊ธฐยถ
๋ค์ ์์ ์์, earlyCancelJob
ํ๋ก์์ ๋ ๋น๋๊ธฐ ํ์ ์์
์ ์คํํ์ฌ ํ
์ด๋ธ์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์
ํ๊ณ ์๋ฃํ๋ ๋ฐ 10์ด๊ฐ ๊ฑธ๋ฆฝ๋๋ค. ๊ทธ๋ฌ๋ ์์ ์์
async_handler
๋ ํ์ ์์
์ด ์๋ฃ๋๊ธฐ ์ ์ ๋ฐํ๋์ด ํ์ ์์
์ด ์ทจ์๋ฉ๋๋ค.
CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;
๋ค์ ์ฝ๋๋ earlyCancelJob
ํ๋ก์์ ๋ฅผ ํธ์ถํฉ๋๋ค. ๊ทธ๋ฐ ๋ค์ test_tb
ํ
์ด๋ธ์ ์ฟผ๋ฆฌํ๋ฉฐ, ์ด ์์
์ ์ทจ์๋ ํ์ ์์
์์ ์ฝ์
๋ ๋ฐ์ดํฐ๊ฐ ์๊ธฐ ๋๋ฌธ์ ๊ฒฐ๊ณผ๋ฅผ ๋ฐํํ์ง ์์ต๋๋ค.
CALL earlyCancelJob();
SELECT * FROM test_tb;
+----+
| C1 |
|----|
+----+