Skip to content

Commit f60fa98

Browse files
author
Wojciech Januszek
authored
Cloud SQL assets & system tests migration (AIP-47) (#23583)
1 parent df84c4a commit f60fa98

File tree

8 files changed

+234
-1011
lines changed

8 files changed

+234
-1011
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
"""This module contains Google Cloud SQL links."""
19+
from typing import TYPE_CHECKING, Optional
20+
21+
from airflow.models import BaseOperator
22+
from airflow.providers.google.cloud.links.base import BaseGoogleLink
23+
24+
if TYPE_CHECKING:
25+
from airflow.utils.context import Context
26+
27+
28+
CLOUD_SQL_BASE_LINK = "https://console.cloud.google.com/sql"
29+
CLOUD_SQL_INSTANCE_LINK = CLOUD_SQL_BASE_LINK + "/instances/{instance}/overview?project={project_id}"
30+
CLOUD_SQL_INSTANCE_DATABASE_LINK = (
31+
CLOUD_SQL_BASE_LINK + "/instances/{instance}/databases?project={project_id}"
32+
)
33+
34+
35+
class CloudSQLInstanceLink(BaseGoogleLink):
36+
"""Helper class for constructing Cloud SQL Instance Link"""
37+
38+
name = "Cloud SQL Instance"
39+
key = "cloud_sql_instance"
40+
format_str = CLOUD_SQL_INSTANCE_LINK
41+
42+
@staticmethod
43+
def persist(
44+
context: "Context",
45+
task_instance: BaseOperator,
46+
cloud_sql_instance: str,
47+
project_id: Optional[str],
48+
):
49+
task_instance.xcom_push(
50+
context,
51+
key=CloudSQLInstanceLink.key,
52+
value={"instance": cloud_sql_instance, "project_id": project_id},
53+
)
54+
55+
56+
class CloudSQLInstanceDatabaseLink(BaseGoogleLink):
57+
"""Helper class for constructing Cloud SQL Instance Database Link"""
58+
59+
name = "Cloud SQL Instance Database"
60+
key = "cloud_sql_instance_database"
61+
format_str = CLOUD_SQL_INSTANCE_DATABASE_LINK
62+
63+
@staticmethod
64+
def persist(
65+
context: "Context",
66+
task_instance: BaseOperator,
67+
cloud_sql_instance: str,
68+
project_id: Optional[str],
69+
):
70+
task_instance.xcom_push(
71+
context,
72+
key=CloudSQLInstanceDatabaseLink.key,
73+
value={"instance": cloud_sql_instance, "project_id": project_id},
74+
)

β€Žairflow/providers/google/cloud/operators/cloud_sql.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
from airflow.hooks.base import BaseHook
2525
from airflow.models import BaseOperator, Connection
2626
from airflow.providers.google.cloud.hooks.cloud_sql import CloudSQLDatabaseHook, CloudSQLHook
27+
from airflow.providers.google.cloud.links.cloud_sql import CloudSQLInstanceDatabaseLink, CloudSQLInstanceLink
2728
from airflow.providers.google.cloud.utils.field_validator import GcpBodyFieldValidator
29+
from airflow.providers.google.common.links.storage import FileDetailsLink
2830
from airflow.providers.mysql.hooks.mysql import MySqlHook
2931
from airflow.providers.postgres.hooks.postgres import PostgresHook
3032

@@ -316,6 +318,7 @@ class CloudSQLCreateInstanceOperator(CloudSQLBaseOperator):
316318
'impersonation_chain',
317319
)
318320
# [END gcp_sql_create_template_fields]
321+
operator_extra_links = (CloudSQLInstanceLink(),)
319322

320323
def __init__(
321324
self,
@@ -363,6 +366,13 @@ def execute(self, context: 'Context') -> None:
363366
else:
364367
self.log.info("Cloud SQL instance with ID %s already exists. Aborting create.", self.instance)
365368

369+
CloudSQLInstanceLink.persist(
370+
context=context,
371+
task_instance=self,
372+
cloud_sql_instance=self.instance,
373+
project_id=self.project_id or hook.project_id,
374+
)
375+
366376
instance_resource = hook.get_instance(project_id=self.project_id, instance=self.instance)
367377
service_account_email = instance_resource["serviceAccountEmailAddress"]
368378
task_instance = context['task_instance']
@@ -411,6 +421,7 @@ class CloudSQLInstancePatchOperator(CloudSQLBaseOperator):
411421
'impersonation_chain',
412422
)
413423
# [END gcp_sql_patch_template_fields]
424+
operator_extra_links = (CloudSQLInstanceLink(),)
414425

415426
def __init__(
416427
self,
@@ -450,6 +461,13 @@ def execute(self, context: 'Context'):
450461
'Please specify another instance to patch.'
451462
)
452463
else:
464+
CloudSQLInstanceLink.persist(
465+
context=context,
466+
task_instance=self,
467+
cloud_sql_instance=self.instance,
468+
project_id=self.project_id or hook.project_id,
469+
)
470+
453471
return hook.patch_instance(project_id=self.project_id, body=self.body, instance=self.instance)
454472

455473

@@ -535,6 +553,7 @@ class CloudSQLCreateInstanceDatabaseOperator(CloudSQLBaseOperator):
535553
'impersonation_chain',
536554
)
537555
# [END gcp_sql_db_create_template_fields]
556+
operator_extra_links = (CloudSQLInstanceDatabaseLink(),)
538557

539558
def __init__(
540559
self,
@@ -585,6 +604,12 @@ def execute(self, context: 'Context') -> Optional[bool]:
585604
api_version=self.api_version,
586605
impersonation_chain=self.impersonation_chain,
587606
)
607+
CloudSQLInstanceDatabaseLink.persist(
608+
context=context,
609+
task_instance=self,
610+
cloud_sql_instance=self.instance,
611+
project_id=self.project_id or hook.project_id,
612+
)
588613
if self._check_if_db_exists(database, hook):
589614
self.log.info(
590615
"Cloud SQL instance with ID %s already contains database '%s'. Aborting database insert.",
@@ -635,6 +660,7 @@ class CloudSQLPatchInstanceDatabaseOperator(CloudSQLBaseOperator):
635660
'impersonation_chain',
636661
)
637662
# [END gcp_sql_db_patch_template_fields]
663+
operator_extra_links = (CloudSQLInstanceDatabaseLink(),)
638664

639665
def __init__(
640666
self,
@@ -687,6 +713,12 @@ def execute(self, context: 'Context') -> None:
687713
"Please specify another database to patch."
688714
)
689715
else:
716+
CloudSQLInstanceDatabaseLink.persist(
717+
context=context,
718+
task_instance=self,
719+
cloud_sql_instance=self.instance,
720+
project_id=self.project_id or hook.project_id,
721+
)
690722
return hook.patch_database(
691723
project_id=self.project_id, instance=self.instance, database=self.database, body=self.body
692724
)
@@ -811,6 +843,7 @@ class CloudSQLExportInstanceOperator(CloudSQLBaseOperator):
811843
'impersonation_chain',
812844
)
813845
# [END gcp_sql_export_template_fields]
846+
operator_extra_links = (CloudSQLInstanceLink(), FileDetailsLink())
814847

815848
def __init__(
816849
self,
@@ -853,6 +886,18 @@ def execute(self, context: 'Context') -> None:
853886
api_version=self.api_version,
854887
impersonation_chain=self.impersonation_chain,
855888
)
889+
CloudSQLInstanceLink.persist(
890+
context=context,
891+
task_instance=self,
892+
cloud_sql_instance=self.instance,
893+
project_id=self.project_id or hook.project_id,
894+
)
895+
FileDetailsLink.persist(
896+
context=context,
897+
task_instance=self,
898+
uri=self.body["exportContext"]["uri"][5:],
899+
project_id=self.project_id or hook.project_id,
900+
)
856901
return hook.export_instance(project_id=self.project_id, instance=self.instance, body=self.body)
857902

858903

@@ -908,6 +953,7 @@ class CloudSQLImportInstanceOperator(CloudSQLBaseOperator):
908953
'impersonation_chain',
909954
)
910955
# [END gcp_sql_import_template_fields]
956+
operator_extra_links = (CloudSQLInstanceLink(), FileDetailsLink())
911957

912958
def __init__(
913959
self,
@@ -950,6 +996,18 @@ def execute(self, context: 'Context') -> None:
950996
api_version=self.api_version,
951997
impersonation_chain=self.impersonation_chain,
952998
)
999+
CloudSQLInstanceLink.persist(
1000+
context=context,
1001+
task_instance=self,
1002+
cloud_sql_instance=self.instance,
1003+
project_id=self.project_id or hook.project_id,
1004+
)
1005+
FileDetailsLink.persist(
1006+
context=context,
1007+
task_instance=self,
1008+
uri=self.body["importContext"]["uri"][5:],
1009+
project_id=self.project_id or hook.project_id,
1010+
)
9531011
return hook.import_instance(project_id=self.project_id, instance=self.instance, body=self.body)
9541012

9551013

β€Žairflow/providers/google/provider.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -883,6 +883,8 @@ extra-links:
883883
- airflow.providers.google.cloud.operators.datafusion.DataFusionInstanceLink
884884
- airflow.providers.google.cloud.operators.datafusion.DataFusionPipelineLink
885885
- airflow.providers.google.cloud.operators.datafusion.DataFusionPipelinesLink
886+
- airflow.providers.google.cloud.links.cloud_sql.CloudSQLInstanceLink
887+
- airflow.providers.google.cloud.links.cloud_sql.CloudSQLInstanceDatabaseLink
886888
- airflow.providers.google.cloud.links.dataplex.DataplexTaskLink
887889
- airflow.providers.google.cloud.links.dataplex.DataplexTasksLink
888890
- airflow.providers.google.cloud.links.bigquery.BigQueryDatasetLink

β€Ždocs/apache-airflow-providers-google/operators/cloud/cloud_sql.rst

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,15 @@ Using the operator
4242
You can create the operator with or without project id. If project id is missing
4343
it will be retrieved from the Google Cloud connection used. Both variants are shown:
4444

45-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
45+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
4646
:language: python
4747
:dedent: 4
4848
:start-after: [START howto_operator_cloudsql_db_create]
4949
:end-before: [END howto_operator_cloudsql_db_create]
5050

5151
Example request body:
5252

53-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
53+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
5454
:language: python
5555
:start-after: [START howto_operator_cloudsql_db_create_body]
5656
:end-before: [END howto_operator_cloudsql_db_create_body]
@@ -87,7 +87,7 @@ Using the operator
8787
You can create the operator with or without project id. If project id is missing
8888
it will be retrieved from the Google Cloud connection used. Both variants are shown:
8989

90-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
90+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
9191
:language: python
9292
:dedent: 4
9393
:start-after: [START howto_operator_cloudsql_db_delete]
@@ -127,15 +127,15 @@ Using the operator
127127
You can create the operator with or without project id. If project id is missing
128128
it will be retrieved from the Google Cloud connection used. Both variants are shown:
129129

130-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
130+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
131131
:language: python
132132
:dedent: 4
133133
:start-after: [START howto_operator_cloudsql_db_patch]
134134
:end-before: [END howto_operator_cloudsql_db_patch]
135135

136136
Example request body:
137137

138-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
138+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
139139
:language: python
140140
:start-after: [START howto_operator_cloudsql_db_patch_body]
141141
:end-before: [END howto_operator_cloudsql_db_patch_body]
@@ -174,7 +174,7 @@ Using the operator
174174
You can create the operator with or without project id. If project id is missing
175175
it will be retrieved from the Google Cloud connection used. Both variants are shown:
176176

177-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
177+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
178178
:language: python
179179
:dedent: 4
180180
:start-after: [START howto_operator_cloudsql_delete]
@@ -183,7 +183,7 @@ it will be retrieved from the Google Cloud connection used. Both variants are sh
183183
Note: If the instance has read or failover replicas you need to delete them before you delete the primary instance.
184184
Replicas are deleted the same way as primary instances:
185185

186-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
186+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
187187
:language: python
188188
:dedent: 4
189189
:start-after: [START howto_operator_cloudsql_replicas_delete]
@@ -224,7 +224,7 @@ Arguments
224224

225225
Example body defining the export operation:
226226

227-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
227+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
228228
:language: python
229229
:start-after: [START howto_operator_cloudsql_export_body]
230230
:end-before: [END howto_operator_cloudsql_export_body]
@@ -235,7 +235,7 @@ Using the operator
235235
You can create the operator with or without project id. If project id is missing
236236
it will be retrieved from the Google Cloud connection used. Both variants are shown:
237237

238-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
238+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
239239
:language: python
240240
:dedent: 4
241241
:start-after: [START howto_operator_cloudsql_export]
@@ -269,7 +269,7 @@ To grant the service account with the appropriate WRITE permissions for the GCS
269269
you can use the :class:`~airflow.providers.google.cloud.operators.gcs.GCSBucketCreateAclEntryOperator`,
270270
as shown in the example:
271271

272-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
272+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
273273
:language: python
274274
:dedent: 4
275275
:start-after: [START howto_operator_cloudsql_export_gcs_permissions]
@@ -309,7 +309,7 @@ Arguments
309309

310310
Example body defining the import operation:
311311

312-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
312+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
313313
:language: python
314314
:start-after: [START howto_operator_cloudsql_import_body]
315315
:end-before: [END howto_operator_cloudsql_import_body]
@@ -320,7 +320,7 @@ Using the operator
320320
You can create the operator with or without project id. If project id is missing
321321
it will be retrieved from the Google Cloud connection used. Both variants are shown:
322322

323-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
323+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
324324
:language: python
325325
:dedent: 4
326326
:start-after: [START howto_operator_cloudsql_import]
@@ -354,7 +354,7 @@ To grant the service account with the appropriate READ permissions for the GCS o
354354
you can use the :class:`~airflow.providers.google.cloud.operators.gcs.GCSBucketCreateAclEntryOperator`,
355355
as shown in the example:
356356

357-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
357+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
358358
:language: python
359359
:dedent: 4
360360
:start-after: [START howto_operator_cloudsql_import_gcs_permissions]
@@ -380,14 +380,14 @@ Arguments
380380

381381
Example body defining the instance with failover replica:
382382

383-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
383+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
384384
:language: python
385385
:start-after: [START howto_operator_cloudsql_create_body]
386386
:end-before: [END howto_operator_cloudsql_create_body]
387387

388388
Example body defining read replica for the instance above:
389389

390-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
390+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
391391
:language: python
392392
:start-after: [START howto_operator_cloudsql_create_replica]
393393
:end-before: [END howto_operator_cloudsql_create_replica]
@@ -401,7 +401,7 @@ Using the operator
401401
You can create the operator with or without project id. If project id is missing
402402
it will be retrieved from the Google Cloud connection used. Both variants are shown:
403403

404-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
404+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
405405
:language: python
406406
:dedent: 4
407407
:start-after: [START howto_operator_cloudsql_create]
@@ -441,7 +441,7 @@ Arguments
441441

442442
Example body defining the instance:
443443

444-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
444+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
445445
:language: python
446446
:start-after: [START howto_operator_cloudsql_patch_body]
447447
:end-before: [END howto_operator_cloudsql_patch_body]
@@ -452,7 +452,7 @@ Using the operator
452452
You can create the operator with or without project id. If project id is missing
453453
it will be retrieved from the Google Cloud connection used. Both variants are shown:
454454

455-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_sql.py
455+
.. exampleinclude:: /../../tests/system/providers/google/cloud_sql/example_cloud_sql.py
456456
:language: python
457457
:dedent: 4
458458
:start-after: [START howto_operator_cloudsql_patch]

0 commit comments

Comments
 (0)