Skip to content

Commit 6629933

Browse files
add a return when the event is yielded in a loop to stop the execution (#31985)
Signed-off-by: Hussein Awala <hussein@awala.fr>
1 parent 6724eeb commit 6629933

File tree

9 files changed

+49
-7
lines changed

9 files changed

+49
-7
lines changed

β€Žairflow/providers/cncf/kubernetes/triggers/pod.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
141141
"message": "All containers inside pod have started successfully.",
142142
}
143143
)
144+
return
144145
elif self.should_wait(pod_phase=pod_status, container_state=container_state):
145146
self.log.info("Container is not completed and still working.")
146147

@@ -159,6 +160,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
159160
"message": message,
160161
}
161162
)
163+
return
162164

163165
self.log.info("Sleeping for %s seconds.", self.poll_interval)
164166
await asyncio.sleep(self.poll_interval)
@@ -171,6 +173,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
171173
"message": pod.status.message,
172174
}
173175
)
176+
return
174177
except CancelledError:
175178
# That means that task was marked as failed
176179
if self.get_logs:
@@ -193,6 +196,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
193196
"message": "Pod execution was cancelled",
194197
}
195198
)
199+
return
196200
except Exception as e:
197201
self.log.exception("Exception occurred while checking pod phase:")
198202
yield TriggerEvent(
@@ -203,6 +207,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
203207
"message": str(e),
204208
}
205209
)
210+
return
206211

207212
def _get_async_hook(self) -> AsyncKubernetesHook:
208213
if self._hook is None:

β€Žairflow/providers/databricks/triggers/databricks.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ async def run(self):
8989
"run_state": run_state.to_json(),
9090
}
9191
)
92+
return
9293
else:
9394
self.log.info(
9495
"run-id %s in run state %s. sleeping for %s seconds",

β€Žairflow/providers/google/cloud/triggers/bigquery.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,16 +88,19 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
8888
"message": "Job completed",
8989
}
9090
)
91+
return
9192
elif response_from_hook == "pending":
9293
self.log.info("Query is still running...")
9394
self.log.info("Sleeping for %s seconds.", self.poll_interval)
9495
await asyncio.sleep(self.poll_interval)
9596
else:
9697
yield TriggerEvent({"status": "error", "message": response_from_hook})
98+
return
9799

98100
except Exception as e:
99101
self.log.exception("Exception occurred while checking for query completion")
100102
yield TriggerEvent({"status": "error", "message": str(e)})
103+
return
101104

102105
def _get_async_hook(self) -> BigQueryAsyncHook:
103106
return BigQueryAsyncHook(gcp_conn_id=self.conn_id)
@@ -140,6 +143,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
140143
"records": None,
141144
}
142145
)
146+
return
143147
else:
144148
# Extract only first record from the query results
145149
first_record = records.pop(0)
@@ -149,16 +153,19 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
149153
"records": first_record,
150154
}
151155
)
156+
return
152157

153158
elif response_from_hook == "pending":
154159
self.log.info("Query is still running...")
155160
self.log.info("Sleeping for %s seconds.", self.poll_interval)
156161
await asyncio.sleep(self.poll_interval)
157162
else:
158163
yield TriggerEvent({"status": "error", "message": response_from_hook})
164+
return
159165
except Exception as e:
160166
self.log.exception("Exception occurred while checking for query completion")
161167
yield TriggerEvent({"status": "error", "message": str(e)})
168+
return
162169

163170

164171
class BigQueryGetDataTrigger(BigQueryInsertJobTrigger):
@@ -206,15 +213,18 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
206213
"records": records,
207214
}
208215
)
216+
return
209217
elif response_from_hook == "pending":
210218
self.log.info("Query is still running...")
211219
self.log.info("Sleeping for %s seconds.", self.poll_interval)
212220
await asyncio.sleep(self.poll_interval)
213221
else:
214222
yield TriggerEvent({"status": "error", "message": response_from_hook})
223+
return
215224
except Exception as e:
216225
self.log.exception("Exception occurred while checking for query completion")
217226
yield TriggerEvent({"status": "error", "message": str(e)})
227+
return
218228

219229

220230
class BigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):
@@ -345,6 +355,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
345355
"second_row_data": second_job_row,
346356
}
347357
)
358+
return
348359
elif first_job_response_from_hook == "pending" or second_job_response_from_hook == "pending":
349360
self.log.info("Query is still running...")
350361
self.log.info("Sleeping for %s seconds.", self.poll_interval)
@@ -353,10 +364,12 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
353364
yield TriggerEvent(
354365
{"status": "error", "message": second_job_response_from_hook, "data": None}
355366
)
367+
return
356368

357369
except Exception as e:
358370
self.log.exception("Exception occurred while checking for query completion")
359371
yield TriggerEvent({"status": "error", "message": str(e)})
372+
return
360373

361374

362375
class BigQueryValueCheckTrigger(BigQueryInsertJobTrigger):
@@ -428,16 +441,18 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
428441
records = records.pop(0) if records else None
429442
hook.value_check(self.sql, self.pass_value, records, self.tolerance)
430443
yield TriggerEvent({"status": "success", "message": "Job completed", "records": records})
444+
return
431445
elif response_from_hook == "pending":
432446
self.log.info("Query is still running...")
433447
self.log.info("Sleeping for %s seconds.", self.poll_interval)
434448
await asyncio.sleep(self.poll_interval)
435449
else:
436450
yield TriggerEvent({"status": "error", "message": response_from_hook, "records": None})
437-
451+
return
438452
except Exception as e:
439453
self.log.exception("Exception occurred while checking for query completion")
440454
yield TriggerEvent({"status": "error", "message": str(e)})
455+
return
441456

442457

443458
class BigQueryTableExistenceTrigger(BaseTrigger):
@@ -495,10 +510,12 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
495510
)
496511
if response:
497512
yield TriggerEvent({"status": "success", "message": "success"})
513+
return
498514
await asyncio.sleep(self.poll_interval)
499515
except Exception as e:
500516
self.log.exception("Exception occurred while checking for Table existence")
501517
yield TriggerEvent({"status": "error", "message": str(e)})
518+
return
502519

503520
async def _table_exists(
504521
self, hook: BigQueryTableAsyncHook, dataset: str, table_id: str, project_id: str
@@ -577,9 +594,11 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
577594
"message": f"Partition: {self.partition_id} in table: {self.table_id}",
578595
}
579596
)
597+
return
580598
job_id = None
581599
elif status == "error":
582600
yield TriggerEvent({"status": "error", "message": status})
601+
return
583602
self.log.info("Sleeping for %s seconds.", self.poll_interval)
584603
await asyncio.sleep(self.poll_interval)
585604

β€Žairflow/providers/google/cloud/triggers/bigquery_dts.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
105105
"config_id": self.config_id,
106106
}
107107
)
108-
108+
return
109109
elif state == TransferState.FAILED:
110110
self.log.info("Job has failed")
111111
yield TriggerEvent(
@@ -115,7 +115,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
115115
"message": "Job has failed",
116116
}
117117
)
118-
118+
return
119119
if state == TransferState.CANCELLED:
120120
self.log.info("Job has been cancelled.")
121121
yield TriggerEvent(
@@ -125,19 +125,19 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
125125
"message": "Job was cancelled",
126126
}
127127
)
128-
128+
return
129129
else:
130130
self.log.info("Job is still working...")
131131
self.log.info("Waiting for %s seconds", self.poll_interval)
132132
await asyncio.sleep(self.poll_interval)
133-
134133
except Exception as e:
135134
yield TriggerEvent(
136135
{
137136
"status": "failed",
138137
"message": f"Trigger failed with exception: {str(e)}",
139138
}
140139
)
140+
return
141141

142142
def _get_async_hook(self) -> AsyncBiqQueryDataTransferServiceHook:
143143
return AsyncBiqQueryDataTransferServiceHook(

β€Žairflow/providers/google/cloud/triggers/dataflow.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,20 +107,23 @@ async def run(self):
107107
"message": "Job completed",
108108
}
109109
)
110+
return
110111
elif status == JobState.JOB_STATE_FAILED:
111112
yield TriggerEvent(
112113
{
113114
"status": "error",
114115
"message": f"Dataflow job with id {self.job_id} has failed its execution",
115116
}
116117
)
118+
return
117119
elif status == JobState.JOB_STATE_STOPPED:
118120
yield TriggerEvent(
119121
{
120122
"status": "stopped",
121123
"message": f"Dataflow job with id {self.job_id} was stopped",
122124
}
123125
)
126+
return
124127
else:
125128
self.log.info("Job is still running...")
126129
self.log.info("Current job status is: %s", status)
@@ -129,6 +132,7 @@ async def run(self):
129132
except Exception as e:
130133
self.log.exception("Exception occurred while checking for job completion.")
131134
yield TriggerEvent({"status": "error", "message": str(e)})
135+
return
132136

133137
def _get_async_hook(self) -> AsyncDataflowHook:
134138
return AsyncDataflowHook(

β€Žairflow/providers/google/cloud/triggers/datafusion.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,16 +101,18 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
101101
"message": "Pipeline is running",
102102
}
103103
)
104+
return
104105
elif response_from_hook == "pending":
105106
self.log.info("Pipeline is not still in running state...")
106107
self.log.info("Sleeping for %s seconds.", self.poll_interval)
107108
await asyncio.sleep(self.poll_interval)
108109
else:
109110
yield TriggerEvent({"status": "error", "message": response_from_hook})
110-
111+
return
111112
except Exception as e:
112113
self.log.exception("Exception occurred while checking for pipeline state")
113114
yield TriggerEvent({"status": "error", "message": str(e)})
115+
return
114116

115117
def _get_async_hook(self) -> DataFusionAsyncHook:
116118
return DataFusionAsyncHook(

β€Žairflow/providers/google/cloud/triggers/dataproc.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,10 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
279279
await asyncio.sleep(self.polling_interval_seconds)
280280
except NotFound:
281281
yield TriggerEvent({"status": "success", "message": ""})
282+
return
282283
except Exception as e:
283284
yield TriggerEvent({"status": "error", "message": str(e)})
285+
return
284286
yield TriggerEvent({"status": "error", "message": "Timeout"})
285287

286288

@@ -322,6 +324,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
322324
"message": operation.error.message,
323325
}
324326
)
327+
return
325328
yield TriggerEvent(
326329
{
327330
"operation_name": operation.name,
@@ -330,6 +333,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
330333
"message": "Operation is successfully ended.",
331334
}
332335
)
336+
return
333337
else:
334338
self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
335339
await asyncio.sleep(self.polling_interval_seconds)
@@ -341,3 +345,4 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
341345
"message": str(e),
342346
}
343347
)
348+
return

β€Žairflow/providers/google/cloud/triggers/gcs.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
7979
)
8080
if res == "success":
8181
yield TriggerEvent({"status": "success", "message": res})
82+
return
8283
await asyncio.sleep(self.poke_interval)
8384
except Exception as e:
8485
yield TriggerEvent({"status": "error", "message": str(e)})
@@ -159,6 +160,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
159160
)
160161
if status:
161162
yield TriggerEvent(res)
163+
return
162164
await asyncio.sleep(self.poke_interval)
163165
except Exception as e:
164166
yield TriggerEvent({"status": "error", "message": str(e)})
@@ -262,6 +264,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
262264
yield TriggerEvent(
263265
{"status": "success", "message": "Successfully completed", "matches": res}
264266
)
267+
return
265268
await asyncio.sleep(self.poke_interval)
266269
except Exception as e:
267270
yield TriggerEvent({"status": "error", "message": str(e)})
@@ -364,6 +367,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
364367
res = self._is_bucket_updated(set(list_blobs))
365368
if res["status"] in ("success", "error"):
366369
yield TriggerEvent(res)
370+
return
367371
await asyncio.sleep(self.poke_interval)
368372
except Exception as e:
369373
yield TriggerEvent({"status": "error", "message": str(e)})

β€Žairflow/providers/google/cloud/triggers/kubernetes_engine.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
176176
"operation_name": operation.name,
177177
}
178178
)
179-
179+
return
180180
elif status == Operation.Status.RUNNING or status == Operation.Status.PENDING:
181181
self.log.info("Operation is still running.")
182182
self.log.info("Sleeping for %ss...", self.poll_interval)
@@ -189,6 +189,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
189189
"message": f"Operation has failed with status: {operation.status}",
190190
}
191191
)
192+
return
192193
except Exception as e:
193194
self.log.exception("Exception occurred while checking operation status")
194195
yield TriggerEvent(
@@ -197,6 +198,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
197198
"message": str(e),
198199
}
199200
)
201+
return
200202

201203
def _get_hook(self) -> GKEAsyncHook:
202204
if self._hook is None:

0 commit comments

Comments
 (0)