Dataflow ํ…œํ”Œ๋ฆฟ์— ๋Œ€ํ•œ ์‚ฌ์šฉ์ž ์ •์˜ ํ•จ์ˆ˜ ๋งŒ๋“ค๊ธฐ

์ผ๋ถ€ Google ์ œ๊ณต Dataflow ํ…œํ”Œ๋ฆฟ์—์„œ๋Š” ์‚ฌ์šฉ์ž ์ •์˜ ํ•จ์ˆ˜(UDF)๋ฅผ ์ง€์›ํ•ฉ๋‹ˆ๋‹ค. UDF๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ํ…œํ”Œ๋ฆฟ ์ฝ”๋“œ๋ฅผ ์ˆ˜์ •ํ•˜์ง€ ์•Š๊ณ  ํ…œํ”Œ๋ฆฟ ๊ธฐ๋Šฅ์„ ํ™•์žฅํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๊ฐœ์š”

UDF๋ฅผ ๋งŒ๋“ค๋ ค๋ฉด ํ…œํ”Œ๋ฆฟ์— ๋”ฐ๋ผ JavaScript ํ•จ์ˆ˜ ๋˜๋Š” Python ํ•จ์ˆ˜๋ฅผ ์ž‘์„ฑํ•ฉ๋‹ˆ๋‹ค. UDF ์ฝ”๋“œ ํŒŒ์ผ์„ Cloud Storage์— ์ €์žฅํ•˜๊ณ  ์œ„์น˜๋ฅผ ํ…œํ”Œ๋ฆฟ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ์ง€์ •ํ•ฉ๋‹ˆ๋‹ค. ๊ฐ ์ž…๋ ฅ ์š”์†Œ์— ๋Œ€ํ•ด ํ…œํ”Œ๋ฆฟ์ด ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค. ์ด ํ•จ์ˆ˜๋Š” ์š”์†Œ๋ฅผ ๋ณ€ํ™˜ํ•˜๊ฑฐ๋‚˜ ๋‹ค๋ฅธ ์ปค์Šคํ…€ ๋กœ์ง์„ ์ˆ˜ํ–‰ํ•˜๊ณ  ๊ฒฐ๊ณผ๋ฅผ ๋‹ค์‹œ ํ…œํ”Œ๋ฆฟ์— ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด UDF๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋‹ค์Œ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  • ํƒ€๊ฒŸ ์Šคํ‚ค๋งˆ์™€ ์ผ์น˜ํ•˜๋„๋ก ์ž…๋ ฅ ๋ฐ์ดํ„ฐ ํ˜•์‹์„ ๋‹ค์‹œ ์ง€์ •ํ•ฉ๋‹ˆ๋‹ค.
  • ๋ฏผ๊ฐํ•œ ์ •๋ณด๋ฅผ ์ˆ˜์ •ํ•ฉ๋‹ˆ๋‹ค.
  • ์ถœ๋ ฅ์—์„œ ์ผ๋ถ€ ์š”์†Œ๋ฅผ ํ•„ํ„ฐ๋งํ•ฉ๋‹ˆ๋‹ค.

UDF ํ•จ์ˆ˜์˜ ์ž…๋ ฅ์€ JSON ๋ฌธ์ž์—ด๋กœ ์ง๋ ฌํ™”๋œ ๋‹จ์ผ ๋ฐ์ดํ„ฐ ์š”์†Œ์ž…๋‹ˆ๋‹ค. ์ด ํ•จ์ˆ˜๋Š” ์ง๋ ฌํ™”๋œ JSON ๋ฌธ์ž์—ด์„ ์ถœ๋ ฅ์œผ๋กœ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ ํ˜•์‹์€ ํ…œํ”Œ๋ฆฟ์— ๋”ฐ๋ผ ๋‹ค๋ฆ…๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด Pub/Sub Subscription to BigQuery ํ…œํ”Œ๋ฆฟ์—์„œ ์ž…๋ ฅ์€ JSON ๊ฐ์ฒด๋กœ ์ง๋ ฌํ™”๋œ Pub/Sub ๋ฉ”์‹œ์ง€ ๋ฐ์ดํ„ฐ์ด๊ณ  ์ถœ๋ ฅ์€ BigQuery ํ…Œ์ด๋ธ” ํ–‰์„ ๋‚˜ํƒ€๋‚ด๋Š” ์ง๋ ฌํ™”๋œ JSON ๊ฐ์ฒด์ž…๋‹ˆ๋‹ค. ์ž์„ธํ•œ ๋‚ด์šฉ์€ ๊ฐ ํ…œํ”Œ๋ฆฟ์˜ ๋ฌธ์„œ๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”.

UDF๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํ…œํ”Œ๋ฆฟ ์‹คํ–‰

UDF๋กœ ํ…œํ”Œ๋ฆฟ์„ ์‹คํ–‰ํ•˜๋ ค๋ฉด ํ…œํ”Œ๋ฆฟ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ JavaScript ํŒŒ์ผ์˜ Cloud Storage ์œ„์น˜์™€ ํ•จ์ˆ˜ ์ด๋ฆ„์„ ์ง€์ •ํ•ฉ๋‹ˆ๋‹ค.

์ผ๋ถ€ Google ์ œ๊ณต ํ…œํ”Œ๋ฆฟ์„ ์‚ฌ์šฉํ•˜๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์ด Google Cloud ์ฝ˜์†”์—์„œ ์ง์ ‘ UDF๋ฅผ ๋งŒ๋“ค ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  1. Google Cloud ์ฝ˜์†”์˜ Dataflow ํŽ˜์ด์ง€๋กœ ์ด๋™ํ•ฉ๋‹ˆ๋‹ค.

    Dataflow ํŽ˜์ด์ง€๋กœ ์ด๋™

  2. ํ…œํ”Œ๋ฆฟ์—์„œ ์ž‘์—… ๋งŒ๋“ค๊ธฐ๋ฅผ ํด๋ฆญํ•ฉ๋‹ˆ๋‹ค.

  3. ์‹คํ–‰ํ•˜๋ ค๋Š” Google ์ œ๊ณต ํ…œํ”Œ๋ฆฟ์„ ์„ ํƒํ•ฉ๋‹ˆ๋‹ค.

  4. ์„ ํƒ์  ๋งค๊ฐœ๋ณ€์ˆ˜๋ฅผ ํŽผ์นฉ๋‹ˆ๋‹ค. ํ…œํ”Œ๋ฆฟ์ด UDF๋ฅผ ์ง€์›ํ•˜๋Š” ๊ฒฝ์šฐ UDF์˜ Cloud Storage ์œ„์น˜์— ๋Œ€ํ•œ ๋งค๊ฐœ๋ณ€์ˆ˜์™€ ํ•จ์ˆ˜ ์ด๋ฆ„์˜ ๋‹ค๋ฅธ ๋งค๊ฐœ๋ณ€์ˆ˜๊ฐ€ ํฌํ•จ๋ฉ๋‹ˆ๋‹ค.

  5. ํ…œํ”Œ๋ฆฟ ๋งค๊ฐœ๋ณ€์ˆ˜ ์˜†์— ์žˆ๋Š” UDF ๋งŒ๋“ค๊ธฐ๋ฅผ ํด๋ฆญํ•ฉ๋‹ˆ๋‹ค.

  6. ์‚ฌ์šฉ์ž ์ •์˜ ํ•จ์ˆ˜(UDF) ์„ ํƒ ๋˜๋Š” ๋งŒ๋“ค๊ธฐ ํŒจ๋„์—์„œ ๋‹ค์Œ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

    1. ํŒŒ์ผ ์ด๋ฆ„์„ ์ž…๋ ฅํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ: my_udf.js
    2. Cloud Storage ํด๋”๋ฅผ ์„ ํƒํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ: gs://your-bucket/your-folder
    3. ์ธ๋ผ์ธ ์ฝ”๋“œ ํŽธ์ง‘๊ธฐ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํ•จ์ˆ˜๋ฅผ ์ž‘์„ฑํ•ฉ๋‹ˆ๋‹ค. ํŽธ์ง‘๊ธฐ์—๋Š” ์ถœ๋ฐœ์ ์œผ๋กœ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ์ƒ์šฉ๊ตฌ ์ฝ”๋“œ๊ฐ€ ์ž๋™ ์ž…๋ ฅ๋ฉ๋‹ˆ๋‹ค.
    4. UDF ๋งŒ๋“ค๊ธฐ๋ฅผ ํด๋ฆญํ•ฉ๋‹ˆ๋‹ค.

      Google Cloud ์ฝ˜์†”์ด UDF ํŒŒ์ผ์„ ์ €์žฅํ•˜๊ณ  Cloud Storage ์œ„์น˜๋ฅผ ์ž๋™ ์ž…๋ ฅํ•ฉ๋‹ˆ๋‹ค.

    5. ํ•ด๋‹น ํ•„๋“œ์— ํ•จ์ˆ˜ ์ด๋ฆ„์„ ์ž…๋ ฅํ•ฉ๋‹ˆ๋‹ค.

JavaScript UDF ์ž‘์„ฑ

๋‹ค์Œ ์ฝ”๋“œ๋Š” ์‹œ์ž‘ํ•  ์ˆ˜ ์žˆ๋Š” ๋…ธ์˜ต์Šค(no-ops) JavaScript UDF๋ฅผ ๋ณด์—ฌ์ค๋‹ˆ๋‹ค.

/*
 * @param {string} inJson input JSON message (stringified)
 * @return {?string} outJson output JSON message (stringified)
 */
function process(inJson) {
  const obj = JSON.parse(inJson);

  // Example data transformations:
  // Add a field: obj.newField = 1;
  // Modify a field: obj.existingField = '';
  // Filter a record: return null;

  return JSON.stringify(obj);
}

JavaScript ์ฝ”๋“œ๋Š” Nashorn JavaScript ์—”์ง„์—์„œ ์‹คํ–‰๋ฉ๋‹ˆ๋‹ค. ๋ฐฐํฌํ•˜๊ธฐ ์ „์— Nashorn ์—”์ง„์—์„œ UDF๋ฅผ ํ…Œ์ŠคํŠธํ•˜๋Š” ๊ฒƒ์ด ์ข‹์Šต๋‹ˆ๋‹ค. Nashorn ์—”์ง„์€ JavaScript์˜ Node.js ๊ตฌํ˜„๊ณผ ์ •ํ™•ํ•˜๊ฒŒ ์ผ์น˜ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ํ”ํžˆ ๋ฒ”ํ•˜๋Š” ์‹ค์ˆ˜๋Š” console.log() ๋˜๋Š” Number.isNaN()๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค. ์ด ๋‘˜์€ ๋ชจ๋‘ Nashorn ์—”์ง„์— ์ •์˜๋˜์–ด ์žˆ์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

JDK 11์ด ์‚ฌ์ „ ์„ค์น˜๋œ Cloud Shell์„ ์‚ฌ์šฉํ•˜์—ฌ Nashorn ์—”์ง„์—์„œ UDF๋ฅผ ํ…Œ์ŠคํŠธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋‹ค์Œ๊ณผ ๊ฐ™์ด ๋Œ€ํ™”ํ˜• ๋ชจ๋“œ์—์„œ Nashorn์„ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.

jjs --language=es6

Nashorn ๋Œ€ํ™”ํ˜• ์…ธ์—์„œ ๋‹ค์Œ ๋‹จ๊ณ„๋ฅผ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.

  1. load๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ UDF JavaScript ํŒŒ์ผ์„ ๋กœ๋“œํ•ฉ๋‹ˆ๋‹ค.
  2. ํŒŒ์ดํ”„๋ผ์ธ์˜ ์˜ˆ์ƒ ๋ฉ”์‹œ์ง€์— ๋”ฐ๋ผ ์ž…๋ ฅ JSON ๊ฐ์ฒด๋ฅผ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค.
  3. JSON.stringify ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ž…๋ ฅ์„ JSON ๋ฌธ์ž์—ด๋กœ ์ง๋ ฌํ™”ํ•ฉ๋‹ˆ๋‹ค.
  4. UDF ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ JSON ๋ฌธ์ž์—ด์„ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค.
  5. JSON.parse๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ์ถœ๋ ฅ์„ ์—ญ์ง๋ ฌํ™”ํ•ฉ๋‹ˆ๋‹ค.
  6. ๊ฒฐ๊ณผ๋ฅผ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)

Python UDF ์ž‘์„ฑ

๋‹ค์Œ ์ฝ”๋“œ๋Š” ์‹œ์ž‘ํ•  ์ˆ˜ ์žˆ๋Š” ๋…ธ์˜ต์Šค(no-ops) Python UDF๋ฅผ ๋ณด์—ฌ์ค๋‹ˆ๋‹ค.

import json
def process(value):
  # Load the JSON string into a dictionary.
  data = json.loads(value)

  # Transform the data in some way.
  data['new_field'] = 'new_value'

  # Serialize the data back to JSON.
  return json.dumps(data)

Python UDF๋Š” Python ๋ฐ Apache Beam์˜ ํ‘œ์ค€ ์ข…์† ํ•ญ๋ชฉ ํŒจํ‚ค์ง€๋ฅผ ์ง€์›ํ•ฉ๋‹ˆ๋‹ค. ํƒ€์‚ฌ ํŒจํ‚ค์ง€๋Š” ์‚ฌ์šฉํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.

์˜ค๋ฅ˜ ์ฒ˜๋ฆฌ

์ผ๋ฐ˜์ ์œผ๋กœ UDF ์‹คํ–‰ ์ค‘์— ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด ์˜ค๋ฅ˜๊ฐ€ ๋ฐ๋“œ ๋ ˆํ„ฐ ์œ„์น˜์— ๊ธฐ๋ก๋ฉ๋‹ˆ๋‹ค. ์„ธ๋ถ€์ •๋ณด๋Š” ํ…œํ”Œ๋ฆฟ์— ๋”ฐ๋ผ ๋‹ค๋ฆ…๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด Pub/Sub Subscription to BigQuery ํ…œํ”Œ๋ฆฟ์€ _error_records ํ…Œ์ด๋ธ”์„ ๋งŒ๋“ค๊ณ  ์—ฌ๊ธฐ์— ์˜ค๋ฅ˜๋ฅผ ๊ธฐ๋กํ•ฉ๋‹ˆ๋‹ค. ๋Ÿฐํƒ€์ž„ UDF ์˜ค๋ฅ˜๋Š” ๊ตฌ๋ฌธ ์˜ค๋ฅ˜ ๋˜๋Š” ๋ฐœ๊ฒฌ๋˜์ง€ ์•Š์€ ์˜ˆ์™ธ๋กœ ์ธํ•ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ตฌ๋ฌธ ์˜ค๋ฅ˜๋ฅผ ํ™•์ธํ•˜๋ ค๋ฉด ๋กœ์ปฌ์—์„œ UDF๋ฅผ ํ…Œ์ŠคํŠธํ•ฉ๋‹ˆ๋‹ค.

์ฒ˜๋ฆฌ๋˜์ง€ ์•Š์•„์•ผ ํ•˜๋Š” ์š”์†Œ์— ๋Œ€ํ•ด ํ”„๋กœ๊ทธ๋ž˜๋งคํ‹ฑ ๋ฐฉ์‹์œผ๋กœ ์˜ˆ์™ธ๋ฅผ ๋ฐœ์ƒ์‹œํ‚ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด ๊ฒฝ์šฐ ํ…œํ”Œ๋ฆฟ์—์„œ ๋ฐ๋“œ ๋ ˆํ„ฐ๋ฅผ ์ง€์›ํ•œ๋‹ค๋ฉด ๋ฐ๋“œ ๋ ˆํ„ฐ ์œ„์น˜์— ์š”์†Œ๊ฐ€ ๊ธฐ๋ก๋ฉ๋‹ˆ๋‹ค. ์ด ์ ‘๊ทผ ๋ฐฉ์‹์„ ๋ณด์—ฌ์ฃผ๋Š” ์˜ˆ์‹œ๋Š” ๊ฒฝ๋กœ ์ด๋ฒคํŠธ๋ฅผ ์ฐธ์กฐํ•˜์„ธ์š”.

์‚ฌ์šฉ ์‚ฌ๋ก€ ์˜ˆ์‹œ

์ด ์„น์…˜์—์„œ๋Š” ์‹ค์ œ ์‚ฌ์šฉ ์‚ฌ๋ก€์— ๋”ฐ๋ผ ์ผ๋ฐ˜์ ์ธ UDF ํŒจํ„ด ๋ช‡ ๊ฐ€์ง€๋ฅผ ์„ค๋ช…ํ•ฉ๋‹ˆ๋‹ค.

์ด๋ฒคํŠธ ๋ณด๊ฐ•

UDF๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ถ”๊ฐ€ ์ปจํ…์ŠคํŠธ ์ •๋ณด๋ฅผ ๋‹ด์€ ์ƒˆ ํ•„๋“œ๋กœ ์ด๋ฒคํŠธ๋ฅผ ๋ณด๊ฐ•ํ•ฉ๋‹ˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Add new field to track data source
  data.source = "source1";
  return JSON.stringify(data);
}

๋ณ€ํ™˜ ์ด๋ฒคํŠธ

UDF๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋Œ€์ƒ์˜ ์˜ˆ์ƒ์— ๋”ฐ๋ผ ์ „์ฒด ์ด๋ฒคํŠธ ํ˜•์‹์„ ๋ณ€ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์‹œ์—์„œ๋Š” ๊ฐ€๋Šฅํ•œ ๊ฒฝ์šฐ Cloud Logging ๋กœ๊ทธ ํ•ญ๋ชฉ(LogEntry)์„ ์›๋ž˜ ๋กœ๊ทธ ๋ฌธ์ž์—ด๋กœ ๋˜๋Œ๋ฆฝ๋‹ˆ๋‹ค. ๋กœ๊ทธ ์†Œ์Šค์— ๋”ฐ๋ผ ์›๋ณธ ๋กœ๊ทธ ๋ฌธ์ž์—ด์ด textPayload ํ•„๋“œ์— ์ฑ„์›Œ์ง€๋Š” ๊ฒฝ์šฐ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. Cloud Logging์—์„œ ์ „์ฒด LogEntry๋ฅผ ๋ณด๋‚ด๋Š” ๋Œ€์‹  ์ด ํŒจํ„ด์„ ์‚ฌ์šฉํ•˜๋ฉด ์›์‹œ ๋กœ๊ทธ๋ฅผ ์›๋ž˜ ํ˜•์‹์œผ๋กœ ๋ณด๋‚ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 function process(inJson) {
  const data = JSON.parse(inJson);

  if (data.textPayload) {
    return data.textPayload; // Return string value, and skip JSON.stringify
  }
 return JSON.stringify(obj);
}

์ด๋ฒคํŠธ ๋ฐ์ดํ„ฐ ์ˆ˜์ • ๋˜๋Š” ์‚ญ์ œ

UDF๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ด๋ฒคํŠธ์˜ ์ผ๋ถ€๋ฅผ ์ˆ˜์ •ํ•˜๊ฑฐ๋‚˜ ์‚ญ์ œํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์‹œ์—์„œ๋Š” ํ•„๋“œ ์ด๋ฆ„ sensitiveField๋ฅผ ์ˆ˜์ •ํ•˜์—ฌ ๊ฐ’์„ ๊ต์ฒดํ•˜๊ณ  redundantField๋ผ๋Š” ํ•„๋“œ๋ฅผ ์™„์ „ํžˆ ์‚ญ์ œํ•ฉ๋‹ˆ๋‹ค.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Normalize existing field values
  data.source = (data.source && data.source.toLowerCase()) || "unknown";

  // Redact existing field values
  if (data.sensitiveField) {
    data.sensitiveField = "REDACTED";
  }

  // Remove existing fields
  if (data.redundantField) {
    delete(data.redundantField);
  }

  return JSON.stringify(data);
}

๊ฒฝ๋กœ ์ด๋ฒคํŠธ

UDF๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋‹ค์šด์ŠคํŠธ๋ฆผ ์‹ฑํฌ์—์„œ ๊ฐœ๋ณ„ ๋Œ€์ƒ์œผ๋กœ ์ด๋ฒคํŠธ๋ฅผ ๋ผ์šฐํŒ…ํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์‹œ์—์„œ๋Š” Pub/Sub to Splunk ํ…œํ”Œ๋ฆฟ์„ ๊ธฐ๋ฐ˜์œผ๋กœ ๊ฐ ์ด๋ฒคํŠธ๋ฅผ ์˜ฌ๋ฐ”๋ฅธ Splunk ์ƒ‰์ธ์œผ๋กœ ๋ผ์šฐํŒ…ํ•ฉ๋‹ˆ๋‹ค. ์‚ฌ์šฉ์ž ์ •์˜ ๋กœ์ปฌ ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ์ด๋ฒคํŠธ๋ฅผ ์ƒ‰์ธ์— ๋งคํ•‘ํ•ฉ๋‹ˆ๋‹ค.

function process(inJson) {
  const obj = JSON.parse(inJson);
  
  // Set index programmatically for data segregation in Splunk
  obj._metadata = {
    index: splunkIndexLookup(obj)
  }
  return JSON.stringify(obj);
}  

๋‹ค์Œ ์˜ˆ์‹œ์—์„œ๋Š” ํ…œํ”Œ๋ฆฟ์ด ๋ฐ๋“œ ๋ ˆํ„ฐ ํ๋ฅผ ์ง€์›ํ•œ๋‹ค๊ณ  ๊ฐ€์ •ํ•˜๊ณ  ์ธ์‹๋˜์ง€ ์•Š์€ ์ด๋ฒคํŠธ๋ฅผ ๋ฐ๋“œ ๋ ˆํ„ฐ ํ๋กœ ๋ผ์šฐํŒ…ํ•ฉ๋‹ˆ๋‹ค. (์˜ˆ์‹œ๋Š” Pub/Sub to JDBC ํ…œํ”Œ๋ฆฟ์„ ์ฐธ์กฐํ•˜์„ธ์š”.) ์ด ํŒจํ„ด์„ ์‚ฌ์šฉํ•˜๋ฉด ๋Œ€์ƒ์— ์“ฐ๊ธฐ ์ „์— ์˜ˆ๊ธฐ์น˜ ๋ชปํ•œ ํ•ญ๋ชฉ์„ ํ•„ํ„ฐ๋งํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Route unrecognized events to the deadletter topic
  if (!data.hasOwnProperty('severity')) {
    throw new Error("Unrecognized event. eventId='" + data.Id + "'");
  }

  return JSON.stringify(data);

์ด๋ฒคํŠธ ํ•„ํ„ฐ๋ง

UDF๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ถœ๋ ฅ์—์„œ ์›์น˜ ์•Š๊ฑฐ๋‚˜ ์ธ์‹๋˜์ง€ ์•Š๋Š” ์ด๋ฒคํŠธ๋ฅผ ํ•„ํ„ฐ๋งํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ ์˜ˆ์‹œ๋Š” data.severity๊ฐ€ "DEBUG"์ธ ์ด๋ฒคํŠธ๋ฅผ ์‚ญ์ œํ•ฉ๋‹ˆ๋‹ค.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Drop events with certain field values
  if (data.severity == "DEBUG") {
    return null;
  }

  return JSON.stringify(data);
}

๋‹ค์Œ ๋‹จ๊ณ„