Skip to content

Commit af4627f

Browse files
authored
fix setting project_id for gs to bq and bq to gs (#30053)
1 parent 732fcd7 commit af4627f

File tree

3 files changed

+24
-19
lines changed

3 files changed

+24
-19
lines changed

β€Žairflow/providers/google/cloud/transfers/bigquery_to_gcs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ def _submit_job(
193193

194194
return hook.insert_job(
195195
configuration=configuration,
196-
project_id=hook.project_id,
196+
project_id=configuration["extract"]["sourceTable"]["projectId"],
197197
location=self.location,
198198
job_id=job_id,
199199
timeout=self.result_timeout,

β€Žairflow/providers/google/cloud/transfers/gcs_to_bigquery.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ def __init__(
227227
job_id: str | None = None,
228228
force_rerun: bool = True,
229229
reattach_states: set[str] | None = None,
230+
project_id: str | None = None,
230231
**kwargs,
231232
) -> None:
232233

@@ -249,6 +250,7 @@ def __init__(
249250

250251
# BQ config
251252
self.destination_project_dataset_table = destination_project_dataset_table
253+
self.project_id = project_id
252254
self.schema_fields = schema_fields
253255
if source_format.upper() not in ALLOWED_FORMATS:
254256
raise ValueError(
@@ -306,7 +308,7 @@ def _submit_job(
306308
# Submit a new job without waiting for it to complete.
307309
return hook.insert_job(
308310
configuration=self.configuration,
309-
project_id=hook.project_id,
311+
project_id=self.project_id,
310312
location=self.location,
311313
job_id=job_id,
312314
timeout=self.result_timeout,
@@ -507,9 +509,9 @@ def _find_max_value_in_column(self):
507509
raise RuntimeError(f"The {select_command} returned no rows!")
508510

509511
def _create_empty_table(self):
510-
project_id, dataset_id, table_id = self.hook.split_tablename(
512+
self.project_id, dataset_id, table_id = self.hook.split_tablename(
511513
table_input=self.destination_project_dataset_table,
512-
default_project_id=self.hook.project_id or "",
514+
default_project_id=self.project_id or self.hook.project_id,
513515
)
514516

515517
external_config_api_repr = {
@@ -556,7 +558,7 @@ def _create_empty_table(self):
556558

557559
# build table definition
558560
table = Table(
559-
table_ref=TableReference.from_string(self.destination_project_dataset_table, project_id)
561+
table_ref=TableReference.from_string(self.destination_project_dataset_table, self.project_id)
560562
)
561563
table.external_data_configuration = external_config
562564
if self.labels:
@@ -573,15 +575,18 @@ def _create_empty_table(self):
573575

574576
self.log.info("Creating external table: %s", self.destination_project_dataset_table)
575577
self.hook.create_empty_table(
576-
table_resource=table_obj_api_repr, project_id=project_id, location=self.location, exists_ok=True
578+
table_resource=table_obj_api_repr,
579+
project_id=self.project_id,
580+
location=self.location,
581+
exists_ok=True,
577582
)
578583
self.log.info("External table created successfully: %s", self.destination_project_dataset_table)
579584
return table_obj_api_repr
580585

581586
def _use_existing_table(self):
582-
destination_project, destination_dataset, destination_table = self.hook.split_tablename(
587+
self.project_id, destination_dataset, destination_table = self.hook.split_tablename(
583588
table_input=self.destination_project_dataset_table,
584-
default_project_id=self.hook.project_id or "",
589+
default_project_id=self.project_id or self.hook.project_id,
585590
var_name="destination_project_dataset_table",
586591
)
587592

@@ -601,7 +606,7 @@ def _use_existing_table(self):
601606
"autodetect": self.autodetect,
602607
"createDisposition": self.create_disposition,
603608
"destinationTable": {
604-
"projectId": destination_project,
609+
"projectId": self.project_id,
605610
"datasetId": destination_dataset,
606611
"tableId": destination_table,
607612
},

β€Žtests/providers/google/cloud/transfers/test_gcs_to_bigquery.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ def test_max_value_without_external_table_should_execute_successfully(self, hook
172172
job_id=pytest.real_job_id,
173173
location=None,
174174
nowait=True,
175-
project_id=hook.return_value.project_id,
175+
project_id=hook.return_value.split_tablename.return_value[0],
176176
retry=DEFAULT_RETRY,
177177
timeout=None,
178178
),
@@ -233,7 +233,7 @@ def test_max_value_should_throw_ex_when_query_returns_no_rows(self, hook):
233233
job_id=pytest.real_job_id,
234234
location=None,
235235
nowait=True,
236-
project_id=hook.return_value.project_id,
236+
project_id=hook.return_value.split_tablename.return_value[0],
237237
retry=DEFAULT_RETRY,
238238
timeout=None,
239239
),
@@ -342,7 +342,7 @@ def test_labels_without_external_table_should_execute_successfully(self, hook):
342342
job_id=pytest.real_job_id,
343343
location=None,
344344
nowait=True,
345-
project_id=hook.return_value.project_id,
345+
project_id=hook.return_value.split_tablename.return_value[0],
346346
retry=DEFAULT_RETRY,
347347
timeout=None,
348348
)
@@ -441,7 +441,7 @@ def test_description_without_external_table_should_execute_successfully(self, ho
441441
fieldDelimiter=",",
442442
),
443443
},
444-
project_id=hook.return_value.project_id,
444+
project_id=hook.return_value.split_tablename.return_value[0],
445445
location=None,
446446
job_id=pytest.real_job_id,
447447
timeout=None,
@@ -545,7 +545,7 @@ def test_source_objs_as_list_without_external_table_should_execute_successfully(
545545
job_id=pytest.real_job_id,
546546
location=None,
547547
nowait=True,
548-
project_id=hook.return_value.project_id,
548+
project_id=hook.return_value.split_tablename.return_value[0],
549549
retry=DEFAULT_RETRY,
550550
timeout=None,
551551
)
@@ -645,7 +645,7 @@ def test_source_objs_as_string_without_external_table_should_execute_successfull
645645
job_id=pytest.real_job_id,
646646
location=None,
647647
nowait=True,
648-
project_id=hook.return_value.project_id,
648+
project_id=hook.return_value.split_tablename.return_value[0],
649649
retry=DEFAULT_RETRY,
650650
timeout=None,
651651
)
@@ -746,7 +746,7 @@ def test_schema_obj_without_external_table_should_execute_successfully(self, bq_
746746
"encoding": "UTF-8",
747747
}
748748
},
749-
project_id=bq_hook.return_value.project_id,
749+
project_id=bq_hook.return_value.split_tablename.return_value[0],
750750
location=None,
751751
job_id=pytest.real_job_id,
752752
timeout=None,
@@ -842,7 +842,7 @@ def test_autodetect_none_without_external_table_should_execute_successfully(self
842842
"encoding": "UTF-8",
843843
}
844844
},
845-
project_id=hook.return_value.project_id,
845+
project_id=hook.return_value.split_tablename.return_value[0],
846846
location=None,
847847
job_id=pytest.real_job_id,
848848
timeout=None,
@@ -1067,7 +1067,7 @@ def test_schema_fields_integer_scanner_without_external_table_should_execute_suc
10671067
job_id=pytest.real_job_id,
10681068
location=None,
10691069
nowait=True,
1070-
project_id=bq_hook.return_value.project_id,
1070+
project_id=bq_hook.return_value.split_tablename.return_value[0],
10711071
retry=DEFAULT_RETRY,
10721072
timeout=None,
10731073
),
@@ -1129,7 +1129,7 @@ def test_schema_fields_without_external_table_should_execute_successfully(self,
11291129
job_id=pytest.real_job_id,
11301130
location=None,
11311131
nowait=True,
1132-
project_id=hook.return_value.project_id,
1132+
project_id=hook.return_value.split_tablename.return_value[0],
11331133
retry=DEFAULT_RETRY,
11341134
timeout=None,
11351135
)

0 commit comments

Comments
 (0)