Skip to content

Commit 955e949

Browse files
authored
Remove the leading underscore in some attrs in AIP-52 (#31383)
* Remove the leading underscore in some attrs in AIP-52 These attrs are used in many places but they are private. Since we can use :meta private: to hide attributes from documentation we can safely document that these attrs are private and remove the leading underscores * fixup! Remove the leading underscore in some attrs in AIP-52
1 parent 33709f0 commit 955e949

File tree

14 files changed

+92
-79
lines changed

14 files changed

+92
-79
lines changed

β€Žairflow/decorators/base.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -302,9 +302,9 @@ class _TaskDecorator(ExpandableFactory, Generic[FParams, FReturn, OperatorSubcla
302302
decorator_name: str = attr.ib(repr=False, default="task")
303303

304304
_airflow_is_task_decorator: ClassVar[bool] = True
305-
_is_setup: ClassVar[bool] = False
306-
_is_teardown: ClassVar[bool] = False
307-
_on_failure_fail_dagrun: ClassVar[bool] = False
305+
is_setup: ClassVar[bool] = False
306+
is_teardown: ClassVar[bool] = False
307+
on_failure_fail_dagrun: ClassVar[bool] = False
308308

309309
@multiple_outputs.default
310310
def _infer_multiple_outputs(self):
@@ -338,7 +338,7 @@ def __attrs_post_init__(self):
338338
self.kwargs.setdefault("task_id", self.function.__name__)
339339

340340
def __call__(self, *args: FParams.args, **kwargs: FParams.kwargs) -> XComArg:
341-
if self._is_teardown:
341+
if self.is_teardown:
342342
if "trigger_rule" in self.kwargs:
343343
raise ValueError("Trigger rule not configurable for teardown tasks.")
344344
self.kwargs.update(trigger_rule=TriggerRule.ALL_DONE_SETUP_SUCCESS)
@@ -349,9 +349,9 @@ def __call__(self, *args: FParams.args, **kwargs: FParams.kwargs) -> XComArg:
349349
multiple_outputs=self.multiple_outputs,
350350
**self.kwargs,
351351
)
352-
op._is_setup = self._is_setup
353-
op._is_teardown = self._is_teardown
354-
op._on_failure_fail_dagrun = self._on_failure_fail_dagrun
352+
op.is_setup = self.is_setup
353+
op.is_teardown = self.is_teardown
354+
op.on_failure_fail_dagrun = self.on_failure_fail_dagrun
355355
op_doc_attrs = [op.doc, op.doc_json, op.doc_md, op.doc_rst, op.doc_yaml]
356356
# Set the task's doc_md to the function's docstring if it exists and no other doc* args are set.
357357
if self.function.__doc__ and not any(op_doc_attrs):
@@ -485,9 +485,9 @@ def partial(self, **kwargs: Any) -> _TaskDecorator[FParams, FReturn, OperatorSub
485485

486486
def override(self, **kwargs: Any) -> _TaskDecorator[FParams, FReturn, OperatorSubclass]:
487487
result = attr.evolve(self, kwargs={**self.kwargs, **kwargs})
488-
setattr(result, "_is_setup", self._is_setup)
489-
setattr(result, "_is_teardown", self._is_teardown)
490-
setattr(result, "_on_failure_fail_dagrun", self._on_failure_fail_dagrun)
488+
setattr(result, "is_setup", self.is_setup)
489+
setattr(result, "is_teardown", self.is_teardown)
490+
setattr(result, "on_failure_fail_dagrun", self.on_failure_fail_dagrun)
491491
return result
492492

493493

β€Žairflow/decorators/setup_teardown.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def setup_task(func: Callable) -> Callable:
3030
func = python_task(func)
3131
if isinstance(func, _TaskGroupFactory):
3232
raise AirflowException("Task groups cannot be marked as setup or teardown.")
33-
func._is_setup = True # type: ignore[attr-defined]
33+
func.is_setup = True # type: ignore[attr-defined]
3434
return func
3535

3636

@@ -41,8 +41,8 @@ def teardown(func: Callable) -> Callable:
4141
func = python_task(func)
4242
if isinstance(func, _TaskGroupFactory):
4343
raise AirflowException("Task groups cannot be marked as setup or teardown.")
44-
func._is_teardown = True # type: ignore[attr-defined]
45-
func._on_failure_fail_dagrun = on_failure_fail_dagrun # type: ignore[attr-defined]
44+
func.is_teardown = True # type: ignore[attr-defined]
45+
func.on_failure_fail_dagrun = on_failure_fail_dagrun # type: ignore[attr-defined]
4646
return func
4747

4848
if _func is None:

β€Žairflow/models/baseoperator.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -720,9 +720,24 @@ class derived from this one results in the creation of a task object,
720720
# Set to True for an operator instantiated by a mapped operator.
721721
__from_mapped = False
722722

723-
_is_setup = False
724-
_is_teardown = False
725-
_on_failure_fail_dagrun = False
723+
is_setup = False
724+
"""
725+
Whether the operator is a setup task
726+
727+
:meta private:
728+
"""
729+
is_teardown = False
730+
"""
731+
Whether the operator is a teardown task
732+
733+
:meta private:
734+
"""
735+
on_failure_fail_dagrun = False
736+
"""
737+
Whether the operator should fail the dagrun on failure
738+
739+
:meta private:
740+
"""
726741

727742
def __init__(
728743
self,
@@ -963,7 +978,7 @@ def __init__(
963978
@classmethod
964979
def as_setup(cls, *args, **kwargs):
965980
op = cls(*args, **kwargs)
966-
op._is_setup = True
981+
op.is_setup = True
967982
return op
968983

969984
@classmethod
@@ -972,12 +987,12 @@ def as_teardown(cls, *args, **kwargs):
972987
if "trigger_rule" in kwargs:
973988
raise ValueError("Cannot set trigger rule for teardown tasks.")
974989
op = cls(*args, **kwargs, trigger_rule=TriggerRule.ALL_DONE_SETUP_SUCCESS)
975-
op._is_teardown = True
976-
op._on_failure_fail_dagrun = on_failure_fail_dagrun
990+
op.is_teardown = True
991+
op.on_failure_fail_dagrun = on_failure_fail_dagrun
977992
return op
978993

979994
def __enter__(self):
980-
if not self._is_setup and not self._is_teardown:
995+
if not self.is_setup and not self.is_teardown:
981996
raise AirflowException("Only setup/teardown tasks can be used as context managers.")
982997
SetupTeardownContext.push_setup_teardown_task(self)
983998
return self
@@ -1534,9 +1549,9 @@ def get_serialized_fields(cls):
15341549
"template_fields",
15351550
"template_fields_renderers",
15361551
"params",
1537-
"_is_setup",
1538-
"_is_teardown",
1539-
"_on_failure_fail_dagrun",
1552+
"is_setup",
1553+
"is_teardown",
1554+
"on_failure_fail_dagrun",
15401555
}
15411556
)
15421557
DagContext.pop_context_managed_dag()

β€Žairflow/models/dag.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1218,14 +1218,12 @@ def task_ids(self) -> list[str]:
12181218

12191219
@property
12201220
def teardowns(self) -> list[Operator]:
1221-
return [task for task in self.tasks if getattr(task, "_is_teardown", None)]
1221+
return [task for task in self.tasks if getattr(task, "is_teardown", None)]
12221222

12231223
@property
12241224
def tasks_upstream_of_teardowns(self) -> list[Operator]:
12251225
upstream_tasks = [t.upstream_list for t in self.teardowns]
1226-
return [
1227-
val for sublist in upstream_tasks for val in sublist if not getattr(val, "_is_teardown", None)
1228-
]
1226+
return [val for sublist in upstream_tasks for val in sublist if not getattr(val, "is_teardown", None)]
12291227

12301228
@property
12311229
def task_group(self) -> TaskGroup:

β€Žairflow/models/dagrun.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,7 @@ def recalculate(self) -> _UnfinishedStates:
603603
teardown_task_ids = [t.task_id for t in dag.teardowns]
604604
upstream_of_teardowns = [t.task_id for t in dag.tasks_upstream_of_teardowns]
605605
teardown_tis = [ti for ti in tis if ti.task_id in teardown_task_ids]
606-
on_failure_fail_tis = [ti for ti in teardown_tis if getattr(ti.task, "_on_failure_fail_dagrun")]
606+
on_failure_fail_tis = [ti for ti in teardown_tis if getattr(ti.task, "on_failure_fail_dagrun")]
607607
tis_upstream_of_teardowns = [ti for ti in tis if ti.task_id in upstream_of_teardowns]
608608
leaf_tis = list(set(leaf_tis) - set(teardown_tis))
609609
leaf_tis.extend(on_failure_fail_tis)

β€Žairflow/models/xcom_arg.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ def resolve(self, context: Context, session: Session = NEW_SESSION) -> Any:
206206
raise NotImplementedError()
207207

208208
def __enter__(self):
209-
if not self.operator._is_setup and not self.operator._is_teardown:
209+
if not self.operator.is_setup and not self.operator.is_teardown:
210210
raise AirflowException("Only setup/teardown tasks can be used as context managers.")
211211
SetupTeardownContext.push_setup_teardown_task(self.operator)
212212
return self

β€Žairflow/ti_deps/deps/trigger_rule_dep.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def calculate(cls, finished_upstreams: Iterator[TaskInstance]) -> _UpstreamTISta
6969
curr_state = {ti.state: 1}
7070
counter.update(curr_state)
7171
# setup task cannot be mapped
72-
if not isinstance(ti.task, MappedOperator) and ti.task._is_setup:
72+
if not isinstance(ti.task, MappedOperator) and ti.task.is_setup:
7373
setup_counter.update(curr_state)
7474
return _UpstreamTIStates(
7575
success=counter.get(TaskInstanceState.SUCCESS, 0),
@@ -231,7 +231,7 @@ def _iter_upstream_conditions() -> Iterator[ColumnOperators]:
231231
if not any(needs_expansion(t) for t in upstream_tasks.values()):
232232
upstream = len(upstream_tasks)
233233
upstream_setup = len(
234-
[x for x in upstream_tasks.values() if not isinstance(x, MappedOperator) and x._is_setup]
234+
[x for x in upstream_tasks.values() if not isinstance(x, MappedOperator) and x.is_setup]
235235
)
236236
else:
237237
upstream = (

β€Žairflow/utils/setup_teardown.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,14 @@ def get_context_managed_teardown_task(cls) -> Operator | None:
9393

9494
@classmethod
9595
def push_setup_teardown_task(cls, operator):
96-
if operator._is_teardown:
96+
if operator.is_teardown:
9797
SetupTeardownContext.push_context_managed_teardown_task(operator)
98-
upstream_setup = [task for task in operator.upstream_list if task._is_setup]
98+
upstream_setup = [task for task in operator.upstream_list if task.is_setup]
9999
if upstream_setup:
100100
SetupTeardownContext.push_context_managed_setup_task(upstream_setup[-1])
101-
elif operator._is_setup:
101+
elif operator.is_setup:
102102
SetupTeardownContext.push_context_managed_setup_task(operator)
103-
downstream_teardown = [task for task in operator.downstream_list if task._is_teardown]
103+
downstream_teardown = [task for task in operator.downstream_list if task.is_teardown]
104104
if downstream_teardown:
105105
SetupTeardownContext.push_context_managed_teardown_task(downstream_teardown[0])
106106
SetupTeardownContext.active = True

β€Žtests/decorators/test_external_python.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def f():
137137

138138
assert len(dag.task_group.children) == 1
139139
setup_task = dag.task_group.children["f"]
140-
assert setup_task._is_setup
140+
assert setup_task.is_setup
141141
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
142142

143143
def test_marking_external_python_task_as_teardown(self, dag_maker, venv_python):
@@ -151,7 +151,7 @@ def f():
151151

152152
assert len(dag.task_group.children) == 1
153153
teardown_task = dag.task_group.children["f"]
154-
assert teardown_task._is_teardown
154+
assert teardown_task.is_teardown
155155
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
156156

157157
@pytest.mark.parametrize("on_failure_fail_dagrun", [True, False])
@@ -168,6 +168,6 @@ def f():
168168

169169
assert len(dag.task_group.children) == 1
170170
teardown_task = dag.task_group.children["f"]
171-
assert teardown_task._is_teardown
172-
assert teardown_task._on_failure_fail_dagrun is on_failure_fail_dagrun
171+
assert teardown_task.is_teardown
172+
assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun
173173
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

β€Žtests/decorators/test_python_virtualenv.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ def f():
188188

189189
assert len(dag.task_group.children) == 1
190190
setup_task = dag.task_group.children["f"]
191-
assert setup_task._is_setup
191+
assert setup_task.is_setup
192192
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
193193

194194
def test_marking_virtualenv_python_task_as_teardown(self, dag_maker):
@@ -202,7 +202,7 @@ def f():
202202

203203
assert len(dag.task_group.children) == 1
204204
teardown_task = dag.task_group.children["f"]
205-
assert teardown_task._is_teardown
205+
assert teardown_task.is_teardown
206206
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
207207

208208
@pytest.mark.parametrize("on_failure_fail_dagrun", [True, False])
@@ -219,6 +219,6 @@ def f():
219219

220220
assert len(dag.task_group.children) == 1
221221
teardown_task = dag.task_group.children["f"]
222-
assert teardown_task._is_teardown
223-
assert teardown_task._on_failure_fail_dagrun is on_failure_fail_dagrun
222+
assert teardown_task.is_teardown
223+
assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun
224224
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

0 commit comments

Comments
 (0)