์ผ๋ถ Google ์ ๊ณต Dataflow ํ ํ๋ฆฟ์์๋ ์ฌ์ฉ์ ์ ์ ํจ์(UDF)๋ฅผ ์ง์ํฉ๋๋ค. UDF๋ฅผ ์ฌ์ฉํ๋ฉด ํ ํ๋ฆฟ ์ฝ๋๋ฅผ ์์ ํ์ง ์๊ณ ํ ํ๋ฆฟ ๊ธฐ๋ฅ์ ํ์ฅํ ์ ์์ต๋๋ค.
๊ฐ์
UDF๋ฅผ ๋ง๋ค๋ ค๋ฉด ํ ํ๋ฆฟ์ ๋ฐ๋ผ JavaScript ํจ์ ๋๋ Python ํจ์๋ฅผ ์์ฑํฉ๋๋ค. UDF ์ฝ๋ ํ์ผ์ Cloud Storage์ ์ ์ฅํ๊ณ ์์น๋ฅผ ํ ํ๋ฆฟ ๋งค๊ฐ๋ณ์๋ก ์ง์ ํฉ๋๋ค. ๊ฐ ์ ๋ ฅ ์์์ ๋ํด ํ ํ๋ฆฟ์ด ํจ์๋ฅผ ํธ์ถํฉ๋๋ค. ์ด ํจ์๋ ์์๋ฅผ ๋ณํํ๊ฑฐ๋ ๋ค๋ฅธ ์ปค์คํ ๋ก์ง์ ์ํํ๊ณ ๊ฒฐ๊ณผ๋ฅผ ๋ค์ ํ ํ๋ฆฟ์ ๋ฐํํฉ๋๋ค.
์๋ฅผ ๋ค์ด UDF๋ฅผ ์ฌ์ฉํ์ฌ ๋ค์ ์์ ์ ์ํํ ์ ์์ต๋๋ค.
- ํ๊ฒ ์คํค๋ง์ ์ผ์นํ๋๋ก ์ ๋ ฅ ๋ฐ์ดํฐ ํ์์ ๋ค์ ์ง์ ํฉ๋๋ค.
- ๋ฏผ๊ฐํ ์ ๋ณด๋ฅผ ์์ ํฉ๋๋ค.
- ์ถ๋ ฅ์์ ์ผ๋ถ ์์๋ฅผ ํํฐ๋งํฉ๋๋ค.
UDF ํจ์์ ์ ๋ ฅ์ JSON ๋ฌธ์์ด๋ก ์ง๋ ฌํ๋ ๋จ์ผ ๋ฐ์ดํฐ ์์์ ๋๋ค. ์ด ํจ์๋ ์ง๋ ฌํ๋ JSON ๋ฌธ์์ด์ ์ถ๋ ฅ์ผ๋ก ๋ฐํํฉ๋๋ค. ๋ฐ์ดํฐ ํ์์ ํ ํ๋ฆฟ์ ๋ฐ๋ผ ๋ค๋ฆ ๋๋ค. ์๋ฅผ ๋ค์ด Pub/Sub Subscription to BigQuery ํ ํ๋ฆฟ์์ ์ ๋ ฅ์ JSON ๊ฐ์ฒด๋ก ์ง๋ ฌํ๋ Pub/Sub ๋ฉ์์ง ๋ฐ์ดํฐ์ด๊ณ ์ถ๋ ฅ์ BigQuery ํ ์ด๋ธ ํ์ ๋ํ๋ด๋ ์ง๋ ฌํ๋ JSON ๊ฐ์ฒด์ ๋๋ค. ์์ธํ ๋ด์ฉ์ ๊ฐ ํ ํ๋ฆฟ์ ๋ฌธ์๋ฅผ ์ฐธ์กฐํ์ธ์.
UDF๋ฅผ ์ฌ์ฉํ์ฌ ํ ํ๋ฆฟ ์คํ
UDF๋ก ํ ํ๋ฆฟ์ ์คํํ๋ ค๋ฉด ํ ํ๋ฆฟ ๋งค๊ฐ๋ณ์๋ก JavaScript ํ์ผ์ Cloud Storage ์์น์ ํจ์ ์ด๋ฆ์ ์ง์ ํฉ๋๋ค.
์ผ๋ถ Google ์ ๊ณต ํ ํ๋ฆฟ์ ์ฌ์ฉํ๋ฉด ๋ค์๊ณผ ๊ฐ์ด Google Cloud ์ฝ์์์ ์ง์ UDF๋ฅผ ๋ง๋ค ์ ์์ต๋๋ค.
Google Cloud ์ฝ์์ Dataflow ํ์ด์ง๋ก ์ด๋ํฉ๋๋ค.
add_boxํ ํ๋ฆฟ์์ ์์ ๋ง๋ค๊ธฐ๋ฅผ ํด๋ฆญํฉ๋๋ค.
์คํํ๋ ค๋ Google ์ ๊ณต ํ ํ๋ฆฟ์ ์ ํํฉ๋๋ค.
์ ํ์ ๋งค๊ฐ๋ณ์๋ฅผ ํผ์นฉ๋๋ค. ํ ํ๋ฆฟ์ด UDF๋ฅผ ์ง์ํ๋ ๊ฒฝ์ฐ UDF์ Cloud Storage ์์น์ ๋ํ ๋งค๊ฐ๋ณ์์ ํจ์ ์ด๋ฆ์ ๋ค๋ฅธ ๋งค๊ฐ๋ณ์๊ฐ ํฌํจ๋ฉ๋๋ค.
ํ ํ๋ฆฟ ๋งค๊ฐ๋ณ์ ์์ ์๋ UDF ๋ง๋ค๊ธฐ๋ฅผ ํด๋ฆญํฉ๋๋ค.
์ฌ์ฉ์ ์ ์ ํจ์(UDF) ์ ํ ๋๋ ๋ง๋ค๊ธฐ ํจ๋์์ ๋ค์ ์์ ์ ์ํํฉ๋๋ค.
- ํ์ผ ์ด๋ฆ์ ์
๋ ฅํฉ๋๋ค. ์:
my_udf.js
- Cloud Storage ํด๋๋ฅผ ์ ํํฉ๋๋ค.
์:
gs://your-bucket/your-folder
- ์ธ๋ผ์ธ ์ฝ๋ ํธ์ง๊ธฐ๋ฅผ ์ฌ์ฉํ์ฌ ํจ์๋ฅผ ์์ฑํฉ๋๋ค. ํธ์ง๊ธฐ์๋ ์ถ๋ฐ์ ์ผ๋ก ์ฌ์ฉํ ์ ์๋ ์์ฉ๊ตฌ ์ฝ๋๊ฐ ์๋ ์ ๋ ฅ๋ฉ๋๋ค.
UDF ๋ง๋ค๊ธฐ๋ฅผ ํด๋ฆญํฉ๋๋ค.
Google Cloud ์ฝ์์ด UDF ํ์ผ์ ์ ์ฅํ๊ณ Cloud Storage ์์น๋ฅผ ์๋ ์ ๋ ฅํฉ๋๋ค.
ํด๋น ํ๋์ ํจ์ ์ด๋ฆ์ ์ ๋ ฅํฉ๋๋ค.
- ํ์ผ ์ด๋ฆ์ ์
๋ ฅํฉ๋๋ค. ์:
JavaScript UDF ์์ฑ
๋ค์ ์ฝ๋๋ ์์ํ ์ ์๋ ๋ ธ์ต์ค(no-ops) JavaScript UDF๋ฅผ ๋ณด์ฌ์ค๋๋ค.
/*
* @param {string} inJson input JSON message (stringified)
* @return {?string} outJson output JSON message (stringified)
*/
function process(inJson) {
const obj = JSON.parse(inJson);
// Example data transformations:
// Add a field: obj.newField = 1;
// Modify a field: obj.existingField = '';
// Filter a record: return null;
return JSON.stringify(obj);
}
JavaScript ์ฝ๋๋ Nashorn JavaScript ์์ง์์ ์คํ๋ฉ๋๋ค. ๋ฐฐํฌํ๊ธฐ ์ ์ Nashorn ์์ง์์ UDF๋ฅผ ํ
์คํธํ๋ ๊ฒ์ด ์ข์ต๋๋ค. Nashorn ์์ง์ JavaScript์ Node.js ๊ตฌํ๊ณผ ์ ํํ๊ฒ ์ผ์นํ์ง ์์ต๋๋ค. ํํ ๋ฒํ๋ ์ค์๋ console.log()
๋๋ Number.isNaN()
๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์
๋๋ค. ์ด ๋์ ๋ชจ๋ Nashorn ์์ง์ ์ ์๋์ด ์์ง ์์ต๋๋ค.
JDK 11์ด ์ฌ์ ์ค์น๋ Cloud Shell์ ์ฌ์ฉํ์ฌ Nashorn ์์ง์์ UDF๋ฅผ ํ ์คํธํ ์ ์์ต๋๋ค. ๋ค์๊ณผ ๊ฐ์ด ๋ํํ ๋ชจ๋์์ Nashorn์ ์คํํฉ๋๋ค.
jjs --language=es6
Nashorn ๋ํํ ์ ธ์์ ๋ค์ ๋จ๊ณ๋ฅผ ์ํํฉ๋๋ค.
load
๋ฅผ ํธ์ถํ์ฌ UDF JavaScript ํ์ผ์ ๋ก๋ํฉ๋๋ค.- ํ์ดํ๋ผ์ธ์ ์์ ๋ฉ์์ง์ ๋ฐ๋ผ ์ ๋ ฅ JSON ๊ฐ์ฒด๋ฅผ ์ ์ํฉ๋๋ค.
JSON.stringify
ํจ์๋ฅผ ์ฌ์ฉํ์ฌ ์ ๋ ฅ์ JSON ๋ฌธ์์ด๋ก ์ง๋ ฌํํฉ๋๋ค.- UDF ํจ์๋ฅผ ํธ์ถํ์ฌ JSON ๋ฌธ์์ด์ ์ฒ๋ฆฌํฉ๋๋ค.
JSON.parse
๋ฅผ ํธ์ถํ์ฌ ์ถ๋ ฅ์ ์ญ์ง๋ ฌํํฉ๋๋ค.- ๊ฒฐ๊ณผ๋ฅผ ํ์ธํฉ๋๋ค.
์๋ฅผ ๋ค๋ฉด ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)
Python UDF ์์ฑ
๋ค์ ์ฝ๋๋ ์์ํ ์ ์๋ ๋ ธ์ต์ค(no-ops) Python UDF๋ฅผ ๋ณด์ฌ์ค๋๋ค.
import json
def process(value):
# Load the JSON string into a dictionary.
data = json.loads(value)
# Transform the data in some way.
data['new_field'] = 'new_value'
# Serialize the data back to JSON.
return json.dumps(data)
Python UDF๋ Python ๋ฐ Apache Beam์ ํ์ค ์ข ์ ํญ๋ชฉ ํจํค์ง๋ฅผ ์ง์ํฉ๋๋ค. ํ์ฌ ํจํค์ง๋ ์ฌ์ฉํ ์ ์์ต๋๋ค.
์ค๋ฅ ์ฒ๋ฆฌ
์ผ๋ฐ์ ์ผ๋ก UDF ์คํ ์ค์ ์ค๋ฅ๊ฐ ๋ฐ์ํ๋ฉด ์ค๋ฅ๊ฐ ๋ฐ๋ ๋ ํฐ ์์น์ ๊ธฐ๋ก๋ฉ๋๋ค. ์ธ๋ถ์ ๋ณด๋ ํ
ํ๋ฆฟ์ ๋ฐ๋ผ ๋ค๋ฆ
๋๋ค. ์๋ฅผ ๋ค์ด Pub/Sub Subscription to BigQuery ํ
ํ๋ฆฟ์ _error_records
ํ
์ด๋ธ์ ๋ง๋ค๊ณ ์ฌ๊ธฐ์ ์ค๋ฅ๋ฅผ ๊ธฐ๋กํฉ๋๋ค. ๋ฐํ์ UDF ์ค๋ฅ๋ ๊ตฌ๋ฌธ ์ค๋ฅ ๋๋ ๋ฐ๊ฒฌ๋์ง ์์ ์์ธ๋ก ์ธํด ๋ฐ์ํ ์ ์์ต๋๋ค. ๊ตฌ๋ฌธ ์ค๋ฅ๋ฅผ ํ์ธํ๋ ค๋ฉด ๋ก์ปฌ์์ UDF๋ฅผ ํ
์คํธํฉ๋๋ค.
์ฒ๋ฆฌ๋์ง ์์์ผ ํ๋ ์์์ ๋ํด ํ๋ก๊ทธ๋๋งคํฑ ๋ฐฉ์์ผ๋ก ์์ธ๋ฅผ ๋ฐ์์ํฌ ์ ์์ต๋๋ค. ์ด ๊ฒฝ์ฐ ํ ํ๋ฆฟ์์ ๋ฐ๋ ๋ ํฐ๋ฅผ ์ง์ํ๋ค๋ฉด ๋ฐ๋ ๋ ํฐ ์์น์ ์์๊ฐ ๊ธฐ๋ก๋ฉ๋๋ค. ์ด ์ ๊ทผ ๋ฐฉ์์ ๋ณด์ฌ์ฃผ๋ ์์๋ ๊ฒฝ๋ก ์ด๋ฒคํธ๋ฅผ ์ฐธ์กฐํ์ธ์.
์ฌ์ฉ ์ฌ๋ก ์์
์ด ์น์ ์์๋ ์ค์ ์ฌ์ฉ ์ฌ๋ก์ ๋ฐ๋ผ ์ผ๋ฐ์ ์ธ UDF ํจํด ๋ช ๊ฐ์ง๋ฅผ ์ค๋ช ํฉ๋๋ค.
์ด๋ฒคํธ ๋ณด๊ฐ
UDF๋ฅผ ์ฌ์ฉํ์ฌ ์ถ๊ฐ ์ปจํ ์คํธ ์ ๋ณด๋ฅผ ๋ด์ ์ ํ๋๋ก ์ด๋ฒคํธ๋ฅผ ๋ณด๊ฐํฉ๋๋ค.
์๋ฅผ ๋ค๋ฉด ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
function process(inJson) {
const data = JSON.parse(inJson);
// Add new field to track data source
data.source = "source1";
return JSON.stringify(data);
}
๋ณํ ์ด๋ฒคํธ
UDF๋ฅผ ์ฌ์ฉํ์ฌ ๋์์ ์์์ ๋ฐ๋ผ ์ ์ฒด ์ด๋ฒคํธ ํ์์ ๋ณํํฉ๋๋ค.
๋ค์ ์์์์๋ ๊ฐ๋ฅํ ๊ฒฝ์ฐ Cloud Logging ๋ก๊ทธ ํญ๋ชฉ(LogEntry
)์ ์๋ ๋ก๊ทธ ๋ฌธ์์ด๋ก ๋๋๋ฆฝ๋๋ค. ๋ก๊ทธ ์์ค์ ๋ฐ๋ผ ์๋ณธ ๋ก๊ทธ ๋ฌธ์์ด์ด textPayload
ํ๋์ ์ฑ์์ง๋ ๊ฒฝ์ฐ๊ฐ ์์ต๋๋ค. Cloud Logging์์ ์ ์ฒด LogEntry
๋ฅผ ๋ณด๋ด๋ ๋์ ์ด ํจํด์ ์ฌ์ฉํ๋ฉด ์์ ๋ก๊ทธ๋ฅผ ์๋ ํ์์ผ๋ก ๋ณด๋ผ ์ ์์ต๋๋ค.
function process(inJson) {
const data = JSON.parse(inJson);
if (data.textPayload) {
return data.textPayload; // Return string value, and skip JSON.stringify
}
return JSON.stringify(obj);
}
์ด๋ฒคํธ ๋ฐ์ดํฐ ์์ ๋๋ ์ญ์
UDF๋ฅผ ์ฌ์ฉํ์ฌ ์ด๋ฒคํธ์ ์ผ๋ถ๋ฅผ ์์ ํ๊ฑฐ๋ ์ญ์ ํฉ๋๋ค.
๋ค์ ์์์์๋ ํ๋ ์ด๋ฆ sensitiveField
๋ฅผ ์์ ํ์ฌ ๊ฐ์ ๊ต์ฒดํ๊ณ redundantField
๋ผ๋ ํ๋๋ฅผ ์์ ํ ์ญ์ ํฉ๋๋ค.
function process(inJson) {
const data = JSON.parse(inJson);
// Normalize existing field values
data.source = (data.source && data.source.toLowerCase()) || "unknown";
// Redact existing field values
if (data.sensitiveField) {
data.sensitiveField = "REDACTED";
}
// Remove existing fields
if (data.redundantField) {
delete(data.redundantField);
}
return JSON.stringify(data);
}
๊ฒฝ๋ก ์ด๋ฒคํธ
UDF๋ฅผ ์ฌ์ฉํ์ฌ ๋ค์ด์คํธ๋ฆผ ์ฑํฌ์์ ๊ฐ๋ณ ๋์์ผ๋ก ์ด๋ฒคํธ๋ฅผ ๋ผ์ฐํ ํฉ๋๋ค.
๋ค์ ์์์์๋ Pub/Sub to Splunk ํ ํ๋ฆฟ์ ๊ธฐ๋ฐ์ผ๋ก ๊ฐ ์ด๋ฒคํธ๋ฅผ ์ฌ๋ฐ๋ฅธ Splunk ์์ธ์ผ๋ก ๋ผ์ฐํ ํฉ๋๋ค. ์ฌ์ฉ์ ์ ์ ๋ก์ปฌ ํจ์๋ฅผ ํธ์ถํ์ฌ ์ด๋ฒคํธ๋ฅผ ์์ธ์ ๋งคํํฉ๋๋ค.
function process(inJson) {
const obj = JSON.parse(inJson);
// Set index programmatically for data segregation in Splunk
obj._metadata = {
index: splunkIndexLookup(obj)
}
return JSON.stringify(obj);
}
๋ค์ ์์์์๋ ํ ํ๋ฆฟ์ด ๋ฐ๋ ๋ ํฐ ํ๋ฅผ ์ง์ํ๋ค๊ณ ๊ฐ์ ํ๊ณ ์ธ์๋์ง ์์ ์ด๋ฒคํธ๋ฅผ ๋ฐ๋ ๋ ํฐ ํ๋ก ๋ผ์ฐํ ํฉ๋๋ค. (์์๋ Pub/Sub to JDBC ํ ํ๋ฆฟ์ ์ฐธ์กฐํ์ธ์.) ์ด ํจํด์ ์ฌ์ฉํ๋ฉด ๋์์ ์ฐ๊ธฐ ์ ์ ์๊ธฐ์น ๋ชปํ ํญ๋ชฉ์ ํํฐ๋งํ ์ ์์ต๋๋ค.
function process(inJson) {
const data = JSON.parse(inJson);
// Route unrecognized events to the deadletter topic
if (!data.hasOwnProperty('severity')) {
throw new Error("Unrecognized event. eventId='" + data.Id + "'");
}
return JSON.stringify(data);
์ด๋ฒคํธ ํํฐ๋ง
UDF๋ฅผ ์ฌ์ฉํ์ฌ ์ถ๋ ฅ์์ ์์น ์๊ฑฐ๋ ์ธ์๋์ง ์๋ ์ด๋ฒคํธ๋ฅผ ํํฐ๋งํฉ๋๋ค.
๋ค์ ์์๋ data.severity
๊ฐ "DEBUG"
์ธ ์ด๋ฒคํธ๋ฅผ ์ญ์ ํฉ๋๋ค.
function process(inJson) {
const data = JSON.parse(inJson);
// Drop events with certain field values
if (data.severity == "DEBUG") {
return null;
}
return JSON.stringify(data);
}
๋ค์ ๋จ๊ณ
- Google ์ ๊ณต ํ ํ๋ฆฟ
- Flex ํ ํ๋ฆฟ ๋น๋ ๋ฐ ์คํ
- ๊ธฐ๋ณธ ํ ํ๋ฆฟ ์คํ
- UDF๋ก Dataflow ํ ํ๋ฆฟ ํ์ฅ(๋ธ๋ก๊ทธ ๊ฒ์๋ฌผ)
- UDF ์์(GitHub)