Scala UDF ์ฒ˜๋ฆฌ๊ธฐ์˜ ์˜ˆยถ

์ด ํ•ญ๋ชฉ์—๋Š” Scala๋กœ ์ž‘์„ฑ๋œ UDF ์ฒ˜๋ฆฌ๊ธฐ ์ฝ”๋“œ์˜ ๊ฐ„๋‹จํ•œ ์˜ˆ๊ฐ€ ๋‚˜์™€ ์žˆ์Šต๋‹ˆ๋‹ค.

Scala๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์Šค์นผ๋ผ UDF ์ฒ˜๋ฆฌ๊ธฐ๋ฅผ ๋งŒ๋“œ๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ Scala๋กœ ์Šค์นผ๋ผ UDF ์ž‘์„ฑํ•˜๊ธฐ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค. ์ผ๋ฐ˜์ ์ธ ์ฝ”๋”ฉ ์ง€์นจ์€ ์ผ๋ฐ˜ Scala UDF ์ฒ˜๋ฆฌ๊ธฐ ์ฝ”๋”ฉ ์ง€์นจ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

๊ฐ„๋‹จํ•œ ์ธ๋ผ์ธ Scala UDF ๋งŒ๋“ค๊ธฐ ๋ฐ ํ˜ธ์ถœํ•˜๊ธฐยถ

๋‹ค์Œ ๋ฌธ์€ ์ธ๋ผ์ธ Scala UDF๋ฅผ ๋งŒ๋“ค๊ณ  ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค. ์ด ์ฝ”๋“œ๋Š” ์ด์— ์ „๋‹ฌ๋œ VARCHAR ๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

์ด ํ•จ์ˆ˜๋Š” ์ž…๋ ฅ ๊ฐ’์ด NULL์ธ ๊ฒฝ์šฐ์—๋„ ํ•จ์ˆ˜๊ฐ€ ํ˜ธ์ถœ๋จ์„ ๋‚˜ํƒ€๋‚ด๊ธฐ ์œ„ํ•ด ์„ ํƒ์  CALLED ON NULL INPUT ์ ˆ๋กœ ์„ ์–ธ๋ฉ๋‹ˆ๋‹ค. (์ด ํ•จ์ˆ˜๋Š” ์ด ์ ˆ์„ ํฌํ•จํ•˜๊ฑฐ๋‚˜ ํฌํ•จํ•˜์ง€ ์•Š๊ณ  NULL์„ ๋ฐ˜ํ™˜ํ•˜์ง€๋งŒ, ์‚ฌ์šฉ์ž๋Š” ๋นˆ ๋ฌธ์ž์—ด์„ ๋ฐ˜ํ™˜ํ•˜๋Š” ๋“ฑ ๋‹ค๋ฅธ ๋ฐฉ์‹์œผ๋กœ NULL์„ ์ฒ˜๋ฆฌํ•˜๋„๋ก ์ฝ”๋“œ๋ฅผ ์ˆ˜์ •ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.)

UDF ๋งŒ๋“ค๊ธฐยถ

CREATE OR REPLACE FUNCTION echo_varchar(x VARCHAR)
  RETURNS VARCHAR
  LANGUAGE SCALA
  CALLED ON NULL INPUT
  RUNTIME_VERSION = 2.12
  HANDLER='Echo.echoVarchar'
  AS
  $$
  class Echo {
    def echoVarchar(x : String): String = {
      return x
    }
  }
  $$;
Copy

UDF ํ˜ธ์ถœํ•˜๊ธฐยถ

SELECT echo_varchar('Hello');
Copy

์ธ๋ผ์ธ Scala UDF์— NULL ์ „๋‹ฌํ•˜๊ธฐยถ

์ด๋Š” ์œ„์—์„œ ์ •์˜ํ•œ echo_varchar() UDF๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. SQL NULL ๊ฐ’์€ ์•”์‹œ์ ์œผ๋กœ Scala Null ๋กœ ๋ณ€ํ™˜๋˜๊ณ  ํ•ด๋‹น Scala Null ์ด ๋ฐ˜ํ™˜๋˜์–ด ์•”์‹œ์ ์œผ๋กœ SQL NULL ๋กœ ๋‹ค์‹œ ๋ณ€ํ™˜๋ฉ๋‹ˆ๋‹ค.

UDF๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.

SELECT echo_varchar(NULL);
Copy

์ธ๋ผ์ธ UDF์—์„œ ๋ช…์‹œ์ ์œผ๋กœ NULL ๋ฐ˜ํ™˜ํ•˜๊ธฐยถ

๋‹ค์Œ ์ฝ”๋“œ๋Š” ๋ช…์‹œ์ ์œผ๋กœ NULL ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋ณด์—ฌ์ค๋‹ˆ๋‹ค. Scala ๊ฐ’ Null ์€ SQL NULL ๋กœ ๋ณ€ํ™˜๋ฉ๋‹ˆ๋‹ค.

UDF ๋งŒ๋“ค๊ธฐยถ

CREATE OR REPLACE FUNCTION return_a_null()
  RETURNS VARCHAR
  LANGUAGE SCALA
  RUNTIME_VERSION = 2.12
  HANDLER='TemporaryTestLibrary.returnNull'
  AS
  $$
  class TemporaryTestLibrary {
    def returnNull(): String = {
      return null
    }
  }
  $$;
Copy

UDF ํ˜ธ์ถœํ•˜๊ธฐยถ

SELECT return_a_null();
Copy

์ธ๋ผ์ธ Scala UDF์— OBJECT ์ „๋‹ฌํ•˜๊ธฐยถ

๋‹ค์Œ ์˜ˆ์—์„œ๋Š” SQL OBJECT ๋ฐ์ดํ„ฐ ํƒ€์ž…๊ณผ ํ•ด๋‹น Scala ๋ฐ์ดํ„ฐ ํƒ€์ž…(Map[String, String])์„ ์‚ฌ์šฉํ•˜๊ณ  OBJECT์—์„œ ๊ฐ’์„ ์ถ”์ถœํ•ฉ๋‹ˆ๋‹ค. ์ด ์˜ˆ๋Š” ๋˜ํ•œ ์—ฌ๋Ÿฌ ๋งค๊ฐœ ๋ณ€์ˆ˜๋ฅผ Scala UDF์— ์ „๋‹ฌํ•  ์ˆ˜ ์žˆ์Œ์„ ๋ณด์—ฌ์ค๋‹ˆ๋‹ค.

๋‹ค์Œ๊ณผ ๊ฐ™์ด OBJECT ํ˜•์‹์˜ ์—ด์ด ํฌํ•จ๋œ ํ…Œ์ด๋ธ”์„ ๋งŒ๋“ค๊ณ  ๋กœ๋”ฉํ•˜์‹ญ์‹œ์˜ค.

CREATE TABLE objectives (o OBJECT);
INSERT INTO objectives SELECT PARSE_JSON('{"outer_key" : {"inner_key" : "inner_value"} }');
Copy

UDF ๋งŒ๋“ค๊ธฐยถ

CREATE OR REPLACE FUNCTION extract_from_object(x OBJECT, key VARCHAR)
  RETURNS VARIANT
  LANGUAGE SCALA
  RUNTIME_VERSION = 2.12
  HANDLER='VariantLibrary.extract'
  AS
  $$
  import scala.collection.immutable.Map

  class VariantLibrary {
    def extract(m: Map[String, String], key: String): String = {
      return m(key)
    }
  }
  $$;
Copy

UDF ํ˜ธ์ถœํ•˜๊ธฐยถ

SELECT extract_from_object(o, 'outer_key'),
  extract_from_object(o, 'outer_key')['inner_key'] FROM OBJECTIVES;
Copy

์ธ๋ผ์ธ Scala UDF์— ARRAY ์ „๋‹ฌํ•˜๊ธฐยถ

๋‹ค์Œ ์˜ˆ์—์„œ๋Š” SQL ARRAY ๋ฐ์ดํ„ฐ ํƒ€์ž…์„ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

UDF ๋งŒ๋“ค๊ธฐยถ

CREATE OR REPLACE FUNCTION generate_greeting(greeting_words ARRAY)
  RETURNS VARCHAR
  LANGUAGE SCALA
  RUNTIME_VERSION = 2.12
  HANDLER='StringHandler.handleStrings'
  AS
  $$
  class StringHandler {
    def handleStrings(strings: Array[String]): String = {
      return concatenate(strings)
    }
    private def concatenate(strings: Array[String]): String = {
      var concatenated : String = ""
      for (newString <- strings)  {
          concatenated = concatenated + " " + newString
      }
      return concatenated
    }
  }
  $$;
Copy

Scala UDF๋กœ ํŒŒ์ผ ์ฝ๊ธฐยถ

์ฒ˜๋ฆฌ๊ธฐ ์ฝ”๋“œ๋กœ ํŒŒ์ผ์˜ ๋‚ด์šฉ์„ ์ฝ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ์ฒ˜๋ฆฌ๊ธฐ๋กœ ๋น„์ •ํ˜• ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ํŒŒ์ผ์„ ์ฝ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

ํŒŒ์ผ์€ ์ฒ˜๋ฆฌ๊ธฐ์— ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” Snowflake ์Šคํ…Œ์ด์ง€์— ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

์Šคํ…Œ์ด์ง•๋œ ํŒŒ์ผ์˜ ๋‚ด์šฉ์„ ์ฝ๊ธฐ ์œ„ํ•ด ์ฒ˜๋ฆฌ๊ธฐ๋Š” SnowflakeFile ํด๋ž˜์Šค ๋˜๋Š” InputStream ํด๋ž˜์Šค์˜ ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ๋™์ ์œผ๋กœ ์ง€์ •๋œ ํŒŒ์ผ์„ ์ฝ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

ํ˜ธ์ถœ์ž๊ฐ€ ์ง€์ •ํ•œ ํŒŒ์ผ์— ์•ก์„ธ์Šคํ•ด์•ผ ํ•˜๋Š” ๊ฒฝ์šฐ ์ด๋ ‡๊ฒŒ ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ž์„ธํ•œ ๋‚ด์šฉ์€ ์ด ํ•ญ๋ชฉ์˜ ๋‹ค์Œ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

SnowflakeFile ์€ ๋‹ค์Œ ํ‘œ์— ์„ค๋ช…๋œ ๊ฒƒ์ฒ˜๋Ÿผ InputStream ๊ณผ ํ•จ๊ป˜ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†๋Š” ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

ํด๋ž˜์Šค

์ž…๋ ฅ

์ฐธ๊ณ 

SnowflakeFile

URL ํ˜•์‹:

  • ํ•จ์ˆ˜์˜ ํ˜ธ์ถœ์ž๊ฐ€ ์†Œ์œ ์ž์ด๊ธฐ๋„ ํ•˜์ง€ ์•Š์€ ๊ฒฝ์šฐ ํŒŒ์ผ ์‚ฝ์ž… ๊ณต๊ฒฉ์˜ ์œ„ํ—˜์„ ์ค„์ด๊ธฐ ์œ„ํ•ด ๋ฒ”์œ„๊ฐ€ ์ง€์ •๋œ URL์ž…๋‹ˆ๋‹ค.

  • UDF ์†Œ์œ ์ž๊ฐ€ ์•ก์„ธ์Šคํ•  ์ˆ˜ ์žˆ๋Š” ํŒŒ์ผ์˜ ํŒŒ์ผ URL ๋˜๋Š” ๋ฌธ์ž์—ด ๊ฒฝ๋กœ์ž…๋‹ˆ๋‹ค.

ํŒŒ์ผ์€ ๋ช…๋ช…๋œ ๋‚ด๋ถ€ ์Šคํ…Œ์ด์ง€ ๋˜๋Š” ์™ธ๋ถ€ ์Šคํ…Œ์ด์ง€์— ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

ํŒŒ์ผ ํฌ๊ธฐ์™€ ๊ฐ™์€ ์ถ”๊ฐ€ ํŒŒ์ผ ํŠน์„ฑ์— ์‰ฝ๊ฒŒ ์•ก์„ธ์Šคํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

InputStream

URL ํ˜•์‹:

  • ํ•จ์ˆ˜์˜ ํ˜ธ์ถœ์ž๊ฐ€ ์†Œ์œ ์ž์ด๊ธฐ๋„ ํ•˜์ง€ ์•Š์€ ๊ฒฝ์šฐ ํŒŒ์ผ ์‚ฝ์ž… ๊ณต๊ฒฉ์˜ ์œ„ํ—˜์„ ์ค„์ด๊ธฐ ์œ„ํ•ด ๋ฒ”์œ„๊ฐ€ ์ง€์ •๋œ URL์ž…๋‹ˆ๋‹ค.

ํŒŒ์ผ์€ ๋ช…๋ช…๋œ ๋‚ด๋ถ€ ์Šคํ…Œ์ด์ง€ ๋˜๋Š” ์™ธ๋ถ€ ์Šคํ…Œ์ด์ง€์— ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

์ฐธ๊ณ 

UDF ์†Œ์œ ์ž๋Š” ์œ„์น˜๊ฐ€ ๋ฒ”์œ„ ์ง€์ •๋œ URL์ด ์•„๋‹Œ ๋ชจ๋“  ํŒŒ์ผ์— ์•ก์„ธ์Šคํ•  ์ˆ˜ ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ์ฒ˜๋ฆฌ๊ธฐ ์ฝ”๋“œ๊ฐ€ ์ƒˆ requireScopedUrl ๋งค๊ฐœ ๋ณ€์ˆ˜์— ๋Œ€ํ•œ boolean ๊ฐ’์œผ๋กœ SnowflakeFile.newInstance ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜๋„๋ก ํ•˜์—ฌ ์ด๋“ค ์Šคํ…Œ์ด์ง•๋œ ํŒŒ์ผ์„ ์ฝ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์—์„œ๋Š” ๋ฒ”์œ„๊ฐ€ ์ง€์ •๋œ URL์ด ํ•„์š”ํ•˜์ง€ ์•Š์Œ์„ ์ง€์ •ํ•˜๋ฉด์„œ SnowflakeFile.newInstance ๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

var filename = "@my_stage/filename.txt"
var sfFile = SnowflakeFile.newInstance(filename, false)
Copy

SnowflakeFile ์„ ์‚ฌ์šฉํ•˜์—ฌ ๋™์ ์œผ๋กœ ์ง€์ •๋œ ํŒŒ์ผ ์ฝ๊ธฐยถ

SnowflakeFile ํด๋ž˜์Šค์˜ ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ฒ˜๋ฆฌ๊ธฐ ์ฝ”๋“œ๋กœ ์Šคํ…Œ์ด์ง€์—์„œ ํŒŒ์ผ์„ ์ฝ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. SnowflakeFile ํด๋ž˜์Šค๋Š” Snowflake์˜ Scala UDF ์ฒ˜๋ฆฌ๊ธฐ๊ฐ€ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ํด๋ž˜์Šค ๊ฒฝ๋กœ์— ํฌํ•จ๋ฉ๋‹ˆ๋‹ค.

์ฐธ๊ณ 

ํŒŒ์ผ ์‚ฝ์ž… ๊ณต๊ฒฉ์— ๋Œ€ํ•œ ์ฝ”๋“œ์˜ ํƒ„๋ ฅ์„ฑ์„ ๋†’์ด๋ ค๋ฉด ํŒŒ์ผ์˜ ์œ„์น˜๋ฅผ UDF์— ์ „๋‹ฌํ•  ๋•Œ, ํŠนํžˆ ํ•จ์ˆ˜ ํ˜ธ์ถœ์ž๊ฐ€ ์†Œ์œ ์ž์ด๊ธฐ๋„ ํ•˜์ง€ ์•Š์„ ๋•Œ๋Š” ํ•ญ์ƒ ๋ฒ”์œ„๊ฐ€ ์ง€์ •๋œ URL์„ ์‚ฌ์šฉํ•˜์‹ญ์‹œ์˜ค. ๊ธฐ๋ณธ ์ œ๊ณต ํ•จ์ˆ˜ BUILD_SCOPED_FILE_URL ์„ ์‚ฌ์šฉํ•˜์—ฌ SQL์—์„œ ๋ฒ”์œ„๊ฐ€ ์ง€์ •๋œ URL์„ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. BUILD_SCOPED_FILE_URL ์˜ ๊ธฐ๋Šฅ์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ ๋น„์ •ํ˜• ๋ฐ์ดํ„ฐ ์†Œ๊ฐœ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

UDF ์ฝ”๋“œ๋ฅผ ๋กœ์ปฌ์—์„œ ๊ฐœ๋ฐœํ•˜๋ ค๋ฉด SnowflakeFile ์ด ํฌํ•จ๋œ Snowpark JAR์„ ์ฝ”๋“œ์˜ ํด๋ž˜์Šค ๊ฒฝ๋กœ์— ์ถ”๊ฐ€ํ•˜์‹ญ์‹œ์˜ค. snowpark.jar ์— ๋Œ€ํ•œ ์ •๋ณด๋Š” Snowpark Scala๋ฅผ ์œ„ํ•œ ๊ฐœ๋ฐœ ํ™˜๊ฒฝ ์„ค์ •ํ•˜๊ธฐ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค. Snowpark ํด๋ผ์ด์–ธํŠธ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์€ ์ด ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.

SnowflakeFile ์„ ์‚ฌ์šฉํ•  ๋•Œ๋Š” CREATE FUNCTION ๋ฌธ์„ ํฌํ•จํ•œ SQL์—์„œ์ฒ˜๋Ÿผ UDF๋ฅผ ์ƒ์„ฑํ•  ๋•Œ ์Šคํ…Œ์ด์ง•๋œ ํŒŒ์ผ์ด๋‚˜ SnowflakeFile ์ด ํฌํ•จ๋œ JAR์„ IMPORTS ์ ˆ๋กœ๋„ ์ง€์ •ํ•  ํ•„์š”๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.

UDF ๋งŒ๋“ค๊ธฐยถ

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ์—์„œ๋Š” SnowflakeFile ์„ ์‚ฌ์šฉํ•˜์—ฌ ์ง€์ •๋œ ์Šคํ…Œ์ด์ง€ ์œ„์น˜์—์„œ ํŒŒ์ผ์„ ์ฝ์Šต๋‹ˆ๋‹ค. getInputStream ๋ฉ”์„œ๋“œ์˜ InputStream ์„ ์‚ฌ์šฉํ•˜์—ฌ ํŒŒ์ผ์˜ ๋‚ด์šฉ์„ String ๋ณ€์ˆ˜๋กœ ์ฝ์Šต๋‹ˆ๋‹ค.

CREATE OR REPLACE FUNCTION sum_total_sales_snowflake_file(file STRING)
  RETURNS INTEGER
  LANGUAGE SCALA
  RUNTIME_VERSION = 2.12
  PACKAGES=('com.snowflake:snowpark:latest')
  HANDLER='SalesSum.sumTotalSales'
  AS
  $$
  import java.io.InputStream
  import java.io.IOException
  import java.nio.charset.StandardCharsets
  import com.snowflake.snowpark_java.types.SnowflakeFile

  object SalesSum {
    @throws(classOf[IOException])
    def sumTotalSales(filePath: String): Int = {
      var total = -1

      // Use a SnowflakeFile instance to read sales data from a stage.
      val file = SnowflakeFile.newInstance(filePath)
      val stream = file.getInputStream()
      val contents = new String(stream.readAllBytes(), StandardCharsets.UTF_8)

      // Omitted for brevity: code to retrieve sales data from JSON and assign it to the total variable.

      return total
    }
  }
  $$;
Copy

UDF ํ˜ธ์ถœํ•˜๊ธฐยถ

SELECT sum_total_sales_input_stream(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));
Copy

InputStream ์„ ์‚ฌ์šฉํ•˜์—ฌ ๋™์ ์œผ๋กœ ์ง€์ •๋œ ํŒŒ์ผ ์ฝ๊ธฐยถ

์ฒ˜๋ฆฌ๊ธฐ ํ•จ์ˆ˜์˜ ์ธ์ž๋ฅผ InputStream ๋ณ€์ˆ˜๋กœ ๋งŒ๋“ค์–ด ํŒŒ์ผ ๋‚ด์šฉ์„ java.io.InputStream ์œผ๋กœ ์ง์ ‘ ์ฝ์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๊ฒƒ์€ ํ•จ์ˆ˜์˜ ํ˜ธ์ถœ์ž๊ฐ€ ํŒŒ์ผ ๊ฒฝ๋กœ๋ฅผ ์ธ์ž๋กœ ์ „๋‹ฌํ•˜๊ณ  ์‹ถ์„ ๋•Œ ์œ ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ฐธ๊ณ 

ํŒŒ์ผ ์ฃผ์ž… ๊ณต๊ฒฉ์— ๋Œ€ํ•œ ์ฝ”๋“œ ๋ณต์›๋ ฅ์„ ๋†’์ด๋ ค๋ฉด ํŒŒ์ผ ์œ„์น˜๋ฅผ UDF์— ์ „๋‹ฌํ•  ๋•Œ ๋ฒ”์œ„๊ฐ€ ์ง€์ •๋œ URL์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค. ๊ธฐ๋ณธ ์ œ๊ณต ํ•จ์ˆ˜ BUILD_SCOPED_FILE_URL ์„ ์‚ฌ์šฉํ•˜์—ฌ SQL์—์„œ ๋ฒ”์œ„๊ฐ€ ์ง€์ •๋œ URL์„ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. BUILD_SCOPED_FILE_URL ์˜ ๊ธฐ๋Šฅ์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ ๋น„์ •ํ˜• ๋ฐ์ดํ„ฐ ์†Œ๊ฐœ ์„น์…˜์„ ์ฐธ์กฐํ•˜์‹ญ์‹œ์˜ค.

UDF ๋งŒ๋“ค๊ธฐยถ

๋‹ค์Œ ์˜ˆ์ œ์˜ ์ฝ”๋“œ์—๋Š” InputStream ์„ ๋ฐ›์•„ Int ๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋Š” ์ฒ˜๋ฆฌ๊ธฐ ํ•จ์ˆ˜ sumTotalSales ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ๋Ÿฐํƒ€์ž„์— Snowflake๋Š” file ๋ณ€์ˆ˜์˜ ๊ฒฝ๋กœ์— ์žˆ๋Š” ํŒŒ์ผ์˜ ๋‚ด์šฉ์„ stream ์ธ์ž ๋ณ€์ˆ˜์— ์ž๋™์œผ๋กœ ํ• ๋‹นํ•ฉ๋‹ˆ๋‹ค.

CREATE OR REPLACE FUNCTION sum_total_sales_input_stream(file STRING)
  RETURNS NUMBER
  LANGUAGE SCALA
  RUNTIME_VERSION = 2.12
  HANDLER = 'SalesSum.sumTotalSales'
  PACKAGES = ('com.snowflake:snowpark:latest')
  AS $$
  import com.snowflake.snowpark.types.Variant
  import java.io.InputStream
  import java.io.IOException
  import java.nio.charset.StandardCharsets
  object SalesSum {
    @throws(classOf[IOException])
    def sumTotalSales(stream: InputStream): Int = {
      val total = -1
      val contents = new String(stream.readAllBytes(), StandardCharsets.UTF_8)

      // Omitted for brevity: code to retrieve sales data from JSON and assign it to the total variable.

      return total
    }
  }
  $$;
Copy

UDF ํ˜ธ์ถœํ•˜๊ธฐยถ

SELECT sum_total_sales_input_stream(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));
Copy