Skip to content

Commit 94257f4

Browse files
author
Peter Wicks
authored
Expose SQL to GCS Metadata (#24382)
1 parent 8e0bdda commit 94257f4

File tree

8 files changed

+185
-40
lines changed

8 files changed

+185
-40
lines changed

β€Žairflow/providers/google/cloud/transfers/sql_to_gcs.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class BaseSQLToGCSOperator(BaseOperator):
7171
If set as a sequence, the identities from the list must grant
7272
Service Account Token Creator IAM role to the directly preceding identity, with first
7373
account from the list granting this role to the originating account (templated).
74+
:param upload_metadata: whether to upload the row count metadata as blob metadata
7475
:param exclude_columns: set of columns to exclude from transmission
7576
"""
7677

@@ -104,6 +105,7 @@ def __init__(
104105
gcp_conn_id: str = 'google_cloud_default',
105106
delegate_to: Optional[str] = None,
106107
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
108+
upload_metadata: bool = False,
107109
exclude_columns=None,
108110
**kwargs,
109111
) -> None:
@@ -125,6 +127,7 @@ def __init__(
125127
self.gcp_conn_id = gcp_conn_id
126128
self.delegate_to = delegate_to
127129
self.impersonation_chain = impersonation_chain
130+
self.upload_metadata = upload_metadata
128131
self.exclude_columns = exclude_columns
129132

130133
def execute(self, context: 'Context'):
@@ -144,6 +147,9 @@ def execute(self, context: 'Context'):
144147
schema_file['file_handle'].close()
145148

146149
counter = 0
150+
files = []
151+
total_row_count = 0
152+
total_files = 0
147153
self.log.info('Writing local data files')
148154
for file_to_upload in self._write_local_data_files(cursor):
149155
# Flush file before uploading
@@ -154,8 +160,29 @@ def execute(self, context: 'Context'):
154160

155161
self.log.info('Removing local file')
156162
file_to_upload['file_handle'].close()
163+
164+
# Metadata to be outputted to Xcom
165+
total_row_count += file_to_upload['file_row_count']
166+
total_files += 1
167+
files.append(
168+
{
169+
'file_name': file_to_upload['file_name'],
170+
'file_mime_type': file_to_upload['file_mime_type'],
171+
'file_row_count': file_to_upload['file_row_count'],
172+
}
173+
)
174+
157175
counter += 1
158176

177+
file_meta = {
178+
'bucket': self.bucket,
179+
'total_row_count': total_row_count,
180+
'total_files': total_files,
181+
'files': files,
182+
}
183+
184+
return file_meta
185+
159186
def convert_types(self, schema, col_type_dict, row, stringify_dict=False) -> list:
160187
"""Convert values from DBAPI to output-friendly formats."""
161188
return [
@@ -188,6 +215,7 @@ def _write_local_data_files(self, cursor):
188215
'file_name': self.filename.format(file_no),
189216
'file_handle': tmp_file_handle,
190217
'file_mime_type': file_mime_type,
218+
'file_row_count': 0,
191219
}
192220

193221
if self.export_format == 'csv':
@@ -197,6 +225,7 @@ def _write_local_data_files(self, cursor):
197225
parquet_writer = self._configure_parquet_file(tmp_file_handle, parquet_schema)
198226

199227
for row in cursor:
228+
file_to_upload['file_row_count'] += 1
200229
if self.export_format == 'csv':
201230
row = self.convert_types(schema, col_type_dict, row)
202231
if self.null_marker is not None:
@@ -232,14 +261,17 @@ def _write_local_data_files(self, cursor):
232261
'file_name': self.filename.format(file_no),
233262
'file_handle': tmp_file_handle,
234263
'file_mime_type': file_mime_type,
264+
'file_row_count': 0,
235265
}
236266
if self.export_format == 'csv':
237267
csv_writer = self._configure_csv_file(tmp_file_handle, schema)
238268
if self.export_format == 'parquet':
239269
parquet_writer = self._configure_parquet_file(tmp_file_handle, parquet_schema)
240270
if self.export_format == 'parquet':
241271
parquet_writer.close()
242-
yield file_to_upload
272+
# Last file may have 0 rows, don't yield if empty
273+
if file_to_upload['file_row_count'] > 0:
274+
yield file_to_upload
243275

244276
def _configure_csv_file(self, file_handle, schema):
245277
"""Configure a csv writer with the file_handle and write schema
@@ -350,10 +382,16 @@ def _upload_to_gcs(self, file_to_upload):
350382
delegate_to=self.delegate_to,
351383
impersonation_chain=self.impersonation_chain,
352384
)
385+
is_data_file = file_to_upload.get('file_name') != self.schema_filename
386+
metadata = None
387+
if is_data_file and self.upload_metadata:
388+
metadata = {'row_count': file_to_upload['file_row_count']}
389+
353390
hook.upload(
354391
self.bucket,
355392
file_to_upload.get('file_name'),
356393
file_to_upload.get('file_handle').name,
357394
mime_type=file_to_upload.get('file_mime_type'),
358-
gzip=self.gzip if file_to_upload.get('file_name') != self.schema_filename else False,
395+
gzip=self.gzip if is_data_file else False,
396+
metadata=metadata,
359397
)

β€Žtests/providers/google/cloud/transfers/test_mssql_to_gcs.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def test_exec_success_json(self, gcs_hook_mock_class, mssql_hook_mock_class):
9797

9898
gcs_hook_mock = gcs_hook_mock_class.return_value
9999

100-
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False):
100+
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False, metadata=None):
101101
assert BUCKET == bucket
102102
assert JSON_FILENAME.format(0) == obj
103103
assert 'application/json' == mime_type
@@ -126,7 +126,7 @@ def test_file_splitting(self, gcs_hook_mock_class, mssql_hook_mock_class):
126126
JSON_FILENAME.format(1): NDJSON_LINES[2],
127127
}
128128

129-
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False):
129+
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False, metadata=None):
130130
assert BUCKET == bucket
131131
assert 'application/json' == mime_type
132132
assert GZIP == gzip
@@ -154,7 +154,7 @@ def test_schema_file(self, gcs_hook_mock_class, mssql_hook_mock_class):
154154

155155
gcs_hook_mock = gcs_hook_mock_class.return_value
156156

157-
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip):
157+
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip, metadata=None):
158158
if obj == SCHEMA_FILENAME:
159159
with open(tmp_filename, 'rb') as file:
160160
assert b''.join(SCHEMA_JSON) == file.read()

β€Žtests/providers/google/cloud/transfers/test_mysql_to_gcs.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def test_exec_success_json(self, gcs_hook_mock_class, mysql_hook_mock_class):
124124

125125
gcs_hook_mock = gcs_hook_mock_class.return_value
126126

127-
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False):
127+
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False, metadata=None):
128128
assert BUCKET == bucket
129129
assert JSON_FILENAME.format(0) == obj
130130
assert 'application/json' == mime_type
@@ -158,7 +158,7 @@ def test_exec_success_csv(self, gcs_hook_mock_class, mysql_hook_mock_class):
158158

159159
gcs_hook_mock = gcs_hook_mock_class.return_value
160160

161-
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False):
161+
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False, metadata=None):
162162
assert BUCKET == bucket
163163
assert CSV_FILENAME.format(0) == obj
164164
assert 'text/csv' == mime_type
@@ -193,7 +193,7 @@ def test_exec_success_csv_ensure_utc(self, gcs_hook_mock_class, mysql_hook_mock_
193193

194194
gcs_hook_mock = gcs_hook_mock_class.return_value
195195

196-
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False):
196+
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False, metadata=None):
197197
assert BUCKET == bucket
198198
assert CSV_FILENAME.format(0) == obj
199199
assert 'text/csv' == mime_type
@@ -228,7 +228,7 @@ def test_exec_success_csv_with_delimiter(self, gcs_hook_mock_class, mysql_hook_m
228228

229229
gcs_hook_mock = gcs_hook_mock_class.return_value
230230

231-
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False):
231+
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False, metadata=None):
232232
assert BUCKET == bucket
233233
assert CSV_FILENAME.format(0) == obj
234234
assert 'text/csv' == mime_type
@@ -257,7 +257,7 @@ def test_file_splitting(self, gcs_hook_mock_class, mysql_hook_mock_class):
257257
JSON_FILENAME.format(1): NDJSON_LINES[2],
258258
}
259259

260-
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False):
260+
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False, metadata=None):
261261
assert BUCKET == bucket
262262
assert 'application/json' == mime_type
263263
assert not gzip
@@ -285,7 +285,7 @@ def test_schema_file(self, gcs_hook_mock_class, mysql_hook_mock_class):
285285

286286
gcs_hook_mock = gcs_hook_mock_class.return_value
287287

288-
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip):
288+
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip, metadata=None):
289289
if obj == SCHEMA_FILENAME:
290290
assert not gzip
291291
with open(tmp_filename, 'rb') as file:
@@ -311,7 +311,7 @@ def test_schema_file_with_custom_schema(self, gcs_hook_mock_class, mysql_hook_mo
311311

312312
gcs_hook_mock = gcs_hook_mock_class.return_value
313313

314-
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip):
314+
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip, metadata=None):
315315
if obj == SCHEMA_FILENAME:
316316
assert not gzip
317317
with open(tmp_filename, 'rb') as file:

β€Žtests/providers/google/cloud/transfers/test_oracle_to_gcs.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def test_exec_success_json(self, gcs_hook_mock_class, oracle_hook_mock_class):
7070

7171
gcs_hook_mock = gcs_hook_mock_class.return_value
7272

73-
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False):
73+
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False, metadata=None):
7474
assert BUCKET == bucket
7575
assert JSON_FILENAME.format(0) == obj
7676
assert 'application/json' == mime_type
@@ -99,7 +99,7 @@ def test_file_splitting(self, gcs_hook_mock_class, oracle_hook_mock_class):
9999
JSON_FILENAME.format(1): NDJSON_LINES[2],
100100
}
101101

102-
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False):
102+
def _assert_upload(bucket, obj, tmp_filename, mime_type=None, gzip=False, metadata=None):
103103
assert BUCKET == bucket
104104
assert 'application/json' == mime_type
105105
assert GZIP == gzip
@@ -127,7 +127,7 @@ def test_schema_file(self, gcs_hook_mock_class, oracle_hook_mock_class):
127127

128128
gcs_hook_mock = gcs_hook_mock_class.return_value
129129

130-
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip):
130+
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip, metadata=None):
131131
if obj == SCHEMA_FILENAME:
132132
with open(tmp_filename, 'rb') as file:
133133
assert b''.join(SCHEMA_JSON) == file.read()

β€Žtests/providers/google/cloud/transfers/test_postgres_to_gcs.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def test_init(self):
9292
assert op.bucket == BUCKET
9393
assert op.filename == FILENAME
9494

95-
def _assert_uploaded_file_content(self, bucket, obj, tmp_filename, mime_type, gzip):
95+
def _assert_uploaded_file_content(self, bucket, obj, tmp_filename, mime_type, gzip, metadata=None):
9696
assert BUCKET == bucket
9797
assert FILENAME.format(0) == obj
9898
assert 'application/json' == mime_type
@@ -159,7 +159,7 @@ def test_file_splitting(self, gcs_hook_mock_class):
159159
FILENAME.format(1): NDJSON_LINES[2],
160160
}
161161

162-
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip):
162+
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip, metadata=None):
163163
assert BUCKET == bucket
164164
assert 'application/json' == mime_type
165165
assert not gzip
@@ -183,7 +183,7 @@ def test_schema_file(self, gcs_hook_mock_class):
183183

184184
gcs_hook_mock = gcs_hook_mock_class.return_value
185185

186-
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip):
186+
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip, metadata=None):
187187
if obj == SCHEMA_FILENAME:
188188
with open(tmp_filename, 'rb') as file:
189189
assert SCHEMA_JSON == file.read()

β€Žtests/providers/google/cloud/transfers/test_presto_to_gcs.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def test_init(self):
6565
@patch("airflow.providers.google.cloud.transfers.presto_to_gcs.PrestoHook")
6666
@patch("airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook")
6767
def test_save_as_json(self, mock_gcs_hook, mock_presto_hook):
68-
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip):
68+
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip, metadata=None):
6969
assert BUCKET == bucket
7070
assert FILENAME.format(0) == obj
7171
assert "application/json" == mime_type
@@ -120,7 +120,7 @@ def test_save_as_json_with_file_splitting(self, mock_gcs_hook, mock_presto_hook)
120120
FILENAME.format(1): NDJSON_LINES[2],
121121
}
122122

123-
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip):
123+
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip, metadata=None):
124124
assert BUCKET == bucket
125125
assert "application/json" == mime_type
126126
assert not gzip
@@ -160,7 +160,7 @@ def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip):
160160
def test_save_as_json_with_schema_file(self, mock_gcs_hook, mock_presto_hook):
161161
"""Test writing schema files."""
162162

163-
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip):
163+
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip, metadata=None):
164164
if obj == SCHEMA_FILENAME:
165165
with open(tmp_filename, "rb") as file:
166166
assert SCHEMA_JSON == file.read()
@@ -199,7 +199,7 @@ def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip):
199199
@patch("airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook")
200200
@patch("airflow.providers.google.cloud.transfers.presto_to_gcs.PrestoHook")
201201
def test_save_as_csv(self, mock_presto_hook, mock_gcs_hook):
202-
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip):
202+
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip, metadata=None):
203203
assert BUCKET == bucket
204204
assert FILENAME.format(0) == obj
205205
assert "text/csv" == mime_type
@@ -255,7 +255,7 @@ def test_save_as_csv_with_file_splitting(self, mock_gcs_hook, mock_presto_hook):
255255
FILENAME.format(1): b"".join([CSV_LINES[0], CSV_LINES[3]]),
256256
}
257257

258-
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip):
258+
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip, metadata=None):
259259
assert BUCKET == bucket
260260
assert "text/csv" == mime_type
261261
assert not gzip
@@ -296,7 +296,7 @@ def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip):
296296
def test_save_as_csv_with_schema_file(self, mock_gcs_hook, mock_presto_hook):
297297
"""Test writing schema files."""
298298

299-
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip):
299+
def _assert_upload(bucket, obj, tmp_filename, mime_type, gzip, metadata=None):
300300
if obj == SCHEMA_FILENAME:
301301
with open(tmp_filename, "rb") as file:
302302
assert SCHEMA_JSON == file.read()

0 commit comments

Comments
 (0)