Skip to content

Commit b0598b5

Browse files
authored
Add support for creating multiple replicated clusters in Bigtable hook and operator (#10475)
* Add support for creating multiple Bigtable replicas * Flake8 fix
1 parent 3a53039 commit b0598b5

File tree

4 files changed

+191
-4
lines changed

4 files changed

+191
-4
lines changed

β€Žairflow/providers/google/cloud/hooks/bigtable.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
This module contains a Google Cloud Bigtable Hook.
2020
"""
2121
import enum
22+
import warnings
2223
from typing import Dict, List, Optional, Sequence, Union
2324

2425
from google.cloud.bigtable import Client
@@ -109,6 +110,7 @@ def create_instance(
109110
main_cluster_id: str,
110111
main_cluster_zone: str,
111112
project_id: str,
113+
replica_clusters: Optional[List[Dict[str, str]]] = None,
112114
replica_cluster_id: Optional[str] = None,
113115
replica_cluster_zone: Optional[str] = None,
114116
instance_display_name: Optional[str] = None,
@@ -132,11 +134,15 @@ def create_instance(
132134
:param project_id: Optional, Google Cloud Platform project ID where the
133135
BigTable exists. If set to None or missing,
134136
the default project_id from the GCP connection is used.
137+
:type replica_clusters: List[Dict[str, str]]
138+
:param replica_clusters: (optional) A list of replica clusters for the new
139+
instance. Each cluster dictionary contains an id and a zone.
140+
Example: [{"id": "replica-1", "zone": "us-west1-a"}]
135141
:type replica_cluster_id: str
136-
:param replica_cluster_id: (optional) The ID for replica cluster for the new
142+
:param replica_cluster_id: (deprecated) The ID for replica cluster for the new
137143
instance.
138144
:type replica_cluster_zone: str
139-
:param replica_cluster_zone: (optional) The zone for replica cluster.
145+
:param replica_cluster_zone: (deprecated) The zone for replica cluster.
140146
:type instance_type: enums.Instance.Type
141147
:param instance_type: (optional) The type of the instance.
142148
:type instance_display_name: str
@@ -173,12 +179,24 @@ def create_instance(
173179
)
174180
]
175181
if replica_cluster_id and replica_cluster_zone:
182+
warnings.warn(
183+
"The replica_cluster_id and replica_cluster_zone parameter have been deprecated."
184+
"You should pass the replica_clusters parameter.", DeprecationWarning, stacklevel=2)
176185
clusters.append(instance.cluster(
177186
replica_cluster_id,
178187
replica_cluster_zone,
179188
cluster_nodes,
180189
cluster_storage_type
181190
))
191+
if replica_clusters:
192+
for replica_cluster in replica_clusters:
193+
if "id" in replica_cluster and "zone" in replica_cluster:
194+
clusters.append(instance.cluster(
195+
replica_cluster["id"],
196+
replica_cluster["zone"],
197+
cluster_nodes,
198+
cluster_storage_type
199+
))
182200
operation = instance.create(
183201
clusters=clusters
184202
)

β€Žairflow/providers/google/cloud/operators/bigtable.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,15 @@ class BigtableCreateInstanceOperator(BaseOperator, BigtableValidationMixin):
6868
:type project_id: str
6969
:param project_id: Optional, the ID of the GCP project. If set to None or missing,
7070
the default project_id from the GCP connection is used.
71+
:type replica_clusters: List[Dict[str, str]]
72+
:param replica_clusters: (optional) A list of replica clusters for the new
73+
instance. Each cluster dictionary contains an id and a zone.
74+
Example: [{"id": "replica-1", "zone": "us-west1-a"}]
7175
:type replica_cluster_id: str
72-
:param replica_cluster_id: (optional) The ID for replica cluster for the new instance.
76+
:param replica_cluster_id: (deprecated) The ID for replica cluster for the new
77+
instance.
7378
:type replica_cluster_zone: str
74-
:param replica_cluster_zone: (optional) The zone for replica cluster.
79+
:param replica_cluster_zone: (deprecated) The zone for replica cluster.
7580
:type instance_type: enum.IntEnum
7681
:param instance_type: (optional) The type of the instance.
7782
:type instance_display_name: str
@@ -100,6 +105,7 @@ def __init__(self, *, # pylint: disable=too-many-arguments
100105
main_cluster_id: str,
101106
main_cluster_zone: str,
102107
project_id: Optional[str] = None,
108+
replica_clusters: Optional[List[Dict[str, str]]] = None,
103109
replica_cluster_id: Optional[str] = None,
104110
replica_cluster_zone: Optional[str] = None,
105111
instance_display_name: Optional[str] = None,
@@ -114,6 +120,7 @@ def __init__(self, *, # pylint: disable=too-many-arguments
114120
self.instance_id = instance_id
115121
self.main_cluster_id = main_cluster_id
116122
self.main_cluster_zone = main_cluster_zone
123+
self.replica_clusters = replica_clusters
117124
self.replica_cluster_id = replica_cluster_id
118125
self.replica_cluster_zone = replica_cluster_zone
119126
self.instance_display_name = instance_display_name
@@ -145,6 +152,7 @@ def execute(self, context):
145152
instance_id=self.instance_id,
146153
main_cluster_id=self.main_cluster_id,
147154
main_cluster_zone=self.main_cluster_zone,
155+
replica_clusters=self.replica_clusters,
148156
replica_cluster_id=self.replica_cluster_id,
149157
replica_cluster_zone=self.replica_cluster_zone,
150158
instance_display_name=self.instance_display_name,

β€Žtests/providers/google/cloud/hooks/test_bigtable.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@
3838
CBT_CLUSTER = 'cluster'
3939
CBT_ZONE = 'zone'
4040
CBT_TABLE = 'table'
41+
CBT_REPLICA_CLUSTER_ID = 'replica-cluster'
42+
CBT_REPLICA_CLUSTER_ZONE = 'us-west1-b'
43+
CBT_REPLICATE_CLUSTERS = [
44+
{'id': 'replica-1', 'zone': 'us-west1-a'},
45+
{'id': 'replica-2', 'zone': 'us-central1-f'},
46+
{'id': 'replica-3', 'zone': 'us-east1-d'},
47+
]
4148

4249

4350
class TestBigtableHookNoDefaultProjectId(unittest.TestCase):
@@ -289,6 +296,95 @@ def test_create_instance(self, get_client, instance_create, mock_project_id):
289296
instance_create.assert_called_once_with(clusters=mock.ANY)
290297
self.assertEqual(res.instance_id, 'instance')
291298

299+
@mock.patch(
300+
'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
301+
new_callable=PropertyMock,
302+
return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST,
303+
)
304+
@mock.patch('google.cloud.bigtable.instance.Instance.cluster')
305+
@mock.patch('google.cloud.bigtable.instance.Instance.create')
306+
@mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
307+
def test_create_instance_with_one_replica_cluster(
308+
self, get_client, instance_create, cluster, mock_project_id
309+
):
310+
operation = mock.Mock()
311+
operation.result_return_value = Instance(
312+
instance_id=CBT_INSTANCE, client=get_client
313+
)
314+
instance_create.return_value = operation
315+
316+
res = self.bigtable_hook_default_project_id.create_instance(
317+
instance_id=CBT_INSTANCE,
318+
main_cluster_id=CBT_CLUSTER,
319+
main_cluster_zone=CBT_ZONE,
320+
replica_cluster_id=CBT_REPLICA_CLUSTER_ID,
321+
replica_cluster_zone=CBT_REPLICA_CLUSTER_ZONE,
322+
cluster_nodes=1,
323+
cluster_storage_type=enums.StorageType.SSD,
324+
project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST,
325+
)
326+
cluster.assert_has_calls(
327+
[
328+
unittest.mock.call(
329+
CBT_CLUSTER, CBT_ZONE, 1, enums.StorageType.SSD
330+
),
331+
unittest.mock.call(
332+
CBT_REPLICA_CLUSTER_ID, CBT_REPLICA_CLUSTER_ZONE, 1, enums.StorageType.SSD
333+
),
334+
],
335+
any_order=True
336+
)
337+
get_client.assert_called_once_with(project_id='example-project')
338+
instance_create.assert_called_once_with(clusters=mock.ANY)
339+
self.assertEqual(res.instance_id, 'instance')
340+
341+
@mock.patch(
342+
'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
343+
new_callable=PropertyMock,
344+
return_value=GCP_PROJECT_ID_HOOK_UNIT_TEST,
345+
)
346+
@mock.patch('google.cloud.bigtable.instance.Instance.cluster')
347+
@mock.patch('google.cloud.bigtable.instance.Instance.create')
348+
@mock.patch('airflow.providers.google.cloud.hooks.bigtable.BigtableHook._get_client')
349+
def test_create_instance_with_multiple_replica_clusters(
350+
self, get_client, instance_create, cluster, mock_project_id
351+
):
352+
operation = mock.Mock()
353+
operation.result_return_value = Instance(
354+
instance_id=CBT_INSTANCE, client=get_client
355+
)
356+
instance_create.return_value = operation
357+
358+
res = self.bigtable_hook_default_project_id.create_instance(
359+
instance_id=CBT_INSTANCE,
360+
main_cluster_id=CBT_CLUSTER,
361+
main_cluster_zone=CBT_ZONE,
362+
replica_clusters=CBT_REPLICATE_CLUSTERS,
363+
cluster_nodes=1,
364+
cluster_storage_type=enums.StorageType.SSD,
365+
project_id=GCP_PROJECT_ID_HOOK_UNIT_TEST,
366+
)
367+
cluster.assert_has_calls(
368+
[
369+
unittest.mock.call(
370+
CBT_CLUSTER, CBT_ZONE, 1, enums.StorageType.SSD
371+
),
372+
unittest.mock.call(
373+
'replica-1', 'us-west1-a', 1, enums.StorageType.SSD
374+
),
375+
unittest.mock.call(
376+
'replica-2', 'us-central1-f', 1, enums.StorageType.SSD
377+
),
378+
unittest.mock.call(
379+
'replica-3', 'us-east1-d', 1, enums.StorageType.SSD
380+
),
381+
],
382+
any_order=True
383+
)
384+
get_client.assert_called_once_with(project_id='example-project')
385+
instance_create.assert_called_once_with(clusters=mock.ANY)
386+
self.assertEqual(res.instance_id, 'instance')
387+
292388
@mock.patch(
293389
'airflow.providers.google.common.hooks.base_google.GoogleBaseHook.project_id',
294390
new_callable=PropertyMock,

β€Žtests/providers/google/cloud/operators/test_bigtable.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@
3636
INSTANCE_ID = 'test-instance-id'
3737
CLUSTER_ID = 'test-cluster-id'
3838
CLUSTER_ZONE = 'us-central1-f'
39+
REPLICATE_CLUSTERS = [
40+
{'id': 'replica-1', 'zone': 'us-west1-a'},
41+
{'id': 'replica-2', 'zone': 'us-central1-f'},
42+
{'id': 'replica-3', 'zone': 'us-east1-d'},
43+
]
3944
GCP_CONN_ID = 'test-gcp-conn-id'
4045
NODES = 5
4146
INSTANCE_DISPLAY_NAME = "test instance"
@@ -131,6 +136,66 @@ def test_different_error_reraised(self, mock_hook):
131136
main_cluster_id=CLUSTER_ID,
132137
main_cluster_zone=CLUSTER_ZONE,
133138
project_id=PROJECT_ID,
139+
replica_clusters=None,
140+
replica_cluster_id=None,
141+
replica_cluster_zone=None,
142+
timeout=None
143+
)
144+
145+
@mock.patch('airflow.providers.google.cloud.operators.bigtable.BigtableHook')
146+
def test_create_instance_that_doesnt_exists(self, mock_hook):
147+
mock_hook.return_value.get_instance.return_value = None
148+
op = BigtableCreateInstanceOperator(
149+
project_id=PROJECT_ID,
150+
instance_id=INSTANCE_ID,
151+
main_cluster_id=CLUSTER_ID,
152+
main_cluster_zone=CLUSTER_ZONE,
153+
task_id="id",
154+
gcp_conn_id=GCP_CONN_ID
155+
)
156+
op.execute(None)
157+
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
158+
mock_hook.return_value.create_instance.assert_called_once_with(
159+
cluster_nodes=None,
160+
cluster_storage_type=None,
161+
instance_display_name=None,
162+
instance_id=INSTANCE_ID,
163+
instance_labels=None,
164+
instance_type=None,
165+
main_cluster_id=CLUSTER_ID,
166+
main_cluster_zone=CLUSTER_ZONE,
167+
project_id=PROJECT_ID,
168+
replica_clusters=None,
169+
replica_cluster_id=None,
170+
replica_cluster_zone=None,
171+
timeout=None
172+
)
173+
174+
@mock.patch('airflow.providers.google.cloud.operators.bigtable.BigtableHook')
175+
def test_create_instance_with_replicas_that_doesnt_exists(self, mock_hook):
176+
mock_hook.return_value.get_instance.return_value = None
177+
op = BigtableCreateInstanceOperator(
178+
project_id=PROJECT_ID,
179+
instance_id=INSTANCE_ID,
180+
main_cluster_id=CLUSTER_ID,
181+
main_cluster_zone=CLUSTER_ZONE,
182+
replica_clusters=REPLICATE_CLUSTERS,
183+
task_id="id",
184+
gcp_conn_id=GCP_CONN_ID
185+
)
186+
op.execute(None)
187+
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
188+
mock_hook.return_value.create_instance.assert_called_once_with(
189+
cluster_nodes=None,
190+
cluster_storage_type=None,
191+
instance_display_name=None,
192+
instance_id=INSTANCE_ID,
193+
instance_labels=None,
194+
instance_type=None,
195+
main_cluster_id=CLUSTER_ID,
196+
main_cluster_zone=CLUSTER_ZONE,
197+
project_id=PROJECT_ID,
198+
replica_clusters=REPLICATE_CLUSTERS,
134199
replica_cluster_id=None,
135200
replica_cluster_zone=None,
136201
timeout=None

0 commit comments

Comments
 (0)