Skip to content

Commit c46d04e

Browse files
MaksYermakpotiuk
authored andcommitted
Create system test for K8s and dataproc operators
1 parent 35cbc89 commit c46d04e

File tree

8 files changed

+239
-47
lines changed

8 files changed

+239
-47
lines changed

β€Žairflow/providers/google/cloud/hooks/dataproc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ def create_cluster(
294294
region: str,
295295
project_id: str,
296296
cluster_name: str,
297-
cluster_config: Union[Dict, Cluster],
297+
cluster_config: Union[Dict, Cluster, None],
298298
virtual_cluster_config: Optional[Dict] = None,
299299
run_in_gke_cluster: Optional[bool] = False,
300300
labels: Optional[Dict[str, str]] = None,

β€Žairflow/providers/google/cloud/operators/dataproc.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ def __init__(
459459
cluster_name: str,
460460
region: Optional[str] = None,
461461
project_id: Optional[str] = None,
462-
cluster_config: Optional[Dict] = None,
462+
cluster_config: Optional[Union[Dict, Cluster]] = None,
463463
virtual_cluster_config: Optional[Dict] = None,
464464
run_in_gke_cluster: bool = False,
465465
labels: Optional[Dict] = None,
@@ -482,7 +482,7 @@ def __init__(
482482
region = 'global'
483483

484484
# TODO: remove one day
485-
if cluster_config is None:
485+
if cluster_config is None and not run_in_gke_cluster:
486486
warnings.warn(
487487
f"Passing cluster parameters by keywords to `{type(self).__name__}` will be deprecated. "
488488
"Please provide cluster_config object using `cluster_config` parameter. "

β€Ždocs/apache-airflow-providers-google/operators/cloud/dataproc.rst

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,24 @@ With this configuration we can create the cluster:
5858
:start-after: [START how_to_cloud_dataproc_create_cluster_operator]
5959
:end-before: [END how_to_cloud_dataproc_create_cluster_operator]
6060

61+
For create Dataproc cluster in Google Kubernetes Engine you should enable run_in_gke_cluster flag
62+
and use this cluster configuration:
63+
64+
.. exampleinclude:: /../../tests/system/providers/google/dataproc/example_dataproc_gke.py
65+
:language: python
66+
:dedent: 0
67+
:start-after: [START how_to_cloud_dataproc_create_cluster_in_gke_config]
68+
:end-before: [END how_to_cloud_dataproc_create_cluster_in_gke_config]
69+
70+
With this configuration we can create the cluster:
71+
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateClusterOperator`
72+
73+
.. exampleinclude:: /../../tests/system/providers/google/dataproc/example_dataproc_gke.py
74+
:language: python
75+
:dedent: 4
76+
:start-after: [START how_to_cloud_dataproc_create_cluster_operator_in_gke]
77+
:end-before: [END how_to_cloud_dataproc_create_cluster_operator_in_gke]
78+
6179
Generating Cluster Config
6280
^^^^^^^^^^^^^^^^^^^^^^^^^
6381
You can also generate **CLUSTER_CONFIG** using functional API,

β€Ždocs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ Create GKE cluster
4343

4444
Here is an example of a cluster definition:
4545

46-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py
46+
.. exampleinclude:: /../../tests/system/providers/google/kubernetes_engine/example_kubernetes_engine.py
4747
:language: python
4848
:start-after: [START howto_operator_gcp_gke_create_cluster_definition]
4949
:end-before: [END howto_operator_gcp_gke_create_cluster_definition]
@@ -53,7 +53,7 @@ A dict object like this, or a
5353
definition, is required when creating a cluster with
5454
:class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKECreateClusterOperator`.
5555

56-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py
56+
.. exampleinclude:: /../../tests/system/providers/google/kubernetes_engine/example_kubernetes_engine.py
5757
:language: python
5858
:dedent: 4
5959
:start-after: [START howto_operator_gke_create_cluster]
@@ -68,7 +68,7 @@ To delete a cluster, use
6868
:class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKEDeleteClusterOperator`.
6969
This would also delete all the nodes allocated to the cluster.
7070

71-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py
71+
.. exampleinclude:: /../../tests/system/providers/google/kubernetes_engine/example_kubernetes_engine.py
7272
:language: python
7373
:dedent: 4
7474
:start-after: [START howto_operator_gke_delete_cluster]
@@ -117,15 +117,15 @@ is the path ``/airflow/xcom``. To provide values to the XCom, ensure your Pod wr
117117
``return.json`` in the sidecar. The contents of this can then be used downstream in your DAG.
118118
Here is an example of it being used:
119119

120-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py
120+
.. exampleinclude:: /../../tests/system/providers/google/kubernetes_engine/example_kubernetes_engine.py
121121
:language: python
122122
:dedent: 4
123123
:start-after: [START howto_operator_gke_start_pod_xcom]
124124
:end-before: [END howto_operator_gke_start_pod_xcom]
125125

126126
And then use it in other operators:
127127

128-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py
128+
.. exampleinclude:: /../../tests/system/providers/google/kubernetes_engine/example_kubernetes_engine.py
129129
:language: python
130130
:dedent: 4
131131
:start-after: [START howto_operator_gke_xcom_result]

β€Žtests/providers/google/cloud/operators/test_dataproc.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,21 @@
113113
],
114114
"endpoint_config": {},
115115
}
116+
VIRTUAL_CLUSTER_CONFIG = {
117+
"kubernetes_cluster_config": {
118+
"gke_cluster_config": {
119+
"gke_cluster_target": "projects/project_id/locations/region/clusters/gke_cluster_name",
120+
"node_pool_target": [
121+
{
122+
"node_pool": "projects/project_id/locations/region/clusters/gke_cluster_name/nodePools/dp", # noqa
123+
"roles": ["DEFAULT"],
124+
}
125+
],
126+
},
127+
"kubernetes_software_config": {"component_version": {"SPARK": b'3'}},
128+
},
129+
"staging_bucket": "test-staging-bucket",
130+
}
116131

117132
CONFIG_WITH_CUSTOM_IMAGE_FAMILY = {
118133
"gce_cluster_config": {
@@ -424,6 +439,8 @@ def test_execute(self, mock_hook, to_dict_mock):
424439
'metadata': METADATA,
425440
'cluster_config': CONFIG,
426441
'labels': LABELS,
442+
'run_in_gke_cluster': False,
443+
'virtual_cluster_config': None,
427444
}
428445
expected_calls = self.extra_links_expected_calls_base + [
429446
call.hook().create_cluster(**create_cluster_args),
@@ -457,6 +474,57 @@ def test_execute(self, mock_hook, to_dict_mock):
457474
execution_date=None,
458475
)
459476

477+
@mock.patch(DATAPROC_PATH.format("Cluster.to_dict"))
478+
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
479+
def test_execute_in_gke(self, mock_hook, to_dict_mock):
480+
self.extra_links_manager_mock.attach_mock(mock_hook, 'hook')
481+
mock_hook.return_value.create_cluster.result.return_value = None
482+
create_cluster_args = {
483+
'region': GCP_LOCATION,
484+
'project_id': GCP_PROJECT,
485+
'cluster_name': CLUSTER_NAME,
486+
'request_id': REQUEST_ID,
487+
'retry': RETRY,
488+
'timeout': TIMEOUT,
489+
'metadata': METADATA,
490+
'cluster_config': None,
491+
'labels': LABELS,
492+
'run_in_gke_cluster': True,
493+
'virtual_cluster_config': VIRTUAL_CLUSTER_CONFIG,
494+
}
495+
expected_calls = self.extra_links_expected_calls_base + [
496+
call.hook().create_cluster(**create_cluster_args),
497+
]
498+
499+
op = DataprocCreateClusterOperator(
500+
task_id=TASK_ID,
501+
region=GCP_LOCATION,
502+
labels=LABELS,
503+
cluster_name=CLUSTER_NAME,
504+
project_id=GCP_PROJECT,
505+
run_in_gke_cluster=True,
506+
virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG,
507+
request_id=REQUEST_ID,
508+
gcp_conn_id=GCP_CONN_ID,
509+
retry=RETRY,
510+
timeout=TIMEOUT,
511+
metadata=METADATA,
512+
impersonation_chain=IMPERSONATION_CHAIN,
513+
)
514+
op.execute(context=self.mock_context)
515+
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
516+
mock_hook.return_value.create_cluster.assert_called_once_with(**create_cluster_args)
517+
518+
# Test whether xcom push occurs before create cluster is called
519+
self.extra_links_manager_mock.assert_has_calls(expected_calls, any_order=False)
520+
521+
to_dict_mock.assert_called_once_with(mock_hook().create_cluster().result())
522+
self.mock_ti.xcom_push.assert_called_once_with(
523+
key="conf",
524+
value=DATAPROC_CLUSTER_CONF_EXPECTED,
525+
execution_date=None,
526+
)
527+
460528
@mock.patch(DATAPROC_PATH.format("Cluster.to_dict"))
461529
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
462530
def test_execute_if_cluster_exists(self, mock_hook, to_dict_mock):
@@ -488,6 +556,8 @@ def test_execute_if_cluster_exists(self, mock_hook, to_dict_mock):
488556
retry=RETRY,
489557
timeout=TIMEOUT,
490558
metadata=METADATA,
559+
run_in_gke_cluster=False,
560+
virtual_cluster_config=None,
491561
)
492562
mock_hook.return_value.get_cluster.assert_called_once_with(
493563
region=GCP_LOCATION,

β€Žtests/providers/google/cloud/operators/test_kubernetes_engine_system.py

Lines changed: 0 additions & 35 deletions
This file was deleted.
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
"""
19+
Example Airflow DAG that show how to create a Dataproc cluster in Google Kubernetes Engine.
20+
"""
21+
22+
import os
23+
from datetime import datetime
24+
25+
from airflow import models
26+
from airflow.providers.google.cloud.operators.dataproc import (
27+
DataprocCreateClusterOperator,
28+
DataprocDeleteClusterOperator,
29+
)
30+
from airflow.providers.google.cloud.operators.kubernetes_engine import (
31+
GKECreateClusterOperator,
32+
GKEDeleteClusterOperator,
33+
)
34+
from airflow.utils.trigger_rule import TriggerRule
35+
36+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
37+
DAG_ID = "dataproc-gke"
38+
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
39+
40+
REGION = "us-central1"
41+
CLUSTER_NAME = f"cluster-test-build-in-gke{ENV_ID}"
42+
GKE_CLUSTER_NAME = f"test-dataproc-gke-cluster-{ENV_ID}"
43+
GKE_CLUSTER_CONFIG = {
44+
"name": GKE_CLUSTER_NAME,
45+
"workload_identity_config": {
46+
"workload_pool": f"{PROJECT_ID}.svc.id.goog",
47+
},
48+
"initial_node_count": 1,
49+
}
50+
51+
# [START how_to_cloud_dataproc_create_cluster_in_gke_config]
52+
53+
VIRTUAL_CLUSTER_CONFIG = {
54+
"kubernetes_cluster_config": {
55+
"gke_cluster_config": {
56+
"gke_cluster_target": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}",
57+
"node_pool_target": [
58+
{
59+
"node_pool": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}/nodePools/dp", # noqa
60+
"roles": ["DEFAULT"],
61+
}
62+
],
63+
},
64+
"kubernetes_software_config": {"component_version": {"SPARK": b'3'}},
65+
},
66+
"staging_bucket": "test-staging-bucket",
67+
}
68+
69+
# [END how_to_cloud_dataproc_create_cluster_in_gke_config]
70+
71+
72+
with models.DAG(
73+
DAG_ID,
74+
schedule_interval='@once',
75+
start_date=datetime(2021, 1, 1),
76+
catchup=False,
77+
tags=['example'],
78+
) as dag:
79+
create_gke_cluster = GKECreateClusterOperator(
80+
task_id="create_gke_cluster",
81+
project_id=PROJECT_ID,
82+
location=REGION,
83+
body=GKE_CLUSTER_CONFIG,
84+
)
85+
86+
# [START how_to_cloud_dataproc_create_cluster_operator_in_gke]
87+
create_cluster_in_gke = DataprocCreateClusterOperator(
88+
task_id="create_cluster_in_gke",
89+
project_id=PROJECT_ID,
90+
region=REGION,
91+
cluster_name=CLUSTER_NAME,
92+
run_in_gke_cluster=True,
93+
virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG,
94+
)
95+
# [END how_to_cloud_dataproc_create_cluster_operator_in_gke]
96+
97+
delete_dataproc_cluster = DataprocDeleteClusterOperator(
98+
task_id="delete_dataproc_cluster",
99+
project_id=PROJECT_ID,
100+
cluster_name=CLUSTER_NAME,
101+
region=REGION,
102+
trigger_rule=TriggerRule.ALL_DONE,
103+
)
104+
105+
delete_gke_cluster = GKEDeleteClusterOperator(
106+
task_id="delete_gke_cluster",
107+
name=GKE_CLUSTER_NAME,
108+
project_id=PROJECT_ID,
109+
location=REGION,
110+
trigger_rule=TriggerRule.ALL_DONE,
111+
)
112+
113+
create_gke_cluster >> create_cluster_in_gke >> [delete_dataproc_cluster, delete_gke_cluster]
114+
115+
from tests.system.utils.watcher import watcher
116+
117+
# This test needs watcher in order to properly mark success/failure
118+
# when "teardown" task with trigger rule is part of the DAG
119+
list(dag.tasks) >> watcher()
120+
121+
from tests.system.utils import get_test_run # noqa: E402
122+
123+
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
124+
test_run = get_test_run(dag)

β€Žairflow/providers/google/cloud/example_dags/example_kubernetes_engine.py renamed to β€Žtests/system/providers/google/kubernetes_engine/example_kubernetes_engine.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,19 @@
3030
GKEStartPodOperator,
3131
)
3232

33-
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
34-
GCP_LOCATION = os.environ.get("GCP_GKE_LOCATION", "europe-north1-a")
35-
CLUSTER_NAME = os.environ.get("GCP_GKE_CLUSTER_NAME", "cluster-name")
33+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
34+
DAG_ID = "kubernetes_engine"
35+
GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
36+
37+
GCP_LOCATION = "europe-north1-a"
38+
CLUSTER_NAME = f"cluster-name-test-build-{ENV_ID}"
3639

3740
# [START howto_operator_gcp_gke_create_cluster_definition]
3841
CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1}
3942
# [END howto_operator_gcp_gke_create_cluster_definition]
4043

4144
with models.DAG(
42-
"example_gcp_gke",
45+
DAG_ID,
4346
schedule_interval='@once', # Override to match your needs
4447
start_date=datetime(2021, 1, 1),
4548
catchup=False,
@@ -101,3 +104,15 @@
101104
create_cluster >> pod_task >> delete_cluster
102105
create_cluster >> pod_task_xcom >> delete_cluster
103106
pod_task_xcom >> pod_task_xcom_result
107+
108+
from tests.system.utils.watcher import watcher
109+
110+
# This test needs watcher in order to properly mark success/failure
111+
# when "teardown" task with trigger rule is part of the DAG
112+
list(dag.tasks) >> watcher()
113+
114+
115+
from tests.system.utils import get_test_run # noqa: E402
116+
117+
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
118+
test_run = get_test_run(dag)

0 commit comments

Comments
 (0)