Skip to content

Commit 3734876

Browse files
olchasKamil Olszewski
andauthored
Implement impersonation in google operators (#10052)
Co-authored-by: Kamil Olszewski <kamil.olszewski@polidea.com>
1 parent b0598b5 commit 3734876

File tree

118 files changed

+6845
-1258
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

118 files changed

+6845
-1258
lines changed

β€ŽUPDATING.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1038,6 +1038,12 @@ of this provider.
10381038
This section describes the changes that have been made, and what you need to do to update your if
10391039
you use operators or hooks which integrate with Google services (including Google Cloud Platform - GCP).
10401040

1041+
#### Direct impersonation added to operators communicating with Google services
1042+
[Directly impersonating a service account](https://cloud.google.com/iam/docs/understanding-service-accounts#directly_impersonating_a_service_account)
1043+
has been made possible for operators communicating with Google services via new argument called `impersonation_chain`
1044+
(`google_impersonation_chain` in case of operators that also communicate with services of other cloud providers).
1045+
As a result, GCSToS3Operator no longer derivatives from GCSListObjectsOperator.
1046+
10411047
#### Normalize gcp_conn_id for Google Cloud Platform
10421048

10431049
Previously not all hooks and operators related to Google Cloud Platform use

β€Žairflow/providers/amazon/aws/transfers/gcs_to_s3.py

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@
1919
This module contains Google Cloud Storage to S3 operator.
2020
"""
2121
import warnings
22-
from typing import Iterable
22+
from typing import Iterable, Optional, Sequence, Union
2323

24+
from airflow.models import BaseOperator
2425
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
2526
from airflow.providers.google.cloud.hooks.gcs import GCSHook
26-
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
2727
from airflow.utils.decorators import apply_defaults
2828

2929

30-
class GCSToS3Operator(GCSListObjectsOperator):
30+
class GCSToS3Operator(BaseOperator):
3131
"""
3232
Synchronizes a Google Cloud Storage bucket with an S3 bucket.
3333
@@ -45,8 +45,8 @@ class GCSToS3Operator(GCSListObjectsOperator):
4545
:param google_cloud_storage_conn_id: (Deprecated) The connection ID used to connect to Google Cloud
4646
Platform. This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
4747
:type google_cloud_storage_conn_id: str
48-
:param delegate_to: The account to impersonate, if any.
49-
For this to work, the service account making the request must have
48+
:param delegate_to: Google account to impersonate using domain-wide delegation of authority,
49+
if any. For this to work, the service account making the request must have
5050
domain-wide delegation enabled.
5151
:type delegate_to: str
5252
:param dest_aws_conn_id: The destination S3 connection
@@ -73,8 +73,18 @@ class GCSToS3Operator(GCSListObjectsOperator):
7373
If set to False, will upload only the files that are in the origin but not
7474
in the destination bucket.
7575
:type replace: bool
76+
:param google_impersonation_chain: Optional Google service account to impersonate using
77+
short-term credentials, or chained list of accounts required to get the access_token
78+
of the last account in the list, which will be impersonated in the request.
79+
If set as a string, the account must grant the originating account
80+
the Service Account Token Creator IAM role.
81+
If set as a sequence, the identities from the list must grant
82+
Service Account Token Creator IAM role to the directly preceding identity, with first
83+
account from the list granting this role to the originating account (templated).
84+
:type google_impersonation_chain: Union[str, Sequence[str]]
7685
"""
77-
template_fields: Iterable[str] = ('bucket', 'prefix', 'delimiter', 'dest_s3_key')
86+
template_fields: Iterable[str] = ('bucket', 'prefix', 'delimiter', 'dest_s3_key',
87+
'google_impersonation_chain',)
7888
ui_color = '#f0eee4'
7989

8090
@apply_defaults
@@ -89,31 +99,42 @@ def __init__(self, *, # pylint: disable=too-many-arguments
8999
dest_s3_key=None,
90100
dest_verify=None,
91101
replace=False,
102+
google_impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
92103
**kwargs):
104+
super().__init__(**kwargs)
93105

94106
if google_cloud_storage_conn_id:
95107
warnings.warn(
96108
"The google_cloud_storage_conn_id parameter has been deprecated. You should pass "
97109
"the gcp_conn_id parameter.", DeprecationWarning, stacklevel=3)
98110
gcp_conn_id = google_cloud_storage_conn_id
99111

100-
super().__init__(
101-
bucket=bucket,
102-
prefix=prefix,
103-
delimiter=delimiter,
104-
gcp_conn_id=gcp_conn_id,
105-
delegate_to=delegate_to,
106-
**kwargs
107-
)
108-
112+
self.bucket = bucket
113+
self.prefix = prefix
114+
self.delimiter = delimiter
115+
self.gcp_conn_id = gcp_conn_id
116+
self.delegate_to = delegate_to
109117
self.dest_aws_conn_id = dest_aws_conn_id
110118
self.dest_s3_key = dest_s3_key
111119
self.dest_verify = dest_verify
112120
self.replace = replace
121+
self.google_impersonation_chain = google_impersonation_chain
113122

114123
def execute(self, context):
115-
# use the super to list all files in an Google Cloud Storage bucket
116-
files = super().execute(context)
124+
# list all files in an Google Cloud Storage bucket
125+
hook = GCSHook(
126+
google_cloud_storage_conn_id=self.gcp_conn_id,
127+
delegate_to=self.delegate_to,
128+
impersonation_chain=self.google_impersonation_chain,
129+
)
130+
131+
self.log.info('Getting list of the files. Bucket: %s; Delimiter: %s; Prefix: %s',
132+
self.bucket, self.delimiter, self.prefix)
133+
134+
files = hook.list(bucket_name=self.bucket,
135+
prefix=self.prefix,
136+
delimiter=self.delimiter)
137+
117138
s3_hook = S3Hook(aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify)
118139

119140
if not self.replace:
@@ -131,10 +152,6 @@ def execute(self, context):
131152
files = list(set(files) - set(existing_files))
132153

133154
if files:
134-
hook = GCSHook(
135-
google_cloud_storage_conn_id=self.gcp_conn_id,
136-
delegate_to=self.delegate_to
137-
)
138155

139156
for file in files:
140157
file_bytes = hook.download(self.bucket, file)

β€Žairflow/providers/amazon/aws/transfers/google_api_to_s3.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
"""
2222
import json
2323
import sys
24+
from typing import Optional, Sequence, Union
2425

2526
from airflow.models import BaseOperator
2627
from airflow.models.xcom import MAX_XCOM_SIZE
@@ -68,17 +69,27 @@ class GoogleApiToS3Operator(BaseOperator):
6869
:type s3_overwrite: bool
6970
:param gcp_conn_id: The connection ID to use when fetching connection info.
7071
:type gcp_conn_id: str
71-
:param delegate_to: The account to impersonate, if any.
72-
For this to work, the service account making the request must have
72+
:param delegate_to: Google account to impersonate using domain-wide delegation of authority,
73+
if any. For this to work, the service account making the request must have
7374
domain-wide delegation enabled.
7475
:type delegate_to: str
7576
:param aws_conn_id: The connection id specifying the authentication information for the S3 Bucket.
7677
:type aws_conn_id: str
78+
:param google_impersonation_chain: Optional Google service account to impersonate using
79+
short-term credentials, or chained list of accounts required to get the access_token
80+
of the last account in the list, which will be impersonated in the request.
81+
If set as a string, the account must grant the originating account
82+
the Service Account Token Creator IAM role.
83+
If set as a sequence, the identities from the list must grant
84+
Service Account Token Creator IAM role to the directly preceding identity, with first
85+
account from the list granting this role to the originating account (templated).
86+
:type google_impersonation_chain: Union[str, Sequence[str]]
7787
"""
7888

7989
template_fields = (
8090
'google_api_endpoint_params',
8191
's3_destination_key',
92+
'google_impersonation_chain',
8293
)
8394
template_ext = ()
8495
ui_color = '#cc181e'
@@ -100,6 +111,7 @@ def __init__(
100111
gcp_conn_id='google_cloud_default',
101112
delegate_to=None,
102113
aws_conn_id='aws_default',
114+
google_impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
103115
**kwargs
104116
):
105117
super().__init__(**kwargs)
@@ -117,6 +129,7 @@ def __init__(
117129
self.gcp_conn_id = gcp_conn_id
118130
self.delegate_to = delegate_to
119131
self.aws_conn_id = aws_conn_id
132+
self.google_impersonation_chain = google_impersonation_chain
120133

121134
def execute(self, context):
122135
"""
@@ -142,7 +155,8 @@ def _retrieve_data_from_google_api(self):
142155
gcp_conn_id=self.gcp_conn_id,
143156
delegate_to=self.delegate_to,
144157
api_service_name=self.google_api_service_name,
145-
api_version=self.google_api_service_version
158+
api_version=self.google_api_service_version,
159+
impersonation_chain=self.google_impersonation_chain,
146160
)
147161
google_api_response = google_discovery_api_hook.query(
148162
endpoint=self.google_api_endpoint_path,

β€Žairflow/providers/google/ads/operators/ads.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
"""
2121
import csv
2222
from tempfile import NamedTemporaryFile
23-
from typing import Dict
23+
from typing import Dict, Optional, Sequence, Union
2424

2525
from airflow.models import BaseOperator
2626
from airflow.providers.google.ads.hooks.ads import GoogleAdsHook
@@ -57,9 +57,18 @@ class GoogleAdsListAccountsOperator(BaseOperator):
5757
:type page_size: int
5858
:param gzip: Option to compress local file or file data for upload
5959
:type gzip: bool
60+
:param impersonation_chain: Optional service account to impersonate using short-term
61+
credentials, or chained list of accounts required to get the access_token
62+
of the last account in the list, which will be impersonated in the request.
63+
If set as a string, the account must grant the originating account
64+
the Service Account Token Creator IAM role.
65+
If set as a sequence, the identities from the list must grant
66+
Service Account Token Creator IAM role to the directly preceding identity, with first
67+
account from the list granting this role to the originating account (templated).
68+
:type impersonation_chain: Union[str, Sequence[str]]
6069
"""
6170

62-
template_fields = ("bucket", "object_name")
71+
template_fields = ("bucket", "object_name", "impersonation_chain",)
6372

6473
@apply_defaults
6574
def __init__(
@@ -69,6 +78,7 @@ def __init__(
6978
gcp_conn_id: str = "google_cloud_default",
7079
google_ads_conn_id: str = "google_ads_default",
7180
gzip: bool = False,
81+
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
7282
**kwargs,
7383
) -> None:
7484
super().__init__(**kwargs)
@@ -77,6 +87,7 @@ def __init__(
7787
self.gcp_conn_id = gcp_conn_id
7888
self.google_ads_conn_id = google_ads_conn_id
7989
self.gzip = gzip
90+
self.impersonation_chain = impersonation_chain
8091

8192
def execute(self, context: Dict):
8293
uri = f"gs://{self.bucket}/{self.object_name}"
@@ -86,8 +97,10 @@ def execute(self, context: Dict):
8697
google_ads_conn_id=self.google_ads_conn_id
8798
)
8899

89-
gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
90-
100+
gcs_hook = GCSHook(
101+
gcp_conn_id=self.gcp_conn_id,
102+
impersonation_chain=self.impersonation_chain
103+
)
91104
with NamedTemporaryFile("w+") as temp_file:
92105
# Download accounts
93106
accounts = ads_hook.list_accessible_customers()

β€Žairflow/providers/google/ads/transfers/ads_to_gcs.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import csv
1919
from operator import attrgetter
2020
from tempfile import NamedTemporaryFile
21-
from typing import Dict, List
21+
from typing import Dict, List, Optional, Sequence, Union
2222

2323
from airflow.models import BaseOperator
2424
from airflow.providers.google.ads.hooks.ads import GoogleAdsHook
@@ -58,9 +58,18 @@ class GoogleAdsToGcsOperator(BaseOperator):
5858
:type page_size: int
5959
:param gzip: Option to compress local file or file data for upload
6060
:type gzip: bool
61+
:param impersonation_chain: Optional service account to impersonate using short-term
62+
credentials, or chained list of accounts required to get the access_token
63+
of the last account in the list, which will be impersonated in the request.
64+
If set as a string, the account must grant the originating account
65+
the Service Account Token Creator IAM role.
66+
If set as a sequence, the identities from the list must grant
67+
Service Account Token Creator IAM role to the directly preceding identity, with first
68+
account from the list granting this role to the originating account (templated).
69+
:type impersonation_chain: Union[str, Sequence[str]]
6170
"""
6271

63-
template_fields = ("client_ids", "query", "attributes", "bucket", "obj")
72+
template_fields = ("client_ids", "query", "attributes", "bucket", "obj", "impersonation_chain",)
6473

6574
@apply_defaults
6675
def __init__(
@@ -74,6 +83,7 @@ def __init__(
7483
google_ads_conn_id: str = "google_ads_default",
7584
page_size: int = 10000,
7685
gzip: bool = False,
86+
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
7787
**kwargs,
7888
) -> None:
7989
super().__init__(**kwargs)
@@ -86,6 +96,7 @@ def __init__(
8696
self.google_ads_conn_id = google_ads_conn_id
8797
self.page_size = page_size
8898
self.gzip = gzip
99+
self.impersonation_chain = impersonation_chain
89100

90101
def execute(self, context: Dict):
91102
service = GoogleAdsHook(
@@ -108,7 +119,10 @@ def execute(self, context: Dict):
108119
writer.writerows(converted_rows)
109120
csvfile.flush()
110121

111-
hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
122+
hook = GCSHook(
123+
gcp_conn_id=self.gcp_conn_id,
124+
impersonation_chain=self.impersonation_chain
125+
)
112126
hook.upload(
113127
bucket_name=self.bucket,
114128
object_name=self.obj,

β€Žairflow/providers/google/cloud/hooks/bigquery.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,11 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
6363
def __init__(self,
6464
gcp_conn_id: str = 'google_cloud_default',
6565
delegate_to: Optional[str] = None,
66-
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
6766
use_legacy_sql: bool = True,
6867
location: Optional[str] = None,
6968
bigquery_conn_id: Optional[str] = None,
70-
api_resource_configs: Optional[Dict] = None) -> None:
69+
api_resource_configs: Optional[Dict] = None,
70+
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,) -> None:
7171
# To preserve backward compatibility
7272
# TODO: remove one day
7373
if bigquery_conn_id:

β€Žairflow/providers/google/cloud/hooks/dataflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,8 +416,8 @@ def __init__(
416416
self,
417417
gcp_conn_id: str = "google_cloud_default",
418418
delegate_to: Optional[str] = None,
419+
poll_sleep: int = 10,
419420
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
420-
poll_sleep: int = 10
421421
) -> None:
422422
self.poll_sleep = poll_sleep
423423
super().__init__(

β€Žairflow/providers/google/cloud/hooks/datastore.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ def __init__(
4444
self,
4545
gcp_conn_id: str = "google_cloud_default",
4646
delegate_to: Optional[str] = None,
47-
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
4847
api_version: str = 'v1',
49-
datastore_conn_id: Optional[str] = None
48+
datastore_conn_id: Optional[str] = None,
49+
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
5050
) -> None:
5151
if datastore_conn_id:
5252
warnings.warn(

β€Žairflow/providers/google/cloud/hooks/gcs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ def __init__(
116116
self,
117117
gcp_conn_id: str = "google_cloud_default",
118118
delegate_to: Optional[str] = None,
119+
google_cloud_storage_conn_id: Optional[str] = None,
119120
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
120-
google_cloud_storage_conn_id: Optional[str] = None
121121
) -> None:
122122
# To preserve backward compatibility
123123
# TODO: remove one day

β€Žairflow/providers/google/cloud/hooks/kubernetes_engine.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ def __init__(
5151
self,
5252
gcp_conn_id: str = "google_cloud_default",
5353
delegate_to: Optional[str] = None,
54+
location: Optional[str] = None,
5455
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
55-
location: Optional[str] = None
5656
) -> None:
5757
super().__init__(
5858
gcp_conn_id=gcp_conn_id,

0 commit comments

Comments
 (0)