Skip to content

Commit 0e076dc

Browse files
Fix catching 409 error (#33173)
1 parent 8542cdd commit 0e076dc

File tree

2 files changed

+38
-11
lines changed

2 files changed

+38
-11
lines changed

β€Žairflow/providers/google/cloud/hooks/datafusion.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@
3939
Operation = Dict[str, Any]
4040

4141

42+
class ConflictException(AirflowException):
43+
"""Exception to catch 409 error."""
44+
45+
pass
46+
47+
4248
class PipelineStates:
4349
"""Data Fusion pipeline states."""
4450

@@ -163,6 +169,8 @@ def _cdap_request(
163169
def _check_response_status_and_data(response, message: str) -> None:
164170
if response.status == 404:
165171
raise AirflowNotFoundException(message)
172+
elif response.status == 409:
173+
raise ConflictException("Conflict: Resource is still in use.")
166174
elif response.status != 200:
167175
raise AirflowException(message)
168176
if response.data is None:
@@ -356,21 +364,18 @@ def delete_pipeline(
356364
if version_id:
357365
url = os.path.join(url, "versions", version_id)
358366

359-
response = self._cdap_request(url=url, method="DELETE", body=None)
360-
# Check for 409 error: the previous step for starting/stopping pipeline could still be in progress.
361-
# Waiting some time before retry.
362-
for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
367+
for time_to_wait in exponential_sleep_generator(initial=1, maximum=10):
363368
try:
369+
response = self._cdap_request(url=url, method="DELETE", body=None)
364370
self._check_response_status_and_data(
365371
response, f"Deleting a pipeline failed with code {response.status}: {response.data}"
366372
)
367-
break
368-
except AirflowException as exc:
369-
if "409" in str(exc):
370-
sleep(time_to_wait)
371-
response = self._cdap_request(url=url, method="DELETE", body=None)
372-
else:
373-
raise
373+
if response.status == 200:
374+
break
375+
except ConflictException as exc:
376+
self.log.info(exc)
377+
sleep(time_to_wait)
378+
continue
374379

375380
def list_pipelines(
376381
self,

β€Žtests/providers/google/cloud/hooks/test_datafusion.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@
5252
)
5353

5454

55+
class MockResponse:
56+
def __init__(self, status, data=None):
57+
self.status = status
58+
self.data = data
59+
60+
5561
@pytest.fixture
5662
def hook():
5763
with mock.patch(
@@ -255,6 +261,22 @@ def test_delete_pipeline_should_fail_if_status_not_200(self, mock_request, hook)
255261
body=None,
256262
)
257263

264+
@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
265+
def test_delete_pipeline_should_fail_if_status_409(self, mock_request, hook, caplog):
266+
mock_request.side_effect = [
267+
MockResponse(status=409, data="Conflict: Resource is still in use."),
268+
MockResponse(status=200, data="Success"),
269+
]
270+
hook.delete_pipeline(pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL)
271+
272+
assert mock_request.call_count == 2
273+
assert "Conflict: Resource is still in use." in caplog.text
274+
mock_request.assert_called_with(
275+
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}",
276+
method="DELETE",
277+
body=None,
278+
)
279+
258280
@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
259281
def test_list_pipelines(self, mock_request, hook):
260282
data = {"data": "test"}

0 commit comments

Comments
 (0)