Skip to content

Commit 28f2e70

Browse files
authored
Optimize deferrable mode execution for BigQueryInsertJobOperator (#31249)
* Optimize deferred mode for BigQueryInsertJobOperator * Apply review suggestion * Add job state to the log * capture and assert job state log in test
1 parent 64b0872 commit 28f2e70

File tree

2 files changed

+42
-10
lines changed

2 files changed

+42
-10
lines changed

β€Žairflow/providers/google/cloud/operators/bigquery.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2709,16 +2709,19 @@ def execute(self, context: Any):
27092709
self._handle_job_error(job)
27102710

27112711
return self.job_id
2712-
self.defer(
2713-
timeout=self.execution_timeout,
2714-
trigger=BigQueryInsertJobTrigger(
2715-
conn_id=self.gcp_conn_id,
2716-
job_id=self.job_id,
2717-
project_id=self.project_id,
2718-
poll_interval=self.poll_interval,
2719-
),
2720-
method_name="execute_complete",
2721-
)
2712+
else:
2713+
if job.running():
2714+
self.defer(
2715+
timeout=self.execution_timeout,
2716+
trigger=BigQueryInsertJobTrigger(
2717+
conn_id=self.gcp_conn_id,
2718+
job_id=self.job_id,
2719+
project_id=self.project_id,
2720+
poll_interval=self.poll_interval,
2721+
),
2722+
method_name="execute_complete",
2723+
)
2724+
self.log.info("Current state of job %s is %s", job.job_id, job.state)
27222725

27232726
def execute_complete(self, context: Context, event: dict[str, Any]):
27242727
"""

β€Žtests/providers/google/cloud/operators/test_bigquery.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1310,6 +1310,35 @@ def test_execute_no_force_rerun(self, mock_hook):
13101310
with pytest.raises(AirflowException):
13111311
op.execute(context=MagicMock())
13121312

1313+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator.defer")
1314+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
1315+
def test_bigquery_insert_job_operator_async_finish_before_deferred(self, mock_hook, mock_defer, caplog):
1316+
job_id = "123456"
1317+
hash_ = "hash"
1318+
real_job_id = f"{job_id}_{hash_}"
1319+
1320+
configuration = {
1321+
"query": {
1322+
"query": "SELECT * FROM any",
1323+
"useLegacySql": False,
1324+
}
1325+
}
1326+
mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False)
1327+
mock_hook.return_value.insert_job.return_value.running.return_value = False
1328+
1329+
op = BigQueryInsertJobOperator(
1330+
task_id="insert_query_job",
1331+
configuration=configuration,
1332+
location=TEST_DATASET_LOCATION,
1333+
job_id=job_id,
1334+
project_id=TEST_GCP_PROJECT_ID,
1335+
deferrable=True,
1336+
)
1337+
1338+
op.execute(MagicMock())
1339+
assert not mock_defer.called
1340+
assert "Current state of job" in caplog.text
1341+
13131342
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
13141343
def test_bigquery_insert_job_operator_async(self, mock_hook, create_task_instance_of_operator):
13151344
"""

0 commit comments

Comments
 (0)