Skip to content

Commit 8cac969

Browse files
author
Peter Wicks
authored
GCSToBigQueryOperator allow for schema_object in alternate GCS Bucket (#26190)
1 parent 63562d7 commit 8cac969

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

β€Žairflow/providers/google/cloud/transfers/gcs_to_bigquery.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ class GCSToBigQueryOperator(BaseOperator):
5656
:param schema_object: If set, a GCS object path pointing to a .json file that
5757
contains the schema for the table. (templated)
5858
Parameter must be defined if 'schema_fields' is null and autodetect is False.
59+
:param schema_object_bucket: [Optional] If set, the GCS bucket where the schema object
60+
template is stored. (templated) (Default: the value of ``bucket``)
5961
:param source_format: File format to export.
6062
:param compression: [Optional] The compression type of the data source.
6163
Possible values include GZIP and NONE.
@@ -133,6 +135,7 @@ class GCSToBigQueryOperator(BaseOperator):
133135
'bucket',
134136
'source_objects',
135137
'schema_object',
138+
'schema_object_bucket',
136139
'destination_project_dataset_table',
137140
'impersonation_chain',
138141
)
@@ -147,6 +150,7 @@ def __init__(
147150
destination_project_dataset_table,
148151
schema_fields=None,
149152
schema_object=None,
153+
schema_object_bucket=None,
150154
source_format='CSV',
151155
compression='NONE',
152156
create_disposition='CREATE_IF_NEEDED',
@@ -187,6 +191,10 @@ def __init__(
187191
self.source_objects = source_objects
188192
self.schema_object = schema_object
189193

194+
if schema_object_bucket is None:
195+
schema_object_bucket = bucket
196+
self.schema_object_bucket = schema_object_bucket
197+
190198
# BQ config
191199
self.destination_project_dataset_table = destination_project_dataset_table
192200
self.schema_fields = schema_fields
@@ -236,7 +244,7 @@ def execute(self, context: 'Context'):
236244
impersonation_chain=self.impersonation_chain,
237245
)
238246
blob = gcs_hook.download(
239-
bucket_name=self.bucket,
247+
bucket_name=self.schema_object_bucket,
240248
object_name=self.schema_object,
241249
)
242250
schema_fields = json.loads(blob.decode("utf-8"))

0 commit comments

Comments
 (0)