Skip to content

Commit 2d569fd

Browse files
authored
Create links for Biqtable operators (#23164)
1 parent 434ab5a commit 2d569fd

File tree

5 files changed

+125
-7
lines changed

5 files changed

+125
-7
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from typing import TYPE_CHECKING
19+
20+
from airflow.providers.google.cloud.links.base import BaseGoogleLink
21+
22+
if TYPE_CHECKING:
23+
from airflow.utils.context import Context
24+
25+
BASE_LINK = "https://console.cloud.google.com"
26+
BIGTABLE_BASE_LINK = BASE_LINK + "/bigtable"
27+
BIGTABLE_INSTANCE_LINK = BIGTABLE_BASE_LINK + "/instances/{instance_id}/overview?project={project_id}"
28+
BIGTABLE_CLUSTER_LINK = (
29+
BIGTABLE_BASE_LINK + "/instances/{instance_id}/clusters/{cluster_id}?project={project_id}"
30+
)
31+
BIGTABLE_TABLES_LINK = BIGTABLE_BASE_LINK + "/instances/{instance_id}/tables?project={project_id}"
32+
33+
34+
class BigtableInstanceLink(BaseGoogleLink):
35+
"""Helper class for constructing Bigtable Instance link"""
36+
37+
name = "Bigtable Instance"
38+
key = "instance_key"
39+
format_str = BIGTABLE_INSTANCE_LINK
40+
41+
@staticmethod
42+
def persist(
43+
context: "Context",
44+
task_instance,
45+
):
46+
task_instance.xcom_push(
47+
context=context,
48+
key=BigtableInstanceLink.key,
49+
value={
50+
"instance_id": task_instance.instance_id,
51+
"project_id": task_instance.project_id,
52+
},
53+
)
54+
55+
56+
class BigtableClusterLink(BaseGoogleLink):
57+
"""Helper class for constructing Bigtable Cluster link"""
58+
59+
name = "Bigtable Cluster"
60+
key = "cluster_key"
61+
format_str = BIGTABLE_CLUSTER_LINK
62+
63+
@staticmethod
64+
def persist(
65+
context: "Context",
66+
task_instance,
67+
):
68+
task_instance.xcom_push(
69+
context=context,
70+
key=BigtableClusterLink.key,
71+
value={
72+
"instance_id": task_instance.instance_id,
73+
"cluster_id": task_instance.cluster_id,
74+
"project_id": task_instance.project_id,
75+
},
76+
)
77+
78+
79+
class BigtableTablesLink(BaseGoogleLink):
80+
"""Helper class for constructing Bigtable Tables link"""
81+
82+
name = "Bigtable Tables"
83+
key = "tables_key"
84+
format_str = BIGTABLE_TABLES_LINK
85+
86+
@staticmethod
87+
def persist(
88+
context: "Context",
89+
task_instance,
90+
):
91+
task_instance.xcom_push(
92+
context=context,
93+
key=BigtableTablesLink.key,
94+
value={
95+
"instance_id": task_instance.instance_id,
96+
"project_id": task_instance.project_id,
97+
},
98+
)

β€Žairflow/providers/google/cloud/operators/bigtable.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
from airflow.exceptions import AirflowException
2727
from airflow.models import BaseOperator
2828
from airflow.providers.google.cloud.hooks.bigtable import BigtableHook
29+
from airflow.providers.google.cloud.links.bigtable import (
30+
BigtableClusterLink,
31+
BigtableInstanceLink,
32+
BigtableTablesLink,
33+
)
2934

3035
if TYPE_CHECKING:
3136
from airflow.utils.context import Context
@@ -93,6 +98,7 @@ class BigtableCreateInstanceOperator(BaseOperator, BigtableValidationMixin):
9398
'main_cluster_zone',
9499
'impersonation_chain',
95100
)
101+
operator_extra_links = (BigtableInstanceLink(),)
96102

97103
def __init__(
98104
self,
@@ -141,6 +147,7 @@ def execute(self, context: 'Context') -> None:
141147
"The instance '%s' already exists in this project. Consider it as created",
142148
self.instance_id,
143149
)
150+
BigtableInstanceLink.persist(context=context, task_instance=self)
144151
return
145152
try:
146153
hook.create_instance(
@@ -156,6 +163,7 @@ def execute(self, context: 'Context') -> None:
156163
cluster_storage_type=self.cluster_storage_type,
157164
timeout=self.timeout,
158165
)
166+
BigtableInstanceLink.persist(context=context, task_instance=self)
159167
except google.api_core.exceptions.GoogleAPICallError as e:
160168
self.log.error('An error occurred. Exiting.')
161169
raise e
@@ -198,6 +206,7 @@ class BigtableUpdateInstanceOperator(BaseOperator, BigtableValidationMixin):
198206
'instance_id',
199207
'impersonation_chain',
200208
)
209+
operator_extra_links = (BigtableInstanceLink(),)
201210

202211
def __init__(
203212
self,
@@ -241,6 +250,7 @@ def execute(self, context: 'Context') -> None:
241250
instance_labels=self.instance_labels,
242251
timeout=self.timeout,
243252
)
253+
BigtableInstanceLink.persist(context=context, task_instance=self)
244254
except google.api_core.exceptions.GoogleAPICallError as e:
245255
self.log.error('An error occurred. Exiting.')
246256
raise e
@@ -351,6 +361,7 @@ class BigtableCreateTableOperator(BaseOperator, BigtableValidationMixin):
351361
'table_id',
352362
'impersonation_chain',
353363
)
364+
operator_extra_links = (BigtableTablesLink(),)
354365

355366
def __init__(
356367
self,
@@ -412,6 +423,7 @@ def execute(self, context: 'Context') -> None:
412423
initial_split_keys=self.initial_split_keys,
413424
column_families=self.column_families,
414425
)
426+
BigtableTablesLink.persist(context=context, task_instance=self)
415427
except google.api_core.exceptions.AlreadyExists:
416428
if not self._compare_column_families(hook, instance):
417429
raise AirflowException(
@@ -533,6 +545,7 @@ class BigtableUpdateClusterOperator(BaseOperator, BigtableValidationMixin):
533545
'nodes',
534546
'impersonation_chain',
535547
)
548+
operator_extra_links = (BigtableClusterLink(),)
536549

537550
def __init__(
538551
self,
@@ -565,6 +578,7 @@ def execute(self, context: 'Context') -> None:
565578

566579
try:
567580
hook.update_cluster(instance=instance, cluster_id=self.cluster_id, nodes=self.nodes)
581+
BigtableClusterLink.persist(context=context, task_instance=self)
568582
except google.api_core.exceptions.NotFound:
569583
raise AirflowException(
570584
f"Dependency: cluster '{self.cluster_id}' does not exist for instance '{self.instance_id}'."

β€Žairflow/providers/google/cloud/sensors/bigtable.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from google.cloud.bigtable_admin_v2 import enums
2424

2525
from airflow.providers.google.cloud.hooks.bigtable import BigtableHook
26+
from airflow.providers.google.cloud.links.bigtable import BigtableTablesLink
2627
from airflow.providers.google.cloud.operators.bigtable import BigtableValidationMixin
2728
from airflow.sensors.base import BaseSensorOperator
2829

@@ -62,6 +63,7 @@ class BigtableTableReplicationCompletedSensor(BaseSensorOperator, BigtableValida
6263
'table_id',
6364
'impersonation_chain',
6465
)
66+
operator_extra_links = (BigtableTablesLink(),)
6567

6668
def __init__(
6769
self,
@@ -111,4 +113,5 @@ def poke(self, context: 'Context') -> bool:
111113
return False
112114

113115
self.log.info("Table '%s' is replicated.", self.table_id)
116+
BigtableTablesLink.persist(context=context, task_instance=self)
114117
return True

β€Žairflow/providers/google/provider.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -906,6 +906,9 @@ extra-links:
906906
- airflow.providers.google.cloud.links.dataflow.DataflowJobLink
907907
- airflow.providers.google.cloud.links.datastore.CloudDatastoreImportExportLink
908908
- airflow.providers.google.cloud.links.datastore.CloudDatastoreEntitiesLink
909+
- airflow.providers.google.cloud.links.bigtable.BigtableInstanceLink
910+
- airflow.providers.google.cloud.links.bigtable.BigtableClusterLink
911+
- airflow.providers.google.cloud.links.bigtable.BigtableTablesLink
909912
- airflow.providers.google.common.links.storage.StorageLink
910913

911914
additional-extras:

β€Žtests/providers/google/cloud/operators/test_bigtable.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def test_create_instance_that_exists(self, mock_hook):
100100
gcp_conn_id=GCP_CONN_ID,
101101
impersonation_chain=IMPERSONATION_CHAIN,
102102
)
103-
op.execute(None)
103+
op.execute(context={'ti': mock.MagicMock()})
104104

105105
mock_hook.assert_called_once_with(
106106
gcp_conn_id=GCP_CONN_ID,
@@ -120,7 +120,7 @@ def test_create_instance_that_exists_empty_project_id(self, mock_hook):
120120
gcp_conn_id=GCP_CONN_ID,
121121
impersonation_chain=IMPERSONATION_CHAIN,
122122
)
123-
op.execute(None)
123+
op.execute(context={'ti': mock.MagicMock()})
124124

125125
mock_hook.assert_called_once_with(
126126
gcp_conn_id=GCP_CONN_ID,
@@ -178,7 +178,7 @@ def test_create_instance_that_doesnt_exists(self, mock_hook):
178178
gcp_conn_id=GCP_CONN_ID,
179179
impersonation_chain=IMPERSONATION_CHAIN,
180180
)
181-
op.execute(None)
181+
op.execute(context={'ti': mock.MagicMock()})
182182
mock_hook.assert_called_once_with(
183183
gcp_conn_id=GCP_CONN_ID,
184184
impersonation_chain=IMPERSONATION_CHAIN,
@@ -210,7 +210,7 @@ def test_create_instance_with_replicas_that_doesnt_exists(self, mock_hook):
210210
gcp_conn_id=GCP_CONN_ID,
211211
impersonation_chain=IMPERSONATION_CHAIN,
212212
)
213-
op.execute(None)
213+
op.execute(context={'ti': mock.MagicMock()})
214214
mock_hook.assert_called_once_with(
215215
gcp_conn_id=GCP_CONN_ID,
216216
impersonation_chain=IMPERSONATION_CHAIN,
@@ -243,7 +243,7 @@ def test_delete_execute(self, mock_hook):
243243
gcp_conn_id=GCP_CONN_ID,
244244
impersonation_chain=IMPERSONATION_CHAIN,
245245
)
246-
op.execute(None)
246+
op.execute(context={'ti': mock.MagicMock()})
247247
mock_hook.assert_called_once_with(
248248
gcp_conn_id=GCP_CONN_ID,
249249
impersonation_chain=IMPERSONATION_CHAIN,
@@ -268,7 +268,7 @@ def test_update_execute_empty_project_id(self, mock_hook):
268268
gcp_conn_id=GCP_CONN_ID,
269269
impersonation_chain=IMPERSONATION_CHAIN,
270270
)
271-
op.execute(None)
271+
op.execute(context={'ti': mock.MagicMock()})
272272
mock_hook.assert_called_once_with(
273273
gcp_conn_id=GCP_CONN_ID,
274274
impersonation_chain=IMPERSONATION_CHAIN,
@@ -803,7 +803,7 @@ def test_create_execute(self, mock_hook):
803803
impersonation_chain=IMPERSONATION_CHAIN,
804804
)
805805
instance = mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
806-
op.execute(None)
806+
op.execute(context={'ti': mock.MagicMock()})
807807
mock_hook.assert_called_once_with(
808808
gcp_conn_id=GCP_CONN_ID,
809809
impersonation_chain=IMPERSONATION_CHAIN,

0 commit comments

Comments
 (0)