Skip to content

Commit 766726f

Browse files
Fix PostgresToGCSOperator does not allow nested JSON (#23063)
* Avoid double json.dumps for json data export in PostgresToGCSOperator. * Fix CI
1 parent ca3fbbb commit 766726f

File tree

9 files changed

+38
-23
lines changed

9 files changed

+38
-23
lines changed

β€Žairflow/providers/google/cloud/transfers/mssql_to_gcs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
8080
}
8181

8282
@classmethod
83-
def convert_type(cls, value, schema_type):
83+
def convert_type(cls, value, schema_type, **kwargs):
8484
"""
8585
Takes a value from MSSQL, and converts it to a value that's safe for
8686
JSON/Google Cloud Storage/BigQuery.

β€Žairflow/providers/google/cloud/transfers/mysql_to_gcs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
9292
'mode': field_mode,
9393
}
9494

95-
def convert_type(self, value, schema_type: str):
95+
def convert_type(self, value, schema_type: str, **kwargs):
9696
"""
9797
Takes a value from MySQLdb, and converts it to a value that's safe for
9898
JSON/Google Cloud Storage/BigQuery.

β€Žairflow/providers/google/cloud/transfers/oracle_to_gcs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
8585
'mode': field_mode,
8686
}
8787

88-
def convert_type(self, value, schema_type):
88+
def convert_type(self, value, schema_type, **kwargs):
8989
"""
9090
Takes a value from Oracle db, and converts it to a value that's safe for
9191
JSON/Google Cloud Storage/BigQuery.

β€Žairflow/providers/google/cloud/transfers/postgres_to_gcs.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,17 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
128128
'mode': 'REPEATED' if field[1] in (1009, 1005, 1007, 1016) else 'NULLABLE',
129129
}
130130

131-
def convert_type(self, value, schema_type):
131+
def convert_type(self, value, schema_type, stringify_dict=True):
132132
"""
133133
Takes a value from Postgres, and converts it to a value that's safe for
134134
JSON/Google Cloud Storage/BigQuery.
135135
Timezone aware Datetime are converted to UTC seconds.
136136
Unaware Datetime, Date and Time are converted to ISO formatted strings.
137137
Decimals are converted to floats.
138+
139+
:param value: Postgres column value.
140+
:param schema_type: BigQuery data type.
141+
:param stringify_dict: Specify whether to convert dict to string.
138142
"""
139143
if isinstance(value, datetime.datetime):
140144
iso_format_value = value.isoformat()
@@ -149,7 +153,7 @@ def convert_type(self, value, schema_type):
149153
hours=formatted_time.tm_hour, minutes=formatted_time.tm_min, seconds=formatted_time.tm_sec
150154
)
151155
return str(time_delta)
152-
if isinstance(value, dict):
156+
if stringify_dict and isinstance(value, dict):
153157
return json.dumps(value)
154158
if isinstance(value, Decimal):
155159
return float(value)

β€Žairflow/providers/google/cloud/transfers/presto_to_gcs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
195195

196196
return {"name": field[0], "type": new_field_type}
197197

198-
def convert_type(self, value, schema_type):
198+
def convert_type(self, value, schema_type, **kwargs):
199199
"""
200200
Do nothing. Presto uses JSON on the transport layer, so types are simple.
201201

β€Žairflow/providers/google/cloud/transfers/sql_to_gcs.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,12 @@ def execute(self, context: 'Context'):
150150
file_to_upload['file_handle'].close()
151151
counter += 1
152152

153-
def convert_types(self, schema, col_type_dict, row) -> list:
153+
def convert_types(self, schema, col_type_dict, row, stringify_dict=False) -> list:
154154
"""Convert values from DBAPI to output-friendly formats."""
155-
return [self.convert_type(value, col_type_dict.get(name)) for name, value in zip(schema, row)]
155+
return [
156+
self.convert_type(value, col_type_dict.get(name), stringify_dict=stringify_dict)
157+
for name, value in zip(schema, row)
158+
]
156159

157160
def _write_local_data_files(self, cursor):
158161
"""
@@ -186,21 +189,20 @@ def _write_local_data_files(self, cursor):
186189
parquet_writer = self._configure_parquet_file(tmp_file_handle, parquet_schema)
187190

188191
for row in cursor:
189-
# Convert datetime objects to utc seconds, and decimals to floats.
190-
# Convert binary type object to string encoded with base64.
191-
row = self.convert_types(schema, col_type_dict, row)
192-
193192
if self.export_format == 'csv':
193+
row = self.convert_types(schema, col_type_dict, row)
194194
if self.null_marker is not None:
195195
row = [value if value is not None else self.null_marker for value in row]
196196
csv_writer.writerow(row)
197197
elif self.export_format == 'parquet':
198+
row = self.convert_types(schema, col_type_dict, row)
198199
if self.null_marker is not None:
199200
row = [value if value is not None else self.null_marker for value in row]
200201
row_pydic = {col: [value] for col, value in zip(schema, row)}
201202
tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
202203
parquet_writer.write_table(tbl)
203204
else:
205+
row = self.convert_types(schema, col_type_dict, row, stringify_dict=False)
204206
row_dict = dict(zip(schema, row))
205207

206208
tmp_file_handle.write(
@@ -273,7 +275,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
273275
"""Convert a DBAPI field to BigQuery schema format."""
274276

275277
@abc.abstractmethod
276-
def convert_type(self, value, schema_type):
278+
def convert_type(self, value, schema_type, **kwargs):
277279
"""Convert a value from DBAPI to output-friendly formats."""
278280

279281
def _get_col_type_dict(self):

β€Žairflow/providers/google/cloud/transfers/trino_to_gcs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
195195

196196
return {"name": field[0], "type": new_field_type}
197197

198-
def convert_type(self, value, schema_type):
198+
def convert_type(self, value, schema_type, **kwargs):
199199
"""
200200
Do nothing. Trino uses JSON on the transport layer, so types are simple.
201201

β€Žtests/providers/google/cloud/transfers/test_postgres_to_gcs.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,15 @@
3535
FILENAME = 'test_{}.ndjson'
3636

3737
NDJSON_LINES = [
38-
b'{"some_num": 42, "some_str": "mock_row_content_1"}\n',
39-
b'{"some_num": 43, "some_str": "mock_row_content_2"}\n',
40-
b'{"some_num": 44, "some_str": "mock_row_content_3"}\n',
38+
b'{"some_json": {"firtname": "John", "lastname": "Smith", "nested_dict": {"a": null, "b": "something"}}, "some_num": 42, "some_str": "mock_row_content_1"}\n', # noqa
39+
b'{"some_json": {}, "some_num": 43, "some_str": "mock_row_content_2"}\n',
40+
b'{"some_json": {}, "some_num": 44, "some_str": "mock_row_content_3"}\n',
4141
]
4242
SCHEMA_FILENAME = 'schema_test.json'
4343
SCHEMA_JSON = (
4444
b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, '
45-
b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}]'
45+
b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}, '
46+
b'{"mode": "NULLABLE", "name": "some_json", "type": "STRING"}]'
4647
)
4748

4849

@@ -55,16 +56,24 @@ def setUpClass(cls):
5556
with conn.cursor() as cur:
5657
for table in TABLES:
5758
cur.execute(f"DROP TABLE IF EXISTS {table} CASCADE;")
58-
cur.execute(f"CREATE TABLE {table}(some_str varchar, some_num integer);")
59+
cur.execute(f"CREATE TABLE {table}(some_str varchar, some_num integer, some_json json);")
5960

6061
cur.execute(
61-
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);", ('mock_row_content_1', 42)
62+
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
63+
(
64+
'mock_row_content_1',
65+
42,
66+
'{"lastname": "Smith", "firtname": "John", \
67+
"nested_dict": {"a": null, "b": "something"}}',
68+
),
6269
)
6370
cur.execute(
64-
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);", ('mock_row_content_2', 43)
71+
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
72+
('mock_row_content_2', 43, '{}'),
6573
)
6674
cur.execute(
67-
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s);", ('mock_row_content_3', 44)
75+
"INSERT INTO postgres_to_gcs_operator VALUES(%s, %s, %s);",
76+
('mock_row_content_3', 44, '{}'),
6877
)
6978

7079
@classmethod

β€Žtests/providers/google/cloud/transfers/test_sql_to_gcs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def field_to_bigquery(self, field) -> Dict[str, str]:
7070
'mode': 'NULLABLE',
7171
}
7272

73-
def convert_type(self, value, schema_type):
73+
def convert_type(self, value, schema_type, stringify_dict):
7474
return 'convert_type_return_value'
7575

7676
def query(self):

0 commit comments

Comments
 (0)