์ ํ์ ๊ตฌ์ฑ์์ ๊ธฐ๋ฅ์ ์ฌ์ฉํ์ฌ Dataproc ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ค ๋ Flink์ ๊ฐ์ ์ถ๊ฐ ๊ตฌ์ฑ์์๋ฅผ ํ์ฑํํ ์ ์์ต๋๋ค. ์ด ํ์ด์ง์์๋ Apache Flink ์ ํ์ ๊ตฌ์ฑ์์(Flink ํด๋ฌ์คํฐ)๊ฐ ํ์ฑํ๋ Dataproc ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ ํ ํด๋ฌ์คํฐ์์ Flink ์์ ์ ์คํํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ์ค๋๋ค.
Flink ํด๋ฌ์คํฐ๋ฅผ ์ฌ์ฉํ์ฌ ๋ค์์ ์ํํ ์ ์์ต๋๋ค.
Google Cloud ์ฝ์, Google Cloud CLI ๋๋ Dataproc API์์ Dataproc
Jobs
๋ฆฌ์์ค๋ฅผ ์ฌ์ฉํ์ฌ Flink ์์ ์คํFlink ํด๋ฌ์คํฐ ๋ง์คํฐ ๋ ธ๋์์ ์คํ๋๋
flink
CLI๋ฅผ ์ฌ์ฉํ์ฌ Flink ์์ ์คํ
Dataproc Flink ํด๋ฌ์คํฐ ๋ง๋ค๊ธฐ
Google Cloud ์ฝ์, Google Cloud CLI ๋๋ Dataproc API๋ฅผ ์ฌ์ฉํ์ฌ ํด๋ฌ์คํฐ์์ Flink ๊ตฌ์ฑ์์๊ฐ ํ์ฑํ๋ Dataproc ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ค ์ ์์ต๋๋ค.
๊ถ์ฅ์ฌํญ: Flink ๊ตฌ์ฑ์์๊ฐ ํฌํจ๋ ๋ง์คํฐ 1๊ฐ ํ์ค VM ํด๋ฌ์คํฐ๋ฅผ ์ฌ์ฉํ์ธ์. Dataproc ๊ณ ๊ฐ์ฉ์ฑ ๋ชจ๋ ํด๋ฌ์คํฐ(๋ง์คํฐ VM 3๊ฐ)๋ Flink ๊ณ ๊ฐ์ฉ์ฑ ๋ชจ๋๋ฅผ ์ง์ํ์ง ์์ต๋๋ค.
์ฝ์
Google Cloud ์ฝ์์ ์ฌ์ฉํ์ฌ Dataproc Flink ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ค๋ ค๋ฉด ๋ค์ ๋จ๊ณ๋ฅผ ์ํํฉ๋๋ค.
Dataproc Compute Engine์์ ํด๋ฌ์คํฐ ๋ง๋ค๊ธฐ ํ์ด์ง๋ฅผ ์ฝ๋๋ค.
- ํด๋ฌ์คํฐ ์ค์ ํจ๋์ด ์ ํ๋์์ต๋๋ค.
- ๋ฒ์ ๊ด๋ฆฌ ์น์
์์ ์ด๋ฏธ์ง ์ ํ ๋ฐ ๋ฒ์ ์ ํ์ธํ๊ฑฐ๋ ๋ณ๊ฒฝํฉ๋๋ค. ํด๋ฌ์คํฐ ์ด๋ฏธ์ง ๋ฒ์ ์ ๋ฐ๋ผ ํด๋ฌ์คํฐ์ ์ค์น๋ Flink ๊ตฌ์ฑ์์์ ๋ฒ์ ์ด ๊ฒฐ์ ๋ฉ๋๋ค.
- ํด๋ฌ์คํฐ์์ Flink ๊ตฌ์ฑ์์๋ฅผ ํ์ฑํํ๋ ค๋ฉด ์ด๋ฏธ์ง ๋ฒ์ ์ด 1.5 ์ด์์ด์ด์ผ ํฉ๋๋ค(๊ฐ Dataproc ์ด๋ฏธ์ง ์ถ์ ๋ฒ์ ์ ํฌํจ๋ ๊ตฌ์ฑ์์ ๋ฒ์ ๋ชฉ๋ก์ ๋ณด๋ ค๋ฉด ์ง์๋๋ Dataproc ๋ฒ์ ์ฐธ์กฐ).
- Dataproc Jobs API๋ฅผ ํตํด Flink ์์ ์ ์คํํ๋ ค๋ฉด ์ด๋ฏธ์ง ๋ฒ์ ์ด [๋ฏธ์ ] ์ด์์ด์ด์ผ ํฉ๋๋ค(Dataproc Flink ์์ ์คํ ์ฐธ์กฐ).
- ๊ตฌ์ฑ์์ ์น์
์์ ๋ค์์ ์ํํฉ๋๋ค.
- ๊ตฌ์ฑ์์ ๊ฒ์ดํธ์จ์ด ์๋์์ ๊ตฌ์ฑ์์ ๊ฒ์ดํธ์จ์ด ์ฌ์ฉ ์ค์ ์ ์ ํํฉ๋๋ค. Flink ๊ธฐ๋ก ์๋ฒ UI์ ๋ํ ๊ตฌ์ฑ์์ ๊ฒ์ดํธ์จ์ด ๋งํฌ๋ฅผ ํ์ฑํํ๋ ค๋ฉด ๊ตฌ์ฑ์์ ๊ฒ์ดํธ์จ์ด๋ฅผ ์ฌ์ฉ ์ค์ ํด์ผ ํฉ๋๋ค. ๊ตฌ์ฑ์์ ๊ฒ์ดํธ์จ์ด๋ฅผ ์ฌ์ฉ ์ค์ ํ๋ฉด Flink ํด๋ฌ์คํฐ์์ ์คํ๋๋ Flink ์์ ๊ด๋ฆฌ์ ์น ์ธํฐํ์ด์ค์ ์ก์ธ์คํ ์ ์์ต๋๋ค.
- ์ ํ์ ๊ตฌ์ฑ์์ ์๋์์ ํด๋ฌ์คํฐ์์ ํ์ฑํํ Flink ๋ฐ ๊ธฐํ ์ ํ์ ์ธ ๊ตฌ์ฑ์์๋ฅผ ์ ํํฉ๋๋ค.
- ๋ฒ์ ๊ด๋ฆฌ ์น์
์์ ์ด๋ฏธ์ง ์ ํ ๋ฐ ๋ฒ์ ์ ํ์ธํ๊ฑฐ๋ ๋ณ๊ฒฝํฉ๋๋ค. ํด๋ฌ์คํฐ ์ด๋ฏธ์ง ๋ฒ์ ์ ๋ฐ๋ผ ํด๋ฌ์คํฐ์ ์ค์น๋ Flink ๊ตฌ์ฑ์์์ ๋ฒ์ ์ด ๊ฒฐ์ ๋ฉ๋๋ค.
ํด๋ฌ์คํฐ ๋ง์ถค์ค์ (์ ํ์ฌํญ) ํจ๋์ ํด๋ฆญํฉ๋๋ค.
ํด๋ฌ์คํฐ ์์ฑ ์น์ ์์ ์ ํ์ ์ธ ๊ฐ ํด๋ฌ์คํฐ ์์ฑ์ ๋ํด ์์ฑ ์ถ๊ฐ๋ฅผ ํด๋ฆญํ์ฌ ํด๋ฌ์คํฐ์ ์ถ๊ฐํฉ๋๋ค.
flink
ํ๋ฆฌํฝ์ค๊ฐ ์๋ ์์ฑ์ ์ถ๊ฐํ์ฌ ํด๋ฌ์คํฐ์์ ์คํ๋๋ Flink ์ ํ๋ฆฌ์ผ์ด์ ์ ๊ธฐ๋ณธ๊ฐ์ผ๋ก ์๋ํ๋/etc/flink/conf/flink-conf.yaml
์ ํฌํจ๋ Flink ์์ฑ์ ๊ตฌ์ฑํ ์ ์์ต๋๋ค.์:
flink:historyserver.archive.fs.dir
์ ์ค์ ํ์ฌ Flink ์์ ๊ธฐ๋ก ํ์ผ์ ์์ฑํ Cloud Storage ์์น๋ฅผ ์ง์ ํฉ๋๋ค(์ด ์์น๋ Flink ํด๋ฌ์คํฐ์์ ์คํ๋๋ Flink ๊ธฐ๋ก ์๋ฒ์์ ์ฌ์ฉ๋จ).flink:taskmanager.numberOfTaskSlots=n
์ผ๋ก Flink ํ์คํฌ ์ฌ๋กฏ์ ์ค์ ํฉ๋๋ค.
์ปค์คํ ํด๋ฌ์คํฐ ๋ฉํ๋ฐ์ดํฐ ์น์ ์์ ๋ฉํ๋ฐ์ดํฐ ์ถ๊ฐ๋ฅผ ํด๋ฆญํ์ฌ ์ ํ์ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ์ถ๊ฐํฉ๋๋ค. ์๋ฅผ ๋ค์ด
flink-start-yarn-session
true
๋ฅผ ์ถ๊ฐํ์ฌ ํด๋ฌ์คํฐ ๋ง์คํฐ ๋ ธ๋์ ๋ฐฑ๊ทธ๋ผ์ด๋์์ Flink YARN ๋ฐ๋ชฌ(/usr/bin/flink-yarn-daemon
)์ ์คํํ๋ฉด Flink YARN ์ธ์ ์ ์์ํ ์ ์์ต๋๋ค(Flink ์ธ์ ๋ชจ๋ ์ฐธ์กฐ).
Dataproc ์ด๋ฏธ์ง ๋ฒ์ 2.0 ์ดํ๋ฅผ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ ๋ณด์ ๊ด๋ฆฌ(์ ํ์ฌํญ) ํจ๋์ ํด๋ฆญํ๊ณ ํ๋ก์ ํธ ์ก์ธ์ค์์
Enables the cloud-platform scope for this cluster
๋ฅผ ์ ํํฉ๋๋ค.cloud-platform
๋ฒ์๋ Dataproc ์ด๋ฏธ์ง ๋ฒ์ 2.1 ์ด์์ ์ฌ์ฉํ๋ ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ค ๋ ๊ธฐ๋ณธ์ ์ผ๋ก ์ฌ์ฉ ์ค์ ๋ฉ๋๋ค.
- ํด๋ฌ์คํฐ ์ค์ ํจ๋์ด ์ ํ๋์์ต๋๋ค.
๋ง๋ค๊ธฐ๋ฅผ ํด๋ฆญํ์ฌ ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ญ๋๋ค.
gcloud
gcloud CLI๋ฅผ ์ฌ์ฉํ์ฌ Dataproc Flink ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ค๋ ค๋ฉด gcloud dataproc clusters create ๋ช ๋ น์ด๋ฅผ ํฐ๋ฏธ๋ ์ฐฝ์์ ๋ก์ปฌ๋ก ๋๋ Cloud Shell์์ ๋ค์๊ณผ ๊ฐ์ด ์คํํฉ๋๋ค.
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=DATAPROC_IMAGE_VERSION \ --optional-components=FLINK \ --enable-component-gateway \ --properties=PROPERTIES ... other flags
์ฐธ๊ณ :
- CLUSTER_NAME: ํด๋ฌ์คํฐ์ ์ด๋ฆ์ ์ง์ ํฉ๋๋ค.
- REGION: ํด๋ฌ์คํฐ๊ฐ ์์นํ Compute Engine ๋ฆฌ์ ์ ์ง์ ํฉ๋๋ค.
DATAPROC_IMAGE_VERSION: ํด๋ฌ์คํฐ์ ์ฌ์ฉํ ์ด๋ฏธ์ง ๋ฒ์ ์ ์ง์ ํฉ๋๋ค(์ ํ์ฌํญ). ํด๋ฌ์คํฐ ์ด๋ฏธ์ง ๋ฒ์ ์ ๋ฐ๋ผ ํด๋ฌ์คํฐ์ ์ค์น๋ Flink ๊ตฌ์ฑ์์์ ๋ฒ์ ์ด ๊ฒฐ์ ๋ฉ๋๋ค.
ํด๋ฌ์คํฐ์์ Flink ๊ตฌ์ฑ์์๋ฅผ ํ์ฑํํ๋ ค๋ฉด ์ด๋ฏธ์ง ๋ฒ์ ์ด 1.5 ์ด์์ด์ด์ผ ํฉ๋๋ค(๊ฐ Dataproc ์ด๋ฏธ์ง ์ถ์ ๋ฒ์ ์ ํฌํจ๋ ๊ตฌ์ฑ์์ ๋ฒ์ ๋ชฉ๋ก์ ๋ณด๋ ค๋ฉด ์ง์๋๋ Dataproc ๋ฒ์ ์ฐธ์กฐ).
Dataproc Jobs API๋ฅผ ํตํด Flink ์์ ์ ์คํํ๋ ค๋ฉด ์ด๋ฏธ์ง ๋ฒ์ ์ด [๋ฏธ์ ] ์ด์์ด์ด์ผ ํฉ๋๋ค(Dataproc Flink ์์ ์คํ ์ฐธ์กฐ).
--optional-components
: ํด๋ฌ์คํฐ์์ Flink ์์ ๋ฐ Flink HistoryServer ์น ์๋น์ค๋ฅผ ์คํํ๋ ค๋ฉดFLINK
๊ตฌ์ฑ์์๋ฅผ ์ง์ ํด์ผ ํฉ๋๋ค.--enable-component-gateway
: Flink ๊ธฐ๋ก ์๋ฒ UI์ ๋ํ ๊ตฌ์ฑ์์ ๊ฒ์ดํธ์จ์ด ๋งํฌ๋ฅผ ํ์ฑํํ๋ ค๋ฉด ๊ตฌ์ฑ์์ ๊ฒ์ดํธ์จ์ด๋ฅผ ์ฌ์ฉ ์ค์ ํด์ผ ํฉ๋๋ค. ๊ตฌ์ฑ์์ ๊ฒ์ดํธ์จ์ด๋ฅผ ์ฌ์ฉ ์ค์ ํ๋ฉด Flink ํด๋ฌ์คํฐ์์ ์คํ๋๋ Flink ์์ ๊ด๋ฆฌ์ ์น ์ธํฐํ์ด์ค์ ์ก์ธ์คํ ์ ์์ต๋๋ค.PROPERTIES. ํด๋ฌ์คํฐ ์์ฑ์ ํ ๊ฐ ์ด์ ์ง์ ํฉ๋๋ค(์ ํ์ฌํญ).
์ด๋ฏธ์ง ๋ฒ์
2.0.67
+ ๋ฐ2.1.15
+๋ก Dataproc ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ค ๋--properties
ํ๋๊ทธ๋ฅผ ์ฌ์ฉํ์ฌ/etc/flink/conf/flink-conf.yaml
์ ํฌํจ๋ ํด๋ฌ์คํฐ์์ ์คํํ๋ Flink ์ ํ๋ฆฌ์ผ์ด์ ์ ๊ธฐ๋ณธ๊ฐ์ผ๋ก ์๋ํ Flink ์์ฑ์ ๊ตฌ์ฑํ ์ ์์ต๋๋ค.flink:historyserver.archive.fs.dir
์ ์ค์ ํ์ฌ Flink ์์ ๊ธฐ๋ก ํ์ผ์ ์์ฑํ Cloud Storage ์์น๋ฅผ ์ง์ ํ ์ ์์ต๋๋ค(์ด ์์น๋ Flink ํด๋ฌ์คํฐ์์ ์คํ๋๋ Flink ๊ธฐ๋ก ์๋ฒ์์ ์ฌ์ฉ๋จ).์ฌ๋ฌ ์์ฑ ์์:
--properties=flink:historyserver.archive.fs.dir=gs://my-bucket/my-flink-cluster/completed-jobs,flink:taskmanager.numberOfTaskSlots=2
๊ธฐํ ํ๋๊ทธ:
- ์ ํ์ฌํญ์ธ
--metadata flink-start-yarn-session=true
๋ฅผ ์ถ๊ฐํ์ฌ ํด๋ฌ์คํฐ ๋ง์คํฐ ๋ ธ๋์ ๋ฐฑ๊ทธ๋ผ์ด๋์์ Flink YARN ๋ฐ๋ชฌ(/usr/bin/flink-yarn-daemon
)์ ์คํํ๋ฉด Flink YARN ์ธ์ ์ ์์ํ ์ ์์ต๋๋ค(Flink ์ธ์ ๋ชจ๋ ์ฐธ์กฐ).
- ์ ํ์ฌํญ์ธ
2.0 ์ดํ ์ด๋ฏธ์ง ๋ฒ์ ์ ์ฌ์ฉํ ๋๋ ํด๋ฌ์คํฐ๋ณ๋ก Google Cloud API์ ์ก์ธ์คํ ์ ์๋๋ก
--scopes=https://www.googleapis.com/auth/cloud-platform
ํ๋๊ทธ๋ฅผ ์ถ๊ฐํ ์ ์์ต๋๋ค(๋ฒ์ ๊ถ์ฅ์ฌํญ ์ฐธ๊ณ ).cloud-platform
๋ฒ์๋ Dataproc ์ด๋ฏธ์ง ๋ฒ์ 2.1 ์ด์์ ์ฌ์ฉํ๋ ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ค ๋ ๊ธฐ๋ณธ์ ์ผ๋ก ์ฌ์ฉ ์ค์ ๋ฉ๋๋ค.
API
Dataproc API๋ฅผ ์ฌ์ฉํ์ฌ Dataproc Flink ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ค๋ ค๋ฉด ๋ค์๊ณผ ๊ฐ์ด clusters.create ์์ฒญ์ ์ ์ถํฉ๋๋ค.
์ฐธ๊ณ :
SoftwareConfig.Component๋ฅผ
FLINK
๋ก ์ค์ ํฉ๋๋ค.์ ํ์ ์ผ๋ก
SoftwareConfig.imageVersion
์ ์ค์ ํ์ฌ ํด๋ฌ์คํฐ์์ ์ฌ์ฉํ ์ด๋ฏธ์ง ๋ฒ์ ์ ์ง์ ํ ์ ์์ต๋๋ค. ํด๋ฌ์คํฐ ์ด๋ฏธ์ง ๋ฒ์ ์ ๋ฐ๋ผ ํด๋ฌ์คํฐ์ ์ค์น๋ Flink ๊ตฌ์ฑ์์์ ๋ฒ์ ์ด ๊ฒฐ์ ๋ฉ๋๋ค.ํด๋ฌ์คํฐ์์ Flink ๊ตฌ์ฑ์์๋ฅผ ํ์ฑํํ๋ ค๋ฉด ์ด๋ฏธ์ง ๋ฒ์ ์ด 1.5 ์ด์์ด์ด์ผ ํฉ๋๋ค(๊ฐ Dataproc ์ด๋ฏธ์ง ์ถ์ ๋ฒ์ ์ ํฌํจ๋ ๊ตฌ์ฑ์์ ๋ฒ์ ๋ชฉ๋ก์ ๋ณด๋ ค๋ฉด ์ง์๋๋ Dataproc ๋ฒ์ ์ฐธ์กฐ).
Dataproc Jobs API๋ฅผ ํตํด Flink ์์ ์ ์คํํ๋ ค๋ฉด ์ด๋ฏธ์ง ๋ฒ์ ์ด [๋ฏธ์ ] ์ด์์ด์ด์ผ ํฉ๋๋ค(Dataproc Flink ์์ ์คํ ์ฐธ์กฐ).
EndpointConfig.enableHttpPortAccess๋ฅผ
true
๋ก ์ค์ ํ์ฌ Flink ๊ธฐ๋ก ์๋ฒ UI์ ๋ํ ๊ตฌ์ฑ์์ ๊ฒ์ดํธ์จ์ด ๋งํฌ๋ฅผ ์ฌ์ฉ ์ค์ ํฉ๋๋ค. ๊ตฌ์ฑ์์ ๊ฒ์ดํธ์จ์ด๋ฅผ ์ฌ์ฉ ์ค์ ํ๋ฉด Flink ํด๋ฌ์คํฐ์์ ์คํ๋๋ Flink ์์ ๊ด๋ฆฌ์ ์น ์ธํฐํ์ด์ค์ ์ก์ธ์คํ ์ ์์ต๋๋ค.ํ์์ ๋ฐ๋ผ
SoftwareConfig.properties
๋ฅผ ์ค์ ํ์ฌ ํด๋ฌ์คํฐ ์์ฑ์ ํ ๊ฐ ์ด์ ์ง์ ํ ์ ์์ต๋๋ค.- ํด๋ฌ์คํฐ์์ ์คํํ๋ Flink ์ ํ๋ฆฌ์ผ์ด์
์ ๊ธฐ๋ณธ๊ฐ์ผ๋ก ์๋ํ๋ Flink ์์ฑ์ ์ง์ ํ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด
flink:historyserver.archive.fs.dir
์ ์ค์ ํ์ฌ Flink ์์ ๊ธฐ๋ก ํ์ผ์ ์์ฑํ Cloud Storage ์์น๋ฅผ ์ง์ ํ ์ ์์ต๋๋ค(์ด ์์น๋ Flink ํด๋ฌ์คํฐ์์ ์คํ๋๋ Flink ๊ธฐ๋ก ์๋ฒ์์ ์ฌ์ฉ๋จ).
- ํด๋ฌ์คํฐ์์ ์คํํ๋ Flink ์ ํ๋ฆฌ์ผ์ด์
์ ๊ธฐ๋ณธ๊ฐ์ผ๋ก ์๋ํ๋ Flink ์์ฑ์ ์ง์ ํ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด
์ ํ์ ์ผ๋ก ๋ค์์ ์ค์ ํ ์ ์์ต๋๋ค.
GceClusterConfig.metadata
. ์๋ฅผ ๋ค์ดflink-start-yarn-session
true
๋ฅผ ์ถ๊ฐํ์ฌ ํด๋ฌ์คํฐ ๋ง์คํฐ ๋ ธ๋์ ๋ฐฑ๊ทธ๋ผ์ด๋์์ Flink YARN ๋ฐ๋ชฌ(/usr/bin/flink-yarn-daemon
)์ ์คํํ๋ฉด Flink YARN ์ธ์ ์ ์์ํ ์ ์์ต๋๋ค(Flink ์ธ์ ๋ชจ๋ ์ฐธ์กฐ).- 2.0 ์ด์ ์ด๋ฏธ์ง ๋ฒ์ ์ ์ฌ์ฉํ์ฌ ํด๋ฌ์คํฐ์์ Google CloudAPI์ ์ก์ธ์คํ ๋๋ GceClusterConfig.serviceAccountScopes๋ฅผ
https://www.googleapis.com/auth/cloud-platform
(cloud-platform
๋ฒ์)์ผ๋ก ์ค์ ํฉ๋๋ค(๋ฒ์ ๊ถ์ฅ์ฌํญ ์ฐธ์กฐ).cloud-platform
๋ฒ์๋ Dataproc ์ด๋ฏธ์ง ๋ฒ์ 2.1 ์ด์์ ์ฌ์ฉํ๋ ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ค ๋ ๊ธฐ๋ณธ์ ์ผ๋ก ์ฌ์ฉ ์ค์ ๋ฉ๋๋ค.
Flink ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ ํ
- ๊ตฌ์ฑ์์ ๊ฒ์ดํธ์จ์ด์
Flink History Server
๋งํฌ๋ฅผ ์ฌ์ฉํ์ฌ Flink ํด๋ฌ์คํฐ์์ ์คํ๋๋ Flink ๊ธฐ๋ก ์๋ฒ๋ฅผ ํ์ธํฉ๋๋ค. - ๊ตฌ์ฑ์์ ๊ฒ์ดํธ์จ์ด์์
YARN ResourceManager link
๋ฅผ ์ฌ์ฉํ์ฌ Flink ํด๋ฌ์คํฐ์์ ์คํ๋๋ Flink ์์ ๊ด๋ฆฌ์ ์น ์ธํฐํ์ด์ค๋ฅผ ํ์ธํฉ๋๋ค. - Dataproc ์๊ตฌ ๊ธฐ๋ก ์๋ฒ๋ฅผ ๋ง๋ค์ด ๊ธฐ์กด ๋ฐ ์ญ์ ๋ Flink ํด๋ฌ์คํฐ์์ ์์ฑํ Flink ์์ ๊ธฐ๋ก ํ์ผ์ ํ์ธํฉ๋๋ค.
Dataproc Jobs
๋ฆฌ์์ค๋ฅผ ์ฌ์ฉํ์ฌ Flink ์์
์คํ
Google Cloud ์ฝ์, Google Cloud CLI ๋๋ Dataproc API์์ Dataproc Jobs
๋ฆฌ์์ค๋ฅผ ์ฌ์ฉํ์ฌ Flink ์์
์ ์คํํ ์ ์์ต๋๋ค.
์ฝ์
์ฝ์์์ ์ํ Flink ์๋์นด์ดํธ ์์ ์ ์ ์ถํ๋ ค๋ฉด ๋ค์ ์๋ด๋ฅผ ๋ฐ๋ฅด์ธ์.
๋ธ๋ผ์ฐ์ ์Google Cloud ์ฝ์์์ Dataproc ์์ ์ ์ถ ํ์ด์ง๋ฅผ ์ฝ๋๋ค.
์์ ์ ์ถ ํ์ด์ง์ ํ๋๋ฅผ ์์ฑํฉ๋๋ค.
- ํด๋ฌ์คํฐ ๋ชฉ๋ก์์ ํด๋ฌ์คํฐ ์ด๋ฆ์ ์ ํํฉ๋๋ค.
- ์์
์ ํ์
Flink
๋ก ์ค์ ํฉ๋๋ค. - ๊ธฐ๋ณธ ํด๋์ค ๋๋ jar์
org.apache.flink.examples.java.wordcount.WordCount
๋ก ์ค์ ํฉ๋๋ค. - JAR ํ์ผ์
file:///usr/lib/flink/examples/batch/WordCount.jar
๋ก ์ค์ ํฉ๋๋ค.file:///
์ ํด๋ฌ์คํฐ์ ์๋ ํ์ผ์ ๋ํ๋ ๋๋ค. Flink ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ค ๋ Dataproc์ดWordCount.jar
์ ์ค์นํ์ต๋๋ค.- ์ด ํ๋๋ Cloud Storage ๊ฒฝ๋ก(
gs://BUCKET/JARFILE
) ๋๋ Hadoop ๋ถ์ฐ ํ์ผ ์์คํ (HDFS) ๊ฒฝ๋ก(hdfs://PATH_TO_JAR
)๋ ํ์ฉํฉ๋๋ค.
์ ์ถ์ ํด๋ฆญํฉ๋๋ค.
- ์์ ๋๋ผ์ด๋ฒ ์ถ๋ ฅ์ ์์ ์ธ๋ถ์ ๋ณด ํ์ด์ง์ ํ์๋ฉ๋๋ค.
- Flink ์์ ์ Google Cloud ์ฝ์์ Dataproc ์์ ํ์ด์ง์ ๋์ด๋ฉ๋๋ค.
- ์์ ๋๋ ์์ ์ธ๋ถ์ ๋ณด ํ์ด์ง์์ ์ค์ง ๋๋ ์ญ์ ๋ฅผ ํด๋ฆญํ์ฌ ์์ ์ ์ค์งํ๊ฑฐ๋ ์ญ์ ํฉ๋๋ค.
gcloud
Dataproc Flink ํด๋ฌ์คํฐ์ Flink ์์ ์ ์ ์ถํ๋ ค๋ฉด gcloud CLI gcloud dataproc jobs submit ๋ช ๋ น์ด๋ฅผ ํฐ๋ฏธ๋ ์ฐฝ์์ ๋ก์ปฌ๋ก ๋๋Cloud Shell์์ ์คํํฉ๋๋ค.
gcloud dataproc jobs submit flink \ --cluster=CLUSTER_NAME \ --region=REGION \ --class=MAIN_CLASS \ --jar=JAR_FILE \ -- JOB_ARGS
์ฐธ๊ณ :
- CLUSTER_NAME: ์์ ์ ์ ์ถํ Dataproc Flink ํด๋ฌ์คํฐ์ ์ด๋ฆ์ ์ง์ ํฉ๋๋ค.
- REGION: ํด๋ฌ์คํฐ๊ฐ ์๋ Compute Engine ๋ฆฌ์ ์ ์ง์ ํฉ๋๋ค.
- MAIN_CLASS: Flink ์ ํ๋ฆฌ์ผ์ด์
์
main
ํด๋์ค๋ฅผ ์ง์ ํฉ๋๋ค. ์๋ฅผ ๋ค๋ฉด ๋ค์๊ณผ ๊ฐ์ต๋๋ค.org.apache.flink.examples.java.wordcount.WordCount
- JAR_FILE: Flink ์ ํ๋ฆฌ์ผ์ด์
jar ํ์ผ์ ์ง์ ํฉ๋๋ค. ๋ค์ ํญ๋ชฉ์ ์ง์ ํ ์ ์์ต๋๋ค.
- file:///` ํ๋ฆฌํฝ์ค๋ฅผ ์ฌ์ฉํ์ฌ ํด๋ฌ์คํฐ์ ์ค์น๋ jar ํ์ผ:
file:///usr/lib/flink/examples/streaming/TopSpeedWindowing.jar
file:///usr/lib/flink/examples/batch/WordCount.jar
- Cloud Storage์ jar ํ์ผ:
gs://BUCKET/JARFILE
- HDFS์ jar ํ์ผ:
hdfs://PATH_TO_JAR
- file:///` ํ๋ฆฌํฝ์ค๋ฅผ ์ฌ์ฉํ์ฌ ํด๋ฌ์คํฐ์ ์ค์น๋ jar ํ์ผ:
JOB_ARGS: ์ ํ์ ์ผ๋ก ์ด์ค ๋์(
--
) ๋ค์ ์์ ์ธ์๋ฅผ ์ถ๊ฐํฉ๋๋ค.์์ ์ ์ ์ถํ๋ฉด ์์ ๋๋ผ์ด๋ฒ ์ถ๋ ฅ์ด ๋ก์ปฌ ๋๋ Cloud Shell ํฐ๋ฏธ๋์ ํ์๋ฉ๋๋ค.
Program execution finished Job with JobID 829d48df4ebef2817f4000dfba126e0f has finished. Job Runtime: 13610 ms ... (after,1) (and,12) (arrows,1) (ay,1) (be,4) (bourn,1) (cast,1) (coil,1) (come,1)
REST
์ด ์น์ ์์๋ Dataproc jobs.submit API๋ฅผ ์ฌ์ฉํ์ฌ Dataproc Flink ํด๋ฌ์คํฐ์ Flink ์์ ์ ์ ์ถํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ์ค๋๋ค.
์์ฒญ ๋ฐ์ดํฐ๋ฅผ ์ฌ์ฉํ๊ธฐ ์ ์ ๋ค์์ ๋ฐ๊ฟ๋๋ค.
- PROJECT_ID: Google Cloud ํ๋ก์ ํธ ID์ ๋๋ค.
- REGION: ํด๋ฌ์คํฐ ๋ฆฌ์ ์ ๋๋ค.
- CLUSTER_NAME: ์์ ์ ์ ์ถํ Dataproc Flink ํด๋ฌ์คํฐ์ ์ด๋ฆ์ ์ง์ ํฉ๋๋ค.
HTTP ๋ฉ์๋ ๋ฐ URL:
POST https://dataproc.googleapis.com/v1/projects/PROJECT_ID/regions/REGION/jobs:submit
JSON ์์ฒญ ๋ณธ๋ฌธ:
{ "job": { "placement": { "clusterName": "CLUSTER_NAME" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] } } }
์์ฒญ์ ๋ณด๋ด๋ ค๋ฉด ๋ค์ ์ต์ ์ค ํ๋๋ฅผ ํผ์นฉ๋๋ค.
๋ค์๊ณผ ๋น์ทํ JSON ์๋ต์ด ํ์๋ฉ๋๋ค.
{ "reference": { "projectId": "PROJECT_ID", "jobId": "JOB_ID" }, "placement": { "clusterName": "CLUSTER_NAME", "clusterUuid": "CLUSTER_UUID" }, "flinkJob": { "mainClass": "org.apache.flink.examples.java.wordcount.WordCount", "args": [ "1000" ], "jarFileUris": [ "file:///usr/lib/flink/examples/batch/WordCount.jar" ] }, "status": { "state": "PENDING", "stateStartTime": "2020-10-07T20:16:21.759Z" }, "jobUuid": "JOB_UUID" }
- Flink ์์ ์ Google Cloud ์ฝ์์ Dataproc ์์ ํ์ด์ง์ ๋์ด๋ฉ๋๋ค.
- Google Cloud ์ฝ์์ ์์ ๋๋ ์์ ์ธ๋ถ์ ๋ณด ํ์ด์ง์์ ์ค์ง ๋๋ ์ญ์ ๋ฅผ ํด๋ฆญํ์ฌ ์์ ์ ์ค์งํ๊ฑฐ๋ ์ญ์ ํ ์ ์์ต๋๋ค.
flink
CLI๋ฅผ ์ฌ์ฉํ์ฌ Flink ์์
์คํ
Dataproc Jobs
๋ฆฌ์์ค๋ฅผ ์ฌ์ฉํ์ฌ Flink ์์
์ ์คํํ๋ ๋์ flink
CLI๋ฅผ ์ฌ์ฉํ์ฌ Flink ํด๋ฌ์คํฐ์ ๋ง์คํฐ ๋
ธ๋์์ Flink ์์
์ ์คํํ ์ ์์ต๋๋ค.
๋ค์ ์น์
์์๋ Dataproc Flink ํด๋ฌ์คํฐ์์ flink
CLI ์์
์ ์คํํ๋ ๋ค์ํ ๋ฐฉ๋ฒ์ ์ค๋ช
ํฉ๋๋ค.
๋ง์คํฐ ๋ ธ๋์ SSH๋ก ์ฐ๊ฒฐ: SSH ์ ํธ๋ฆฌํฐ๋ฅผ ์ฌ์ฉํ์ฌ ํด๋ฌ์คํฐ ๋ง์คํฐ VM์ ํฐ๋ฏธ๋ ์ฐฝ์ ์ฝ๋๋ค.
classpath ์ค์ : Flink ํด๋ฌ์คํฐ ๋ง์คํฐ VM์ SSH ํฐ๋ฏธ๋ ์ฐฝ์์ Hadoop ํด๋์ค ๊ฒฝ๋ก๋ฅผ ์ด๊ธฐํํฉ๋๋ค.
export HADOOP_CLASSPATH=$(hadoop classpath)
Flink ์์ ์คํ: ์ ํ๋ฆฌ์ผ์ด์ , ์์ ๋ณ, ์ธ์ ๋ชจ๋ ๋ฑ ๋ค์ํ YARN ๋ฐฐํฌ ๋ชจ๋์์ Flink ์์ ์ ์คํํ ์ ์์ต๋๋ค.
์ ํ๋ฆฌ์ผ์ด์ ๋ชจ๋: Flink ์ ํ๋ฆฌ์ผ์ด์ ๋ชจ๋๋ Dataproc ์ด๋ฏธ์ง ๋ฒ์ 2.0 ์ด์์์ ์ง์๋ฉ๋๋ค. ์ด ๋ชจ๋๋ YARN ์์ ๊ด๋ฆฌ์์์ ์์ ์
main()
๋ฉ์๋๋ฅผ ์คํํฉ๋๋ค. ์์ ์ด ์๋ฃ๋๋ฉด ํด๋ฌ์คํฐ๊ฐ ์ข ๋ฃ๋ฉ๋๋ค.์์ ์ ์ถ ์์:
flink run-application \ -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=2048m \ -Djobmanager.heap.mb=820 \ -Dtaskmanager.heap.mb=1640 \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dparallelism.default=4 \ /usr/lib/flink/examples/batch/WordCount.jar
์คํ ์ค์ธ ์์ ๋์ด:
./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
์คํ ์ค์ธ ์์ ์ทจ์:
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
์์ ๋ณ ๋ชจ๋: ์ด Flink ๋ชจ๋๋ ํด๋ผ์ด์ธํธ ์ธก์์ ์์ ์
main()
๋ฉ์๋๋ฅผ ์คํํฉ๋๋ค.์์ ์ ์ถ ์์:
flink run \ -m yarn-cluster \ -p 4 \ -ys 2 \ -yjm 1024m \ -ytm 2048m \ /usr/lib/flink/examples/batch/WordCount.jar
์ธ์ ๋ชจ๋: ์ฅ๊ธฐ ์คํ Flink YARN ์ธ์ ์ ์์ํ ํ ํ๋ ์ด์์ ์์ ์ ์ธ์ ์ ์ ์ถํฉ๋๋ค.
์ธ์ ์์: ๋ค์ ๋ฐฉ๋ฒ ์ค ํ๋๋ก Flink ์ธ์ ์ ์์ํ ์ ์์ต๋๋ค.
Flink ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ค๊ณ
gcloud dataproc clusters create
๋ช ๋ น์ด์--metadata flink-start-yarn-session=true
ํ๋๊ทธ๋ฅผ ์ถ๊ฐํฉ๋๋ค(Dataproc Flink ํด๋ฌ์คํฐ ๋ง๋ค๊ธฐ ์ฐธ์กฐ). ์ด ํ๋๊ทธ๋ฅผ ์ฌ์ฉ ์ค์ ํ๋ฉด ํด๋ฌ์คํฐ๊ฐ ์์ฑ๋ ํ Dataproc์ด/usr/bin/flink-yarn-daemon
์ ์คํํ์ฌ ํด๋ฌ์คํฐ์์ Flink ์ธ์ ์ ์์ํฉ๋๋ค.์ธ์ ์ YARN ์ ํ๋ฆฌ์ผ์ด์ ID๊ฐ
/tmp/.yarn-properties-${USER}
์ ์ ์ฅ๋ฉ๋๋ค.yarn application -list
๋ช ๋ น์ด๋ฅผ ์ฌ์ฉํ์ฌ ID๋ฅผ ๋์ดํ ์ ์์ต๋๋ค.์ปค์คํ ์ค์ ์ผ๋ก ํด๋ฌ์คํฐ ๋ง์คํฐ VM์ ์ฌ์ ์ค์น๋
yarn-session.sh
์คํฌ๋ฆฝํธ๋ฅผ ์คํํฉ๋๋ค.์ปค์คํ ์ค์ ์ ์์:
/usr/lib/flink/bin/yarn-session.sh \ -s 1 \ -jm 1024m \ -tm 2048m \ -nm flink-dataproc \ --detached
๊ธฐ๋ณธ ์ค์ ์ผ๋ก Flink
/usr/bin/flink-yarn-daemon
๋ํผ ์คํฌ๋ฆฝํธ๋ฅผ ์คํํฉ๋๋ค.. /usr/bin/flink-yarn-daemon
์ธ์ ์ ์์ ์ ์ถ: ๋ค์ ๋ช ๋ น์ด๋ฅผ ์คํํ์ฌ Flink ์์ ์ ์ธ์ ์ ์ ์ถํฉ๋๋ค.
flink run -m <var>FLINK_MASTER_URL</var>/usr/lib/flink/examples/batch/WordCount.jar
- FLINK_MASTER_URL: ํธ์คํ
๊ณผ ํฌํ
์ ํฌํจํ ์์
์ด ์คํ๋๋ Flink ๋ง์คํฐ VM์ URL์
๋๋ค.
URL์์
http:// prefix
๋ฅผ ์ญ์ ํฉ๋๋ค. ์ด URL์ Flink ์ธ์ ์ ์์ํ ๋ ๋ช ๋ น์ด ์ถ๋ ฅ์ ๋์ด๋ฉ๋๋ค. ๋ค์ ๋ช ๋ น์ด๋ฅผ ์คํํ์ฌ ์ด URL์Tracking-URL
ํ๋์ ๋์ดํ ์ ์์ต๋๋ค.
yarn application -list -appId=<yarn-app-id> | sed 's#http://##' ```
- FLINK_MASTER_URL: ํธ์คํ
๊ณผ ํฌํ
์ ํฌํจํ ์์
์ด ์คํ๋๋ Flink ๋ง์คํฐ VM์ URL์
๋๋ค.
URL์์
์ธ์ ์์ ์์ ๋์ด: ์ธ์ ์์ Flink ์์ ์ ๋์ดํ๋ ค๋ฉด ๋ค์ ์ค ํ๋๋ฅผ ์ํํฉ๋๋ค.
์ธ์ ์์ด
flink list
๋ฅผ ์คํํฉ๋๋ค. ์ด ๋ช ๋ น์ด๋/tmp/.yarn-properties-${USER}
์์ ์ธ์ ์ YARN ์ ํ๋ฆฌ์ผ์ด์ ID๋ฅผ ์ฐพ์ต๋๋ค./tmp/.yarn-properties-${USER}
๋๋yarn application -list
์ ์ถ๋ ฅ์์ ์ธ์ ์ YARN ์ ํ๋ฆฌ์ผ์ด์ ID๋ฅผ ๊ฐ์ ธ์จ ๋ค์<code>
flink ๋ชฉ๋ก -yid YARN_APPLICATION_ID๋ฅผ ์คํํฉ๋๋ค.flink list -m FLINK_MASTER_URL
๋ฅผ ์คํํฉ๋๋ค.
์ธ์ ์ค์ง: ์ธ์ ์ ์ค์งํ๋ ค๋ฉด
/tmp/.yarn-properties-${USER}
๋๋yarn application -list
์ถ๋ ฅ์์ ์ธ์ ์ YARN ์ ํ๋ฆฌ์ผ์ด์ ID๋ฅผ ๊ฐ์ ธ์จ ํ ๋ค์ ๋ช ๋ น์ด ์ค ํ๋๋ฅผ ์คํํฉ๋๋ค.echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID
Flink์์ Apache Beam ์์ ์คํ
Dataproc์์ FlinkRunner
๋ฅผ ์ฌ์ฉํ์ฌ Apache Beam์ ์คํํ ์ ์์ต๋๋ค.
๋ค์๊ณผ ๊ฐ์ ๋ฐฉ๋ฒ์ผ๋ก Flink์์ Beam ์์ ์ ์คํํ ์ ์์ต๋๋ค.
- ์๋ฐ Beam ์์
- ํฌํฐ๋ธ Beam ์์
์๋ฐ Beam ์์
Beam ์์ ์ JAR ํ์ผ๋ก ํจํค์งํฉ๋๋ค. ์์ ์ ์คํํ๋ ๋ฐ ํ์ํ ์ข ์ ํญ๋ชฉ์ด ํฌํจ๋ ๋ฒ๋ค JAR ํ์ผ์ ์ ๊ณตํ์ธ์.
๋ค์ ์์์์๋ Dataproc ํด๋ฌ์คํฐ์ ๋ง์คํฐ ๋ ธ๋์์ ์๋ฐ Beam ์์ ์ ์คํํฉ๋๋ค.
Flink ๊ตฌ์ฑ์์๋ฅผ ์ฌ์ฉ ์ค์ ํ์ฌ Dataproc ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ญ๋๋ค.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
--optional-components
: Flink.--image-version
: ํด๋ฌ์คํฐ์ ์ค์น๋ Flink ๋ฒ์ ์ ๊ฒฐ์ ํ๋ ํด๋ฌ์คํฐ์ ์ด๋ฏธ์ง ๋ฒ์ . ์๋ฅผ ๋ค์ด ์ต์ ๋ฐ ์ด์ 2.0.x ์ด๋ฏธ์ง ์ถ์ ๋ฒ์ 4๊ฐ์ ๋ํ์ฌ ๋์ด๋ Apache Flink ๊ตฌ์ฑ์์ ๋ฒ์ ์ ์ฐธ์กฐํ์ธ์.--region
: ์ง์๋๋ Dataproc ๋ฆฌ์ --enable-component-gateway
: Flink Job Manager UI์ ๋ํ ์ก์ธ์ค ์ฌ์ฉ ์ค์ --scopes
: ํด๋ฌ์คํฐ๋ณ๋ก Google Cloud API์ ๋ํ ์ก์ธ์ค๋ฅผ ์ฌ์ฉ ์ค์ ํฉ๋๋ค(๋ฒ์ ๊ถ์ฅ์ฌํญ ์ฐธ๊ณ ). Dataproc ์ด๋ฏธ์ง ๋ฒ์ 2.1 ์ด์์ ์ฌ์ฉํ๋ ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ค ๋cloud-platform
๋ฒ์๊ฐ ๊ธฐ๋ณธ์ ์ผ๋ก ์ฌ์ฉ ์ค์ ๋ฉ๋๋ค(์ด ํ๋๊ทธ ์ค์ ์ ํฌํจํ์ง ์์๋ ๋จ).
SSH ์ ํธ๋ฆฌํฐ๋ฅผ ์ฌ์ฉํ์ฌ Flink ํด๋ฌ์คํฐ ๋ง์คํฐ ๋ ธ๋์์ ํฐ๋ฏธ๋ ์ฐฝ์ ์ฝ๋๋ค.
Dataproc ํด๋ฌ์คํฐ ๋ง์คํฐ ๋ ธ๋์์ Flink YARN ์ธ์ ์ ์์ํฉ๋๋ค.
. /usr/bin/flink-yarn-daemon
Dataproc ํด๋ฌ์คํฐ์ Flink ๋ฒ์ ์ ๊ธฐ๋กํฉ๋๋ค.
flink --version
๋ก์ปฌ ๋จธ์ ์์ ์๋ฐ์ ํ์ค Beam ๋จ์ด ์ ์์๋ฅผ ์์ฑํฉ๋๋ค.
Dataproc ํด๋ฌ์คํฐ์ Flink ๋ฒ์ ๊ณผ ํธํ๋๋ Beam ๋ฒ์ ์ ์ ํํฉ๋๋ค. Beam-Flink ๋ฒ์ ํธํ์ฑ์ด ๋์ด๋ Flink ๋ฒ์ ํธํ์ฑ ํ๋ฅผ ์ฐธ์กฐํ์ธ์.
์์ฑ๋ POM ํ์ผ์ ์ฝ๋๋ค.
<flink.artifact.name>
ํ๊ทธ๋ก ์ง์ ๋ Beam Flink ์คํ๊ธฐ ๋ฒ์ ์ ํ์ธํฉ๋๋ค. Flink ์ํฐํฉํธ ์ด๋ฆ์ Beam Flink ์คํ๊ธฐ ๋ฒ์ ์ด ํด๋ฌ์คํฐ์ Flink ๋ฒ์ ๊ณผ ์ผ์นํ์ง ์์ผ๋ฉด ์ผ์นํ๋๋ก ๋ฒ์ ๋ฒํธ๋ฅผ ์ ๋ฐ์ดํธํฉ๋๋ค.mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
๋จ์ด ์ ์์๋ฅผ ํจํค์งํฉ๋๋ค.
mvn package -Pflink-runner
ํจํค์ง๋ uber JAR ํ์ผ
word-count-beam-bundled-0.1.jar
(135MB ์ดํ)๋ฅผ Dataproc ํด๋ฌ์คํฐ์ ๋ง์คํฐ ๋ ธ๋์ ์ ๋ก๋ํฉ๋๋ค.gcloud storage cp
๋ฅผ ์ฌ์ฉํ๋ฉด Cloud Storage์์ Dataproc ํด๋ฌ์คํฐ๋ก ํ์ผ์ ๋ ๋น ๋ฅด๊ฒ ์ ์กํ ์ ์์ต๋๋ค.๋ก์ปฌ ํฐ๋ฏธ๋์์ Cloud Storage ๋ฒํท์ ๋ง๋ค๊ณ uber JAR์ ์ ๋ก๋ํฉ๋๋ค.
gcloud storage buckets create BUCKET_NAME
gcloud storage cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
Dataproc์ ๋ง์คํฐ ๋ ธ๋์์ uber JAR์ ๋ค์ด๋ก๋ํฉ๋๋ค.
gcloud storage cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
Dataproc ํด๋ฌ์คํฐ์ ๋ง์คํฐ ๋ ธ๋์์ ์๋ฐ Beam ์์ ์ ์คํํฉ๋๋ค.
flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \ --runner=FlinkRunner \ --output=gs://BUCKET_NAME/java-wordcount-out
๊ฒฐ๊ณผ๊ฐ Cloud Storage ๋ฒํท์ ๊ธฐ๋ก๋์๋์ง ํ์ธํฉ๋๋ค.
gcloud storage cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
Flink YARN ์ธ์ ์ ์ค์งํฉ๋๋ค.
yarn application -list
yarn application -kill YARN_APPLICATION_ID
ํฌํฐ๋ธ Beam ์์
Python, Go, ๊ธฐํ ์ง์๋๋ ์ธ์ด๋ก Beam ์์
์ ์คํํ๋ ค๋ฉด Beam์ Flink Runner์ ์ค๋ช
๋ ๋๋ก FlinkRunner
๋ฐ PortableRunner
๋ฅผ ์ฌ์ฉํ๋ฉด ๋ฉ๋๋ค. ์ด๋์ฑ ํ๋ ์์ํฌ ๋ก๋๋งต๋ ์ฐธ์กฐํ์ธ์.
๋ค์ ์์์์๋ Dataproc ํด๋ฌ์คํฐ์ ๋ง์คํฐ ๋ ธ๋์์ ์ด๋ ๊ฐ๋ฅํ Beam ์์ ์ Python์ผ๋ก ์คํํฉ๋๋ค.
Flink ๋ฐ Docker ๊ตฌ์ฑ์์๊ฐ ๋ชจ๋ ์ฌ์ฉ ์ค์ ๋ Dataproc ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ญ๋๋ค.
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=FLINK,DOCKER \ --image-version=DATAPROC_IMAGE_VERSION \ --region=REGION \ --enable-component-gateway \ --scopes=https://www.googleapis.com/auth/cloud-platform
์ฐธ๊ณ :
--optional-components
: Flink ๋ฐ Docker--image-version
: ํด๋ฌ์คํฐ์ ์ค์น๋ Flink ๋ฒ์ ์ ๊ฒฐ์ ํ๋ ํด๋ฌ์คํฐ์ ์ด๋ฏธ์ง ๋ฒ์ . ์๋ฅผ ๋ค์ด ์ต์ ๋ฐ ์ด์ 2.0.x ์ด๋ฏธ์ง ์ถ์ ๋ฒ์ 4๊ฐ์ ๋ํ์ฌ ๋์ด๋ Apache Flink ๊ตฌ์ฑ์์ ๋ฒ์ ์ ์ฐธ์กฐํ์ธ์.--region
: ์ฌ์ฉ ๊ฐ๋ฅํ Dataproc ๋ฆฌ์ --enable-component-gateway
: Flink Job Manager UI์ ๋ํ ์ก์ธ์ค ์ฌ์ฉ ์ค์ --scopes
: ํด๋ฌ์คํฐ๋ณ๋ก Google Cloud API์ ๋ํ ์ก์ธ์ค๋ฅผ ์ฌ์ฉ ์ค์ ํฉ๋๋ค(๋ฒ์ ๊ถ์ฅ์ฌํญ ์ฐธ๊ณ ). Dataproc ์ด๋ฏธ์ง ๋ฒ์ 2.1 ์ด์์ ์ฌ์ฉํ๋ ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ค ๋cloud-platform
๋ฒ์๊ฐ ๊ธฐ๋ณธ์ ์ผ๋ก ์ฌ์ฉ ์ค์ ๋ฉ๋๋ค(์ด ํ๋๊ทธ ์ค์ ์ ํฌํจํ์ง ์์๋ ๋จ).
๋ก์ปฌ ๋๋ Cloud Shell์์ gcloud CLI๋ฅผ ์ฌ์ฉํ์ฌ Cloud Storage ๋ฒํท์ ๋ง๋ญ๋๋ค. ์ํ WordCount ํ๋ก๊ทธ๋จ์ ์คํํ ๋ BUCKET_NAME์ ์ง์ ํฉ๋๋ค.
gcloud storage buckets create BUCKET_NAME
ํด๋ฌ์คํฐ VM์ ํฐ๋ฏธ๋ ์ฐฝ์์ Flink YARN ์ธ์ ์ ์์ํฉ๋๋ค. ์์ ์ด ์คํ๋๋ Flink ๋ง์คํฐ์ ์ฃผ์์ธ Flink ๋ง์คํฐ URL์ ํ์ธํฉ๋๋ค. ์ํ WordCount ํ๋ก๊ทธ๋จ์ ์คํํ ๋ FLINK_MASTER_URL์ ์ง์ ํฉ๋๋ค.
. /usr/bin/flink-yarn-daemon
Dataproc ํด๋ฌ์คํฐ๋ฅผ ์คํํ๋ Flink ๋ฒ์ ์ ํ์ํฉ๋๋ค. ์ํ WordCount ํ๋ก๊ทธ๋จ์ ์คํํ ๋ FLINK_VERSION์ ์ง์ ํฉ๋๋ค.
flink --version
์์ ์ ํ์ํ Python ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ํด๋ฌ์คํฐ ๋ง์คํฐ ๋ ธ๋์ ์ค์นํฉ๋๋ค.
ํด๋ฌ์คํฐ์ Flink ๋ฒ์ ๊ณผ ํธํ๋๋ Beam ๋ฒ์ ์ ์ค์นํฉ๋๋ค.
python -m pip install apache-beam[gcp]==BEAM_VERSION
ํด๋ฌ์คํฐ ๋ง์คํฐ ๋ ธ๋์์ ์๋ ์ ์์๋ฅผ ์คํํฉ๋๋ค.
python -m apache_beam.examples.wordcount \ --runner=FlinkRunner \ --flink_version=FLINK_VERSION \ --flink_master=FLINK_MASTER_URL --flink_submit_uber_jar \ --output=gs://BUCKET_NAME/python-wordcount-out
์ฐธ๊ณ :
--runner
:FlinkRunner
.--flink_version
: FLINK_VERSION, ์์ ์ธ๊ธ๋จ--flink_master
: FLINK_MASTER_URL, ์์ ์ธ๊ธ๋จ--flink_submit_uber_jar
: uber JAR์ ์ฌ์ฉํ์ฌ Beam ์์ ์คํ--output
: BUCKET_NAME, ์์ ์์ฑ๋จ
๋ฒํท์ ๊ฒฐ๊ณผ๊ฐ ๊ธฐ๋ก๋์๋์ง ํ์ธํฉ๋๋ค.
gcloud storage cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
Flink YARN ์ธ์ ์ ์ค์งํฉ๋๋ค.
- ์ ํ๋ฆฌ์ผ์ด์ ID๋ฅผ ๊ฐ์ ธ์ต๋๋ค.
yarn application -list
1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.yarn application -kill
Kerberized ํด๋ฌ์คํฐ์์ Flink ์คํ
Dataproc Flink ๊ตฌ์ฑ์์๋ Kerberized ํด๋ฌ์คํฐ๋ฅผ ์ง์ํฉ๋๋ค. Flink ์์ ์ ์ ์ถํ๊ณ ์ ์งํ๊ฑฐ๋ Flink ํด๋ฌ์คํฐ๋ฅผ ์์ํ๋ ค๋ฉด ์ ํจํ Kerberos ํฐ์ผ์ด ํ์ํฉ๋๋ค. ๊ธฐ๋ณธ์ ์ผ๋ก Kerberos ํฐ์ผ์ 7์ผ ๋์ ์ ํจํฉ๋๋ค.
Flink ์์ ๊ด๋ฆฌ์ UI ์ก์ธ์ค
Flink ์์ ๋๋ Flink ์ธ์ ํด๋ฌ์คํฐ๋ฅผ ์คํํ๋ ๋์ Flink ์์ ๊ด๋ฆฌ์ ์น ์ธํฐํ์ด์ค๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค. ์น ์ธํฐํ์ด์ค๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด ๋ค์ ๋จ๊ณ๋ฅผ ๋ฐ๋ฅด์ธ์.
- Dataproc Flink ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ญ๋๋ค.
- ํด๋ฌ์คํฐ๋ฅผ ๋ง๋ ํ Google Cloud ์ฝ์์ ํด๋ฌ์คํฐ ์ธ๋ถ์ ๋ณด ํ์ด์ง์ ์๋ ์น ์ธํฐํ์ด์ค ํญ์์ ๊ตฌ์ฑ์์ ๊ฒ์ดํธ์จ์ด YARN ResourceManager ๋งํฌ๋ฅผ ํด๋ฆญํฉ๋๋ค.
- YARN Resource Manager UI์์ Flink ํด๋ฌ์คํฐ ์ ํ๋ฆฌ์ผ์ด์
ํญ๋ชฉ์ ์๋ณํฉ๋๋ค. ์์
์ ์๋ฃ ์ํ์ ๋ฐ๋ผ ApplicationMaster ๋๋ ๊ธฐ๋ก ๋งํฌ๊ฐ ๋์ด๋ฉ๋๋ค.
- ์ฅ๊ธฐ ์คํ ์คํธ๋ฆฌ๋ฐ ์์
์ ๊ฒฝ์ฐ ApplicationManager ๋งํฌ๋ฅผ ํด๋ฆญํ์ฌ Flink ๋์๋ณด๋๋ฅผ ์ฝ๋๋ค. ์๋ฃ๋ ์์
์ ๊ฒฝ์ฐ ๊ธฐ๋ก ๋งํฌ๋ฅผ ํด๋ฆญํ์ฌ ์์
์ธ๋ถ์ ๋ณด๋ฅผ ํ์ธํฉ๋๋ค.