Skip to content

Commit a082123

Browse files
authored
Use typed Context EVERYWHERE (#20565)
Part of #19891
1 parent b5f2ae3 commit a082123

File tree

214 files changed

+1368
-751
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

214 files changed

+1368
-751
lines changed

β€Žairflow/example_dags/example_dag_decorator.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from airflow.decorators import dag, task
2424
from airflow.models.baseoperator import BaseOperator
2525
from airflow.operators.email import EmailOperator
26+
from airflow.utils.context import Context
2627

2728

2829
class GetRequestOperator(BaseOperator):
@@ -32,7 +33,7 @@ def __init__(self, *, url: str, **kwargs):
3233
super().__init__(**kwargs)
3334
self.url = url
3435

35-
def execute(self, context):
36+
def execute(self, context: Context):
3637
return httpx.get(self.url).json()
3738

3839

β€Žairflow/example_dags/example_skip_dag.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from airflow import DAG
2424
from airflow.exceptions import AirflowSkipException
2525
from airflow.operators.dummy import DummyOperator
26+
from airflow.utils.context import Context
2627
from airflow.utils.trigger_rule import TriggerRule
2728

2829

@@ -32,7 +33,7 @@ class DummySkipOperator(DummyOperator):
3233

3334
ui_color = '#e8b7e4'
3435

35-
def execute(self, context):
36+
def execute(self, context: Context):
3637
raise AirflowSkipException
3738

3839

β€Žairflow/models/param.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,16 @@
1717
import copy
1818
import json
1919
import warnings
20-
from typing import TYPE_CHECKING, Any, Dict, ItemsView, MutableMapping, Optional, ValuesView
20+
from typing import Any, Dict, ItemsView, MutableMapping, Optional, ValuesView
2121

2222
import jsonschema
2323
from jsonschema import FormatChecker
2424
from jsonschema.exceptions import ValidationError
2525

2626
from airflow.exceptions import AirflowException
27+
from airflow.utils.context import Context
2728
from airflow.utils.types import NOTSET, ArgNotSet
2829

29-
if TYPE_CHECKING:
30-
from airflow.utils.context import Context
31-
3230

3331
class Param:
3432
"""
@@ -257,7 +255,7 @@ def __init__(self, current_dag, name: str, default: Optional[Any] = None):
257255
self._name = name
258256
self._default = default
259257

260-
def resolve(self, context: "Context") -> Any:
258+
def resolve(self, context: Context) -> Any:
261259
"""Pull DagParam value from DagRun context. This method is run during ``op.execute()``."""
262260
default = self._default
263261
if not self._default:

β€Žairflow/models/xcom_arg.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,15 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17-
from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Union
17+
from typing import Any, List, Optional, Sequence, Union
1818

1919
from airflow.exceptions import AirflowException
2020
from airflow.models.baseoperator import BaseOperator
2121
from airflow.models.taskmixin import DependencyMixin
2222
from airflow.models.xcom import XCOM_RETURN_KEY
23+
from airflow.utils.context import Context
2324
from airflow.utils.edgemodifier import EdgeModifier
2425

25-
if TYPE_CHECKING:
26-
from airflow.utils.context import Context
27-
2826

2927
class XComArg(DependencyMixin):
3028
"""
@@ -130,7 +128,7 @@ def set_downstream(
130128
"""Proxy to underlying operator set_downstream method. Required by TaskMixin."""
131129
self.operator.set_downstream(task_or_task_list, edge_modifier)
132130

133-
def resolve(self, context: "Context") -> Any:
131+
def resolve(self, context: Context) -> Any:
134132
"""
135133
Pull XCom value for the existing arg. This method is run during ``op.execute()``
136134
in respectable context.

β€Žairflow/operators/bash.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from airflow.exceptions import AirflowException, AirflowSkipException
2323
from airflow.hooks.subprocess import SubprocessHook
2424
from airflow.models import BaseOperator
25+
from airflow.utils.context import Context
2526
from airflow.utils.operator_helpers import context_to_airflow_vars
2627

2728

@@ -180,7 +181,7 @@ def get_env(self, context):
180181
env.update(airflow_context_vars)
181182
return env
182183

183-
def execute(self, context):
184+
def execute(self, context: Context):
184185
if self.cwd is not None:
185186
if not os.path.exists(self.cwd):
186187
raise AirflowException(f"Can not find the cwd: {self.cwd}")

β€Žairflow/operators/dummy.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
# KIND, either express or implied. See the License for the
1616
# specific language governing permissions and limitations
1717
# under the License.
18-
1918
from airflow.models import BaseOperator
19+
from airflow.utils.context import Context
2020

2121

2222
class DummyOperator(BaseOperator):
@@ -33,5 +33,5 @@ class DummyOperator(BaseOperator):
3333
def __init__(self, **kwargs) -> None:
3434
super().__init__(**kwargs)
3535

36-
def execute(self, context):
36+
def execute(self, context: Context):
3737
pass

β€Žairflow/operators/email.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from typing import Any, Dict, List, Optional, Union
1919

2020
from airflow.models import BaseOperator
21+
from airflow.utils.context import Context
2122
from airflow.utils.email import send_email
2223

2324

@@ -79,7 +80,7 @@ def __init__(
7980
self.conn_id = conn_id
8081
self.custom_headers = custom_headers
8182

82-
def execute(self, context):
83+
def execute(self, context: Context):
8384
send_email(
8485
self.to,
8586
self.subject,

β€Žairflow/operators/generic_transfer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from airflow.hooks.base import BaseHook
2121
from airflow.models import BaseOperator
22+
from airflow.utils.context import Context
2223

2324

2425
class GenericTransfer(BaseOperator):
@@ -72,7 +73,7 @@ def __init__(
7273
self.preoperator = preoperator
7374
self.insert_args = insert_args or {}
7475

75-
def execute(self, context):
76+
def execute(self, context: Context):
7677
source_hook = BaseHook.get_hook(self.source_conn_id)
7778

7879
self.log.info("Extracting data from %s", self.source_conn_id)

β€Žairflow/operators/subdag.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from airflow.models.pool import Pool
3535
from airflow.models.taskinstance import TaskInstance
3636
from airflow.sensors.base import BaseSensorOperator
37+
from airflow.utils.context import Context
3738
from airflow.utils.session import NEW_SESSION, create_session, provide_session
3839
from airflow.utils.state import State
3940
from airflow.utils.types import DagRunType
@@ -179,7 +180,7 @@ def pre_execute(self, context):
179180
if dag_run.state == State.FAILED:
180181
self._reset_dag_run_and_task_instances(dag_run, execution_date)
181182

182-
def poke(self, context):
183+
def poke(self, context: Context):
183184
execution_date = context['execution_date']
184185
dag_run = self._get_dagrun(execution_date=execution_date)
185186
return dag_run.state != State.RUNNING

β€Žairflow/providers/airbyte/operators/airbyte.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
# KIND, either express or implied. See the License for the
1616
# specific language governing permissions and limitations
1717
# under the License.
18-
from typing import Optional
18+
from typing import TYPE_CHECKING, Optional
1919

2020
from airflow.models import BaseOperator
2121
from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
2222

23+
if TYPE_CHECKING:
24+
from airflow.utils.context import Context
25+
2326

2427
class AirbyteTriggerSyncOperator(BaseOperator):
2528
"""
@@ -68,7 +71,7 @@ def __init__(
6871
self.wait_seconds = wait_seconds
6972
self.asynchronous = asynchronous
7073

71-
def execute(self, context) -> None:
74+
def execute(self, context: 'Context') -> None:
7275
"""Create Airbyte Job and wait to finish"""
7376
hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
7477
job_object = hook.submit_sync_connection(connection_id=self.connection_id)

0 commit comments

Comments
 (0)