Skip to content

Commit 499493c

Browse files
Jacob Ferrieroturbaszekmik-laj
authored
[AIRFLOW-6586] Improvements to gcs sensor (#7197)
* [AIRFLOW-6586] Improvements to gcs sensor refactors GoogleCloudStorageUploadSessionCompleteSensor to use set instead of number of objects add poke mode only decorator assert that poke_mode_only applied to child of BaseSensorOperator refactor tests remove assert [AIRFLOW-6586] Improvements to gcs sensor refactors GoogleCloudStorageUploadSessionCompleteSensor to use set instead of number of objects add poke mode only decorator assert that poke_mode_only applied to child of BaseSensorOperator remove assert fix static checks add back inadvertently remove requirements pre-commit fix typo * gix gcs sensor unit test * move poke_mode_only to base_sensor_operator module * add sensor / poke_mode_only docs * fix ci check add sensor how-to docs * Update airflow/providers/google/cloud/sensors/gcs.py Co-authored-by: Tomek Urbaszek <turbaszek@gmail.com> * Update airflow/sensors/base_sensor_operator.py Co-authored-by: Tomek Urbaszek <turbaszek@gmail.com> * Update airflow/sensors/base_sensor_operator.py Co-authored-by: Kamil BreguΕ‚a <mik-laj@users.noreply.github.com> * simplify class decorator * remove type hint * add note to UPDATING.md * remove unecessary declaration of class member * Fix to kwargs in UPDATING.md Co-authored-by: Tomek Urbaszek <turbaszek@gmail.com> Co-authored-by: Kamil BreguΕ‚a <mik-laj@users.noreply.github.com>
1 parent bae5cc2 commit 499493c

File tree

6 files changed

+206
-44
lines changed

6 files changed

+206
-44
lines changed

β€ŽUPDATING.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,29 @@ plugins =
172172
- Added optional project_id argument to DataflowCreatePythonJobOperator
173173
constructor.
174174

175+
### GCSUploadSessionCompleteSensor signature change
176+
177+
To provide more precise control in handling of changes to objects in
178+
underlying GCS Bucket the constructor of this sensor now has changed.
179+
180+
- Old Behavior: This constructor used to optionally take ``previous_num_objects: int``.
181+
- New replacement constructor kwarg: ``previous_objects: Optional[Set[str]]``.
182+
183+
Most users would not specify this argument because the bucket begins empty
184+
and the user wants to treat any files as new.
185+
186+
Example of Updating usage of this sensor:
187+
Users who used to call:
188+
189+
``GCSUploadSessionCompleteSensor(bucket='my_bucket', prefix='my_prefix', previous_num_objects=1)``
190+
191+
Will now call:
192+
193+
``GCSUploadSessionCompleteSensor(bucket='my_bucket', prefix='my_prefix', previous_num_objects={'.keep'})``
194+
195+
Where '.keep' is a single file at your prefix that the sensor should not consider new.
196+
197+
175198
### Rename pool statsd metrics
176199

177200
Used slot has been renamed to running slot to make the name self-explanatory

β€Žairflow/providers/google/cloud/sensors/gcs.py

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121

2222
import os
2323
from datetime import datetime
24-
from typing import Callable, List, Optional
24+
from typing import Callable, List, Optional, Set
2525

2626
from airflow.exceptions import AirflowException
2727
from airflow.providers.google.cloud.hooks.gcs import GCSHook
28-
from airflow.sensors.base_sensor_operator import BaseSensorOperator
28+
from airflow.sensors.base_sensor_operator import BaseSensorOperator, poke_mode_only
2929
from airflow.utils.decorators import apply_defaults
3030

3131

@@ -189,12 +189,14 @@ def get_time():
189189
return datetime.now()
190190

191191

192+
@poke_mode_only
192193
class GCSUploadSessionCompleteSensor(BaseSensorOperator):
193194
"""
194195
Checks for changes in the number of objects at prefix in Google Cloud Storage
195196
bucket and returns True if the inactivity period has passed with no
196-
increase in the number of objects. Note, it is recommended to use reschedule
197-
mode if you expect this sensor to run for hours.
197+
increase in the number of objects. Note, this sensor will no behave correctly
198+
in reschedule mode, as the state of the listed objects in the GCS bucket will
199+
be lost between rescheduled invocations.
198200
199201
:param bucket: The Google Cloud Storage bucket where the objects are.
200202
expected.
@@ -209,8 +211,8 @@ class GCSUploadSessionCompleteSensor(BaseSensorOperator):
209211
:param min_objects: The minimum number of objects needed for upload session
210212
to be considered valid.
211213
:type min_objects: int
212-
:param previous_num_objects: The number of objects found during the last poke.
213-
:type previous_num_objects: int
214+
:param previous_objects: The set of object ids found during the last poke.
215+
:type previous_objects: set[str]
214216
:param allow_delete: Should this sensor consider objects being deleted
215217
between pokes valid behavior. If true a warning message will be logged
216218
when this happens. If false an error will be raised.
@@ -233,7 +235,7 @@ def __init__(self,
233235
prefix: str,
234236
inactivity_period: float = 60 * 60,
235237
min_objects: int = 1,
236-
previous_num_objects: int = 0,
238+
previous_objects: Optional[Set[str]] = None,
237239
allow_delete: bool = True,
238240
google_cloud_conn_id: str = 'google_cloud_default',
239241
delegate_to: Optional[str] = None,
@@ -243,45 +245,56 @@ def __init__(self,
243245

244246
self.bucket = bucket
245247
self.prefix = prefix
248+
if inactivity_period < 0:
249+
raise ValueError("inactivity_period must be non-negative")
246250
self.inactivity_period = inactivity_period
247251
self.min_objects = min_objects
248-
self.previous_num_objects = previous_num_objects
252+
self.previous_objects = previous_objects if previous_objects else set()
249253
self.inactivity_seconds = 0
250254
self.allow_delete = allow_delete
251255
self.google_cloud_conn_id = google_cloud_conn_id
252256
self.delegate_to = delegate_to
253257
self.last_activity_time = None
258+
self.hook = None
254259

255-
def is_bucket_updated(self, current_num_objects: int) -> bool:
260+
def _get_gcs_hook(self):
261+
if not self.hook:
262+
self.hook = GCSHook()
263+
return self.hook
264+
265+
def is_bucket_updated(self, current_objects: Set[str]) -> bool:
256266
"""
257267
Checks whether new objects have been uploaded and the inactivity_period
258268
has passed and updates the state of the sensor accordingly.
259269
260-
:param current_num_objects: number of objects in bucket during last poke.
261-
:type current_num_objects: int
270+
:param current_objects: set of object ids in bucket during last poke.
271+
:type current_objects: set[str]
262272
"""
263-
264-
if current_num_objects > self.previous_num_objects:
273+
current_num_objects = len(current_objects)
274+
if current_objects > self.previous_objects:
265275
# When new objects arrived, reset the inactivity_seconds
266-
# previous_num_objects for the next poke.
276+
# and update previous_objects for the next poke.
267277
self.log.info("New objects found at %s resetting last_activity_time.",
268278
os.path.join(self.bucket, self.prefix))
279+
self.log.debug("New objects: %s",
280+
"\n".join(current_objects - self.previous_objects))
269281
self.last_activity_time = get_time()
270282
self.inactivity_seconds = 0
271-
self.previous_num_objects = current_num_objects
283+
self.previous_objects = current_objects
272284
return False
273285

274-
if current_num_objects < self.previous_num_objects:
286+
if self.previous_objects - current_objects:
275287
# During the last poke interval objects were deleted.
276288
if self.allow_delete:
277-
self.previous_num_objects = current_num_objects
289+
self.previous_objects = current_objects
278290
self.last_activity_time = get_time()
279291
self.log.warning(
280292
"""
281293
Objects were deleted during the last
282294
poke interval. Updating the file counter and
283295
resetting last_activity_time.
284-
"""
296+
%s
297+
""", self.previous_objects - current_objects
285298
)
286299
return False
287300

@@ -314,5 +327,4 @@ def is_bucket_updated(self, current_num_objects: int) -> bool:
314327
return False
315328

316329
def poke(self, context):
317-
hook = GCSHook()
318-
return self.is_bucket_updated(len(hook.list(self.bucket, prefix=self.prefix)))
330+
return self.is_bucket_updated(set(self._get_gcs_hook().list(self.bucket, prefix=self.prefix)))

β€Žairflow/sensors/base_sensor_operator.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
# under the License.
1818

1919
import hashlib
20+
import os
2021
from datetime import timedelta
2122
from time import sleep
2223
from typing import Any, Dict, Iterable
@@ -176,3 +177,41 @@ def deps(self):
176177
if self.reschedule:
177178
return BaseOperator.deps.fget(self) | {ReadyToRescheduleDep()}
178179
return BaseOperator.deps.fget(self)
180+
181+
182+
def poke_mode_only(cls):
183+
"""
184+
Class Decorator for child classes of BaseSensorOperator to indicate
185+
that instances of this class are only safe to use poke mode.
186+
187+
Will decorate all methods in the class to assert they did not change
188+
the mode from 'poke'.
189+
190+
:param cls: BaseSensor class to enforce methods only use 'poke' mode.
191+
:type cls: type
192+
"""
193+
def decorate(cls_type):
194+
def mode_getter(_):
195+
return 'poke'
196+
197+
def mode_setter(_, value):
198+
if value != 'poke':
199+
raise ValueError(
200+
f"cannot set mode to 'poke'.")
201+
202+
if not issubclass(cls_type, BaseSensorOperator):
203+
raise ValueError(f"poke_mode_only decorator should only be "
204+
f"applied to subclasses of BaseSensorOperator,"
205+
f" got:{cls_type}.")
206+
207+
cls_type.mode = property(mode_getter, mode_setter)
208+
209+
return cls_type
210+
211+
return decorate(cls)
212+
213+
214+
if 'BUILDING_AIRFLOW_DOCS' in os.environ:
215+
# flake8: noqa: F811
216+
# Monkey patch hook to get good function headers while building docs
217+
apply_defaults = lambda x: x

β€Ždocs/howto/custom-operator.rst

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,3 +206,27 @@ Define an operator extra link
206206
For your operator, you can :doc:`Define an extra link <define_extra_link>` that can
207207
redirect users to external systems. For example, you can add a link that redirects
208208
the user to the operator's manual.
209+
210+
Sensors
211+
^^^^^^^^
212+
Airflow provides a primitive for a special kind of operator, whose purpose is to
213+
poll some state (e.g. presence of a file) on a regular interval until a
214+
success criteria is met.
215+
216+
You can create any sensor your want by extending the :class:`airflow.sensors.base_sensor_operator.BaseSensorOperator`
217+
defining a ``poke`` method to poll your external state and evaluate the success criteria.
218+
219+
Sensors have a powerful feature called ``'reschedule'`` mode which allows the sensor to
220+
task to be rescheduled, rather than blocking a worker slot between pokes.
221+
This is useful when you can tolerate a longer poll interval and expect to be
222+
polling for a long time.
223+
224+
Reschedule mode comes with a caveat that your sensor cannot maintain internal state
225+
between rescheduled executions. In this case you should decorate your sensor with
226+
:meth:`airflow.sensors.base_sensor_operator.poke_mode_only`. This will let users know
227+
that your sensor is not suitable for use with reschedule mode.
228+
229+
An example of a sensor that keeps internal state and cannot be used with reschedule mode
230+
is :class:`airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor`.
231+
It polls the number of objects at a prefix (this number is the internal state of the sensor)
232+
and succeeds when there a certain amount of time has passed without the number of objects changing.

β€Žtests/providers/google/cloud/sensors/test_gcs.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
DEFAULT_DATE = datetime(2015, 1, 1)
4343

4444
MOCK_DATE_ARRAY = [datetime(2019, 2, 24, 12, 0, 0) - i * timedelta(seconds=10)
45-
for i in range(20)]
45+
for i in range(25)]
4646

4747

4848
def next_time_side_effect():
@@ -212,17 +212,16 @@ def setUp(self):
212212
poke_interval=10,
213213
min_objects=1,
214214
allow_delete=False,
215-
previous_num_objects=0,
216215
dag=self.dag
217216
)
218217

219218
self.last_mocked_date = datetime(2019, 4, 24, 0, 0, 0)
220219

221220
@mock.patch('airflow.providers.google.cloud.sensors.gcs.get_time', mock_time)
222221
def test_files_deleted_between_pokes_throw_error(self):
223-
self.sensor.is_bucket_updated(2)
222+
self.sensor.is_bucket_updated({'a', 'b'})
224223
with self.assertRaises(AirflowException):
225-
self.sensor.is_bucket_updated(1)
224+
self.sensor.is_bucket_updated({'a'})
226225

227226
@mock.patch('airflow.providers.google.cloud.sensors.gcs.get_time', mock_time)
228227
def test_files_deleted_between_pokes_allow_delete(self):
@@ -234,48 +233,49 @@ def test_files_deleted_between_pokes_allow_delete(self):
234233
poke_interval=10,
235234
min_objects=1,
236235
allow_delete=True,
237-
previous_num_objects=0,
238236
dag=self.dag
239237
)
240-
self.sensor.is_bucket_updated(2)
238+
self.sensor.is_bucket_updated({'a', 'b'})
241239
self.assertEqual(self.sensor.inactivity_seconds, 0)
242-
self.sensor.is_bucket_updated(1)
243-
self.assertEqual(self.sensor.previous_num_objects, 1)
240+
self.sensor.is_bucket_updated({'a'})
241+
self.assertEqual(len(self.sensor.previous_objects), 1)
244242
self.assertEqual(self.sensor.inactivity_seconds, 0)
245-
self.sensor.is_bucket_updated(2)
243+
self.sensor.is_bucket_updated({'a', 'c'})
246244
self.assertEqual(self.sensor.inactivity_seconds, 0)
247-
self.sensor.is_bucket_updated(2)
245+
self.sensor.is_bucket_updated({'a', 'd'})
246+
self.assertEqual(self.sensor.inactivity_seconds, 0)
247+
self.sensor.is_bucket_updated({'a', 'd'})
248248
self.assertEqual(self.sensor.inactivity_seconds, 10)
249-
self.assertTrue(self.sensor.is_bucket_updated(2))
249+
self.assertTrue(self.sensor.is_bucket_updated({'a', 'd'}))
250250

251251
@mock.patch('airflow.providers.google.cloud.sensors.gcs.get_time', mock_time)
252252
def test_incoming_data(self):
253-
self.sensor.is_bucket_updated(2)
253+
self.sensor.is_bucket_updated({'a'})
254254
self.assertEqual(self.sensor.inactivity_seconds, 0)
255-
self.sensor.is_bucket_updated(3)
255+
self.sensor.is_bucket_updated({'a', 'b'})
256256
self.assertEqual(self.sensor.inactivity_seconds, 0)
257-
self.sensor.is_bucket_updated(4)
257+
self.sensor.is_bucket_updated({'a', 'b', 'c'})
258258
self.assertEqual(self.sensor.inactivity_seconds, 0)
259259

260260
@mock.patch('airflow.providers.google.cloud.sensors.gcs.get_time', mock_time)
261261
def test_no_new_data(self):
262-
self.sensor.is_bucket_updated(2)
262+
self.sensor.is_bucket_updated({'a'})
263263
self.assertEqual(self.sensor.inactivity_seconds, 0)
264-
self.sensor.is_bucket_updated(2)
264+
self.sensor.is_bucket_updated({'a'})
265265
self.assertEqual(self.sensor.inactivity_seconds, 10)
266266

267267
@mock.patch('airflow.providers.google.cloud.sensors.gcs.get_time', mock_time)
268268
def test_no_new_data_success_criteria(self):
269-
self.sensor.is_bucket_updated(2)
269+
self.sensor.is_bucket_updated({'a'})
270270
self.assertEqual(self.sensor.inactivity_seconds, 0)
271-
self.sensor.is_bucket_updated(2)
271+
self.sensor.is_bucket_updated({'a'})
272272
self.assertEqual(self.sensor.inactivity_seconds, 10)
273-
self.assertTrue(self.sensor.is_bucket_updated(2))
273+
self.assertTrue(self.sensor.is_bucket_updated({'a'}))
274274

275275
@mock.patch('airflow.providers.google.cloud.sensors.gcs.get_time', mock_time)
276276
def test_not_enough_objects(self):
277-
self.sensor.is_bucket_updated(0)
277+
self.sensor.is_bucket_updated(set())
278278
self.assertEqual(self.sensor.inactivity_seconds, 0)
279-
self.sensor.is_bucket_updated(0)
279+
self.sensor.is_bucket_updated(set())
280280
self.assertEqual(self.sensor.inactivity_seconds, 10)
281-
self.assertFalse(self.sensor.is_bucket_updated(0))
281+
self.assertFalse(self.sensor.is_bucket_updated(set()))

0 commit comments

Comments
 (0)