Skip to content

Commit 9150330

Browse files
tanjinPTobiasz KΔ™dzierskimik-laj
authored
Add Google Cloud Memorystore Memcached Operators (#10121)
Co-authored-by: Tobiasz KΔ™dzierski <tobiasz.kedzierski@polidea.com> Co-authored-by: Kamil BreguΕ‚a <mik-laj@users.noreply.github.com>
1 parent b9d677c commit 9150330

File tree

9 files changed

+1687
-21
lines changed

9 files changed

+1687
-21
lines changed

β€Žairflow/providers/google/cloud/example_dags/example_cloud_memorystore.py

Lines changed: 115 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from urllib.parse import urlparse
2323

2424
from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest, Instance
25+
from google.cloud.memcache_v1beta2.types import cloud_memcache
2526

2627
from airflow import models
2728
from airflow.operators.bash import BashOperator
@@ -37,15 +38,29 @@
3738
CloudMemorystoreListInstancesOperator,
3839
CloudMemorystoreScaleInstanceOperator,
3940
CloudMemorystoreUpdateInstanceOperator,
41+
CloudMemorystoreMemcachedApplyParametersOperator,
42+
CloudMemorystoreMemcachedCreateInstanceOperator,
43+
CloudMemorystoreMemcachedDeleteInstanceOperator,
44+
CloudMemorystoreMemcachedGetInstanceOperator,
45+
CloudMemorystoreMemcachedListInstancesOperator,
46+
CloudMemorystoreMemcachedUpdateInstanceOperator,
47+
CloudMemorystoreMemcachedUpdateParametersOperator,
4048
)
4149
from airflow.providers.google.cloud.operators.gcs import GCSBucketCreateAclEntryOperator
4250
from airflow.utils import dates
4351

4452
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
4553

46-
INSTANCE_NAME = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME", "test-memorystore")
47-
INSTANCE_NAME_2 = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME2", "test-memorystore-2")
48-
INSTANCE_NAME_3 = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME3", "test-memorystore-3")
54+
MEMORYSTORE_REDIS_INSTANCE_NAME = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME", "test-memorystoreredis-")
55+
MEMORYSTORE_REDIS_INSTANCE_NAME_2 = os.environ.get(
56+
"GCP_MEMORYSTORE_INSTANCE_NAME2", "test-memorystore-redis-2"
57+
)
58+
MEMORYSTORE_REDIS_INSTANCE_NAME_3 = os.environ.get(
59+
"GCP_MEMORYSTORE_INSTANCE_NAME3", "test-memorystore-redis-3"
60+
)
61+
MEMORYSTORE_MEMCACHED_INSTANCE_NAME = os.environ.get(
62+
"GCP_MEMORYSTORE_INSTANCE_NAME4", "test-memorystore-memcached-1"
63+
)
4964

5065
EXPORT_GCS_URL = os.environ.get("GCP_MEMORYSTORE_EXPORT_GCS_URL", "gs://test-memorystore/my-export.rdb")
5166
EXPORT_GCS_URL_PARTS = urlparse(EXPORT_GCS_URL)
@@ -57,9 +72,13 @@
5772

5873
SECOND_INSTANCE = {"tier": Instance.Tier.STANDARD_HA, "memory_size_gb": 3}
5974

75+
# [START howto_operator_memcached_instance]
76+
MEMCACHED_INSTANCE = {"name": "", "node_count": 1, "node_config": {"cpu_count": 1, "memory_size_mb": 1024}}
77+
# [END howto_operator_memcached_instance]
78+
6079

6180
with models.DAG(
62-
"gcp_cloud_memorystore",
81+
"gcp_cloud_memorystore_redis",
6382
schedule_interval=None, # Override to match your needs
6483
start_date=dates.days_ago(1),
6584
tags=['example'],
@@ -68,7 +87,7 @@
6887
create_instance = CloudMemorystoreCreateInstanceOperator(
6988
task_id="create-instance",
7089
location="europe-north1",
71-
instance_id=INSTANCE_NAME,
90+
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
7291
instance=FIRST_INSTANCE,
7392
project_id=GCP_PROJECT_ID,
7493
)
@@ -84,7 +103,7 @@
84103
create_instance_2 = CloudMemorystoreCreateInstanceOperator(
85104
task_id="create-instance-2",
86105
location="europe-north1",
87-
instance_id=INSTANCE_NAME_2,
106+
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
88107
instance=SECOND_INSTANCE,
89108
project_id=GCP_PROJECT_ID,
90109
)
@@ -93,7 +112,7 @@
93112
get_instance = CloudMemorystoreGetInstanceOperator(
94113
task_id="get-instance",
95114
location="europe-north1",
96-
instance=INSTANCE_NAME,
115+
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
97116
project_id=GCP_PROJECT_ID,
98117
do_xcom_push=True,
99118
)
@@ -109,7 +128,7 @@
109128
failover_instance = CloudMemorystoreFailoverInstanceOperator(
110129
task_id="failover-instance",
111130
location="europe-north1",
112-
instance=INSTANCE_NAME_2,
131+
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
113132
data_protection_mode=FailoverInstanceRequest.DataProtectionMode.LIMITED_DATA_LOSS,
114133
project_id=GCP_PROJECT_ID,
115134
)
@@ -131,7 +150,7 @@
131150
update_instance = CloudMemorystoreUpdateInstanceOperator(
132151
task_id="update-instance",
133152
location="europe-north1",
134-
instance_id=INSTANCE_NAME,
153+
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
135154
project_id=GCP_PROJECT_ID,
136155
update_mask={"paths": ["memory_size_gb"]},
137156
instance={"memory_size_gb": 2},
@@ -152,7 +171,7 @@
152171
export_instance = CloudMemorystoreExportInstanceOperator(
153172
task_id="export-instance",
154173
location="europe-north1",
155-
instance=INSTANCE_NAME,
174+
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
156175
output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}},
157176
project_id=GCP_PROJECT_ID,
158177
)
@@ -162,30 +181,33 @@
162181
import_instance = CloudMemorystoreImportOperator(
163182
task_id="import-instance",
164183
location="europe-north1",
165-
instance=INSTANCE_NAME_2,
184+
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
166185
input_config={"gcs_source": {"uri": EXPORT_GCS_URL}},
167186
project_id=GCP_PROJECT_ID,
168187
)
169188
# [END howto_operator_import_instance]
170189

171190
# [START howto_operator_delete_instance]
172191
delete_instance = CloudMemorystoreDeleteInstanceOperator(
173-
task_id="delete-instance", location="europe-north1", instance=INSTANCE_NAME, project_id=GCP_PROJECT_ID
192+
task_id="delete-instance",
193+
location="europe-north1",
194+
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
195+
project_id=GCP_PROJECT_ID,
174196
)
175197
# [END howto_operator_delete_instance]
176198

177199
delete_instance_2 = CloudMemorystoreDeleteInstanceOperator(
178200
task_id="delete-instance-2",
179201
location="europe-north1",
180-
instance=INSTANCE_NAME_2,
202+
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
181203
project_id=GCP_PROJECT_ID,
182204
)
183205

184206
# [END howto_operator_create_instance_and_import]
185207
create_instance_and_import = CloudMemorystoreCreateInstanceAndImportOperator(
186208
task_id="create-instance-and-import",
187209
location="europe-north1",
188-
instance_id=INSTANCE_NAME_3,
210+
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
189211
instance=FIRST_INSTANCE,
190212
input_config={"gcs_source": {"uri": EXPORT_GCS_URL}},
191213
project_id=GCP_PROJECT_ID,
@@ -196,7 +218,7 @@
196218
scale_instance = CloudMemorystoreScaleInstanceOperator(
197219
task_id="scale-instance",
198220
location="europe-north1",
199-
instance_id=INSTANCE_NAME_3,
221+
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
200222
project_id=GCP_PROJECT_ID,
201223
memory_size_gb=3,
202224
)
@@ -206,7 +228,7 @@
206228
export_and_delete_instance = CloudMemorystoreExportAndDeleteInstanceOperator(
207229
task_id="export-and-delete-instance",
208230
location="europe-north1",
209-
instance=INSTANCE_NAME_3,
231+
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
210232
output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}},
211233
project_id=GCP_PROJECT_ID,
212234
)
@@ -229,3 +251,80 @@
229251
failover_instance >> delete_instance_2
230252

231253
export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance
254+
255+
with models.DAG(
256+
"gcp_cloud_memorystore_memcached",
257+
schedule_interval=None, # Override to match your needs
258+
start_date=dates.days_ago(1),
259+
tags=['example'],
260+
) as dag_memcache:
261+
# [START howto_operator_create_instance_memcached]
262+
create_memcached_instance = CloudMemorystoreMemcachedCreateInstanceOperator(
263+
task_id="create-instance",
264+
location="europe-north1",
265+
instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
266+
instance=MEMCACHED_INSTANCE,
267+
project_id=GCP_PROJECT_ID,
268+
)
269+
# [END howto_operator_create_instance_memcached]
270+
271+
# [START howto_operator_delete_instance_memcached]
272+
delete_memcached_instance = CloudMemorystoreMemcachedDeleteInstanceOperator(
273+
task_id="delete-instance",
274+
location="europe-north1",
275+
instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
276+
project_id=GCP_PROJECT_ID,
277+
)
278+
# [END howto_operator_delete_instance_memcached]
279+
280+
# [START howto_operator_get_instance_memcached]
281+
get_memcached_instance = CloudMemorystoreMemcachedGetInstanceOperator(
282+
task_id="get-instance",
283+
location="europe-north1",
284+
instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
285+
project_id=GCP_PROJECT_ID,
286+
)
287+
# [END howto_operator_get_instance_memcached]
288+
289+
# [START howto_operator_list_instances_memcached]
290+
list_memcached_instances = CloudMemorystoreMemcachedListInstancesOperator(
291+
task_id="list-instances", location="-", project_id=GCP_PROJECT_ID
292+
)
293+
# [END howto_operator_list_instances_memcached]
294+
295+
# # [START howto_operator_update_instance_memcached]
296+
update_memcached_instance = CloudMemorystoreMemcachedUpdateInstanceOperator(
297+
task_id="update-instance",
298+
location="europe-north1",
299+
instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
300+
project_id=GCP_PROJECT_ID,
301+
update_mask=cloud_memcache.field_mask.FieldMask(paths=["node_count"]),
302+
instance={"node_count": 2},
303+
)
304+
# [END howto_operator_update_instance_memcached]
305+
306+
# [START howto_operator_update_and_apply_parameters_memcached]
307+
update_memcached_parameters = CloudMemorystoreMemcachedUpdateParametersOperator(
308+
task_id="update-parameters",
309+
location="europe-north1",
310+
instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
311+
project_id=GCP_PROJECT_ID,
312+
update_mask={"paths": ["params"]},
313+
parameters={"params": {"protocol": "ascii", "hash_algorithm": "jenkins"}},
314+
)
315+
316+
apply_memcached_parameters = CloudMemorystoreMemcachedApplyParametersOperator(
317+
task_id="apply-parameters",
318+
location="europe-north1",
319+
instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
320+
project_id=GCP_PROJECT_ID,
321+
node_ids=["node-a-1"],
322+
apply_all=False,
323+
)
324+
325+
# update_parameters >> apply_parameters
326+
# [END howto_operator_update_and_apply_parameters_memcached]
327+
328+
create_memcached_instance >> [list_memcached_instances, get_memcached_instance]
329+
create_memcached_instance >> update_memcached_instance >> update_memcached_parameters
330+
update_memcached_parameters >> apply_memcached_parameters >> delete_memcached_instance

0 commit comments

Comments
 (0)