์ €์žฅ ํ”„๋กœ์‹œ์ €์šฉ Python ์ฒ˜๋ฆฌ๊ธฐ ์˜ˆยถ

์›Œ์ปค ํ”„๋กœ์„ธ์Šค๋กœ ๋™์‹œ ์ž‘์—… ์‹คํ–‰ํ•˜๊ธฐยถ

Python ์ž‘์—…์ž ํ”„๋กœ์„ธ์Šค๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋™์‹œ ์ž‘์—…์„ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์›จ์–ดํ•˜์šฐ์Šค ๋…ธ๋“œ์—์„œ ์—ฌ๋Ÿฌ CPU ์ฝ”์–ด๋ฅผ ํ™œ์šฉํ•˜๋Š” ๋ณ‘๋ ฌ ์ž‘์—…์„ ์‹คํ–‰ํ•ด์•ผ ํ•  ๋•Œ ์ด ๊ธฐ๋Šฅ์ด ์œ ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ฐธ๊ณ 

๊ธฐ๋ณธ ์ œ๊ณต๋œ Python ๋‹ค์ค‘ ์ฒ˜๋ฆฌ ๋ชจ๋“ˆ์„ ์‚ฌ์šฉํ•˜์ง€ ์•Š๋Š” ๊ฒƒ์ด ์ข‹์Šต๋‹ˆ๋‹ค.

Python Global Interpreter Lock ์œผ๋กœ ์ธํ•ด ๋ฉ€ํ‹ฐํƒœ์Šคํ‚น ์ ‘๊ทผ ๋ฐฉ์‹์ด ๋ชจ๋“  CPU ์ฝ”์–ด์—์„œ ํ™•์žฅ๋˜์ง€ ๋ชปํ•˜๋Š” ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๋ ค๋ฉด ์Šค๋ ˆ๋“œ๊ฐ€ ์•„๋‹Œ ๋ณ„๋„์˜ ์ž‘์—…์ž ํ”„๋กœ์„ธ์Šค๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋™์‹œ ์ž‘์—…์„ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์—์„œ์ฒ˜๋Ÿผ joblib ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์˜ Parallel ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Snowflake ์›จ์–ดํ•˜์šฐ์Šค์—์„œ ์ด ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = 3.9
  HANDLER = 'joblib_multiprocessing'
  PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt

def joblib_multiprocessing(session, i):
  result = joblib.Parallel(n_jobs=-1)(joblib.delayed(sqrt)(i ** 2) for i in range(10))
  return str(result)
$$;
Copy

์ฐธ๊ณ 

joblib.Parallel ์— ์‚ฌ์šฉ๋˜๋Š” ๊ธฐ๋ณธ ๋ฐฑ์—”๋“œ๋Š” Snowflake ํ‘œ์ค€๊ณผ Snowpark ์ตœ์ ํ™” ์›จ์–ดํ•˜์šฐ์Šค ๊ฐ„์— ๋‹ค๋ฆ…๋‹ˆ๋‹ค.

  • ํ‘œ์ค€ ์›จ์–ดํ•˜์šฐ์Šค ๊ธฐ๋ณธ๊ฐ’: threading

  • Snowpark ์ตœ์ ํ™” ์›จ์–ดํ•˜์šฐ์Šค ๊ธฐ๋ณธ๊ฐ’: loky (๋‹ค์ค‘ ์ฒ˜๋ฆฌ)

๋‹ค์Œ ์˜ˆ์—์„œ์ฒ˜๋Ÿผ joblib.parallel_backend ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ๊ธฐ๋ณธ ๋ฐฑ์—”๋“œ ์„ค์ •์„ ์žฌ์ •์˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

import joblib
joblib.parallel_backend('loky')
Copy

๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ์— Snowpark APIs ์‚ฌ์šฉํ•˜๊ธฐยถ

๋‹ค์Œ ์˜ˆ์ œ์—์„œ๋Š” Snowpark APIs๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋น„๋™๊ธฐ ํ•˜์œ„ ์ž‘์—…์„ ์‹œ์ž‘ํ•˜๋Š” ๋ฐฉ๋ฒ•๊ณผ ๋‹ค์–‘ํ•œ ์กฐ๊ฑด์—์„œ ์ด๋Ÿฌํ•œ ์ž‘์—…์ด ์–ด๋–ป๊ฒŒ ์ž‘๋™ํ•˜๋Š”์ง€ ์„ค๋ช…ํ•ฉ๋‹ˆ๋‹ค.

๋น„๋™๊ธฐ ํ•˜์œ„ ์ž‘์—…์˜ ์ƒํƒœ ํ™•์ธํ•˜๊ธฐยถ

๋‹ค์Œ ์˜ˆ์ œ์—์„œ checkStatus ํ”„๋กœ์‹œ์ €๋Š” 60์ดˆ ๋™์•ˆ ๊ธฐ๋‹ค๋ฆฌ๋Š” ๋น„๋™๊ธฐ ํ•˜์œ„ ์ž‘์—…์„ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฐ ๋‹ค์Œ ํ”„๋กœ์‹œ์ €๋Š” ์ž‘์—…์ด ์™„๋ฃŒ๋˜๊ธฐ ์ „์— ์ž‘์—…์˜ ์ƒํƒœ๋ฅผ ํ™•์ธํ•˜๋ฏ€๋กœ ํ™•์ธ์€ False ๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

CREATE OR REPLACE PROCEDURE checkStatus()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    return async_job.is_done()
$$;
Copy

๋‹ค์Œ ์ฝ”๋“œ๋Š” ํ”„๋กœ์‹œ์ €๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

CALL checkStatus();
Copy
+-------------+
| checkStatus |
|-------------|
| False       |
+-------------+

๋น„๋™๊ธฐ ํ•˜์œ„ ์ž‘์—… ์ทจ์†Œํ•˜๊ธฐยถ

๋‹ค์Œ ์˜ˆ์ œ์—์„œ, cancelJob ํ”„๋กœ์‹œ์ €๋Š” SQL์„ ์‚ฌ์šฉํ•˜์—ฌ ์™„๋ฃŒํ•˜๋Š” ๋ฐ 10์ดˆ๊ฐ€ ๊ฑธ๋ฆฌ๋Š” ๋น„๋™๊ธฐ ํ•˜์œ„ ์ž‘์—…์œผ๋กœ test_tb ํ…Œ์ด๋ธ”์— ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฝ์ž…ํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฐ ๋‹ค์Œ ์ž‘์—…์ด ์™„๋ฃŒ๋˜๊ณ  ๋ฐ์ดํ„ฐ๊ฐ€ ์‚ฝ์ž…๋˜๊ธฐ ์ „์— ํ•˜์œ„ ์ž‘์—…์„ ์ทจ์†Œํ•ฉ๋‹ˆ๋‹ค.

CREATE OR REPLACE TABLE test_tb(c1 STRING);
Copy
CREATE OR REPLACE PROCEDURE cancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
    return async_job.cancel()
$$;

CALL cancelJob();
Copy

๋‹ค์Œ ์ฝ”๋“œ๋Š” test_tb ํ…Œ์ด๋ธ”์„ ์ฟผ๋ฆฌํ•˜์ง€๋งŒ, ๋ฐ์ดํ„ฐ๊ฐ€ ์‚ฝ์ž…๋˜์ง€ ์•Š์•˜๊ธฐ ๋•Œ๋ฌธ์— ๊ฒฐ๊ณผ๋ฅผ ๋ฐ˜ํ™˜ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+

๋น„๋™๊ธฐ ํ•˜์œ„ ์ž‘์—…์ด ์‹คํ–‰๋˜๋Š” ๋™์•ˆ ๋Œ€๊ธฐ ๋ฐ ์ฐจ๋‹จํ•˜๊ธฐยถ

๋‹ค์Œ ์˜ˆ์ œ์—์„œ, blockUntilDone ํ”„๋กœ์‹œ์ €๋Š” ์™„๋ฃŒํ•˜๋Š” ๋ฐ 5์ดˆ๊ฐ€ ๊ฑธ๋ฆฌ๋Š” ๋น„๋™๊ธฐ ํ•˜์œ„ ์ž‘์—…์„ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค. snowflake.snowpark.AsyncJob.result ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ํ”„๋กœ์‹œ์ €๊ฐ€ ๋Œ€๊ธฐํ•˜๋‹ค๊ฐ€ ์ž‘์—…์ด ์™„๋ฃŒ๋˜๋ฉด ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

CREATE OR REPLACE PROCEDURE blockUntilDone()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(5)").collect_nowait()
    return async_job.result()
$$;
Copy

๋‹ค์Œ ์ฝ”๋“œ๋Š” 5์ดˆ๋ฅผ ๊ธฐ๋‹ค๋ฆฐ ํ›„ ๋ฐ˜ํ™˜๋˜๋Š” blockUntilDone ํ”„๋กœ์‹œ์ €๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

CALL blockUntilDone();
Copy
+------------------------------------------+
| blockUntilDone                               |
|------------------------------------------|
| [Row(SYSTEM$WAIT(5)='waited 5 seconds')] |
+------------------------------------------+

์™„๋ฃŒ๋˜์ง€ ์•Š์€ ๋น„๋™๊ธฐ ํ•˜์œ„ ์ž‘์—…์˜ ๊ฒฐ๊ณผ๋ฅผ ์š”์ฒญํ•œ ํ›„ ์˜ค๋ฅ˜ ๋ฐ˜ํ™˜ํ•˜๊ธฐยถ

๋‹ค์Œ ์˜ˆ์ œ์—์„œ, earlyReturn ํ”„๋กœ์‹œ์ €๋Š” ์™„๋ฃŒํ•˜๋Š” ๋ฐ 60์ดˆ๊ฐ€ ๊ฑธ๋ฆฌ๋Š” ๋น„๋™๊ธฐ ํ•˜์œ„ ์ž‘์—…์„ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฐ ๋‹ค์Œ ํ”„๋กœ์‹œ์ €๋Š” ์ž‘์—…์ด ์™„๋ฃŒ๋˜๊ธฐ ์ „์— ์ž‘์—… ๊ฒฐ๊ณผ์—์„œ DataFrame ์˜ ๋ฐ˜ํ™˜์„ ์‹œ๋„ํ•ฉ๋‹ˆ๋‹ค. ๊ฒฐ๊ณผ์ ์œผ๋กœ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•ฉ๋‹ˆ๋‹ค.

CREATE OR REPLACE PROCEDURE earlyReturn()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS CALLER
AS $$
def async_handler(session):
    async_job = session.sql("select system$wait(60)").collect_nowait()
    df = async_job.to_df()
    try:
        df.collect()
    except Exception as ex:
        return 'Error: (02000): Result for query <UUID> has expired'
$$;
Copy

๋‹ค์Œ ์ฝ”๋“œ๋Š” earlyReturn ํ”„๋กœ์‹œ์ €๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ์˜ค๋ฅ˜๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

CALL earlyReturn();
Copy
+------------------------------------------------------------+
| earlyReturn                                                 |
|------------------------------------------------------------|
| Error: (02000): Result for query <UUID> has expired        |
+------------------------------------------------------------+

ํ•˜์œ„ ์ž‘์—…์ด ์™„๋ฃŒ๋˜๊ธฐ ์ „์— ์ƒ์œ„ ์ž‘์—…์„ ์™„๋ฃŒํ•˜๊ณ  ํ•˜์œ„ ์ž‘์—… ์ทจ์†Œํ•˜๊ธฐยถ

๋‹ค์Œ ์˜ˆ์ œ์—์„œ, earlyCancelJob ํ”„๋กœ์‹œ์ €๋Š” ๋น„๋™๊ธฐ ํ•˜์œ„ ์ž‘์—…์„ ์‹คํ–‰ํ•˜์—ฌ ํ…Œ์ด๋ธ”์— ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฝ์ž…ํ•˜๊ณ  ์™„๋ฃŒํ•˜๋Š” ๋ฐ 10์ดˆ๊ฐ€ ๊ฑธ๋ฆฝ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ์ƒ์œ„ ์ž‘์—… async_handler ๋Š” ํ•˜์œ„ ์ž‘์—…์ด ์™„๋ฃŒ๋˜๊ธฐ ์ „์— ๋ฐ˜ํ™˜๋˜์–ด ํ•˜์œ„ ์ž‘์—…์ด ์ทจ์†Œ๋ฉ๋‹ˆ๋‹ค.

CREATE OR REPLACE PROCEDURE earlyCancelJob()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER='async_handler'
EXECUTE AS OWNER
AS $$
def async_handler(session):
    async_job = session.sql("insert into test_tb (select system$wait(10))").collect_nowait()
$$;
Copy

๋‹ค์Œ ์ฝ”๋“œ๋Š” earlyCancelJob ํ”„๋กœ์‹œ์ €๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฐ ๋‹ค์Œ test_tb ํ…Œ์ด๋ธ”์„ ์ฟผ๋ฆฌํ•˜๋ฉฐ, ์ด ์ž‘์—…์€ ์ทจ์†Œ๋œ ํ•˜์œ„ ์ž‘์—…์—์„œ ์‚ฝ์ž…๋œ ๋ฐ์ดํ„ฐ๊ฐ€ ์—†๊ธฐ ๋•Œ๋ฌธ์— ๊ฒฐ๊ณผ๋ฅผ ๋ฐ˜ํ™˜ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

CALL earlyCancelJob();
SELECT * FROM test_tb;
Copy
+----+
| C1 |
|----|
+----+