Skip to content

Commit 10ce311

Browse files
authored
Deprecate using global as the default region in Google Dataproc operators and hooks (#10772)
The region parameter is required for some of Google Dataproc operators and it should be provided by users to avoid creating data-intensive tasks in any default location.
1 parent b746f33 commit 10ce311

File tree

4 files changed

+51
-7
lines changed

4 files changed

+51
-7
lines changed

β€Žairflow/providers/google/cloud/hooks/dataproc.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,7 @@ def cancel_job(
851851
self,
852852
job_id: str,
853853
project_id: str,
854-
location: str = 'global',
854+
location: Optional[str] = None,
855855
retry: Optional[Retry] = None,
856856
timeout: Optional[float] = None,
857857
metadata: Optional[Sequence[Tuple[str, str]]] = None,
@@ -874,7 +874,15 @@ def cancel_job(
874874
:param metadata: Additional metadata that is provided to the method.
875875
:type metadata: Sequence[Tuple[str, str]]
876876
"""
877+
if location is None:
878+
warnings.warn(
879+
"Default location value `global` will be deprecated. Please, provide location value.",
880+
DeprecationWarning,
881+
stacklevel=2,
882+
)
883+
location = 'global'
877884
client = self.get_job_client(location=location)
885+
878886
job = client.cancel_job(
879887
project_id=project_id,
880888
region=location,

β€Žairflow/providers/google/cloud/operators/dataproc.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ class ClusterGenerator:
130130
:type internal_ip_only: bool
131131
:param tags: The GCE tags to add to all instances
132132
:type tags: list[str]
133-
:param region: leave as 'global', might become relevant in the future. (templated)
133+
:param region: The specified region where the dataproc cluster is created.
134134
:type region: str
135135
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
136136
:type gcp_conn_id: str
@@ -420,7 +420,7 @@ class DataprocCreateClusterOperator(BaseOperator):
420420
If a dict is provided, it must be of the same form as the protobuf message
421421
:class:`~google.cloud.dataproc_v1.types.ClusterConfig`
422422
:type cluster_config: Union[Dict, google.cloud.dataproc_v1.types.ClusterConfig]
423-
:param region: leave as 'global', might become relevant in the future. (templated)
423+
:param region: The specified region where the dataproc cluster is created.
424424
:type region: str
425425
:parm delete_on_error: If true the cluster will be deleted if created with ERROR state. Default
426426
value is true.
@@ -466,7 +466,7 @@ def __init__( # pylint: disable=too-many-arguments
466466
self,
467467
*,
468468
cluster_name: str,
469-
region: str = 'global',
469+
region: Optional[str] = None,
470470
project_id: Optional[str] = None,
471471
cluster_config: Optional[Dict] = None,
472472
labels: Optional[Dict] = None,
@@ -480,6 +480,14 @@ def __init__( # pylint: disable=too-many-arguments
480480
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
481481
**kwargs,
482482
) -> None:
483+
if region is None:
484+
warnings.warn(
485+
"Default region value `global` will be deprecated. Please, provide region value.",
486+
DeprecationWarning,
487+
stacklevel=2,
488+
)
489+
region = 'global'
490+
483491
# TODO: remove one day
484492
if cluster_config is None:
485493
warnings.warn(
@@ -916,7 +924,7 @@ def __init__(
916924
gcp_conn_id: str = 'google_cloud_default',
917925
delegate_to: Optional[str] = None,
918926
labels: Optional[Dict] = None,
919-
region: str = 'global',
927+
region: Optional[str] = None,
920928
job_error_states: Optional[Set[str]] = None,
921929
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
922930
asynchronous: bool = False,
@@ -930,7 +938,16 @@ def __init__(
930938
self.cluster_name = cluster_name
931939
self.dataproc_properties = dataproc_properties
932940
self.dataproc_jars = dataproc_jars
941+
942+
if region is None:
943+
warnings.warn(
944+
"Default region value `global` will be deprecated. Please, provide region value.",
945+
DeprecationWarning,
946+
stacklevel=2,
947+
)
948+
region = 'global'
933949
self.region = region
950+
934951
self.job_error_states = job_error_states if job_error_states is not None else {'ERROR'}
935952
self.impersonation_chain = impersonation_chain
936953

@@ -1549,7 +1566,7 @@ class DataprocInstantiateWorkflowTemplateOperator(BaseOperator):
15491566
:param project_id: The ID of the google cloud project in which
15501567
the template runs
15511568
:type project_id: str
1552-
:param region: leave as 'global', might become relevant in the future
1569+
:param region: The specified region where the dataproc cluster is created.
15531570
:type region: str
15541571
:param parameters: a map of parameters for Dataproc Template in key-value format:
15551572
map (key: string, value: string)
@@ -1651,7 +1668,7 @@ class DataprocInstantiateInlineWorkflowTemplateOperator(BaseOperator):
16511668
:param project_id: The ID of the google cloud project in which
16521669
the template runs
16531670
:type project_id: str
1654-
:param region: leave as 'global', might become relevant in the future
1671+
:param region: The specified region where the dataproc cluster is created.
16551672
:type region: str
16561673
:param parameters: a map of parameters for Dataproc Template in key-value format:
16571674
map (key: string, value: string)

β€Žtests/providers/google/cloud/hooks/test_dataproc.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,15 @@ def test_cancel_job(self, mock_client):
289289
metadata=None,
290290
)
291291

292+
@mock.patch(DATAPROC_STRING.format("DataprocHook.get_job_client"))
293+
def test_cancel_job_deprecation_warning(self, mock_client):
294+
with self.assertWarns(DeprecationWarning):
295+
self.hook.cancel_job(job_id=JOB_ID, project_id=GCP_PROJECT)
296+
mock_client.assert_called_once_with(location='global')
297+
mock_client.return_value.cancel_job.assert_called_once_with(
298+
region='global', job_id=JOB_ID, project_id=GCP_PROJECT, retry=None, timeout=None, metadata=None,
299+
)
300+
292301

293302
class TestDataProcJobBuilder(unittest.TestCase):
294303
def setUp(self) -> None:

β€Žtests/providers/google/cloud/operators/test_dataproc.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,16 @@ def test_deprecation_warning(self):
194194
self.assertEqual(op.cluster_config['worker_config']['num_instances'], 2)
195195
self.assertIn("zones/zone", op.cluster_config['master_config']["machine_type_uri"])
196196

197+
with self.assertWarns(DeprecationWarning) as warning:
198+
op_default_region = DataprocCreateClusterOperator(
199+
task_id=TASK_ID,
200+
project_id=GCP_PROJECT,
201+
cluster_name="cluster_name",
202+
cluster_config=op.cluster_config,
203+
)
204+
assert_warning("Default region value", warning)
205+
self.assertEqual(op_default_region.region, 'global')
206+
197207
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
198208
def test_execute(self, mock_hook):
199209
op = DataprocCreateClusterOperator(

0 commit comments

Comments
 (0)