Skip to content

Commit 4f20b0f

Browse files
Move the try outside the loop when this is possible in Google provider (#33976)
* Move the try outside the loop when this is possible in Google provider --------- Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
1 parent 8918b43 commit 4f20b0f

File tree

10 files changed

+103
-118
lines changed

10 files changed

+103
-118
lines changed

β€Žairflow/providers/google/cloud/triggers/bigquery.py

Lines changed: 30 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
7575
"""Gets current job execution status and yields a TriggerEvent."""
7676
"""Gets current job execution status and yields a TriggerEvent."""
7777
hook = self._get_async_hook()
78-
while True:
79-
try:
78+
try:
79+
while True:
8080
job_status = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
8181
if job_status == "success":
8282
yield TriggerEvent(
@@ -95,10 +95,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
9595
"Bigquery job status is %s. Sleeping for %s seconds.", job_status, self.poll_interval
9696
)
9797
await asyncio.sleep(self.poll_interval)
98-
except Exception as e:
99-
self.log.exception("Exception occurred while checking for query completion")
100-
yield TriggerEvent({"status": "error", "message": str(e)})
101-
return
98+
except Exception as e:
99+
self.log.exception("Exception occurred while checking for query completion")
100+
yield TriggerEvent({"status": "error", "message": str(e)})
102101

103102
def _get_async_hook(self) -> BigQueryAsyncHook:
104103
return BigQueryAsyncHook(gcp_conn_id=self.conn_id)
@@ -124,8 +123,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
124123
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
125124
"""Gets current job execution status and yields a TriggerEvent."""
126125
hook = self._get_async_hook()
127-
while True:
128-
try:
126+
try:
127+
while True:
129128
# Poll for job execution status
130129
job_status = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
131130
if job_status == "success":
@@ -160,10 +159,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
160159
"Bigquery job status is %s. Sleeping for %s seconds.", job_status, self.poll_interval
161160
)
162161
await asyncio.sleep(self.poll_interval)
163-
except Exception as e:
164-
self.log.exception("Exception occurred while checking for query completion")
165-
yield TriggerEvent({"status": "error", "message": str(e)})
166-
return
162+
except Exception as e:
163+
self.log.exception("Exception occurred while checking for query completion")
164+
yield TriggerEvent({"status": "error", "message": str(e)})
167165

168166

169167
class BigQueryGetDataTrigger(BigQueryInsertJobTrigger):
@@ -196,8 +194,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
196194
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
197195
"""Gets current job execution status and yields a TriggerEvent with response data."""
198196
hook = self._get_async_hook()
199-
while True:
200-
try:
197+
try:
198+
while True:
201199
# Poll for job execution status
202200
job_status = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
203201
if job_status == "success":
@@ -220,10 +218,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
220218
"Bigquery job status is %s. Sleeping for %s seconds.", job_status, self.poll_interval
221219
)
222220
await asyncio.sleep(self.poll_interval)
223-
except Exception as e:
224-
self.log.exception("Exception occurred while checking for query completion")
225-
yield TriggerEvent({"status": "error", "message": str(e)})
226-
return
221+
except Exception as e:
222+
self.log.exception("Exception occurred while checking for query completion")
223+
yield TriggerEvent({"status": "error", "message": str(e)})
227224

228225

229226
class BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
@@ -302,8 +299,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
302299
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
303300
"""Gets current job execution status and yields a TriggerEvent."""
304301
hook = self._get_async_hook()
305-
while True:
306-
try:
302+
try:
303+
while True:
307304
first_job_response_from_hook = await hook.get_job_status(
308305
job_id=self.first_job_id, project_id=self.project_id
309306
)
@@ -365,10 +362,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
365362
)
366363
return
367364

368-
except Exception as e:
369-
self.log.exception("Exception occurred while checking for query completion")
370-
yield TriggerEvent({"status": "error", "message": str(e)})
371-
return
365+
except Exception as e:
366+
self.log.exception("Exception occurred while checking for query completion")
367+
yield TriggerEvent({"status": "error", "message": str(e)})
372368

373369

374370
class BigQueryValueCheckTrigger(BigQueryInsertJobTrigger):
@@ -430,8 +426,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
430426
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
431427
"""Gets current job execution status and yields a TriggerEvent."""
432428
hook = self._get_async_hook()
433-
while True:
434-
try:
429+
try:
430+
while True:
435431
# Poll for job execution status
436432
response_from_hook = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
437433
if response_from_hook == "success":
@@ -448,10 +444,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
448444
else:
449445
yield TriggerEvent({"status": "error", "message": response_from_hook, "records": None})
450446
return
451-
except Exception as e:
452-
self.log.exception("Exception occurred while checking for query completion")
453-
yield TriggerEvent({"status": "error", "message": str(e)})
454-
return
447+
except Exception as e:
448+
self.log.exception("Exception occurred while checking for query completion")
449+
yield TriggerEvent({"status": "error", "message": str(e)})
455450

456451

457452
class BigQueryTableExistenceTrigger(BaseTrigger):
@@ -501,8 +496,8 @@ def _get_async_hook(self) -> BigQueryTableAsyncHook:
501496

502497
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
503498
"""Will run until the table exists in the Google Big Query."""
504-
while True:
505-
try:
499+
try:
500+
while True:
506501
hook = self._get_async_hook()
507502
response = await self._table_exists(
508503
hook=hook, dataset=self.dataset_id, table_id=self.table_id, project_id=self.project_id
@@ -511,10 +506,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
511506
yield TriggerEvent({"status": "success", "message": "success"})
512507
return
513508
await asyncio.sleep(self.poll_interval)
514-
except Exception as e:
515-
self.log.exception("Exception occurred while checking for Table existence")
516-
yield TriggerEvent({"status": "error", "message": str(e)})
517-
return
509+
except Exception as e:
510+
self.log.exception("Exception occurred while checking for Table existence")
511+
yield TriggerEvent({"status": "error", "message": str(e)})
518512

519513
async def _table_exists(
520514
self, hook: BigQueryTableAsyncHook, dataset: str, table_id: str, project_id: str

β€Žairflow/providers/google/cloud/triggers/bigquery_dts.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
8383
async def run(self) -> AsyncIterator[TriggerEvent]:
8484
"""If the Transfer Run is in a terminal state, then yield TriggerEvent object."""
8585
hook = self._get_async_hook()
86-
while True:
87-
try:
86+
try:
87+
while True:
8888
transfer_run: TransferRun = await hook.get_transfer_run(
8989
project_id=self.project_id,
9090
config_id=self.config_id,
@@ -129,14 +129,13 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
129129
self.log.info("Job is still working...")
130130
self.log.info("Waiting for %s seconds", self.poll_interval)
131131
await asyncio.sleep(self.poll_interval)
132-
except Exception as e:
133-
yield TriggerEvent(
134-
{
135-
"status": "failed",
136-
"message": f"Trigger failed with exception: {e}",
137-
}
138-
)
139-
return
132+
except Exception as e:
133+
yield TriggerEvent(
134+
{
135+
"status": "failed",
136+
"message": f"Trigger failed with exception: {e}",
137+
}
138+
)
140139

141140
def _get_async_hook(self) -> AsyncBiqQueryDataTransferServiceHook:
142141
return AsyncBiqQueryDataTransferServiceHook(

β€Žairflow/providers/google/cloud/triggers/cloud_batch.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,8 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
9292
"""
9393
timeout = self.timeout
9494
hook = self._get_async_hook()
95-
while timeout is None or timeout > 0:
96-
97-
try:
95+
try:
96+
while timeout is None or timeout > 0:
9897
job: Job = await hook.get_batch_job(job_name=self.job_name)
9998

10099
status: JobStatus.State = job.status.state
@@ -134,10 +133,10 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
134133
if timeout is None or timeout > 0:
135134
await asyncio.sleep(self.polling_period_seconds)
136135

137-
except Exception as e:
138-
self.log.exception("Exception occurred while checking for job completion.")
139-
yield TriggerEvent({"status": "error", "message": str(e)})
140-
return
136+
except Exception as e:
137+
self.log.exception("Exception occurred while checking for job completion.")
138+
yield TriggerEvent({"status": "error", "message": str(e)})
139+
return
141140

142141
self.log.exception(f"Job with name [{self.job_name}] timed out")
143142
yield TriggerEvent(
@@ -147,7 +146,6 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
147146
"message": f"Batch job with name {self.job_name} timed out",
148147
}
149148
)
150-
return
151149

152150
def _get_async_hook(self) -> CloudBatchAsyncHook:
153151
return CloudBatchAsyncHook(

β€Žairflow/providers/google/cloud/triggers/cloud_build.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
7878
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
7979
"""Gets current build execution status and yields a TriggerEvent."""
8080
hook = self._get_async_hook()
81-
while True:
82-
try:
81+
try:
82+
while True:
8383
# Poll for job execution status
8484
cloud_build_instance = await hook.get_cloud_build(
8585
id_=self.id_,
@@ -119,10 +119,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
119119
)
120120
return
121121

122-
except Exception as e:
123-
self.log.exception("Exception occurred while checking for Cloud Build completion")
124-
yield TriggerEvent({"status": "error", "message": str(e)})
125-
return
122+
except Exception as e:
123+
self.log.exception("Exception occurred while checking for Cloud Build completion")
124+
yield TriggerEvent({"status": "error", "message": str(e)})
126125

127126
def _get_async_hook(self) -> CloudBuildAsyncHook:
128127
return CloudBuildAsyncHook(gcp_conn_id=self.gcp_conn_id)

β€Žairflow/providers/google/cloud/triggers/cloud_sql.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ def serialize(self):
6464
)
6565

6666
async def run(self):
67-
while True:
68-
try:
67+
try:
68+
while True:
6969
operation = await self.hook.get_operation(
7070
project_id=self.project_id, operation_name=self.operation_name
7171
)
@@ -93,11 +93,11 @@ async def run(self):
9393
self.poke_interval,
9494
)
9595
await asyncio.sleep(self.poke_interval)
96-
except Exception as e:
97-
self.log.exception("Exception occurred while checking operation status.")
98-
yield TriggerEvent(
99-
{
100-
"status": "failed",
101-
"message": str(e),
102-
}
103-
)
96+
except Exception as e:
97+
self.log.exception("Exception occurred while checking operation status.")
98+
yield TriggerEvent(
99+
{
100+
"status": "failed",
101+
"message": str(e),
102+
}
103+
)

β€Žairflow/providers/google/cloud/triggers/dataflow.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ async def run(self):
9292
amount of time stored in self.poll_sleep variable.
9393
"""
9494
hook = self._get_async_hook()
95-
while True:
96-
try:
95+
try:
96+
while True:
9797
status = await hook.get_job_status(
9898
project_id=self.project_id,
9999
job_id=self.job_id,
@@ -129,10 +129,9 @@ async def run(self):
129129
self.log.info("Current job status is: %s", status)
130130
self.log.info("Sleeping for %s seconds.", self.poll_sleep)
131131
await asyncio.sleep(self.poll_sleep)
132-
except Exception as e:
133-
self.log.exception("Exception occurred while checking for job completion.")
134-
yield TriggerEvent({"status": "error", "message": str(e)})
135-
return
132+
except Exception as e:
133+
self.log.exception("Exception occurred while checking for job completion.")
134+
yield TriggerEvent({"status": "error", "message": str(e)})
136135

137136
def _get_async_hook(self) -> AsyncDataflowHook:
138137
return AsyncDataflowHook(

β€Žairflow/providers/google/cloud/triggers/datafusion.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
8383
async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
8484
"""Gets current pipeline status and yields a TriggerEvent."""
8585
hook = self._get_async_hook()
86-
while True:
87-
try:
86+
try:
87+
while True:
8888
# Poll for job execution status
8989
response_from_hook = await hook.get_pipeline_status(
9090
success_states=self.success_states,
@@ -109,10 +109,9 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
109109
else:
110110
yield TriggerEvent({"status": "error", "message": response_from_hook})
111111
return
112-
except Exception as e:
113-
self.log.exception("Exception occurred while checking for pipeline state")
114-
yield TriggerEvent({"status": "error", "message": str(e)})
115-
return
112+
except Exception as e:
113+
self.log.exception("Exception occurred while checking for pipeline state")
114+
yield TriggerEvent({"status": "error", "message": str(e)})
116115

117116
def _get_async_hook(self) -> DataFusionAsyncHook:
118117
return DataFusionAsyncHook(

β€Žairflow/providers/google/cloud/triggers/dataproc.py

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
263263

264264
async def run(self) -> AsyncIterator[TriggerEvent]:
265265
"""Wait until cluster is deleted completely."""
266-
while self.end_time > time.time():
267-
try:
266+
try:
267+
while self.end_time > time.time():
268268
cluster = await self.get_async_hook().get_cluster(
269269
region=self.region, # type: ignore[arg-type]
270270
cluster_name=self.cluster_name,
@@ -277,13 +277,12 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
277277
self.polling_interval_seconds,
278278
)
279279
await asyncio.sleep(self.polling_interval_seconds)
280-
except NotFound:
281-
yield TriggerEvent({"status": "success", "message": ""})
282-
return
283-
except Exception as e:
284-
yield TriggerEvent({"status": "error", "message": str(e)})
285-
return
286-
yield TriggerEvent({"status": "error", "message": "Timeout"})
280+
except NotFound:
281+
yield TriggerEvent({"status": "success", "message": ""})
282+
except Exception as e:
283+
yield TriggerEvent({"status": "error", "message": str(e)})
284+
else:
285+
yield TriggerEvent({"status": "error", "message": "Timeout"})
287286

288287

289288
class DataprocWorkflowTrigger(DataprocBaseTrigger):
@@ -312,8 +311,8 @@ def serialize(self):
312311

313312
async def run(self) -> AsyncIterator[TriggerEvent]:
314313
hook = self.get_async_hook()
315-
while True:
316-
try:
314+
try:
315+
while True:
317316
operation = await hook.get_operation(region=self.region, operation_name=self.name)
318317
if operation.done:
319318
if operation.error.message:
@@ -338,12 +337,11 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
338337
else:
339338
self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
340339
await asyncio.sleep(self.polling_interval_seconds)
341-
except Exception as e:
342-
self.log.exception("Exception occurred while checking operation status.")
343-
yield TriggerEvent(
344-
{
345-
"status": "failed",
346-
"message": str(e),
347-
}
348-
)
349-
return
340+
except Exception as e:
341+
self.log.exception("Exception occurred while checking operation status.")
342+
yield TriggerEvent(
343+
{
344+
"status": "failed",
345+
"message": str(e),
346+
}
347+
)

0 commit comments

Comments
 (0)