Skip to content

Commit 623163f

Browse files
authored
Extend dataproc example dag (#21091)
1 parent aac5a1d commit 623163f

File tree

4 files changed

+78
-1
lines changed

4 files changed

+78
-1
lines changed

β€Žairflow/providers/google/cloud/example_dags/example_dataproc.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import os
2424
from datetime import datetime
25+
from uuid import uuid4
2526

2627
from airflow import models
2728
from airflow.providers.google.cloud.operators.dataproc import (
@@ -178,13 +179,38 @@
178179
},
179180
"jobs": [{"step_id": "pig_job_1", "pig_job": PIG_JOB["pig_job"]}],
180181
}
181-
BATCH_ID = "test-batch-id"
182+
BATCH_ID = f"test-batch-id-{str(uuid4())}"
182183
BATCH_CONFIG = {
183184
"spark_batch": {
184185
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
185186
"main_class": "org.apache.spark.examples.SparkPi",
186187
},
187188
}
189+
CLUSTER_GENERATOR_CONFIG_FOR_PHS = ClusterGenerator(
190+
project_id=PROJECT_ID,
191+
region=REGION,
192+
master_machine_type="n1-standard-4",
193+
worker_machine_type="n1-standard-4",
194+
num_workers=0,
195+
properties={
196+
"spark:spark.history.fs.logDirectory": f"gs://{BUCKET}/logging",
197+
},
198+
enable_component_gateway=True,
199+
).make()
200+
CLUSTER_NAME_FOR_PHS = "phs-cluster-name"
201+
BATCH_CONFIG_WITH_PHS = {
202+
"spark_batch": {
203+
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
204+
"main_class": "org.apache.spark.examples.SparkPi",
205+
},
206+
"environment_config": {
207+
"peripherals_config": {
208+
"spark_history_server_config": {
209+
"dataproc_cluster": f"projects/{PROJECT_ID}/regions/{REGION}/clusters/{CLUSTER_NAME_FOR_PHS}"
210+
}
211+
}
212+
},
213+
}
188214

189215

190216
with models.DAG(
@@ -310,6 +336,26 @@
310336
)
311337
# [END how_to_cloud_dataproc_create_batch_operator]
312338

339+
# [START how_to_cloud_dataproc_create_cluster_for_persistent_history_server]
340+
create_cluster_for_phs = DataprocCreateClusterOperator(
341+
task_id="create_cluster_for_phs",
342+
project_id=PROJECT_ID,
343+
cluster_config=CLUSTER_GENERATOR_CONFIG_FOR_PHS,
344+
region=REGION,
345+
cluster_name=CLUSTER_NAME_FOR_PHS,
346+
)
347+
# [END how_to_cloud_dataproc_create_cluster_for_persistent_history_server]
348+
349+
# [START how_to_cloud_dataproc_create_batch_operator_with_persistent_history_server]
350+
create_batch_with_phs = DataprocCreateBatchOperator(
351+
task_id="create_batch_with_phs",
352+
project_id=PROJECT_ID,
353+
region=REGION,
354+
batch=BATCH_CONFIG_WITH_PHS,
355+
batch_id=BATCH_ID,
356+
)
357+
# [END how_to_cloud_dataproc_create_batch_operator_with_persistent_history_server]
358+
313359
# [START how_to_cloud_dataproc_get_batch_operator]
314360
get_batch = DataprocGetBatchOperator(
315361
task_id="get_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
@@ -331,3 +377,4 @@
331377
# [END how_to_cloud_dataproc_delete_batch_operator]
332378

333379
create_batch >> get_batch >> list_batches >> delete_batch
380+
create_cluster_for_phs >> create_batch_with_phs

β€Žairflow/providers/google/cloud/operators/dataproc.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ class ClusterGenerator:
161161
A duration in seconds. (If auto_delete_time is set this parameter will be ignored)
162162
:param customer_managed_key: The customer-managed key used for disk encryption
163163
``projects/[PROJECT_STORING_KEYS]/locations/[LOCATION]/keyRings/[KEY_RING_NAME]/cryptoKeys/[KEY_NAME]`` # noqa
164+
:param enable_component_gateway: Provides access to the web interfaces of default and selected optional
165+
components on the cluster.
164166
"""
165167

166168
def __init__(
@@ -197,6 +199,7 @@ def __init__(
197199
auto_delete_time: Optional[datetime] = None,
198200
auto_delete_ttl: Optional[int] = None,
199201
customer_managed_key: Optional[str] = None,
202+
enable_component_gateway: Optional[bool] = False,
200203
**kwargs,
201204
) -> None:
202205

@@ -232,6 +235,7 @@ def __init__(
232235
self.auto_delete_time = auto_delete_time
233236
self.auto_delete_ttl = auto_delete_ttl
234237
self.customer_managed_key = customer_managed_key
238+
self.enable_component_gateway = enable_component_gateway
235239
self.single_node = num_workers == 0
236240

237241
if self.custom_image and self.image_version:
@@ -339,6 +343,7 @@ def _build_cluster_data(self):
339343
'lifecycle_config': {},
340344
'encryption_config': {},
341345
'autoscaling_config': {},
346+
'endpoint_config': {},
342347
}
343348
if self.num_preemptible_workers > 0:
344349
cluster_data['secondary_worker_config'] = {
@@ -401,6 +406,8 @@ def _build_cluster_data(self):
401406
cluster_data['encryption_config'] = {'gce_pd_kms_key_name': self.customer_managed_key}
402407
if self.autoscaling_policy:
403408
cluster_data['autoscaling_config'] = {'policy_uri': self.autoscaling_policy}
409+
if self.enable_component_gateway:
410+
cluster_data['endpoint_config'] = {'enable_http_port_access': self.enable_component_gateway}
404411

405412
return cluster_data
406413

β€Ždocs/apache-airflow-providers-google/operators/cloud/dataproc.rst

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,24 @@ A batch can be created using:
226226
:start-after: [START how_to_cloud_dataproc_create_batch_operator]
227227
:end-before: [END how_to_cloud_dataproc_create_batch_operator]
228228

229+
For creating a batch with Persistent History Server first you should create a Dataproc Cluster
230+
with specific parameters. Documentation how create cluster you can find here:
231+
https://cloud.google.com/dataproc/docs/concepts/jobs/history-server#setting_up_a_persistent_history_server
232+
233+
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc.py
234+
:language: python
235+
:dedent: 4
236+
:start-after: [START how_to_cloud_dataproc_create_cluster_for_persistent_history_server]
237+
:end-before: [END how_to_cloud_dataproc_create_cluster_for_persistent_history_server]
238+
239+
After Cluster was created you should add it to the Batch configuration.
240+
241+
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc.py
242+
:language: python
243+
:dedent: 4
244+
:start-after: [START how_to_cloud_dataproc_create_batch_operator_with_persistent_history_server]
245+
:end-before: [END how_to_cloud_dataproc_create_batch_operator_with_persistent_history_server]
246+
229247
Get a Batch
230248
-----------
231249

β€Žtests/providers/google/cloud/operators/test_dataproc.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@
109109
"initialization_actions": [
110110
{"executable_file": "init_actions_uris", "execution_timeout": {'seconds': 600}}
111111
],
112+
"endpoint_config": {},
112113
}
113114

114115
CONFIG_WITH_CUSTOM_IMAGE_FAMILY = {
@@ -153,6 +154,9 @@
153154
"initialization_actions": [
154155
{"executable_file": "init_actions_uris", "execution_timeout": {'seconds': 600}}
155156
],
157+
"endpoint_config": {
158+
"enable_http_port_access": True,
159+
},
156160
}
157161

158162
LABELS = {"labels": "data", "airflow-version": AIRFLOW_VERSION}
@@ -367,6 +371,7 @@ def test_build_with_custom_image_family(self):
367371
auto_delete_time=datetime(2019, 9, 12),
368372
auto_delete_ttl=250,
369373
customer_managed_key="customer_managed_key",
374+
enable_component_gateway=True,
370375
)
371376
cluster = generator.make()
372377
assert CONFIG_WITH_CUSTOM_IMAGE_FAMILY == cluster

0 commit comments

Comments
 (0)