Skip to content

Commit 583f407

Browse files
authored
Fixup docstring for deprecated DataprocSubmitSparkJobOperator and refactoring system tests (#32743)
1 parent f2e9331 commit 583f407

File tree

4 files changed

+35
-20
lines changed

4 files changed

+35
-20
lines changed

β€Žairflow/providers/google/cloud/operators/dataproc.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1384,6 +1384,10 @@ def execute(self, context: Context):
13841384
class DataprocSubmitSparkJobOperator(DataprocJobBaseOperator):
13851385
"""Start a Spark Job on a Cloud DataProc cluster.
13861386
1387+
.. seealso::
1388+
This operator is deprecated, please use
1389+
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`:
1390+
13871391
:param main_jar: The HCFS URI of the jar file that contains the main class
13881392
(use this or the main_class, not both together).
13891393
:param main_class: Name of the job class. (use this or the main_jar, not both

β€Žtests/system/providers/google/cloud/dataproc/example_dataproc_spark.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,8 @@
3535
DAG_ID = "dataproc_spark"
3636
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
3737

38-
CLUSTER_NAME = f"cluster-dataproc-spark-{ENV_ID}"
38+
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
3939
REGION = "europe-west1"
40-
ZONE = "europe-west1-b"
41-
4240

4341
# Cluster definition
4442
CLUSTER_CONFIG = {
@@ -54,8 +52,6 @@
5452
},
5553
}
5654

57-
TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
58-
5955
# Jobs definitions
6056
# [START how_to_cloud_dataproc_spark_config]
6157
SPARK_JOB = {
@@ -74,7 +70,7 @@
7470
schedule="@once",
7571
start_date=datetime(2021, 1, 1),
7672
catchup=False,
77-
tags=["example", "dataproc"],
73+
tags=["example", "dataproc", "spark"],
7874
) as dag:
7975
create_cluster = DataprocCreateClusterOperator(
8076
task_id="create_cluster",
@@ -96,7 +92,14 @@
9692
trigger_rule=TriggerRule.ALL_DONE,
9793
)
9894

99-
create_cluster >> spark_task >> delete_cluster
95+
(
96+
# TEST SETUP
97+
create_cluster
98+
# TEST BODY
99+
>> spark_task
100+
# TEST TEARDOWN
101+
>> delete_cluster
102+
)
100103

101104
from tests.system.utils.watcher import watcher
102105

β€Žtests/system/providers/google/cloud/dataproc/example_dataproc_spark_async.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,8 @@
3636
DAG_ID = "dataproc_spark_async"
3737
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
3838

39-
CLUSTER_NAME = f"dataproc-spark-async-{ENV_ID}"
39+
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
4040
REGION = "europe-west1"
41-
ZONE = "europe-west1-b"
4241

4342
# Cluster definition
4443
CLUSTER_CONFIG = {
@@ -54,8 +53,6 @@
5453
},
5554
}
5655

57-
TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
58-
5956
# Jobs definitions
6057
SPARK_JOB = {
6158
"reference": {"project_id": PROJECT_ID},
@@ -72,7 +69,7 @@
7269
schedule="@once",
7370
start_date=datetime(2021, 1, 1),
7471
catchup=False,
75-
tags=["example", "dataproc"],
72+
tags=["example", "dataproc", "spark", "async"],
7673
) as dag:
7774
create_cluster = DataprocCreateClusterOperator(
7875
task_id="create_cluster",
@@ -104,7 +101,15 @@
104101
trigger_rule=TriggerRule.ALL_DONE,
105102
)
106103

107-
create_cluster >> spark_task_async >> spark_task_async_sensor >> delete_cluster
104+
(
105+
# TEST SETUP
106+
create_cluster
107+
# TEST BODY
108+
>> spark_task_async
109+
>> spark_task_async_sensor
110+
# TEST TEARDOWN
111+
>> delete_cluster
112+
)
108113

109114
from tests.system.utils.watcher import watcher
110115

β€Žtests/system/providers/google/cloud/dataproc/example_dataproc_spark_deferrable.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,8 @@
3636
DAG_ID = "dataproc_spark_deferrable"
3737
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
3838

39-
CLUSTER_NAME = f"cluster-dataproc-spark-{ENV_ID}"
39+
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
4040
REGION = "europe-west1"
41-
ZONE = "europe-west1-b"
42-
4341

4442
# Cluster definition
4543
CLUSTER_CONFIG = {
@@ -55,8 +53,6 @@
5553
},
5654
}
5755

58-
TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
59-
6056
# Jobs definitions
6157
# [START how_to_cloud_dataproc_spark_deferrable_config]
6258
SPARK_JOB = {
@@ -75,7 +71,7 @@
7571
schedule_interval="@once",
7672
start_date=datetime(2021, 1, 1),
7773
catchup=False,
78-
tags=["example", "dataproc"],
74+
tags=["example", "dataproc", "spark", "deferrable"],
7975
) as dag:
8076
create_cluster = DataprocCreateClusterOperator(
8177
task_id="create_cluster",
@@ -97,7 +93,14 @@
9793
trigger_rule=TriggerRule.ALL_DONE,
9894
)
9995

100-
create_cluster >> spark_task >> delete_cluster
96+
(
97+
# TEST SETUP
98+
create_cluster
99+
# TEST BODY
100+
>> spark_task
101+
# TEST TEARDOWN
102+
>> delete_cluster
103+
)
101104

102105
from tests.system.utils.watcher import watcher
103106

0 commit comments

Comments
 (0)