Skip to content

Commit 1da6972

Browse files
authored
Fixes to dataproc operators and hook (#14086)
Two quick fixes to Dataproc operators and hooks. Add more templated fields to the DataprocClusterDeleteOperator as per #13454. There were a few other fields which could easily be templated so I added them as well. Don't use the global-dataproc.googleapis.com:443 URL when creating dataproc clients. This was partially done in #12907 but the other two client creation methods were not updated. Using the global-dataproc URL results in 404s when trying to create clusters in the global region. We don't need to specify the default endpoint as it is used by default in the dataproc client library: https://github.com/googleapis/python-dataproc/blob/6f27109faf03dd13f25294e57960f0d9e1a9fa27/google/cloud/dataproc_v1beta2/services/cluster_controller/client.py#L117
1 parent 9036ce2 commit 1da6972

File tree

3 files changed

+34
-8
lines changed

3 files changed

+34
-8
lines changed

β€Žairflow/providers/google/cloud/hooks/dataproc.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,9 @@ class DataprocHook(GoogleBaseHook):
209209

210210
def get_cluster_client(self, location: Optional[str] = None) -> ClusterControllerClient:
211211
"""Returns ClusterControllerClient."""
212-
client_options = {'api_endpoint': f'{location}-dataproc.googleapis.com:443'} if location else None
212+
client_options = None
213+
if location and location != 'global':
214+
client_options = {'api_endpoint': f'{location}-dataproc.googleapis.com:443'}
213215

214216
return ClusterControllerClient(
215217
credentials=self._get_credentials(), client_info=self.client_info, client_options=client_options
@@ -227,7 +229,9 @@ def get_template_client(self, location: Optional[str] = None) -> WorkflowTemplat
227229

228230
def get_job_client(self, location: Optional[str] = None) -> JobControllerClient:
229231
"""Returns JobControllerClient."""
230-
client_options = {'api_endpoint': f'{location}-dataproc.googleapis.com:443'} if location else None
232+
client_options = None
233+
if location and location != 'global':
234+
client_options = {'api_endpoint': f'{location}-dataproc.googleapis.com:443'}
231235

232236
return JobControllerClient(
233237
credentials=self._get_credentials(), client_info=self.client_info, client_options=client_options

β€Žairflow/providers/google/cloud/operators/dataproc.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -767,11 +767,11 @@ class DataprocDeleteClusterOperator(BaseOperator):
767767
"""
768768
Deletes a cluster in a project.
769769
770-
:param project_id: Required. The ID of the Google Cloud project that the cluster belongs to.
770+
:param project_id: Required. The ID of the Google Cloud project that the cluster belongs to (templated).
771771
:type project_id: str
772-
:param region: Required. The Cloud Dataproc region in which to handle the request.
772+
:param region: Required. The Cloud Dataproc region in which to handle the request (templated).
773773
:type region: str
774-
:param cluster_name: Required. The cluster name.
774+
:param cluster_name: Required. The cluster name (templated).
775775
:type cluster_name: str
776776
:param cluster_uuid: Optional. Specifying the ``cluster_uuid`` means the RPC should fail
777777
if cluster with specified UUID does not exist.
@@ -801,7 +801,7 @@ class DataprocDeleteClusterOperator(BaseOperator):
801801
:type impersonation_chain: Union[str, Sequence[str]]
802802
"""
803803

804-
template_fields = ('impersonation_chain',)
804+
template_fields = ('project_id', 'region', 'cluster_name', 'impersonation_chain')
805805

806806
@apply_defaults
807807
def __init__(

β€Žtests/providers/google/cloud/hooks/test_dataproc.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,18 @@ def test_get_cluster_client(self, mock_client, mock_client_info, mock_get_creden
6464
mock_client.assert_called_once_with(
6565
credentials=mock_get_credentials.return_value,
6666
client_info=mock_client_info.return_value,
67-
client_options={"api_endpoint": f"{GCP_LOCATION}-dataproc.googleapis.com:443"},
67+
client_options=None,
68+
)
69+
70+
@mock.patch(DATAPROC_STRING.format("DataprocHook._get_credentials"))
71+
@mock.patch(DATAPROC_STRING.format("DataprocHook.client_info"), new_callable=mock.PropertyMock)
72+
@mock.patch(DATAPROC_STRING.format("ClusterControllerClient"))
73+
def test_get_cluster_client_region(self, mock_client, mock_client_info, mock_get_credentials):
74+
self.hook.get_cluster_client(location='region1')
75+
mock_client.assert_called_once_with(
76+
credentials=mock_get_credentials.return_value,
77+
client_info=mock_client_info.return_value,
78+
client_options={'api_endpoint': 'region1-dataproc.googleapis.com:443'},
6879
)
6980

7081
@mock.patch(DATAPROC_STRING.format("DataprocHook._get_credentials"))
@@ -97,7 +108,18 @@ def test_get_job_client(self, mock_client, mock_client_info, mock_get_credential
97108
mock_client.assert_called_once_with(
98109
credentials=mock_get_credentials.return_value,
99110
client_info=mock_client_info.return_value,
100-
client_options={"api_endpoint": f"{GCP_LOCATION}-dataproc.googleapis.com:443"},
111+
client_options=None,
112+
)
113+
114+
@mock.patch(DATAPROC_STRING.format("DataprocHook._get_credentials"))
115+
@mock.patch(DATAPROC_STRING.format("DataprocHook.client_info"), new_callable=mock.PropertyMock)
116+
@mock.patch(DATAPROC_STRING.format("JobControllerClient"))
117+
def test_get_job_client_region(self, mock_client, mock_client_info, mock_get_credentials):
118+
self.hook.get_job_client(location='region1')
119+
mock_client.assert_called_once_with(
120+
credentials=mock_get_credentials.return_value,
121+
client_info=mock_client_info.return_value,
122+
client_options={'api_endpoint': 'region1-dataproc.googleapis.com:443'},
101123
)
102124

103125
@mock.patch(DATAPROC_STRING.format("DataprocHook.get_cluster_client"))

0 commit comments

Comments
 (0)