Scala UDF ์ฒ๋ฆฌ๊ธฐ์ ์ยถ
์ด ํญ๋ชฉ์๋ Scala๋ก ์์ฑ๋ UDF ์ฒ๋ฆฌ๊ธฐ ์ฝ๋์ ๊ฐ๋จํ ์๊ฐ ๋์ ์์ต๋๋ค.
Scala๋ฅผ ์ฌ์ฉํ์ฌ ์ค์นผ๋ผ UDF ์ฒ๋ฆฌ๊ธฐ๋ฅผ ๋ง๋๋ ๋ฐฉ๋ฒ์ ๋ํ ์์ธํ ๋ด์ฉ์ Scala๋ก ์ค์นผ๋ผ UDF ์์ฑํ๊ธฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค. ์ผ๋ฐ์ ์ธ ์ฝ๋ฉ ์ง์นจ์ ์ผ๋ฐ Scala UDF ์ฒ๋ฆฌ๊ธฐ ์ฝ๋ฉ ์ง์นจ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
๊ฐ๋จํ ์ธ๋ผ์ธ Scala UDF ๋ง๋ค๊ธฐ ๋ฐ ํธ์ถํ๊ธฐยถ
๋ค์ ๋ฌธ์ ์ธ๋ผ์ธ Scala UDF๋ฅผ ๋ง๋ค๊ณ ํธ์ถํฉ๋๋ค. ์ด ์ฝ๋๋ ์ด์ ์ ๋ฌ๋ VARCHAR ๋ฅผ ๋ฐํํฉ๋๋ค.
์ด ํจ์๋ ์
๋ ฅ ๊ฐ์ด NULL์ธ ๊ฒฝ์ฐ์๋ ํจ์๊ฐ ํธ์ถ๋จ์ ๋ํ๋ด๊ธฐ ์ํด ์ ํ์ CALLED ON NULL INPUT
์ ๋ก ์ ์ธ๋ฉ๋๋ค. (์ด ํจ์๋ ์ด ์ ์ ํฌํจํ๊ฑฐ๋ ํฌํจํ์ง ์๊ณ NULL์ ๋ฐํํ์ง๋ง, ์ฌ์ฉ์๋ ๋น ๋ฌธ์์ด์ ๋ฐํํ๋ ๋ฑ ๋ค๋ฅธ ๋ฐฉ์์ผ๋ก NULL์ ์ฒ๋ฆฌํ๋๋ก ์ฝ๋๋ฅผ ์์ ํ ์ ์์ต๋๋ค.)
UDF ๋ง๋ค๊ธฐยถ
CREATE OR REPLACE FUNCTION echo_varchar(x VARCHAR)
RETURNS VARCHAR
LANGUAGE SCALA
CALLED ON NULL INPUT
RUNTIME_VERSION = 2.12
HANDLER='Echo.echoVarchar'
AS
$$
class Echo {
def echoVarchar(x : String): String = {
return x
}
}
$$;
UDF ํธ์ถํ๊ธฐยถ
SELECT echo_varchar('Hello');
์ธ๋ผ์ธ Scala UDF์ NULL ์ ๋ฌํ๊ธฐยถ
์ด๋ ์์์ ์ ์ํ echo_varchar()
UDF๋ฅผ ์ฌ์ฉํฉ๋๋ค. SQL NULL
๊ฐ์ ์์์ ์ผ๋ก Scala Null ๋ก ๋ณํ๋๊ณ ํด๋น Scala Null
์ด ๋ฐํ๋์ด ์์์ ์ผ๋ก SQL NULL
๋ก ๋ค์ ๋ณํ๋ฉ๋๋ค.
UDF๋ฅผ ํธ์ถํฉ๋๋ค.
SELECT echo_varchar(NULL);
์ธ๋ผ์ธ UDF์์ ๋ช ์์ ์ผ๋ก NULL ๋ฐํํ๊ธฐยถ
๋ค์ ์ฝ๋๋ ๋ช
์์ ์ผ๋ก NULL ๊ฐ์ ๋ฐํํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ์ค๋๋ค. Scala ๊ฐ Null
์ SQL NULL
๋ก ๋ณํ๋ฉ๋๋ค.
UDF ๋ง๋ค๊ธฐยถ
CREATE OR REPLACE FUNCTION return_a_null()
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER='TemporaryTestLibrary.returnNull'
AS
$$
class TemporaryTestLibrary {
def returnNull(): String = {
return null
}
}
$$;
UDF ํธ์ถํ๊ธฐยถ
SELECT return_a_null();
์ธ๋ผ์ธ Scala UDF์ OBJECT ์ ๋ฌํ๊ธฐยถ
๋ค์ ์์์๋ SQL OBJECT ๋ฐ์ดํฐ ํ์
๊ณผ ํด๋น Scala ๋ฐ์ดํฐ ํ์
(Map[String, String]
)์ ์ฌ์ฉํ๊ณ OBJECT์์ ๊ฐ์ ์ถ์ถํฉ๋๋ค. ์ด ์๋ ๋ํ ์ฌ๋ฌ ๋งค๊ฐ ๋ณ์๋ฅผ Scala UDF์ ์ ๋ฌํ ์ ์์์ ๋ณด์ฌ์ค๋๋ค.
๋ค์๊ณผ ๊ฐ์ด OBJECT ํ์์ ์ด์ด ํฌํจ๋ ํ ์ด๋ธ์ ๋ง๋ค๊ณ ๋ก๋ฉํ์ญ์์ค.
CREATE TABLE objectives (o OBJECT);
INSERT INTO objectives SELECT PARSE_JSON('{"outer_key" : {"inner_key" : "inner_value"} }');
UDF ๋ง๋ค๊ธฐยถ
CREATE OR REPLACE FUNCTION extract_from_object(x OBJECT, key VARCHAR)
RETURNS VARIANT
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER='VariantLibrary.extract'
AS
$$
import scala.collection.immutable.Map
class VariantLibrary {
def extract(m: Map[String, String], key: String): String = {
return m(key)
}
}
$$;
UDF ํธ์ถํ๊ธฐยถ
SELECT extract_from_object(o, 'outer_key'),
extract_from_object(o, 'outer_key')['inner_key'] FROM OBJECTIVES;
์ธ๋ผ์ธ Scala UDF์ ARRAY ์ ๋ฌํ๊ธฐยถ
๋ค์ ์์์๋ SQL ARRAY ๋ฐ์ดํฐ ํ์ ์ ์ฌ์ฉํฉ๋๋ค.
UDF ๋ง๋ค๊ธฐยถ
CREATE OR REPLACE FUNCTION generate_greeting(greeting_words ARRAY)
RETURNS VARCHAR
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER='StringHandler.handleStrings'
AS
$$
class StringHandler {
def handleStrings(strings: Array[String]): String = {
return concatenate(strings)
}
private def concatenate(strings: Array[String]): String = {
var concatenated : String = ""
for (newString <- strings) {
concatenated = concatenated + " " + newString
}
return concatenated
}
}
$$;
Scala UDF๋ก ํ์ผ ์ฝ๊ธฐยถ
์ฒ๋ฆฌ๊ธฐ ์ฝ๋๋ก ํ์ผ์ ๋ด์ฉ์ ์ฝ์ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด ์ฒ๋ฆฌ๊ธฐ๋ก ๋น์ ํ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ธฐ ์ํด ํ์ผ์ ์ฝ์ ์ ์์ต๋๋ค.
ํ์ผ์ ์ฒ๋ฆฌ๊ธฐ์ ์ฌ์ฉํ ์ ์๋ Snowflake ์คํ ์ด์ง์ ์์ด์ผ ํฉ๋๋ค.
์คํ
์ด์ง๋ ํ์ผ์ ๋ด์ฉ์ ์ฝ๊ธฐ ์ํด ์ฒ๋ฆฌ๊ธฐ๋ SnowflakeFile
ํด๋์ค ๋๋ InputStream
ํด๋์ค์ ๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ ๋์ ์ผ๋ก ์ง์ ๋ ํ์ผ์ ์ฝ์ ์ ์์ต๋๋ค.
ํธ์ถ์๊ฐ ์ง์ ํ ํ์ผ์ ์ก์ธ์คํด์ผ ํ๋ ๊ฒฝ์ฐ ์ด๋ ๊ฒ ํ ์ ์์ต๋๋ค. ์์ธํ ๋ด์ฉ์ ์ด ํญ๋ชฉ์ ๋ค์ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
SnowflakeFile ์ ์ฌ์ฉํ์ฌ ๋์ ์ผ๋ก ์ง์ ๋ ํ์ผ ์ฝ๊ธฐ
InputStream ์ ์ฌ์ฉํ์ฌ ๋์ ์ผ๋ก ์ง์ ๋ ํ์ผ ์ฝ๊ธฐ
SnowflakeFile
์ ๋ค์ ํ์ ์ค๋ช
๋ ๊ฒ์ฒ๋ผ InputStream
๊ณผ ํจ๊ป ์ฌ์ฉํ ์ ์๋ ๊ธฐ๋ฅ์ ์ ๊ณตํฉ๋๋ค.
ํด๋์ค |
์ ๋ ฅ |
์ฐธ๊ณ |
---|---|---|
|
URL ํ์:
ํ์ผ์ ๋ช ๋ช ๋ ๋ด๋ถ ์คํ ์ด์ง ๋๋ ์ธ๋ถ ์คํ ์ด์ง์ ์์ด์ผ ํฉ๋๋ค. |
ํ์ผ ํฌ๊ธฐ์ ๊ฐ์ ์ถ๊ฐ ํ์ผ ํน์ฑ์ ์ฝ๊ฒ ์ก์ธ์คํ ์ ์์ต๋๋ค. |
|
URL ํ์:
ํ์ผ์ ๋ช ๋ช ๋ ๋ด๋ถ ์คํ ์ด์ง ๋๋ ์ธ๋ถ ์คํ ์ด์ง์ ์์ด์ผ ํฉ๋๋ค. |
์ฐธ๊ณ
UDF ์์ ์๋ ์์น๊ฐ ๋ฒ์ ์ง์ ๋ URL์ด ์๋ ๋ชจ๋ ํ์ผ์ ์ก์ธ์คํ ์ ์์ด์ผ ํฉ๋๋ค. ์ฒ๋ฆฌ๊ธฐ ์ฝ๋๊ฐ ์ requireScopedUrl
๋งค๊ฐ ๋ณ์์ ๋ํ boolean
๊ฐ์ผ๋ก SnowflakeFile.newInstance
๋ฉ์๋๋ฅผ ํธ์ถํ๋๋ก ํ์ฌ ์ด๋ค ์คํ
์ด์ง๋ ํ์ผ์ ์ฝ์ ์ ์์ต๋๋ค.
๋ค์ ์์์๋ ๋ฒ์๊ฐ ์ง์ ๋ URL์ด ํ์ํ์ง ์์์ ์ง์ ํ๋ฉด์ SnowflakeFile.newInstance
๋ฅผ ์ฌ์ฉํฉ๋๋ค.
var filename = "@my_stage/filename.txt"
var sfFile = SnowflakeFile.newInstance(filename, false)
SnowflakeFile
์ ์ฌ์ฉํ์ฌ ๋์ ์ผ๋ก ์ง์ ๋ ํ์ผ ์ฝ๊ธฐยถ
SnowflakeFile
ํด๋์ค์ ๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ์ฒ๋ฆฌ๊ธฐ ์ฝ๋๋ก ์คํ
์ด์ง์์ ํ์ผ์ ์ฝ์ ์ ์์ต๋๋ค. SnowflakeFile
ํด๋์ค๋ Snowflake์ Scala UDF ์ฒ๋ฆฌ๊ธฐ๊ฐ ์ฌ์ฉํ ์ ์๋ ํด๋์ค ๊ฒฝ๋ก์ ํฌํจ๋ฉ๋๋ค.
์ฐธ๊ณ
ํ์ผ ์ฝ์ ๊ณต๊ฒฉ์ ๋ํ ์ฝ๋์ ํ๋ ฅ์ฑ์ ๋์ด๋ ค๋ฉด ํ์ผ์ ์์น๋ฅผ UDF์ ์ ๋ฌํ ๋, ํนํ ํจ์ ํธ์ถ์๊ฐ ์์ ์์ด๊ธฐ๋ ํ์ง ์์ ๋๋ ํญ์ ๋ฒ์๊ฐ ์ง์ ๋ URL์ ์ฌ์ฉํ์ญ์์ค. ๊ธฐ๋ณธ ์ ๊ณต ํจ์ BUILD_SCOPED_FILE_URL ์ ์ฌ์ฉํ์ฌ SQL์์ ๋ฒ์๊ฐ ์ง์ ๋ URL์ ์์ฑํ ์ ์์ต๋๋ค. BUILD_SCOPED_FILE_URL ์ ๊ธฐ๋ฅ์ ๋ํ ์์ธํ ๋ด์ฉ์ ๋น์ ํ ๋ฐ์ดํฐ ์๊ฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
UDF ์ฝ๋๋ฅผ ๋ก์ปฌ์์ ๊ฐ๋ฐํ๋ ค๋ฉด SnowflakeFile
์ด ํฌํจ๋ Snowpark JAR์ ์ฝ๋์ ํด๋์ค ๊ฒฝ๋ก์ ์ถ๊ฐํ์ญ์์ค. snowpark.jar
์ ๋ํ ์ ๋ณด๋ Snowpark Scala๋ฅผ ์ํ ๊ฐ๋ฐ ํ๊ฒฝ ์ค์ ํ๊ธฐ ์น์
์ ์ฐธ์กฐํ์ญ์์ค. Snowpark ํด๋ผ์ด์ธํธ ์ ํ๋ฆฌ์ผ์ด์
์ ์ด ํด๋์ค๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค.
SnowflakeFile
์ ์ฌ์ฉํ ๋๋ CREATE FUNCTION ๋ฌธ์ ํฌํจํ SQL์์์ฒ๋ผ UDF๋ฅผ ์์ฑํ ๋ ์คํ
์ด์ง๋ ํ์ผ์ด๋ SnowflakeFile
์ด ํฌํจ๋ JAR์ IMPORTS ์ ๋ก๋ ์ง์ ํ ํ์๊ฐ ์์ต๋๋ค.
UDF ๋ง๋ค๊ธฐยถ
๋ค์ ์์ ์ ์ฝ๋์์๋ SnowflakeFile
์ ์ฌ์ฉํ์ฌ ์ง์ ๋ ์คํ
์ด์ง ์์น์์ ํ์ผ์ ์ฝ์ต๋๋ค. getInputStream
๋ฉ์๋์ InputStream
์ ์ฌ์ฉํ์ฌ ํ์ผ์ ๋ด์ฉ์ String
๋ณ์๋ก ์ฝ์ต๋๋ค.
CREATE OR REPLACE FUNCTION sum_total_sales_snowflake_file(file STRING)
RETURNS INTEGER
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
PACKAGES=('com.snowflake:snowpark:latest')
HANDLER='SalesSum.sumTotalSales'
AS
$$
import java.io.InputStream
import java.io.IOException
import java.nio.charset.StandardCharsets
import com.snowflake.snowpark_java.types.SnowflakeFile
object SalesSum {
@throws(classOf[IOException])
def sumTotalSales(filePath: String): Int = {
var total = -1
// Use a SnowflakeFile instance to read sales data from a stage.
val file = SnowflakeFile.newInstance(filePath)
val stream = file.getInputStream()
val contents = new String(stream.readAllBytes(), StandardCharsets.UTF_8)
// Omitted for brevity: code to retrieve sales data from JSON and assign it to the total variable.
return total
}
}
$$;
UDF ํธ์ถํ๊ธฐยถ
SELECT sum_total_sales_input_stream(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));
InputStream
์ ์ฌ์ฉํ์ฌ ๋์ ์ผ๋ก ์ง์ ๋ ํ์ผ ์ฝ๊ธฐยถ
์ฒ๋ฆฌ๊ธฐ ํจ์์ ์ธ์๋ฅผ InputStream
๋ณ์๋ก ๋ง๋ค์ด ํ์ผ ๋ด์ฉ์ java.io.InputStream
์ผ๋ก ์ง์ ์ฝ์ ์ ์์ต๋๋ค. ์ด๊ฒ์ ํจ์์ ํธ์ถ์๊ฐ ํ์ผ ๊ฒฝ๋ก๋ฅผ ์ธ์๋ก ์ ๋ฌํ๊ณ ์ถ์ ๋ ์ ์ฉํ ์ ์์ต๋๋ค.
์ฐธ๊ณ
ํ์ผ ์ฃผ์ ๊ณต๊ฒฉ์ ๋ํ ์ฝ๋ ๋ณต์๋ ฅ์ ๋์ด๋ ค๋ฉด ํ์ผ ์์น๋ฅผ UDF์ ์ ๋ฌํ ๋ ๋ฒ์๊ฐ ์ง์ ๋ URL์ด ํ์ํฉ๋๋ค. ๊ธฐ๋ณธ ์ ๊ณต ํจ์ BUILD_SCOPED_FILE_URL ์ ์ฌ์ฉํ์ฌ SQL์์ ๋ฒ์๊ฐ ์ง์ ๋ URL์ ์์ฑํ ์ ์์ต๋๋ค. BUILD_SCOPED_FILE_URL ์ ๊ธฐ๋ฅ์ ๋ํ ์์ธํ ๋ด์ฉ์ ๋น์ ํ ๋ฐ์ดํฐ ์๊ฐ ์น์ ์ ์ฐธ์กฐํ์ญ์์ค.
UDF ๋ง๋ค๊ธฐยถ
๋ค์ ์์ ์ ์ฝ๋์๋ InputStream
์ ๋ฐ์ Int
๋ฅผ ๋ฐํํ๋ ์ฒ๋ฆฌ๊ธฐ ํจ์ sumTotalSales
๊ฐ ์์ต๋๋ค. ๋ฐํ์์ Snowflake๋ file
๋ณ์์ ๊ฒฝ๋ก์ ์๋ ํ์ผ์ ๋ด์ฉ์ stream
์ธ์ ๋ณ์์ ์๋์ผ๋ก ํ ๋นํฉ๋๋ค.
CREATE OR REPLACE FUNCTION sum_total_sales_input_stream(file STRING)
RETURNS NUMBER
LANGUAGE SCALA
RUNTIME_VERSION = 2.12
HANDLER = 'SalesSum.sumTotalSales'
PACKAGES = ('com.snowflake:snowpark:latest')
AS $$
import com.snowflake.snowpark.types.Variant
import java.io.InputStream
import java.io.IOException
import java.nio.charset.StandardCharsets
object SalesSum {
@throws(classOf[IOException])
def sumTotalSales(stream: InputStream): Int = {
val total = -1
val contents = new String(stream.readAllBytes(), StandardCharsets.UTF_8)
// Omitted for brevity: code to retrieve sales data from JSON and assign it to the total variable.
return total
}
}
$$;
UDF ํธ์ถํ๊ธฐยถ
SELECT sum_total_sales_input_stream(BUILD_SCOPED_FILE_URL('@sales_data_stage', '/car_sales.json'));