Skip to content

Commit e61d823

Browse files
authored
Cloud Storage Transfer Operators assets & system tests migration (AIP-47) (#26072)
1 parent cf73cb7 commit e61d823

File tree

13 files changed

+329
-106
lines changed

13 files changed

+329
-106
lines changed

β€Žairflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,8 @@ def create_transfer_job(self, body: dict) -> dict:
164164
:rtype: dict
165165
"""
166166
body = self._inject_project_id(body, BODY, PROJECT_ID)
167-
try:
168167

168+
try:
169169
transfer_job = (
170170
self.get_conn().transferJobs().create(body=body).execute(num_retries=self.num_retries)
171171
)
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
"""This module contains Google Storage Transfer Service links."""
19+
20+
from __future__ import annotations
21+
22+
from typing import TYPE_CHECKING
23+
24+
from airflow.providers.google.cloud.links.base import BaseGoogleLink
25+
26+
if TYPE_CHECKING:
27+
from airflow.utils.context import Context
28+
29+
CLOUD_STORAGE_TRANSFER_BASE_LINK = "https://console.cloud.google.com/transfer"
30+
31+
CLOUD_STORAGE_TRANSFER_LIST_LINK = CLOUD_STORAGE_TRANSFER_BASE_LINK + "/jobs?project={project_id}"
32+
33+
CLOUD_STORAGE_TRANSFER_JOB_LINK = (
34+
CLOUD_STORAGE_TRANSFER_BASE_LINK + "/jobs/transferJobs%2F{transfer_job}/runs?project={project_id}"
35+
)
36+
37+
CLOUD_STORAGE_TRANSFER_OPERATION_LINK = (
38+
CLOUD_STORAGE_TRANSFER_BASE_LINK
39+
+ "/jobs/transferJobs%2F{transfer_job}/runs/transferOperations%2F{transfer_operation}"
40+
+ "?project={project_id}"
41+
)
42+
43+
44+
class CloudStorageTransferLinkHelper:
45+
"""Helper class for Storage Transfer links"""
46+
47+
@staticmethod
48+
def extract_parts(operation_name: str | None):
49+
if not operation_name:
50+
return "", ""
51+
transfer_operation = operation_name.split("/")[1]
52+
transfer_job = operation_name.split("-")[1]
53+
return transfer_operation, transfer_job
54+
55+
56+
class CloudStorageTransferListLink(BaseGoogleLink):
57+
"""Helper class for constructing Cloud Storage Transfer Link"""
58+
59+
name = "Cloud Storage Transfer"
60+
key = "cloud_storage_transfer"
61+
format_str = CLOUD_STORAGE_TRANSFER_LIST_LINK
62+
63+
@staticmethod
64+
def persist(
65+
context: Context,
66+
task_instance,
67+
project_id: str,
68+
):
69+
task_instance.xcom_push(
70+
context,
71+
key=CloudStorageTransferListLink.key,
72+
value={"project_id": project_id},
73+
)
74+
75+
76+
class CloudStorageTransferJobLink(BaseGoogleLink):
77+
"""Helper class for constructing Storage Transfer Job Link"""
78+
79+
name = "Cloud Storage Transfer Job"
80+
key = "cloud_storage_transfer_job"
81+
format_str = CLOUD_STORAGE_TRANSFER_JOB_LINK
82+
83+
@staticmethod
84+
def persist(
85+
task_instance,
86+
context: Context,
87+
project_id: str,
88+
job_name: str,
89+
):
90+
91+
job_name = job_name.split('/')[1] if job_name else ""
92+
93+
task_instance.xcom_push(
94+
context,
95+
key=CloudStorageTransferJobLink.key,
96+
value={
97+
"project_id": project_id,
98+
"transfer_job": job_name,
99+
},
100+
)
101+
102+
103+
class CloudStorageTransferDetailsLink(BaseGoogleLink):
104+
"""Helper class for constructing Cloud Storage Transfer Operation Link"""
105+
106+
name = "Cloud Storage Transfer Details"
107+
key = "cloud_storage_transfer_details"
108+
format_str = CLOUD_STORAGE_TRANSFER_OPERATION_LINK
109+
110+
@staticmethod
111+
def persist(
112+
task_instance,
113+
context: Context,
114+
project_id: str,
115+
operation_name: str,
116+
):
117+
transfer_operation, transfer_job = CloudStorageTransferLinkHelper.extract_parts(operation_name)
118+
119+
task_instance.xcom_push(
120+
context,
121+
key=CloudStorageTransferDetailsLink.key,
122+
value={
123+
"project_id": project_id,
124+
"transfer_job": transfer_job,
125+
"transfer_operation": transfer_operation,
126+
},
127+
)

β€Žairflow/providers/google/cloud/operators/cloud_storage_transfer_service.py

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@
5555
CloudDataTransferServiceHook,
5656
GcpTransferJobsStatus,
5757
)
58+
from airflow.providers.google.cloud.links.cloud_storage_transfer import (
59+
CloudStorageTransferDetailsLink,
60+
CloudStorageTransferJobLink,
61+
CloudStorageTransferListLink,
62+
)
5863
from airflow.providers.google.cloud.utils.helpers import normalize_directory_path
5964

6065
if TYPE_CHECKING:
@@ -218,6 +223,7 @@ class CloudDataTransferServiceCreateJobOperator(BaseOperator):
218223
'google_impersonation_chain',
219224
)
220225
# [END gcp_transfer_job_create_template_fields]
226+
operator_extra_links = (CloudStorageTransferJobLink(),)
221227

222228
def __init__(
223229
self,
@@ -226,6 +232,7 @@ def __init__(
226232
aws_conn_id: str = 'aws_default',
227233
gcp_conn_id: str = 'google_cloud_default',
228234
api_version: str = 'v1',
235+
project_id: str | None = None,
229236
google_impersonation_chain: str | Sequence[str] | None = None,
230237
**kwargs,
231238
) -> None:
@@ -234,6 +241,7 @@ def __init__(
234241
self.aws_conn_id = aws_conn_id
235242
self.gcp_conn_id = gcp_conn_id
236243
self.api_version = api_version
244+
self.project_id = project_id
237245
self.google_impersonation_chain = google_impersonation_chain
238246
self._validate_inputs()
239247

@@ -247,7 +255,18 @@ def execute(self, context: Context) -> dict:
247255
gcp_conn_id=self.gcp_conn_id,
248256
impersonation_chain=self.google_impersonation_chain,
249257
)
250-
return hook.create_transfer_job(body=self.body)
258+
result = hook.create_transfer_job(body=self.body)
259+
260+
project_id = self.project_id or hook.project_id
261+
if project_id:
262+
CloudStorageTransferJobLink.persist(
263+
context=context,
264+
task_instance=self,
265+
project_id=project_id,
266+
job_name=result[NAME],
267+
)
268+
269+
return result
251270

252271

253272
class CloudDataTransferServiceUpdateJobOperator(BaseOperator):
@@ -291,6 +310,7 @@ class CloudDataTransferServiceUpdateJobOperator(BaseOperator):
291310
'google_impersonation_chain',
292311
)
293312
# [END gcp_transfer_job_update_template_fields]
313+
operator_extra_links = (CloudStorageTransferJobLink(),)
294314

295315
def __init__(
296316
self,
@@ -300,12 +320,14 @@ def __init__(
300320
aws_conn_id: str = 'aws_default',
301321
gcp_conn_id: str = 'google_cloud_default',
302322
api_version: str = 'v1',
323+
project_id: str | None = None,
303324
google_impersonation_chain: str | Sequence[str] | None = None,
304325
**kwargs,
305326
) -> None:
306327
super().__init__(**kwargs)
307328
self.job_name = job_name
308329
self.body = body
330+
self.project_id = project_id
309331
self.gcp_conn_id = gcp_conn_id
310332
self.api_version = api_version
311333
self.aws_conn_id = aws_conn_id
@@ -324,6 +346,16 @@ def execute(self, context: Context) -> dict:
324346
gcp_conn_id=self.gcp_conn_id,
325347
impersonation_chain=self.google_impersonation_chain,
326348
)
349+
350+
project_id = self.project_id or hook.project_id
351+
if project_id:
352+
CloudStorageTransferJobLink.persist(
353+
context=context,
354+
task_instance=self,
355+
project_id=project_id,
356+
job_name=self.job_name,
357+
)
358+
327359
return hook.update_transfer_job(job_name=self.job_name, body=self.body)
328360

329361

@@ -426,10 +458,12 @@ class CloudDataTransferServiceGetOperationOperator(BaseOperator):
426458
'google_impersonation_chain',
427459
)
428460
# [END gcp_transfer_operation_get_template_fields]
461+
operator_extra_links = (CloudStorageTransferDetailsLink(),)
429462

430463
def __init__(
431464
self,
432465
*,
466+
project_id: str | None = None,
433467
operation_name: str,
434468
gcp_conn_id: str = "google_cloud_default",
435469
api_version: str = "v1",
@@ -438,6 +472,7 @@ def __init__(
438472
) -> None:
439473
super().__init__(**kwargs)
440474
self.operation_name = operation_name
475+
self.project_id = project_id
441476
self.gcp_conn_id = gcp_conn_id
442477
self.api_version = api_version
443478
self.google_impersonation_chain = google_impersonation_chain
@@ -454,6 +489,16 @@ def execute(self, context: Context) -> dict:
454489
impersonation_chain=self.google_impersonation_chain,
455490
)
456491
operation = hook.get_transfer_operation(operation_name=self.operation_name)
492+
493+
project_id = self.project_id or hook.project_id
494+
if project_id:
495+
CloudStorageTransferDetailsLink.persist(
496+
context=context,
497+
task_instance=self,
498+
project_id=project_id,
499+
operation_name=self.operation_name,
500+
)
501+
457502
return operation
458503

459504

@@ -488,10 +533,12 @@ class CloudDataTransferServiceListOperationsOperator(BaseOperator):
488533
'google_impersonation_chain',
489534
)
490535
# [END gcp_transfer_operations_list_template_fields]
536+
operator_extra_links = (CloudStorageTransferListLink(),)
491537

492538
def __init__(
493539
self,
494540
request_filter: dict | None = None,
541+
project_id: str | None = None,
495542
gcp_conn_id: str = 'google_cloud_default',
496543
api_version: str = 'v1',
497544
google_impersonation_chain: str | Sequence[str] | None = None,
@@ -508,6 +555,7 @@ def __init__(
508555

509556
super().__init__(**kwargs)
510557
self.filter = request_filter
558+
self.project_id = project_id
511559
self.gcp_conn_id = gcp_conn_id
512560
self.api_version = api_version
513561
self.google_impersonation_chain = google_impersonation_chain
@@ -525,6 +573,15 @@ def execute(self, context: Context) -> list[dict]:
525573
)
526574
operations_list = hook.list_transfer_operations(request_filter=self.filter)
527575
self.log.info(operations_list)
576+
577+
project_id = self.project_id or hook.project_id
578+
if project_id:
579+
CloudStorageTransferListLink.persist(
580+
context=context,
581+
task_instance=self,
582+
project_id=project_id,
583+
)
584+
528585
return operations_list
529586

530587

β€Žairflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
NAME,
2727
CloudDataTransferServiceHook,
2828
)
29+
from airflow.providers.google.cloud.links.cloud_storage_transfer import CloudStorageTransferJobLink
2930
from airflow.sensors.base import BaseSensorOperator
3031

3132
if TYPE_CHECKING:
@@ -65,6 +66,7 @@ class CloudDataTransferServiceJobStatusSensor(BaseSensorOperator):
6566
'impersonation_chain',
6667
)
6768
# [END gcp_transfer_job_sensor_template_fields]
69+
operator_extra_links = (CloudStorageTransferJobLink(),)
6870

6971
def __init__(
7072
self,
@@ -103,4 +105,13 @@ def poke(self, context: Context) -> bool:
103105
if check:
104106
self.xcom_push(key="sensed_operations", value=operations, context=context)
105107

108+
project_id = self.project_id or hook.project_id
109+
if project_id:
110+
CloudStorageTransferJobLink.persist(
111+
context=context,
112+
task_instance=self,
113+
project_id=project_id,
114+
job_name=self.job_name,
115+
)
116+
106117
return check

β€Žairflow/providers/google/provider.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,6 +1017,9 @@ extra-links:
10171017
- airflow.providers.google.cloud.links.life_sciences.LifeSciencesLink
10181018
- airflow.providers.google.cloud.links.cloud_functions.CloudFunctionsDetailsLink
10191019
- airflow.providers.google.cloud.links.cloud_functions.CloudFunctionsListLink
1020+
- airflow.providers.google.cloud.links.cloud_storage_transfer.CloudStorageTransferListLink
1021+
- airflow.providers.google.cloud.links.cloud_storage_transfer.CloudStorageTransferJobLink
1022+
- airflow.providers.google.cloud.links.cloud_storage_transfer.CloudStorageTransferDetailsLink
10201023
- airflow.providers.google.common.links.storage.StorageLink
10211024
- airflow.providers.google.common.links.storage.FileDetailsLink
10221025

β€Ždocs/apache-airflow-providers-google/operators/cloud/cloud_storage_transfer_service.rst

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ For parameter definition, take a look at
5858
Using the operator
5959
""""""""""""""""""
6060

61-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py
61+
.. exampleinclude:: /../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
6262
:language: python
6363
:start-after: [START howto_operator_gcp_transfer_create_job_body_gcp]
6464
:end-before: [END howto_operator_gcp_transfer_create_job_body_gcp]
@@ -138,12 +138,12 @@ For parameter definition, take a look at
138138
Using the operator
139139
""""""""""""""""""
140140

141-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py
141+
.. exampleinclude:: /../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
142142
:language: python
143143
:start-after: [START howto_operator_gcp_transfer_update_job_body]
144144
:end-before: [END howto_operator_gcp_transfer_update_job_body]
145145

146-
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py
146+
.. exampleinclude:: /../../tests/system/providers/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
147147
:language: python
148148
:dedent: 4
149149
:start-after: [START howto_operator_gcp_transfer_update_job]

0 commit comments

Comments
 (0)