Skip to content

Commit a43e98d

Browse files
authored
Fix DataprocJobBaseOperator not being compatible with dotted names (#23439). (#23791)
* job_name parameter is now sanitized, replacing dots by underscores.
1 parent 509b277 commit a43e98d

File tree

3 files changed

+52
-22
lines changed

3 files changed

+52
-22
lines changed

β€Žairflow/providers/google/cloud/hooks/dataproc.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def __init__(
5858
job_type: str,
5959
properties: Optional[Dict[str, str]] = None,
6060
) -> None:
61-
name = task_id + "_" + str(uuid.uuid4())[:8]
61+
name = f"{task_id.replace('.', '_')}_{uuid.uuid4()!s:.8}"
6262
self.job_type = job_type
6363
self.job = {
6464
"job": {
@@ -175,11 +175,12 @@ def set_python_main(self, main: str) -> None:
175175

176176
def set_job_name(self, name: str) -> None:
177177
"""
178-
Set Dataproc job name.
178+
Set Dataproc job name. Job name is sanitized, replacing dots by underscores.
179179
180180
:param name: Job name.
181181
"""
182-
self.job["job"]["reference"]["job_id"] = name + "_" + str(uuid.uuid4())[:8]
182+
sanitized_name = f"{name.replace('.', '_')}_{uuid.uuid4()!s:.8}"
183+
self.job["job"]["reference"]["job_id"] = sanitized_name
183184

184185
def build(self) -> Dict:
185186
"""

β€Žtests/providers/google/cloud/hooks/test_dataproc.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import pytest
2424
from google.api_core.gapic_v1.method import DEFAULT
2525
from google.cloud.dataproc_v1 import JobStatus
26+
from parameterized import parameterized
2627

2728
from airflow.exceptions import AirflowException
2829
from airflow.providers.google.cloud.hooks.dataproc import DataprocHook, DataProcJobBuilder
@@ -472,27 +473,28 @@ def setUp(self) -> None:
472473
properties={"test": "test"},
473474
)
474475

476+
@parameterized.expand([TASK_ID, f"group.{TASK_ID}"])
475477
@mock.patch(DATAPROC_STRING.format("uuid.uuid4"))
476-
def test_init(self, mock_uuid):
478+
def test_init(self, job_name, mock_uuid):
477479
mock_uuid.return_value = "uuid"
478480
properties = {"test": "test"}
479-
job = {
481+
expected_job_id = f"{job_name}_{mock_uuid.return_value}".replace(".", "_")
482+
expected_job = {
480483
"job": {
481484
"labels": {"airflow-version": AIRFLOW_VERSION},
482485
"placement": {"cluster_name": CLUSTER_NAME},
483-
"reference": {"job_id": TASK_ID + "_uuid", "project_id": GCP_PROJECT},
486+
"reference": {"job_id": expected_job_id, "project_id": GCP_PROJECT},
484487
"test": {"properties": properties},
485488
}
486489
}
487490
builder = DataProcJobBuilder(
488491
project_id=GCP_PROJECT,
489-
task_id=TASK_ID,
492+
task_id=job_name,
490493
cluster_name=CLUSTER_NAME,
491494
job_type="test",
492495
properties=properties,
493496
)
494-
495-
assert job == builder.job
497+
assert expected_job == builder.job
496498

497499
def test_add_labels(self):
498500
labels = {"key": "value"}
@@ -559,14 +561,22 @@ def test_set_python_main(self):
559561
self.builder.set_python_main(main)
560562
assert main == self.builder.job["job"][self.job_type]["main_python_file_uri"]
561563

564+
@parameterized.expand(
565+
[
566+
("simple", "name"),
567+
("name with underscores", "name_with_dash"),
568+
("name with dot", "group.name"),
569+
("name with dot and underscores", "group.name_with_dash"),
570+
]
571+
)
562572
@mock.patch(DATAPROC_STRING.format("uuid.uuid4"))
563-
def test_set_job_name(self, mock_uuid):
573+
def test_set_job_name(self, name, job_name, mock_uuid):
564574
uuid = "test_uuid"
575+
expected_job_name = f"{job_name}_{uuid[:8]}".replace(".", "_")
565576
mock_uuid.return_value = uuid
566-
name = "name"
567-
self.builder.set_job_name(name)
568-
name += "_" + uuid[:8]
569-
assert name == self.builder.job["job"]["reference"]["job_id"]
577+
self.builder.set_job_name(job_name)
578+
assert expected_job_name == self.builder.job["job"]["reference"]["job_id"]
579+
assert len(self.builder.job["job"]["reference"]["job_id"]) == len(job_name) + 9
570580

571581
def test_build(self):
572582
assert self.builder.job == self.builder.build()

β€Žtests/providers/google/cloud/operators/test_dataproc.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,8 +1204,9 @@ class TestDataProcHiveOperator(unittest.TestCase):
12041204
query = "define sin HiveUDF('sin');"
12051205
variables = {"key": "value"}
12061206
job_id = "uuid_id"
1207+
job_name = "simple"
12071208
job = {
1208-
"reference": {"project_id": GCP_PROJECT, "job_id": "{{task.task_id}}_{{ds_nodash}}_" + job_id},
1209+
"reference": {"project_id": GCP_PROJECT, "job_id": f"{job_name}_{job_id}"},
12091210
"placement": {"cluster_name": "cluster-1"},
12101211
"labels": {"airflow-version": AIRFLOW_VERSION},
12111212
"hive_job": {"query_list": {"queries": [query]}, "script_variables": variables},
@@ -1226,6 +1227,7 @@ def test_execute(self, mock_hook, mock_uuid):
12261227
mock_hook.return_value.submit_job.return_value.reference.job_id = self.job_id
12271228

12281229
op = DataprocSubmitHiveJobOperator(
1230+
job_name=self.job_name,
12291231
task_id=TASK_ID,
12301232
region=GCP_LOCATION,
12311233
gcp_conn_id=GCP_CONN_ID,
@@ -1249,6 +1251,7 @@ def test_builder(self, mock_hook, mock_uuid):
12491251
mock_uuid.return_value = self.job_id
12501252

12511253
op = DataprocSubmitHiveJobOperator(
1254+
job_name=self.job_name,
12521255
task_id=TASK_ID,
12531256
region=GCP_LOCATION,
12541257
gcp_conn_id=GCP_CONN_ID,
@@ -1263,8 +1266,9 @@ class TestDataProcPigOperator(unittest.TestCase):
12631266
query = "define sin HiveUDF('sin');"
12641267
variables = {"key": "value"}
12651268
job_id = "uuid_id"
1269+
job_name = "simple"
12661270
job = {
1267-
"reference": {"project_id": GCP_PROJECT, "job_id": "{{task.task_id}}_{{ds_nodash}}_" + job_id},
1271+
"reference": {"project_id": GCP_PROJECT, "job_id": f"{job_name}_{job_id}"},
12681272
"placement": {"cluster_name": "cluster-1"},
12691273
"labels": {"airflow-version": AIRFLOW_VERSION},
12701274
"pig_job": {"query_list": {"queries": [query]}, "script_variables": variables},
@@ -1285,6 +1289,7 @@ def test_execute(self, mock_hook, mock_uuid):
12851289
mock_hook.return_value.submit_job.return_value.reference.job_id = self.job_id
12861290

12871291
op = DataprocSubmitPigJobOperator(
1292+
job_name=self.job_name,
12881293
task_id=TASK_ID,
12891294
region=GCP_LOCATION,
12901295
gcp_conn_id=GCP_CONN_ID,
@@ -1308,6 +1313,7 @@ def test_builder(self, mock_hook, mock_uuid):
13081313
mock_uuid.return_value = self.job_id
13091314

13101315
op = DataprocSubmitPigJobOperator(
1316+
job_name=self.job_name,
13111317
task_id=TASK_ID,
13121318
region=GCP_LOCATION,
13131319
gcp_conn_id=GCP_CONN_ID,
@@ -1321,15 +1327,16 @@ def test_builder(self, mock_hook, mock_uuid):
13211327
class TestDataProcSparkSqlOperator(unittest.TestCase):
13221328
query = "SHOW DATABASES;"
13231329
variables = {"key": "value"}
1330+
job_name = "simple"
13241331
job_id = "uuid_id"
13251332
job = {
1326-
"reference": {"project_id": GCP_PROJECT, "job_id": "{{task.task_id}}_{{ds_nodash}}_" + job_id},
1333+
"reference": {"project_id": GCP_PROJECT, "job_id": f"{job_name}_{job_id}"},
13271334
"placement": {"cluster_name": "cluster-1"},
13281335
"labels": {"airflow-version": AIRFLOW_VERSION},
13291336
"spark_sql_job": {"query_list": {"queries": [query]}, "script_variables": variables},
13301337
}
13311338
other_project_job = {
1332-
"reference": {"project_id": "other-project", "job_id": "{{task.task_id}}_{{ds_nodash}}_" + job_id},
1339+
"reference": {"project_id": "other-project", "job_id": f"{job_name}_{job_id}"},
13331340
"placement": {"cluster_name": "cluster-1"},
13341341
"labels": {"airflow-version": AIRFLOW_VERSION},
13351342
"spark_sql_job": {"query_list": {"queries": [query]}, "script_variables": variables},
@@ -1350,6 +1357,7 @@ def test_execute(self, mock_hook, mock_uuid):
13501357
mock_hook.return_value.submit_job.return_value.reference.job_id = self.job_id
13511358

13521359
op = DataprocSubmitSparkSqlJobOperator(
1360+
job_name=self.job_name,
13531361
task_id=TASK_ID,
13541362
region=GCP_LOCATION,
13551363
gcp_conn_id=GCP_CONN_ID,
@@ -1375,6 +1383,7 @@ def test_execute_override_project_id(self, mock_hook, mock_uuid):
13751383
mock_hook.return_value.submit_job.return_value.reference.job_id = self.job_id
13761384

13771385
op = DataprocSubmitSparkSqlJobOperator(
1386+
job_name=self.job_name,
13781387
project_id="other-project",
13791388
task_id=TASK_ID,
13801389
region=GCP_LOCATION,
@@ -1399,6 +1408,7 @@ def test_builder(self, mock_hook, mock_uuid):
13991408
mock_uuid.return_value = self.job_id
14001409

14011410
op = DataprocSubmitSparkSqlJobOperator(
1411+
job_name=self.job_name,
14021412
task_id=TASK_ID,
14031413
region=GCP_LOCATION,
14041414
gcp_conn_id=GCP_CONN_ID,
@@ -1412,10 +1422,11 @@ def test_builder(self, mock_hook, mock_uuid):
14121422
class TestDataProcSparkOperator(DataprocJobTestBase):
14131423
main_class = "org.apache.spark.examples.SparkPi"
14141424
jars = ["file:///usr/lib/spark/examples/jars/spark-examples.jar"]
1425+
job_name = "simple"
14151426
job = {
14161427
"reference": {
14171428
"project_id": GCP_PROJECT,
1418-
"job_id": "{{task.task_id}}_{{ds_nodash}}_" + TEST_JOB_ID,
1429+
"job_id": f"{job_name}_{TEST_JOB_ID}",
14191430
},
14201431
"placement": {"cluster_name": "cluster-1"},
14211432
"labels": {"airflow-version": AIRFLOW_VERSION},
@@ -1440,6 +1451,7 @@ def test_execute(self, mock_hook, mock_uuid):
14401451
self.extra_links_manager_mock.attach_mock(mock_hook, 'hook')
14411452

14421453
op = DataprocSubmitSparkJobOperator(
1454+
job_name=self.job_name,
14431455
task_id=TASK_ID,
14441456
region=GCP_LOCATION,
14451457
gcp_conn_id=GCP_CONN_ID,
@@ -1505,9 +1517,10 @@ def test_submit_spark_job_operator_extra_links(mock_hook, dag_maker, create_task
15051517
class TestDataProcHadoopOperator(unittest.TestCase):
15061518
args = ["wordcount", "gs://pub/shakespeare/rose.txt"]
15071519
jar = "file:///usr/lib/spark/examples/jars/spark-examples.jar"
1520+
job_name = "simple"
15081521
job_id = "uuid_id"
15091522
job = {
1510-
"reference": {"project_id": GCP_PROJECT, "job_id": "{{task.task_id}}_{{ds_nodash}}_" + job_id},
1523+
"reference": {"project_id": GCP_PROJECT, "job_id": f"{job_name}_{job_id}"},
15111524
"placement": {"cluster_name": "cluster-1"},
15121525
"labels": {"airflow-version": AIRFLOW_VERSION},
15131526
"hadoop_job": {"main_jar_file_uri": jar, "args": args},
@@ -1529,6 +1542,7 @@ def test_execute(self, mock_hook, mock_uuid):
15291542
mock_uuid.return_value = self.job_id
15301543

15311544
op = DataprocSubmitHadoopJobOperator(
1545+
job_name=self.job_name,
15321546
task_id=TASK_ID,
15331547
region=GCP_LOCATION,
15341548
gcp_conn_id=GCP_CONN_ID,
@@ -1542,8 +1556,9 @@ def test_execute(self, mock_hook, mock_uuid):
15421556
class TestDataProcPySparkOperator(unittest.TestCase):
15431557
uri = "gs://{}/{}"
15441558
job_id = "uuid_id"
1559+
job_name = "simple"
15451560
job = {
1546-
"reference": {"project_id": GCP_PROJECT, "job_id": "{{task.task_id}}_{{ds_nodash}}_" + job_id},
1561+
"reference": {"project_id": GCP_PROJECT, "job_id": f"{job_name}_{job_id}"},
15471562
"placement": {"cluster_name": "cluster-1"},
15481563
"labels": {"airflow-version": AIRFLOW_VERSION},
15491564
"pyspark_job": {"main_python_file_uri": uri},
@@ -1562,7 +1577,11 @@ def test_execute(self, mock_hook, mock_uuid):
15621577
mock_uuid.return_value = self.job_id
15631578

15641579
op = DataprocSubmitPySparkJobOperator(
1565-
task_id=TASK_ID, region=GCP_LOCATION, gcp_conn_id=GCP_CONN_ID, main=self.uri
1580+
job_name=self.job_name,
1581+
task_id=TASK_ID,
1582+
region=GCP_LOCATION,
1583+
gcp_conn_id=GCP_CONN_ID,
1584+
main=self.uri,
15661585
)
15671586
job = op.generate_job()
15681587
assert self.job == job

0 commit comments

Comments
 (0)