Skip to content

Commit 382c101

Browse files
authored
Add Bigtable Update Instance Hook/Operator (#10340)
Add Bigtable Update Instance Hook/Operator
1 parent 6656464 commit 382c101

File tree

6 files changed

+347
-3
lines changed

6 files changed

+347
-3
lines changed

β€Žairflow/providers/google/cloud/example_dags/example_bigtable.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,16 +51,21 @@
5151
from airflow import models
5252
from airflow.providers.google.cloud.operators.bigtable import (
5353
BigtableCreateInstanceOperator, BigtableCreateTableOperator, BigtableDeleteInstanceOperator,
54-
BigtableDeleteTableOperator, BigtableUpdateClusterOperator,
54+
BigtableDeleteTableOperator, BigtableUpdateClusterOperator, BigtableUpdateInstanceOperator,
5555
)
5656
from airflow.providers.google.cloud.sensors.bigtable import BigtableTableReplicationCompletedSensor
5757
from airflow.utils.dates import days_ago
5858

5959
GCP_PROJECT_ID = getenv('GCP_PROJECT_ID', 'example-project')
6060
CBT_INSTANCE_ID = getenv('CBT_INSTANCE_ID', 'some-instance-id')
6161
CBT_INSTANCE_DISPLAY_NAME = getenv('CBT_INSTANCE_DISPLAY_NAME', 'Human-readable name')
62+
CBT_INSTANCE_DISPLAY_NAME_UPDATED = getenv(
63+
"CBT_INSTANCE_DISPLAY_NAME_UPDATED", "Human-readable name - updated"
64+
)
6265
CBT_INSTANCE_TYPE = getenv('CBT_INSTANCE_TYPE', '2')
66+
CBT_INSTANCE_TYPE_PROD = getenv('CBT_INSTANCE_TYPE_PROD', '1')
6367
CBT_INSTANCE_LABELS = getenv('CBT_INSTANCE_LABELS', '{}')
68+
CBT_INSTANCE_LABELS_UPDATED = getenv('CBT_INSTANCE_LABELS', '{"env": "prod"}')
6469
CBT_CLUSTER_ID = getenv('CBT_CLUSTER_ID', 'some-cluster-id')
6570
CBT_CLUSTER_ZONE = getenv('CBT_CLUSTER_ZONE', 'europe-west1-b')
6671
CBT_CLUSTER_NODES = getenv('CBT_CLUSTER_NODES', '3')
@@ -103,6 +108,16 @@
103108
create_instance_task >> create_instance_task2
104109
# [END howto_operator_gcp_bigtable_instance_create]
105110

111+
# [START howto_operator_gcp_bigtable_instance_update]
112+
update_instance_task = BigtableUpdateInstanceOperator(
113+
instance_id=CBT_INSTANCE_ID,
114+
instance_display_name=CBT_INSTANCE_DISPLAY_NAME_UPDATED,
115+
instance_type=int(CBT_INSTANCE_TYPE_PROD),
116+
instance_labels=json.loads(CBT_INSTANCE_LABELS_UPDATED),
117+
task_id='update_instance_task',
118+
)
119+
# [END howto_operator_gcp_bigtable_instance_update]
120+
106121
# [START howto_operator_gcp_bigtable_cluster_update]
107122
cluster_update_task = BigtableUpdateClusterOperator(
108123
project_id=GCP_PROJECT_ID,
@@ -186,6 +201,7 @@
186201
create_instance_task \
187202
>> create_table_task \
188203
>> cluster_update_task \
204+
>> update_instance_task \
189205
>> delete_table_task
190206
create_instance_task2 \
191207
>> create_table_task2 \

β€Žairflow/providers/google/cloud/hooks/bigtable.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"""
1919
This module contains a Google Cloud Bigtable Hook.
2020
"""
21+
import enum
2122
from typing import Dict, List, Optional, Sequence, Union
2223

2324
from google.cloud.bigtable import Client
@@ -184,6 +185,51 @@ def create_instance(
184185
operation.result(timeout)
185186
return instance
186187

188+
@GoogleBaseHook.fallback_to_default_project_id
189+
def update_instance(
190+
self,
191+
instance_id: str,
192+
project_id: str,
193+
instance_display_name: Optional[str] = None,
194+
instance_type: Optional[Union[enums.Instance.Type, enum.IntEnum]] = None,
195+
instance_labels: Optional[Dict] = None,
196+
timeout: Optional[float] = None
197+
) -> Instance:
198+
"""
199+
Update an existing instance.
200+
201+
:type instance_id: str
202+
:param instance_id: The ID for the existing instance.
203+
:type project_id: str
204+
:param project_id: Optional, Google Cloud Platform project ID where the
205+
BigTable exists. If set to None or missing,
206+
the default project_id from the GCP connection is used.
207+
:type instance_display_name: str
208+
:param instance_display_name: (optional) Human-readable name of the instance.
209+
:type instance_type: enums.Instance.Type or enum.IntEnum
210+
:param instance_type: (optional) The type of the instance.
211+
:type instance_labels: dict
212+
:param instance_labels: (optional) Dictionary of labels to associate with the
213+
instance.
214+
:type timeout: int
215+
:param timeout: (optional) timeout (in seconds) for instance update.
216+
If None is not specified, Operator will wait indefinitely.
217+
"""
218+
instance_type = enums.Instance.Type(instance_type)
219+
220+
instance = Instance(
221+
instance_id=instance_id,
222+
client=self._get_client(project_id=project_id),
223+
display_name=instance_display_name,
224+
instance_type=instance_type,
225+
labels=instance_labels,
226+
)
227+
228+
operation = instance.update()
229+
operation.result(timeout)
230+
231+
return instance
232+
187233
@staticmethod
188234
def create_table(
189235
instance: Instance,

β€Žairflow/providers/google/cloud/operators/bigtable.py

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
"""
1919
This module contains Google Cloud Bigtable operators.
2020
"""
21-
from typing import Dict, Iterable, List, Optional
21+
import enum
22+
from typing import Dict, Iterable, List, Optional, Union
2223

2324
import google.api_core.exceptions
2425
from google.cloud.bigtable.column_family import GarbageCollectionRule
@@ -158,6 +159,82 @@ def execute(self, context):
158159
raise e
159160

160161

162+
class BigtableUpdateInstanceOperator(BaseOperator, BigtableValidationMixin):
163+
"""
164+
Updates an existing Cloud Bigtable instance.
165+
166+
For more details about instance creation have a look at the reference:
167+
https://googleapis.dev/python/bigtable/latest/instance.html#google.cloud.bigtable.instance.Instance.update
168+
169+
.. seealso::
170+
For more information on how to use this operator, take a look at the guide:
171+
:ref:`howto/operator:BigtableUpdateInstanceOperator`
172+
173+
:type instance_id: str
174+
:param instance_id: The ID of the Cloud Bigtable instance to update.
175+
:type project_id: str
176+
:param project_id: Optional, the ID of the GCP project. If set to None or missing,
177+
the default project_id from the GCP connection is used.
178+
:type instance_display_name: str
179+
:param instance_display_name: (optional) Human-readable name of the instance.
180+
:type instance_type: enums.Instance.Type or enum.IntEnum
181+
:param instance_type: (optional) The type of the instance.
182+
:type instance_labels: dict
183+
:param instance_labels: (optional) Dictionary of labels to associate
184+
with the instance.
185+
:type timeout: int
186+
:param timeout: (optional) timeout (in seconds) for instance update.
187+
If None is not specified, Operator will wait indefinitely.
188+
:param gcp_conn_id: The connection ID to use to connect to Google Cloud Platform.
189+
:type gcp_conn_id: str
190+
"""
191+
192+
REQUIRED_ATTRIBUTES: Iterable[str] = ['instance_id']
193+
template_fields: Iterable[str] = ['project_id', 'instance_id']
194+
195+
@apply_defaults
196+
def __init__(self, *,
197+
instance_id: str,
198+
project_id: Optional[str] = None,
199+
instance_display_name: Optional[str] = None,
200+
instance_type: Optional[Union[enums.Instance.Type, enum.IntEnum]] = None,
201+
instance_labels: Optional[Dict] = None,
202+
timeout: Optional[float] = None,
203+
gcp_conn_id: str = 'google_cloud_default',
204+
**kwargs) -> None:
205+
self.project_id = project_id
206+
self.instance_id = instance_id
207+
self.instance_display_name = instance_display_name
208+
self.instance_type = instance_type
209+
self.instance_labels = instance_labels
210+
self.timeout = timeout
211+
self._validate_inputs()
212+
self.gcp_conn_id = gcp_conn_id
213+
super().__init__(**kwargs)
214+
215+
def execute(self, context):
216+
hook = BigtableHook(gcp_conn_id=self.gcp_conn_id)
217+
instance = hook.get_instance(project_id=self.project_id,
218+
instance_id=self.instance_id)
219+
if not instance:
220+
raise AirflowException(
221+
f"Dependency: instance '{self.instance_id}' does not exist."
222+
)
223+
224+
try:
225+
hook.update_instance(
226+
project_id=self.project_id,
227+
instance_id=self.instance_id,
228+
instance_display_name=self.instance_display_name,
229+
instance_type=self.instance_type,
230+
instance_labels=self.instance_labels,
231+
timeout=self.timeout,
232+
)
233+
except google.api_core.exceptions.GoogleAPICallError as e:
234+
self.log.error('An error occurred. Exiting.')
235+
raise e
236+
237+
161238
class BigtableDeleteInstanceOperator(BaseOperator, BigtableValidationMixin):
162239
"""
163240
Deletes the Cloud Bigtable instance, including its clusters and all related tables.

β€Ždocs/howto/operator/google/cloud/bigtable.rst

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,29 @@ it will be retrieved from the GCP connection used. Both variants are shown:
5353
:start-after: [START howto_operator_gcp_bigtable_instance_create]
5454
:end-before: [END howto_operator_gcp_bigtable_instance_create]
5555

56+
.. _howto/operator:BigtableUpdateInstanceOperator:
57+
58+
BigtableUpdateInstanceOperator
59+
------------------------------
60+
61+
Use the :class:`~airflow.providers.google.cloud.operators.bigtable.BigtableUpdateInstanceOperator`
62+
to update an existing Google Cloud Bigtable instance.
63+
64+
Only the following configuration can be updated for an existing instance:
65+
instance_display_name, instance_type and instance_labels.
66+
67+
Using the operator
68+
""""""""""""""""""
69+
70+
You can create the operator with or without project id. If project id is missing
71+
it will be retrieved from the GCP connection used. Both variants are shown:
72+
73+
.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_bigtable.py
74+
:language: python
75+
:dedent: 4
76+
:start-after: [START howto_operator_gcp_bigtable_instance_update]
77+
:end-before: [END howto_operator_gcp_bigtable_instance_update]
78+
5679
.. _howto/operator:BigtableDeleteInstanceOperator:
5780

5881
BigtableDeleteInstanceOperator

β€Žtests/providers/google/cloud/hooks/test_bigtable.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import mock
2323
from google.cloud.bigtable import Client
2424
from google.cloud.bigtable.instance import Instance
25+
from google.cloud.bigtable_admin_v2 import enums
2526
from mock import PropertyMock
2627

2728
from airflow.providers.google.cloud.hooks.bigtable import BigtableHook
@@ -31,6 +32,9 @@
3132
)
3233

3334
CBT_INSTANCE = 'instance'
35+
CBT_INSTANCE_DISPLAY_NAME = "test instance"
36+
CBT_INSTANCE_TYPE = enums.Instance.Type.PRODUCTION
37+
CBT_INSTANCE_LABELS = {"env": "sit"}
3438
CBT_CLUSTER = 'cluster'
3539
CBT_ZONE = 'zone'
3640
CBT_TABLE = 'table'
@@ -102,6 +106,23 @@ def test_create_instance_overridden_project_id(self, get_client, instance_create
102106
instance_create.assert_called_once_with(clusters=mock.ANY)
103107
self.assertEqual(res.instance_id, 'instance')
104108

109+
@mock.patch('google.cloud.bigtable.instance.Instance.update')
110+
@mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
111+
def test_update_instance_overridden_project_id(self, get_client, instance_update):
112+
operation = mock.Mock()
113+
operation.result_return_value = Instance(instance_id=CBT_INSTANCE, client=get_client)
114+
instance_update.return_value = operation
115+
res = self.bigtable_hook_no_default_project_id.update_instance(
116+
project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST,
117+
instance_id=CBT_INSTANCE,
118+
instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
119+
instance_type=CBT_INSTANCE_TYPE,
120+
instance_labels=CBT_INSTANCE_LABELS
121+
)
122+
get_client.assert_called_once_with(project_id='example-project')
123+
instance_update.assert_called_once_with()
124+
self.assertEqual(res.instance_id, 'instance')
125+
105126
@mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
106127
def test_delete_table_overridden_project_id(self, get_client):
107128
instance_method = get_client.return_value.instance
@@ -268,6 +289,28 @@ def test_create_instance(self, get_client, instance_create, mock_project_id):
268289
instance_create.assert_called_once_with(clusters=mock.ANY)
269290
self.assertEqual(res.instance_id, 'instance')
270291

292+
@mock.patch(
293+
'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
294+
new_callable=PropertyMock,
295+
return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST
296+
)
297+
@mock.patch('google.cloud.bigtable.instance.Instance.update')
298+
@mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
299+
def test_update_instance(self, get_client, instance_update, mock_project_id):
300+
operation = mock.Mock()
301+
operation.result_return_value = Instance(instance_id=CBT_INSTANCE, client=get_client)
302+
instance_update.return_value = operation
303+
res = self.bigtable_hook_default_project_id.update_instance(
304+
instance_id=CBT_INSTANCE,
305+
instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
306+
instance_type=CBT_INSTANCE_TYPE,
307+
instance_labels=CBT_INSTANCE_LABELS,
308+
project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST,
309+
)
310+
get_client.assert_called_once_with(project_id='example-project')
311+
instance_update.assert_called_once_with()
312+
self.assertEqual(res.instance_id, 'instance')
313+
271314
@mock.patch('google.cloud.bigtable.instance.Instance.create')
272315
@mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
273316
def test_create_instance_overridden_project_id(self, get_client, instance_create):

0 commit comments

Comments
 (0)