Skip to content

Commit 1dcd3e1

Browse files
authored
Add support for extra links coming from the providers (#12472)
Closes: #11431
1 parent 6150e26 commit 1dcd3e1

File tree

9 files changed

+130
-12
lines changed

9 files changed

+130
-12
lines changed

β€Žairflow/cli/cli_parser.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,6 +1169,12 @@ class GroupCommand(NamedTuple):
11691169
func=lazy_load_command('airflow.cli.commands.provider_command.provider_get'),
11701170
args=(ARG_OUTPUT, ARG_FULL, ARG_COLOR, ARG_PROVIDER_NAME),
11711171
),
1172+
ActionCommand(
1173+
name='links',
1174+
help='List extra links registered by the providers',
1175+
func=lazy_load_command('airflow.cli.commands.provider_command.extra_links_list'),
1176+
args=(ARG_OUTPUT,),
1177+
),
11721178
)
11731179

11741180
USERS_COMMANDS = (

β€Žairflow/cli/commands/provider_command.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,14 @@ def hooks_list(args):
7272
"conn_attribute_name": x[1][1],
7373
},
7474
)
75+
76+
77+
def extra_links_list(args):
78+
"""Lists all extra links at the command line"""
79+
AirflowConsole().print_as(
80+
data=ProvidersManager().extra_links_class_names,
81+
output=args.output,
82+
mapper=lambda x: {
83+
"extra_link_class_name": x,
84+
},
85+
)

β€Žairflow/provider.yaml.schema.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,13 @@
180180
"items": {
181181
"type": "string"
182182
}
183+
},
184+
"extra-links": {
185+
"type": "array",
186+
"description": "Class name that provide extra link functionality",
187+
"items": {
188+
"type": "string"
189+
}
183190
}
184191
},
185192
"additionalProperties": false,

β€Žairflow/providers/google/provider.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,3 +636,8 @@ hook-class-names:
636636
- airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook
637637
- airflow.providers.google.cloud.hooks.compute_ssh.ComputeEngineSSHHook
638638
- airflow.providers.google.cloud.hooks.bigquery.BigQueryHook
639+
640+
extra-links:
641+
- airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink
642+
- airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink
643+
- airflow.providers.google.cloud.operators.mlengine.AIPlatformConsoleLink

β€Žairflow/providers/qubole/provider.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,6 @@ hooks:
4545
python-modules:
4646
- airflow.providers.qubole.hooks.qubole
4747
- airflow.providers.qubole.hooks.qubole_check
48+
49+
extra-links:
50+
- airflow.providers.qubole.operators.qubole.QDSLink

β€Žairflow/providers_manager.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import logging
2323
import os
2424
from collections import OrderedDict
25-
from typing import Dict, Tuple
25+
from typing import Dict, Set, Tuple
2626

2727
import jsonschema
2828
import yaml
@@ -68,6 +68,7 @@ def __init__(self):
6868
# Keeps dict of hooks keyed by connection type and value is
6969
# Tuple: connection class, connection_id_attribute_name
7070
self._hooks_dict: Dict[str, Tuple[str, str]] = {}
71+
self._extra_link_class_name_set: Set[str] = set()
7172
self._validator = _create_validator()
7273
# Local source folders are loaded first. They should take precedence over the package ones for
7374
# Development purpose. In production provider.yaml files are not present in the 'airflow" directory
@@ -78,6 +79,7 @@ def __init__(self):
7879
self._discover_hooks()
7980
self._provider_dict = OrderedDict(sorted(self.providers.items()))
8081
self._hooks_dict = OrderedDict(sorted(self.hooks.items()))
82+
self._discover_extra_links()
8183

8284
def _discover_all_providers_from_packages(self) -> None:
8385
"""
@@ -224,6 +226,32 @@ def _add_hook(self, hook_class_name, provider_package) -> None:
224226

225227
self._hooks_dict[conn_type] = (hook_class_name, connection_id_attribute_name)
226228

229+
def _discover_extra_links(self) -> None:
230+
"""Retrieves all extra links defined in the providers"""
231+
for provider_package, (_, provider) in self._provider_dict.items():
232+
if provider.get("extra-links"):
233+
for extra_link in provider["extra-links"]:
234+
self._add_extra_link(extra_link, provider_package)
235+
236+
def _add_extra_link(self, extra_link_class_name, provider_package) -> None:
237+
"""
238+
Adds extra link class name to the list of classes
239+
:param extra_link_class_name: name of the class to add
240+
:param provider_package: provider package adding the link
241+
:return:
242+
"""
243+
if provider_package.startswith("apache-airflow"):
244+
provider_path = provider_package[len("apache-") :].replace("-", ".")
245+
if not extra_link_class_name.startswith(provider_path):
246+
log.warning(
247+
"Sanity check failed when importing '%s' from '%s' package. It should start with '%s'",
248+
extra_link_class_name,
249+
provider_package,
250+
provider_path,
251+
)
252+
return
253+
self._extra_link_class_name_set.add(extra_link_class_name)
254+
227255
@property
228256
def providers(self):
229257
"""Returns information about available providers."""
@@ -233,3 +261,8 @@ def providers(self):
233261
def hooks(self):
234262
"""Returns dictionary of connection_type-to-hook mapping"""
235263
return self._hooks_dict
264+
265+
@property
266+
def extra_links_class_names(self):
267+
"""Returns set of extra link class names."""
268+
return sorted(list(self._extra_link_class_name_set))

β€Žairflow/serialization/serialized_objects.py

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,25 @@
2020
import enum
2121
import logging
2222
from inspect import Parameter, signature
23-
from typing import Any, Dict, Iterable, List, Optional, Set, Union
23+
from typing import Any, Dict, Iterable, Optional, Set, Union
2424

2525
import cattr
2626
import pendulum
2727
from dateutil import relativedelta
28+
29+
try:
30+
from functools import cache
31+
except ImportError:
32+
from functools import lru_cache
33+
34+
cache = lru_cache(maxsize=None)
2835
from pendulum.tz.timezone import Timezone
2936

3037
from airflow.exceptions import AirflowException
3138
from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
3239
from airflow.models.connection import Connection
3340
from airflow.models.dag import DAG
41+
from airflow.providers_manager import ProvidersManager
3442
from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
3543
from airflow.serialization.helpers import serialize_template_field
3644
from airflow.serialization.json_schema import Validator, load_dag_schema
@@ -53,14 +61,22 @@
5361
log = logging.getLogger(__name__)
5462
FAILED = 'serialization_failed'
5563

56-
BUILTIN_OPERATOR_EXTRA_LINKS: List[str] = [
57-
"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink",
58-
"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink",
59-
"airflow.providers.google.cloud.operators.mlengine.AIPlatformConsoleLink",
60-
"airflow.providers.qubole.operators.qubole.QDSLink",
64+
_OPERATOR_EXTRA_LINKS: Set[str] = {
6165
"airflow.operators.dagrun_operator.TriggerDagRunLink",
6266
"airflow.sensors.external_task_sensor.ExternalTaskSensorLink",
63-
]
67+
}
68+
69+
70+
@cache
71+
def get_operator_extra_links():
72+
"""
73+
Returns operator extra links - both the ones that are built in and the ones that come from
74+
the providers.
75+
76+
:return: set of extra links
77+
"""
78+
_OPERATOR_EXTRA_LINKS.update(ProvidersManager().extra_links_class_names)
79+
return _OPERATOR_EXTRA_LINKS
6480

6581

6682
class BaseSerialization:
@@ -498,7 +514,7 @@ def _deserialize_operator_extra_links(cls, encoded_op_links: list) -> Dict[str,
498514
# )
499515

500516
_operator_link_class_path, data = list(_operator_links_source.items())[0]
501-
if _operator_link_class_path in BUILTIN_OPERATOR_EXTRA_LINKS:
517+
if _operator_link_class_path in get_operator_extra_links():
502518
single_op_link_class = import_string(_operator_link_class_path)
503519
elif _operator_link_class_path in plugins_manager.registered_operator_link_classes:
504520
single_op_link_class = plugins_manager.registered_operator_link_classes[

β€Žscripts/in_container/run_install_and_test_provider_packages.sh

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,17 @@ function discover_all_provider_packages() {
8181

8282
airflow providers list
8383

84-
local expected_number_of_providers=60
84+
local expected_number_of_providers=59
8585
local actual_number_of_providers
8686
actual_number_of_providers=$(airflow providers list --output table | grep -c apache-airflow-providers | xargs)
8787
if [[ ${actual_number_of_providers} != "${expected_number_of_providers}" ]]; then
8888
echo
8989
echo "${COLOR_RED_ERROR} Number of providers installed is wrong ${COLOR_RESET}"
9090
echo "Expected number was '${expected_number_of_providers}' and got '${actual_number_of_providers}'"
9191
echo
92-
echo "Either increase the number of providers if you added one or fix the problem with imports if you see one."
92+
echo "Either increase the number of providers if you added one or diagnose and fix the problem."
9393
echo
94+
exit 1
9495
fi
9596
}
9697

@@ -109,12 +110,36 @@ function discover_all_hooks() {
109110
echo "${COLOR_RED_ERROR} Number of hooks registered is wrong ${COLOR_RESET}"
110111
echo "Expected number was '${expected_number_of_hooks}' and got '${actual_number_of_hooks}'"
111112
echo
112-
echo "Either increase the number of hooks if you added one or fix problem with imports if you see one."
113+
echo "Either increase the number of hooks if you added one or diagnose and fix the problem."
113114
echo
115+
exit 1
114116
fi
115117
}
116118

119+
function discover_all_extra_links() {
120+
echo
121+
echo Listing available extra links via 'airflow providers links'
122+
echo
123+
124+
airflow providers links
125+
126+
local expected_number_of_extra_links=4
127+
local actual_number_of_extra_links
128+
actual_number_of_extra_links=$(airflow providers links --output table | grep -c ^airflow.providers | xargs)
129+
if [[ ${actual_number_of_extra_links} != "${expected_number_of_extra_links}" ]]; then
130+
echo
131+
echo "${COLOR_RED_ERROR} Number of links registered is wrong ${COLOR_RESET}"
132+
echo "Expected number was '${expected_number_of_extra_links}' and got '${actual_number_of_extra_links}'"
133+
echo
134+
echo "Either increase the number of links if you added one or diagnose and fix the problem."
135+
echo
136+
exit 1
137+
fi
138+
}
139+
140+
117141
if [[ ${BACKPORT_PACKAGES} != "true" ]]; then
118142
discover_all_provider_packages
119143
discover_all_hooks
144+
discover_all_extra_links
120145
fi

β€Žtests/core/test_providers_manager.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,13 @@
119119
'wasb',
120120
]
121121

122+
EXTRA_LINKS = [
123+
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
124+
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink',
125+
'airflow.providers.google.cloud.operators.mlengine.AIPlatformConsoleLink',
126+
'airflow.providers.qubole.operators.qubole.QDSLink',
127+
]
128+
122129

123130
class TestProviderManager(unittest.TestCase):
124131
def test_providers_are_loaded(self):
@@ -137,3 +144,8 @@ def test_hooks(self):
137144
provider_manager = ProvidersManager()
138145
connections_list = list(provider_manager.hooks.keys())
139146
self.assertEqual(CONNECTIONS_LIST, connections_list)
147+
148+
def test_extra_links(self):
149+
provider_manager = ProvidersManager()
150+
extra_link_class_names = list(provider_manager.extra_links_class_names)
151+
self.assertEqual(EXTRA_LINKS, extra_link_class_names)

0 commit comments

Comments
 (0)