Apache Beam SDK์์ ๊ธฐ๋ณธ ์ ๊ณต๋๋ ๋ก๊น ์ธํ๋ผ๋ฅผ ์ฌ์ฉํ์ฌ ํ์ดํ๋ผ์ธ์ ์คํํ ๋ ์ ๋ณด๋ฅผ ๋ก๊น ํ ์ ์์ต๋๋ค. Google Cloud Console์ ์ฌ์ฉํ์ฌ ํ์ดํ๋ผ์ธ ์คํ ์ค๊ณผ ์คํ ํ์ ๋ก๊น ์ ๋ณด๋ฅผ ๋ชจ๋ํฐ๋ง ํ ์ ์์ต๋๋ค.
ํ์ดํ๋ผ์ธ์ ๋ก๊ทธ ๋ฉ์์ง ์ถ๊ฐ
์๋ฐ
์๋ฐ์ฉ Apache Beam SDK๋ ์คํ์์ค Simple Logging Facade for Java(SLF4J) ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ํตํ ์์ ์ ๋ฉ์์ง ๋ก๊น ์ ๊ถ์ฅํฉ๋๋ค. ์๋ฐ์ฉ Apache Beam SDK๊ฐ ํ์ํ ๋ก๊น ์ธํ๋ผ๋ฅผ ๊ตฌํํ๋ฏ๋ก, ์๋ฐ ์ฝ๋์์๋ SLF4J API๋ง ๊ฐ์ ธ์ต๋๋ค. ๊ทธ๋ฐ ๋ค์ ํ์ดํ๋ผ์ธ ์ฝ๋ ๋ด์์ ๋ฉ์์ง ๋ก๊น ์ ์ฌ์ฉ ์ค์ ํ๋๋ก ๋ก๊ฑฐ๋ฅผ ์ธ์คํด์คํํฉ๋๋ค.
๊ธฐ์กด ์ฝ๋ ๋๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ๊ฒฝ์ฐ, ์๋ฐ์ฉ Apache Beam SDK๋ ๋ก๊น ์ธํ๋ผ๋ฅผ ์ถ๊ฐ๋ก ์ค์ ํฉ๋๋ค. ๋ค์ ์๋ฐ์ฉ ๋ก๊น ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ํตํด ์์ฑ๋ ๋ก๊ทธ ๋ฉ์์ง๊ฐ ์บก์ฒ๋ฉ๋๋ค.
Python
Python์ฉ Apache Beam SDK๋ ํ์ดํ๋ผ์ธ ์์
์๊ฐ ๋ก๊ทธ ๋ฉ์์ง๋ฅผ ์ถ๋ ฅํ๊ฒ ํ๋ logging
๋ผ์ด๋ธ๋ฌ๋ฆฌ ํจํค์ง๋ฅผ ์ ๊ณตํฉ๋๋ค. ๋ผ์ด๋ธ๋ฌ๋ฆฌ ํจ์๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ๊ฐ์ ธ์์ผ ํฉ๋๋ค.
import logging
Go
Go์ฉ Apache Beam SDK๋ ํ์ดํ๋ผ์ธ ์์
์๊ฐ ๋ก๊ทธ ๋ฉ์์ง๋ฅผ ์ถ๋ ฅํ๊ฒ ํ๋ log
๋ผ์ด๋ธ๋ฌ๋ฆฌ ํจํค์ง๋ฅผ ์ ๊ณตํฉ๋๋ค. ๋ผ์ด๋ธ๋ฌ๋ฆฌ ํจ์๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ๊ฐ์ ธ์์ผ ํฉ๋๋ค.
import "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
์์ ์ ๋ก๊ทธ ๋ฉ์์ง ์ฝ๋ ์
์๋ฐ
๋ค์ ์์์์๋ Dataflow ๋ก๊น ์ SLF4J๋ฅผ ์ฌ์ฉํฉ๋๋ค. Dataflow ๋ก๊น ์ ์ํ SLF4J ๊ตฌ์ฑ์ ๋ํ ์์ธํ ๋ด์ฉ์ ์๋ฐ ํ ๋ฌธ์๋ฅผ ์ฐธ์กฐํ์ธ์.
Apache Beam WordCount ์์๋ ์ฒ๋ฆฌ๋ ํ ์คํธ ์ค์์ 'love'๋ผ๋ ๋จ์ด๊ฐ ๋ฐ๊ฒฌ๋๋ฉด ๋ก๊ทธ ๋ฉ์์ง๋ฅผ ์ถ๋ ฅํ๋๋ก ์์ ๋ ์ ์์ต๋๋ค. ์ถ๊ฐ๋ ์ฝ๋๋ ๋ค์ ์์์์ ๊ตต๊ฒ ํ์๋์์ต๋๋ค(์ฃผ๋ณ ์ฝ๋๋ ์ดํด๋ฅผ ๋๊ธฐ ์ํด ํฌํจ๋จ).
package org.apache.beam.examples; // Import SLF4J packages. import org.slf4j.Logger; import org.slf4j.LoggerFactory; ... public class WordCount { ... static class ExtractWordsFn extends DoFn<String, String> { // Instantiate Logger. // Suggestion: As shown, specify the class name of the containing class // (WordCount). private static final Logger LOG = LoggerFactory.getLogger(WordCount.class); ... @ProcessElement public void processElement(ProcessContext c) { ... // Output each word encountered into the output PCollection. for (String word : words) { if (!word.isEmpty()) { c.output(word); } // Log INFO messages when the word "love" is found. if(word.toLowerCase().equals("love")) { LOG.info("Found " + word.toLowerCase()); } } } } ... // Remaining WordCount example code ...
Python
Apache Beam wordcount.py ์์๋ ์ฒ๋ฆฌ๋ ํ ์คํธ ์ค์์ 'love' ๋จ์ด๊ฐ ๋ฐ๊ฒฌ๋๋ฉด ๋ก๊ทธ ๋ฉ์์ง๋ฅผ ์ถ๋ ฅํ๋๋ก ์์ ๋ ์ ์์ต๋๋ค.
# import Python logging module. import logging class ExtractWordsFn(beam.DoFn): def process(self, element): words = re.findall(r'[A-Za-z\']+', element) for word in words: yield word if word.lower() == 'love': # Log using the root logger at info or higher levels logging.info('Found : %s', word.lower()) # Remaining WordCount example code ...
Go
Apache Beam wordcount.go ์์๋ ์ฒ๋ฆฌ๋ ํ ์คํธ ์ค์์ 'love' ๋จ์ด๊ฐ ๋ฐ๊ฒฌ๋๋ฉด ๋ก๊ทธ ๋ฉ์์ง๋ฅผ ์ถ๋ ฅํ๋๋ก ์์ ๋ ์ ์์ต๋๋ค.
func (f *extractFn) ProcessElement(ctx context.Context, line string, emit func(string)) { for _, word := range wordRE.FindAllString(line, -1) { // increment the counter for small words if length of words is // less than small_word_length if strings.ToLower(word) == "love" { log.Infof(ctx, "Found : %s", strings.ToLower(word)) } emit(word) } } // Remaining Wordcount example
์๋ฐ
์์ ๋ WordCount ํ์ดํ๋ผ์ธ์ด ๋ก์ปฌ ํ์ผ์ ๋ณด๋ธ ์ถ๋ ฅ(--output=./local-wordcounts
)์ด ์๋ ๊ธฐ๋ณธ DirectRunner๋ฅผ ์ฌ์ฉํ์ฌ ๋ก์ปฌ๋ก ์คํ๋๋ ๊ฒฝ์ฐ, Console ์ถ๋ ฅ์ ์ถ๊ฐ๋ ๋ก๊ทธ ๋ฉ์์ง๊ฐ ํฌํจ๋ฉ๋๋ค.
INFO: Executing pipeline using the DirectRunner. ... Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement INFO: Found love Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement INFO: Found love Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement INFO: Found love ... INFO: Pipeline execution complete.
๊ธฐ๋ณธ์ ์ผ๋ก INFO
์ด์์ผ๋ก ํ์๋ ๋ก๊ทธ ์ค๋ง Cloud Logging์ผ๋ก ์ ์ก๋ฉ๋๋ค. ์ด ๋์์ ๋ณ๊ฒฝํ๋ ค๋ฉด ํ์ดํ๋ผ์ธ ์์
์ ๋ก๊ทธ ์์ค ์ค์ ์ ์ฐธ๊ณ ํ์ธ์.
Python
์์ ๋ WordCount ํ์ดํ๋ผ์ธ์ด ๋ก์ปฌ ํ์ผ์ ๋ณด๋ธ ์ถ๋ ฅ(--output=./local-wordcounts
)์ด ์๋ ๊ธฐ๋ณธ DirectRunner๋ฅผ ์ฌ์ฉํ์ฌ ๋ก์ปฌ๋ก ์คํ๋๋ ๊ฒฝ์ฐ, Console ์ถ๋ ฅ์ ์ถ๊ฐ๋ ๋ก๊ทธ ๋ฉ์์ง๊ฐ ํฌํจ๋ฉ๋๋ค.
INFO:root:Found : love INFO:root:Found : love INFO:root:Found : love
๊ธฐ๋ณธ์ ์ผ๋ก INFO
์ด์์ผ๋ก ํ์๋ ๋ก๊ทธ ์ค๋ง Cloud Logging์ผ๋ก ์ ์ก๋ฉ๋๋ค. ์ด ๋์์ ๋ณ๊ฒฝํ๋ ค๋ฉด ํ์ดํ๋ผ์ธ ์์
์ ๋ก๊ทธ ์์ค ์ค์ ์ ์ฐธ๊ณ ํ์ธ์.
ํ์ดํ๋ผ์ธ ๋ก๊ทธ๋ฅผ Dataflow ๋ฐ Cloud Logging์ผ๋ก ์ ์กํ๋ ์ฌ์ ๊ตฌ์ฑ๋ ๋ก๊ทธ ํธ๋ค๋ฌ๊ฐ ์ฌ์ฉ ์ค์ง๋ ์ ์์ผ๋ฏ๋ก logging.config
ํจ์๋ก ๋ก๊น
๊ตฌ์ฑ์ ๋ฎ์ด์ฐ์ง ๋ง์ธ์.
Go
์์ ๋ WordCount ํ์ดํ๋ผ์ธ์ด ๋ก์ปฌ ํ์ผ์ ๋ณด๋ธ ์ถ๋ ฅ(--output=./local-wordcounts
)์ด ์๋ ๊ธฐ๋ณธ DirectRunner๋ฅผ ์ฌ์ฉํ์ฌ ๋ก์ปฌ๋ก ์คํ๋๋ ๊ฒฝ์ฐ, Console ์ถ๋ ฅ์ ์ถ๊ฐ๋ ๋ก๊ทธ ๋ฉ์์ง๊ฐ ํฌํจ๋ฉ๋๋ค.
2022/05/26 11:36:44 Found : love 2022/05/26 11:36:44 Found : love 2022/05/26 11:36:44 Found : love
๊ธฐ๋ณธ์ ์ผ๋ก INFO
์ด์์ผ๋ก ํ์๋ ๋ก๊ทธ ์ค๋ง Cloud Logging์ผ๋ก ์ ์ก๋ฉ๋๋ค.
๋ก๊ทธ ๋ณผ๋ฅจ ์ ์ด
๋ํ ํ์ดํ๋ผ์ธ ๋ก๊ทธ ์์ค์ ๋ณ๊ฒฝํ์ฌ ์์ฑ๋๋ ๋ก๊ทธ์ ์์ ์ค์ผ ์๋ ์์ต๋๋ค. Dataflow ๋ก๊ทธ์ ์ผ๋ถ ๋๋ ์ ๋ถ๋ฅผ ๊ณ์ ์์งํ์ง ์์ผ๋ ค๋ฉด Logging ์ ์ธ๋ฅผ ์ถ๊ฐํ์ฌ Dataflow ๋ก๊ทธ๋ฅผ ์ ์ธํฉ๋๋ค. ๊ทธ๋ฐ ๋ค์ BigQuery, Cloud Storage, Pub/Sub ๋ฑ์ ๋ค๋ฅธ ๋์์ผ๋ก ๋ก๊ทธ๋ฅผ ๋ด๋ณด๋ ๋๋ค. ์์ธํ ๋ด์ฉ์ Dataflow ๋ก๊ทธ ์์ง ์ ์ด๋ฅผ ์ฐธ์กฐํ์ธ์.
ํ๋ ๋ฐ ์ ํ ๋ก๊น
์์ ์ ๋ก๊ทธ ๋ฉ์์ง๋ ์์ ์๋ณ๋ก 30์ด๋ง๋ค 15,000๊ฐ๋ก ์ ํ๋ฉ๋๋ค. ์ด ํ๋์ ๋๋ฌํ๋ฉด ๋ก๊น ์ด ์ ํ๋จ์ ์๋ฆฌ๋ ๋จ์ผ ์์ ์ ๋ก๊ทธ ๋ฉ์์ง๊ฐ ์ถ๊ฐ๋ฉ๋๋ค.
Throttling logger worker. It used up its 30s quota for logs in only 12.345s
๋ก๊ทธ ์คํ ๋ฆฌ์ง ๋ฐ ๋ณด๊ด
์์
๋ก๊ทธ๋ _Default
๋ก๊ทธ ๋ฒํท์ ์ ์ฅ๋ฉ๋๋ค.
Logging API ์๋น์ค ์ด๋ฆ์ dataflow.googleapis.com
์
๋๋ค. Cloud Logging์ ์ฌ์ฉ๋๋ Google Cloud ๋ชจ๋ํฐ๋ง ๋ฆฌ์์ค ์ ํ ๋ฐ ์๋น์ค์ ๋ํ ์์ธํ ๋ด์ฉ์ ๋ชจ๋ํฐ๋ง ๋ฆฌ์์ค ๋ฐ ์๋น์ค๋ฅผ ์ฐธ์กฐํ์ธ์.
Logging์์ ๋ก๊ทธ ํญ๋ชฉ์ด ๋ณด๊ด๋๋ ๊ธฐ๊ฐ์ ๋ํ ์์ธํ ๋ด์ฉ์ ํ ๋น๋ ๋ฐ ํ๋: ๋ก๊ทธ ๋ณด๊ด ๊ธฐ๊ฐ์ ๋ณด๊ด ์ ๋ณด๋ฅผ ์ฐธ์กฐํ์ธ์.
์์ ๋ก๊ทธ ๋ณด๊ธฐ์ ๋ํ ์์ธํ ๋ด์ฉ์ ํ์ดํ๋ผ์ธ ๋ก๊ทธ ๋ชจ๋ํฐ๋ง ๋ฐ ๋ณด๊ธฐ๋ฅผ ์ฐธ์กฐํ์ธ์.
ํ์ดํ๋ผ์ธ ๋ก๊ทธ ๋ชจ๋ํฐ๋ง ๋ฐ ๋ณด๊ธฐ
Dataflow ์๋น์ค์์ ํ์ดํ๋ผ์ธ์ ์คํํ๋ฉด Dataflow ๋ชจ๋ํฐ๋ง ์ธํฐํ์ด์ค๋ฅผ ์ฌ์ฉํ์ฌ ํ์ดํ๋ผ์ธ์์ ๋ฐฉ์ถํ ๋ก๊ทธ๋ฅผ ๋ณผ ์ ์์ต๋๋ค.
Dataflow ์์ ์ ๋ก๊ทธ ์์
์์ ๋ WordCount ํ์ดํ๋ผ์ธ์ ๋ค์ ์ต์ ์ผ๋ก ํด๋ผ์ฐ๋์์ ์คํ๋ ์ ์์ต๋๋ค.
์๋ฐ
--project=WordCountExample --output=gs://<bucket-name>/counts --runner=DataflowRunner --tempLocation=gs://<bucket-name>/temp --stagingLocation=gs://<bucket-name>/binaries
Python
--project=WordCountExample --output=gs://<bucket-name>/counts --runner=DataflowRunner --staging_location=gs://<bucket-name>/binaries
Go
--project=WordCountExample --output=gs://<bucket-name>/counts --runner=DataflowRunner --staging_location=gs://<bucket-name>/binaries
๋ก๊ทธ ๋ณด๊ธฐ
WordCount ํด๋ผ์ฐ๋ ํ์ดํ๋ผ์ธ์ ์ฐจ๋จ ์คํ์ ์ฌ์ฉํ๋ฏ๋ก, ํ์ดํ๋ผ์ธ ์คํ ์ค์ ์ฝ์ ๋ฉ์์ง๊ฐ ์ถ๋ ฅ๋ฉ๋๋ค. ์์ ์ด ์์๋๋ฉด Google Cloud ์ฝ์ ํ์ด์ง์ ๋ํ ๋งํฌ๊ฐ ์ฝ์๋ก ์ถ๋ ฅ๋๊ณ , ์ด์ด์ ํ์ดํ๋ผ์ธ ์์ ID๊ฐ ์ถ๋ ฅ๋ฉ๋๋ค.
INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669 Submitted job: 2017-04-13_13_58_10-6217777367720337669
์ฝ์ URL์ ์ ์ถ๋ ์์ ์ ์์ฝ ํ์ด์ง๊ฐ ์๋ Dataflow ๋ชจ๋ํฐ๋ง ์ธํฐํ์ด์ค๋ก ์ฐ๊ฒฐ๋ฉ๋๋ค. ํด๋น ํ๋ฉด์ ์ผ์ชฝ์๋ ๋์ ์คํ ๊ทธ๋ํ, ์ค๋ฅธ์ชฝ์๋ ์์ฝ ์ ๋ณด๊ฐ ํ์๋ฉ๋๋ค. ํ๋จ ํจ๋์ keyboard_capslock์ ํด๋ฆญํ์ฌ ๋ก๊ทธ ํจ๋์ ํ์ฅํ์ธ์.

๋ก๊ทธ ํจ๋์๋ ๊ธฐ๋ณธ์ ์ผ๋ก ์์ ์ํ๋ฅผ ์ ์ฒด์ ์ผ๋ก ๋ณด๊ณ ํ๋ ์์ ๋ก๊ทธ๊ฐ ํ์๋ฉ๋๋ค. ์ ๋ณดarrow_drop_down์ filter_list๋ก๊ทธ ํํฐ๋ง์ ํด๋ฆญํ์ฌ ๋ก๊ทธ ํจ๋์ ํ์๋๋ ๋ฉ์์ง๋ฅผ ํํฐ๋งํ ์ ์์ต๋๋ค.

๊ทธ๋ํ์์ ํ์ดํ๋ผ์ธ ๋จ๊ณ๋ฅผ ์ ํํ๋ฉด ๋ทฐ๋ ์ฌ์ฉ์ ์ฝ๋ ๋ฐ ํ์ดํ๋ผ์ธ ๋จ๊ณ์์ ์คํ๋๋ ์์ฑ ์ฝ๋๋ฅผ ํตํด ์์ฑ๋ ๋จ๊ณ ๋ก๊ทธ๋ก ๋ณ๊ฒฝ๋ฉ๋๋ค.

์์ ๋ก๊ทธ๋ก ๋์๊ฐ๋ ค๋ฉด ๊ทธ๋ํ ๋ฐ๊นฅ์ชฝ์ ํด๋ฆญํ๊ฑฐ๋ ์ค๋ฅธ์ชฝ ์ธก๋ฉด ํจ๋์ ์๋ ๋จ๊ณ ์ ํ ์ทจ์ ๋ฒํผ์ ์ฌ์ฉํ์ฌ ๋จ๊ณ๋ฅผ ์ง์๋๋ค.
๋ก๊ทธ ํ์๊ธฐ๋ก ์ด๋
๋ก๊ทธ ํ์๊ธฐ๋ฅผ ์ด๊ณ ๋ค๋ฅธ ๋ก๊ทธ ์ ํ์ ์ ํํ๋ ค๋ฉด ๋ก๊ทธ ํจ๋์์ ๋ก๊ทธ ํ์๊ธฐ์์ ๋ณด๊ธฐ(์ธ๋ถ ๋งํฌ ๋ฒํผ)๋ฅผ ํด๋ฆญํฉ๋๋ค.
๋ก๊ทธ ํ์๊ธฐ์์ ๋ค์ํ ๋ก๊ทธ ์ ํ์ด ํฌํจ๋ ํจ๋์ ๋ณด๋ ค๋ฉด ๋ก๊ทธ ํ๋ ์ ํ ๋ฒํผ์ ํด๋ฆญํฉ๋๋ค.
๋ก๊ทธ ํ์๊ธฐ ํ์ด์ง์์ ์ฟผ๋ฆฌ๋ ์์ ๋จ๊ณ ๋๋ ๋ก๊ทธ ์ ํ๋ณ๋ก ๋ก๊ทธ๋ฅผ ํํฐ๋งํ ์ ์์ต๋๋ค. ํํฐ๋ฅผ ์ญ์ ํ๋ ค๋ฉด ์ฟผ๋ฆฌ ํ์ ์ ํ ๋ฒํผ์ ํด๋ฆญํ๊ณ ์ฟผ๋ฆฌ๋ฅผ ์์ ํฉ๋๋ค.
์์ ์ ์ฌ์ฉํ ์ ์๋ ๋ชจ๋ ๋ก๊ทธ๋ฅผ ๋ณด๋ ค๋ฉด ๋ค์ ๋จ๊ณ๋ฅผ ๋ฐ๋ฅด์ธ์.
์ฟผ๋ฆฌ ํ๋์ ๋ค์ ์ฟผ๋ฆฌ๋ฅผ ์ ๋ ฅํฉ๋๋ค.
resource.type="dataflow_step" resource.labels.job_id="JOB_ID"
JOB_ID๋ฅผ ์์ ID๋ก ๋ฐ๊ฟ๋๋ค.
์ฟผ๋ฆฌ ์คํ์ ํด๋ฆญํฉ๋๋ค.
์ด ์ฟผ๋ฆฌ๋ฅผ ์ฌ์ฉํด๋ ์์ ๋ก๊ทธ๊ฐ ํ์๋์ง ์์ผ๋ฉด ์๊ฐ ์์ ์ ํด๋ฆญํฉ๋๋ค.
์์ ์๊ฐ๊ณผ ์ข ๋ฃ ์๊ฐ์ ์กฐ์ ํ ๋ค์ ์ ์ฉ์ ํด๋ฆญํฉ๋๋ค.
๋ก๊ทธ ์ ํ
๋ก๊ทธ ํ์๊ธฐ์๋ ํ์ดํ๋ผ์ธ์ ์ธํ๋ผ ๋ก๊ทธ๋ ํฌํจ๋ฉ๋๋ค. ์ค๋ฅ ๋ฐ ๊ฒฝ๊ณ ๋ก๊ทธ๋ฅผ ์ฌ์ฉํ์ฌ ๊ด์ฐฐ๋ ํ์ดํ๋ผ์ธ ๋ฌธ์ ๋ฅผ ์ง๋จํฉ๋๋ค. ํ์ดํ๋ผ์ธ ๋ฌธ์ ์ ๊ด๋ จ์ด ์๋ ์ธํ๋ผ ๋ก๊ทธ์ ์ค๋ฅ ๋ฐ ๊ฒฝ๊ณ ๊ฐ ๋ฐ๋์ ๋ฌธ์ ๋ฅผ ๋ํ๋ด๋ ๊ฒ์ ์๋๋๋ค.
๋ค์์ ๋ก๊ทธ ํ์๊ธฐ ํ์ด์ง์์ ๋ณผ ์ ์๋ ์ฌ๋ฌ ๊ฐ์ง ๋ก๊ทธ ์ ํ์ ์์ฝ์ ๋๋ค.
- job-message ๋ก๊ทธ์๋ Dataflow์ ๋ค์ํ ๊ตฌ์ฑ์์๊ฐ ์์ฑํ๋ ์์ ์์ค ๋ฉ์์ง๊ฐ ํฌํจ๋ฉ๋๋ค. ๋ํ์ ์ธ ์์๋ ์๋ ํ์ฅ ๊ตฌ์ฑ, ์์ ์ ์์ ๋๋ ์ข ๋ฃ ์๊ฐ, ์์ ๋จ๊ณ ์งํ ์ํฉ๊ณผ ์์ ์ค๋ฅ์ ๋๋ค. ์ฌ์ฉ์ ์ฝ๋ ๋น์ ์ ์ข ๋ฃ ๋๋ฌธ์ ๋ฐ์ํ์ผ๋ฉฐ ์์ ์ ๋ก๊ทธ์ ์กด์ฌํ๋ ์์ ์ ์์ค ์ค๋ฅ๋ job-message ๋ก๊ทธ๋ฅผ ์์ฑํฉ๋๋ค.
- worker ๋ก๊ทธ๋ Dataflow ์์
์๊ฐ ์์ฑํฉ๋๋ค. ์์
์๋ ๋๋ถ๋ถ์ ํ์ดํ๋ผ์ธ ์์
์ ์ํํฉ๋๋ค(์: ๋ฐ์ดํฐ์
ParDo
์ ์ฉ). Worker ๋ก๊ทธ์๋ ๊ฐ๋ฐ์ ์ฝ๋์ Dataflow์์ ๋ก๊น ํ ๋ฉ์์ง๊ฐ ํฌํจ๋ฉ๋๋ค. - worker-startup ๋ก๊ทธ๋ ๋๋ถ๋ถ์ Dataflow ์์ ์ ํฌํจ๋๋ฉฐ ์์ ํ๋ก์ธ์ค์ ๊ด๋ จ๋ ๋ฉ์์ง๋ฅผ ์บก์ฒํ ์ ์์ต๋๋ค. ์์ ํ๋ก์ธ์ค์๋ Cloud Storage์์ ์์ ์ jar๋ฅผ ๋ค์ด๋ก๋ํ๋ ๊ฒ๊ณผ ์ดํ์ ์์ ์ ์์์ด ํฌํจ๋ฉ๋๋ค. ์์ ์๋ฅผ ์์ํ๋ ๋ฐ ๋ฌธ์ ๊ฐ ์์ผ๋ฉด ์ด ๋ก๊ทธ๋ฅผ ๋จผ์ ๋ณด๋ ๊ฒ์ด ์ข์ต๋๋ค.
- harness ๋ก๊ทธ์๋ Runner v2 ์คํ๊ธฐ ํ๋ค์ค์ ๋ฉ์์ง๊ฐ ํฌํจ๋ฉ๋๋ค.
- shuffler ๋ก๊ทธ์๋ ๋ณ๋ ฌ ํ์ดํ๋ผ์ธ ์์ ๊ฒฐ๊ณผ๋ฅผ ํตํฉํ๋ ์์ ์์ ๋ฉ์์ง๊ฐ ํฌํจ๋ฉ๋๋ค.
- system ๋ก๊ทธ์๋ ์์ ์ VM ํธ์คํธ ์ด์์ฒด์ ์ ๋ฉ์์ง๊ฐ ํฌํจ๋ฉ๋๋ค. ์ผ๋ถ ์๋๋ฆฌ์ค์์๋ ํ๋ก์ธ์ค ๋น์ ์ ์ข ๋ฃ๋ ๋ฉ๋ชจ๋ฆฌ ๋ถ์กฑ(OOM) ์ด๋ฒคํธ๋ฅผ ์บก์ฒํ ์ ์์ต๋๋ค.
- docker์ kubelet ๋ก๊ทธ์๋ Dataflow ์์ ์์์ ์ฌ์ฉ๋๋ ์ด๋ค ๊ณต๊ฐ ๊ธฐ์ ๊ณผ ๊ด๋ จ๋ ๋ฉ์์ง๊ฐ ํฌํจ๋ฉ๋๋ค.
- nvidia-mps ๋ก๊ทธ์๋ NVIDIA Multi-Process Service(MPS) ์์ ์ ๋ํ ๋ฉ์์ง๊ฐ ํฌํจ๋ฉ๋๋ค.
ํ์ดํ๋ผ์ธ ์์ ์ ๋ก๊ทธ ์์ค ์ค์
์๋ฐ
์๋ฐ์ฉ Apache Beam SDK๋ฅผ ํตํด ์์
์์ ์์ฑ๋๋ ๊ธฐ๋ณธ SLF4J ๋ก๊น
์์ค์ INFO
์
๋๋ค. INFO
์ด์(INFO
, WARN
, ERROR
)์ ๋ชจ๋ ๋ก๊ทธ ๋ฉ์์ง๊ฐ ๋ฐฉ์ถ๋ฉ๋๋ค. ๋ ๋ฎ์ SLF4J ๋ก๊น
์์ค(TRACE
๋๋ DEBUG
)์ ์ง์ํ๋๋ก ๋ค๋ฅธ ๊ธฐ๋ณธ ๋ก๊ทธ ์์ค์ ์ค์ ํ๊ฑฐ๋ ์ฝ๋์ ์ฌ๋ฌ ํด๋์ค ํจํค์ง์ ๋ํด ์๋ก ๋ค๋ฅธ ๋ก๊ทธ ์์ค์ ์ค์ ํ ์ ์์ต๋๋ค.
๋ช ๋ น์ค ๋๋ ํ๋ก๊ทธ๋๋งคํฑ ๋ฐฉ์์ผ๋ก ์์ ์ ๋ก๊ทธ ์์ค์ ์ค์ ํ ์ ์๋๋ก ๋ค์ ํ์ดํ๋ผ์ธ ์ต์ ์ด ์ ๊ณต๋ฉ๋๋ค.
--defaultSdkHarnessLogLevel=<level>
: ์ด ์ต์ ์ ์ฌ์ฉํ๋ฉด ๋ชจ๋ ๋ก๊ฑฐ๋ฅผ ์ง์ ๋ ๊ธฐ๋ณธ ์์ค์ผ๋ก ์ค์ ํฉ๋๋ค. ์๋ฅผ ๋ค์ด ๋ค์ ๋ช ๋ น์ค ์ต์ ์ ๊ธฐ๋ณธ DataflowINFO
๋ก๊ทธ ์์ค์ ์ฌ์ ์ํ๊ณ ์ด๋ฅผDEBUG
:
--defaultSdkHarnessLogLevel=DEBUG
๋ก ์ค์ ํฉ๋๋ค.--sdkHarnessLogLevelOverrides={"<package or class>":"<level>"}
: ์ด ์ต์ ์ ์ฌ์ฉํ๋ฉด ์ง์ ๋ ํจํค์ง ๋๋ ํด๋์ค์ ๋ก๊น ์์ค์ ์ค์ ํฉ๋๋ค. ์๋ฅผ ๋ค์ดorg.apache.beam.runners.dataflow
ํจํค์ง์ ๊ธฐ๋ณธ ํ์ดํ๋ผ์ธ ๋ก๊ทธ ์์ค์ ์ฌ์ ์ํ๊ณ ์ด๋ฅผ ๋ค์๊ณผ ๊ฐ์ดTRACE
๋ก ์ค์ ํ ์ ์์ต๋๋ค.
--sdkHarnessLogLevelOverrides='{"org.apache.beam.runners.dataflow":"TRACE"}'
์ฌ๋ฌ ์ฌ์ ์๋ฅผ ์ํํ๋ ค๋ฉด JSON ๋งต์ ์ ๊ณตํฉ๋๋ค.
(--sdkHarnessLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}
)- Runner v2 ์์ด Apache Beam SDK ๋ฒ์ 2.50.0 ์ดํ๋ฅผ ์ฌ์ฉํ๋ ํ์ดํ๋ผ์ธ์์๋
defaultSdkHarnessLogLevel
๋ฐsdkHarnessLogLevelOverrides
ํ์ดํ๋ผ์ธ ์ต์ ์ด ์ง์๋์ง ์์ต๋๋ค. ์ด ๊ฒฝ์ฐ--defaultWorkerLogLevel=<level>
๋ฐ--workerLogLevelOverrides={"<package or class>":"<level>"}
ํ์ดํ๋ผ์ธ ์ต์ ์ ์ฌ์ฉํฉ๋๋ค. ์ฌ๋ฌ ์ฌ์ ์๋ฅผ ์ํํ๋ ค๋ฉด JSON ๋งต์ ์ ๊ณตํฉ๋๋ค.
(--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}
)
๋ค์ ์์๋ ๋ช ๋ น์ค์์ ์ฌ์ ์ํ ์ ์๋ ๊ธฐ๋ณธ๊ฐ๊ณผ ํจ๊ป ํ์ดํ๋ผ์ธ ๋ก๊น ์ต์ ์ ํ๋ก๊ทธ๋๋งคํฑ ๋ฐฉ์์ผ๋ก ์ค์ ํฉ๋๋ค.
PipelineOptions options = ... SdkHarnessOptions loggingOptions = options.as(SdkHarnessOptions.class); // Overrides the default log level on the worker to emit logs at TRACE or higher. loggingOptions.setDefaultSdkHarnessLogLevel(LogLevel.TRACE); // Overrides the Foo class and "org.apache.beam.runners.dataflow" package to emit logs at WARN or higher. loggingOptions.getSdkHarnessLogLevelOverrides() .addOverrideForClass(Foo.class, LogLevel.WARN) .addOverrideForPackage(Package.getPackage("org.apache.beam.runners.dataflow"), LogLevel.WARN);
Python
Python์ฉ Apache Beam SDK๋ฅผ ํตํด ์์
์์ ์์ฑ๋๋ ๊ธฐ๋ณธ ๋ก๊น
์์ค์ INFO
์
๋๋ค. INFO
์ด์(INFO
, WARNING
, ERROR
, CRITICAL
)์ ๋ชจ๋ ๋ก๊ทธ ๋ฉ์์ง๊ฐ ๋ฐฉ์ถ๋ฉ๋๋ค.
๋ ๋ฎ์ ๋ก๊น
์์ค(DEBUG
)์ ์ง์ํ๋๋ก ๋ค๋ฅธ ๊ธฐ๋ณธ ๋ก๊ทธ ์์ค์ ์ค์ ํ๊ฑฐ๋ ์ฝ๋์ ์ฌ๋ฌ ๋ชจ๋์ ๋ํด ์๋ก ๋ค๋ฅธ ๋ก๊ทธ ์์ค์ ์ค์ ํ ์ ์์ต๋๋ค.
๋ช ๋ น์ค ๋๋ ํ๋ก๊ทธ๋๋งคํฑ ๋ฐฉ์์ผ๋ก ์์ ์ ๋ก๊ทธ ์์ค์ ์ค์ ํ ์ ์๋๋ก ํ์ดํ๋ผ์ธ ์ต์ ๋ ๊ฐ๊ฐ ์ ๊ณต๋ฉ๋๋ค.
--default_sdk_harness_log_level=<level>
: ์ด ์ต์ ์ ์ฌ์ฉํ๋ฉด ๋ชจ๋ ๋ก๊ฑฐ๋ฅผ ์ง์ ๋ ๊ธฐ๋ณธ ์์ค์ผ๋ก ์ค์ ํฉ๋๋ค. ์๋ฅผ ๋ค์ด ๋ค์ ๋ช ๋ น์ค ์ต์ ์ ๊ธฐ๋ณธ DataflowINFO
๋ก๊ทธ ์์ค์ ์ฌ์ ์ํ๊ณ ์ด๋ฅผDEBUG
๋ก ์ค์ ํฉ๋๋ค.
--default_sdk_harness_log_level=DEBUG
--sdk_harness_log_level_overrides={\"<module>\":\"<level>\"}
: ์ด ์ต์ ์ ์ฌ์ฉํ์ฌ ์ง์ ๋ ๋ชจ๋์ ๋ก๊น ์์ค์ ์ค์ ํฉ๋๋ค. ์๋ฅผ ๋ค์ดapache_beam.runners.dataflow
๋ชจ๋์ ๊ธฐ๋ณธ ํ์ดํ๋ผ์ธ ๋ก๊ทธ ์์ค์ ์ฌ์ ์ํ๊ณ ์ด๋ฅผ ๋ค์๊ณผ ๊ฐ์ดDEBUG
๋ก ์ค์ ํ ์ ์์ต๋๋ค.
--sdk_harness_log_level_overrides={\"apache_beam.runners.dataflow\":\"DEBUG\"}
์ฌ๋ฌ ์ฌ์ ์๋ฅผ ์ํํ๋ ค๋ฉด JSON ๋งต์ ์ ๊ณตํฉ๋๋ค.
(--sdk_harness_log_level_overrides={\"<module>\":\"<level>\",\"<module>\":\"<level>\",...}
)
๋ค์ ์์์์๋ WorkerOptions
ํด๋์ค๋ฅผ ์ฌ์ฉํ์ฌ ๋ช
๋ น์ค์์ ์ฌ์ ์ํ ์ ์๋ ํ์ดํ๋ผ์ธ ๋ก๊น
์ต์
์ ํ๋ก๊ทธ๋๋งคํฑ ๋ฐฉ์์ผ๋ก ์ค์ ํฉ๋๋ค.
from apache_beam.options.pipeline_options import PipelineOptions, WorkerOptions pipeline_args = [ '--project=PROJECT_NAME', '--job_name=JOB_NAME', '--staging_location=gs://STORAGE_BUCKET/staging/', '--temp_location=gs://STORAGE_BUCKET/tmp/', '--region=DATAFLOW_REGION', '--runner=DataflowRunner' ] pipeline_options = PipelineOptions(pipeline_args) worker_options = pipeline_options.view_as(WorkerOptions) worker_options.default_sdk_harness_log_level = 'WARNING' # Note: In Apache Beam SDK 2.42.0 and earlier versions, use ['{"apache_beam.runners.dataflow":"WARNING"}'] worker_options.sdk_harness_log_level_overrides = {"apache_beam.runners.dataflow":"WARNING"} # Pass in pipeline options during pipeline creation. with beam.Pipeline(options=pipeline_options) as pipeline:
๋ค์์ ๋ฐ๊ฟ๋๋ค.
PROJECT_NAME
: ํ๋ก์ ํธ ์ด๋ฆJOB_NAME
: ์์ ์ ์ด๋ฆSTORAGE_BUCKET
: Cloud Storage ์ด๋ฆDATAFLOW_REGION
: Dataflow ์์ ์ ๋ฐฐํฌํ ๋ฆฌ์ --region
ํ๋๊ทธ๋ ๋ฉํ๋ฐ์ดํฐ ์๋ฒ, ๋ก์ปฌ ํด๋ผ์ด์ธํธ ๋๋ ํ๊ฒฝ ๋ณ์์ ์ค์ ๋ ๊ธฐ๋ณธ ๋ฆฌ์ ์ ์ฌ์ ์ํฉ๋๋ค.
Go
์ด ๊ธฐ๋ฅ์ Go์ฉ Apache Beam SDK์์ ์ฌ์ฉํ ์ ์์ต๋๋ค.
์์๋ BigQuery ์์ ๋ก๊ทธ ๋ณด๊ธฐ
Dataflow ํ์ดํ๋ผ์ธ์์ BigQuery๋ฅผ ์ฌ์ฉํ๋ฉด ๋ค์ํ ์์ ์ ๋์ ์ํํ๊ธฐ ์ํด BigQuery ์์ ์ด ์คํ๋ฉ๋๋ค. ์ด๋ฌํ ์์ ์๋ ๋ฐ์ดํฐ ๋ก๋, ๋ฐ์ดํฐ ๋ด๋ณด๋ด๊ธฐ ๋ฑ์ด ํฌํจ๋ ์ ์์ต๋๋ค. Dataflow ๋ชจ๋ํฐ๋ง ์ธํฐํ์ด์ค์๋ ๋ก๊ทธ ํจ๋์์ ์ฌ์ฉํ ์ ์๋ ์ด๋ฌํ BigQuery ์์ ์ ๋ํ ์ถ๊ฐ ์ ๋ณด๊ฐ ๋ฌธ์ ํด๊ฒฐ ๋ฐ ๋ชจ๋ํฐ๋ง์ ์ํด ์กด์ฌํฉ๋๋ค.
๋ก๊ทธ ํจ๋์ ํ์๋ BigQuery ์์ ์ ๋ณด๋ BigQuery ์์คํ ํ ์ด๋ธ์ ์ ์ฅ๋๊ณ ๋ก๋๋ฉ๋๋ค. ๊ธฐ๋ณธ BigQuery ํ ์ด๋ธ์ ์ฟผ๋ฆฌํ ๋ ์ฒญ๊ตฌ ๋น์ฉ์ด ๋ฐ์ํฉ๋๋ค.
BigQuery ์์ ์ธ๋ถ์ ๋ณด ๋ณด๊ธฐ
BigQuery ์์ ์ ๋ณด๋ฅผ ๋ณด๋ ค๋ฉด ํ์ดํ๋ผ์ธ์์ Apache Beam 2.24.0 ์ด์์ ์ฌ์ฉํด์ผ ํฉ๋๋ค.
BigQuery ์์ ์ ๋์ดํ๋ ค๋ฉด BigQuery ์์ ํญ์ ์ด๊ณ BigQuery ์์ ์ ์์น๋ฅผ ์ ํํฉ๋๋ค. ๊ทธ๋ฐ ๋ค์ BigQuery ์์ ๋ก๋๋ฅผ ํด๋ฆญํ๊ณ ๋ํ์์๋ฅผ ํ์ธํฉ๋๋ค. ์ฟผ๋ฆฌ๊ฐ ์๋ฃ๋๋ฉด ์์ ๋ชฉ๋ก์ด ํ์๋ฉ๋๋ค.
์์ ID, ์ ํ, ๊ธฐ๊ฐ ๋ฑ ๊ฐ ์์ ์ ๋ํ ๊ธฐ๋ณธ ์ ๋ณด๊ฐ ์ ๊ณต๋ฉ๋๋ค.
ํน์ ์์ ์ ๋ํ ์์ธ ์ ๋ณด๋ฅผ ๋ณด๋ ค๋ฉด ์ถ๊ฐ ์ ๋ณด ์ด์์ ๋ช ๋ น์ค์ ํด๋ฆญํ์ธ์.
๋ช ๋ น์ค์ ๋ชจ๋ฌ ์ฐฝ์์ bq jobs describe ๋ช ๋ น์ด๋ฅผ ๋ณต์ฌํ์ฌ ๋ก์ปฌ๋ก ๋๋ Cloud Shell์์ ์คํํฉ๋๋ค.
gcloud alpha bq jobs describe BIGQUERY_JOB_ID
bq jobs describe
๋ช
๋ น์ด๋ JobStatistics๋ฅผ ์ถ๋ ฅํฉ๋๋ค. ์ด ๊ธฐ๋ฅ์ ์๋๊ฐ ๋๋ฆฌ๊ฑฐ๋ ์ค๋จ๋ BigQuery ์์
์ง๋จ ์ ์ ์ฉํ ์ถ๊ฐ ์ธ๋ถ์ ๋ณด๋ฅผ ์ ๊ณตํฉ๋๋ค.
๋๋ BigQueryIO๋ฅผ SQL ์ฟผ๋ฆฌ์ ํจ๊ป ์ฌ์ฉํ๋ฉด ์ฟผ๋ฆฌ ์์ ์ด ์คํ๋ฉ๋๋ค. ์์ ์์ ์ฌ์ฉํ๋ SQL ์ฟผ๋ฆฌ๋ฅผ ๋ณด๋ ค๋ฉด ์ถ๊ฐ ์ ๋ณด ์ด์์ ์ฟผ๋ฆฌ ๋ณด๊ธฐ๋ฅผ ํด๋ฆญํฉ๋๋ค.