Skip to content

Commit e1ebfa6

Browse files
author
Tobiasz KΔ™dzierski
authored
Add DataflowJobMessagesSensor and DataflowAutoscalingEventsSensor (#12249)
1 parent c084393 commit e1ebfa6

File tree

5 files changed

+626
-4
lines changed

5 files changed

+626
-4
lines changed

β€Žairflow/providers/google/cloud/example_dags/example_dataflow.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,12 @@
3232
DataflowCreatePythonJobOperator,
3333
DataflowTemplatedJobStartOperator,
3434
)
35-
from airflow.providers.google.cloud.sensors.dataflow import DataflowJobMetricsSensor, DataflowJobStatusSensor
35+
from airflow.providers.google.cloud.sensors.dataflow import (
36+
DataflowJobAutoScalingEventsSensor,
37+
DataflowJobMessagesSensor,
38+
DataflowJobMetricsSensor,
39+
DataflowJobStatusSensor,
40+
)
3641
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
3742
from airflow.utils.dates import days_ago
3843

@@ -183,8 +188,38 @@ def callback(metrics: List[Dict]) -> bool:
183188
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
184189
)
185190

191+
def check_message(messages: List[dict]) -> bool:
192+
"""Check message"""
193+
for message in messages:
194+
if "Adding workflow start and stop steps." in message.get("messageText", ""):
195+
return True
196+
return False
197+
198+
wait_for_python_job_async_message = DataflowJobMessagesSensor(
199+
task_id="wait-for-python-job-async-message",
200+
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
201+
location='europe-west3',
202+
callback=check_message,
203+
)
204+
205+
def check_autoscaling_event(autoscaling_events: List[dict]) -> bool:
206+
"""Check autoscaling event"""
207+
for autoscaling_event in autoscaling_events:
208+
if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
209+
return True
210+
return False
211+
212+
wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor(
213+
task_id="wait-for-python-job-async-autoscaling-event",
214+
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
215+
location='europe-west3',
216+
callback=check_autoscaling_event,
217+
)
218+
186219
start_python_job_async >> wait_for_python_job_async_done
187220
start_python_job_async >> wait_for_python_job_async_metric
221+
start_python_job_async >> wait_for_python_job_async_message
222+
start_python_job_async >> wait_for_python_job_async_autoscaling_event
188223

189224

190225
with models.DAG(

β€Žairflow/providers/google/cloud/hooks/dataflow.py

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import warnings
2929
from copy import deepcopy
3030
from tempfile import TemporaryDirectory
31-
from typing import Any, Callable, Dict, List, Optional, Sequence, Set, TypeVar, Union, cast
31+
from typing import Any, Callable, Dict, Generator, List, Optional, Sequence, Set, TypeVar, Union, cast
3232

3333
from googleapiclient.discovery import build
3434

@@ -264,6 +264,66 @@ def fetch_job_metrics_by_id(self, job_id: str) -> dict:
264264
self.log.debug("fetch_job_metrics_by_id %s:\n%s", job_id, result)
265265
return result
266266

267+
def _fetch_list_job_messages_responses(self, job_id: str) -> Generator[dict, None, None]:
268+
"""
269+
Helper method to fetch ListJobMessagesResponse with the specified Job ID.
270+
271+
:param job_id: Job ID to get.
272+
:type job_id: str
273+
:return: yields the ListJobMessagesResponse. See:
274+
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse
275+
:rtype: Generator[dict, None, None]
276+
"""
277+
request = (
278+
self._dataflow.projects()
279+
.locations()
280+
.jobs()
281+
.messages()
282+
.list(projectId=self._project_number, location=self._job_location, jobId=job_id)
283+
)
284+
285+
while request is not None:
286+
response = request.execute(num_retries=self._num_retries)
287+
yield response
288+
289+
request = (
290+
self._dataflow.projects()
291+
.locations()
292+
.jobs()
293+
.messages()
294+
.list_next(previous_request=request, previous_response=response)
295+
)
296+
297+
def fetch_job_messages_by_id(self, job_id: str) -> List[dict]:
298+
"""
299+
Helper method to fetch the job messages with the specified Job ID.
300+
301+
:param job_id: Job ID to get.
302+
:type job_id: str
303+
:return: the list of JobMessages. See:
304+
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#JobMessage
305+
:rtype: List[dict]
306+
"""
307+
messages: List[dict] = []
308+
for response in self._fetch_list_job_messages_responses(job_id=job_id):
309+
messages.extend(response.get("jobMessages", []))
310+
return messages
311+
312+
def fetch_job_autoscaling_events_by_id(self, job_id: str) -> List[dict]:
313+
"""
314+
Helper method to fetch the job autoscaling events with the specified Job ID.
315+
316+
:param job_id: Job ID to get.
317+
:type job_id: str
318+
:return: the list of AutoscalingEvents. See:
319+
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#autoscalingevent
320+
:rtype: List[dict]
321+
"""
322+
autoscaling_events: List[dict] = []
323+
for response in self._fetch_list_job_messages_responses(job_id=job_id):
324+
autoscaling_events.extend(response.get("autoscalingEvents", []))
325+
return autoscaling_events
326+
267327
def _fetch_all_jobs(self) -> List[dict]:
268328
request = (
269329
self._dataflow.projects()
@@ -1150,3 +1210,59 @@ def fetch_job_metrics_by_id(
11501210
location=location,
11511211
)
11521212
return jobs_controller.fetch_job_metrics_by_id(job_id)
1213+
1214+
@GoogleBaseHook.fallback_to_default_project_id
1215+
def fetch_job_messages_by_id(
1216+
self,
1217+
job_id: str,
1218+
project_id: str,
1219+
location: str = DEFAULT_DATAFLOW_LOCATION,
1220+
) -> List[dict]:
1221+
"""
1222+
Gets the job messages with the specified Job ID.
1223+
1224+
:param job_id: Job ID to get.
1225+
:type job_id: str
1226+
:param project_id: Optional, the Google Cloud project ID in which to start a job.
1227+
If set to None or missing, the default project_id from the Google Cloud connection is used.
1228+
:type project_id:
1229+
:param location: Job location.
1230+
:type location: str
1231+
:return: the list of JobMessages. See:
1232+
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#JobMessage
1233+
:rtype: List[dict]
1234+
"""
1235+
jobs_controller = _DataflowJobsController(
1236+
dataflow=self.get_conn(),
1237+
project_number=project_id,
1238+
location=location,
1239+
)
1240+
return jobs_controller.fetch_job_messages_by_id(job_id)
1241+
1242+
@GoogleBaseHook.fallback_to_default_project_id
1243+
def fetch_job_autoscaling_events_by_id(
1244+
self,
1245+
job_id: str,
1246+
project_id: str,
1247+
location: str = DEFAULT_DATAFLOW_LOCATION,
1248+
) -> List[dict]:
1249+
"""
1250+
Gets the job autoscaling events with the specified Job ID.
1251+
1252+
:param job_id: Job ID to get.
1253+
:type job_id: str
1254+
:param project_id: Optional, the Google Cloud project ID in which to start a job.
1255+
If set to None or missing, the default project_id from the Google Cloud connection is used.
1256+
:type project_id:
1257+
:param location: Job location.
1258+
:type location: str
1259+
:return: the list of AutoscalingEvents. See:
1260+
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#autoscalingevent
1261+
:rtype: List[dict]
1262+
"""
1263+
jobs_controller = _DataflowJobsController(
1264+
dataflow=self.get_conn(),
1265+
project_number=project_id,
1266+
location=location,
1267+
)
1268+
return jobs_controller.fetch_job_autoscaling_events_by_id(job_id)

0 commit comments

Comments
 (0)