Skip to content

Commit d91861d

Browse files
authored
Optimize deferrable mode (#31758)
1 parent a3768b4 commit d91861d

File tree

2 files changed

+38
-10
lines changed

2 files changed

+38
-10
lines changed

β€Žairflow/providers/google/cloud/operators/bigquery.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -240,16 +240,18 @@ def execute(self, context: Context):
240240
)
241241
job = self._submit_job(hook, job_id="")
242242
context["ti"].xcom_push(key="job_id", value=job.job_id)
243-
self.defer(
244-
timeout=self.execution_timeout,
245-
trigger=BigQueryCheckTrigger(
246-
conn_id=self.gcp_conn_id,
247-
job_id=job.job_id,
248-
project_id=hook.project_id,
249-
poll_interval=self.poll_interval,
250-
),
251-
method_name="execute_complete",
252-
)
243+
if job.running():
244+
self.defer(
245+
timeout=self.execution_timeout,
246+
trigger=BigQueryCheckTrigger(
247+
conn_id=self.gcp_conn_id,
248+
job_id=job.job_id,
249+
project_id=hook.project_id,
250+
poll_interval=self.poll_interval,
251+
),
252+
method_name="execute_complete",
253+
)
254+
self.log.info("Current state of job %s is %s", job.job_id, job.state)
253255

254256
def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
255257
"""

β€Žtests/providers/google/cloud/operators/test_bigquery.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1595,6 +1595,32 @@ def test_bigquery_interval_check_operator_async(self, mock_hook, create_task_ins
15951595

15961596

15971597
class TestBigQueryCheckOperator:
1598+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator.execute")
1599+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator.defer")
1600+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
1601+
def test_bigquery_check_operator_async_finish_before_deferred(
1602+
self, mock_hook, mock_defer, mock_execute, create_task_instance_of_operator
1603+
):
1604+
job_id = "123456"
1605+
hash_ = "hash"
1606+
real_job_id = f"{job_id}_{hash_}"
1607+
1608+
mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False)
1609+
mock_hook.return_value.insert_job.return_value.running.return_value = False
1610+
1611+
ti = create_task_instance_of_operator(
1612+
BigQueryCheckOperator,
1613+
dag_id="dag_id",
1614+
task_id="bq_check_operator_job",
1615+
sql="SELECT * FROM any",
1616+
location=TEST_DATASET_LOCATION,
1617+
deferrable=True,
1618+
)
1619+
1620+
ti.task.execute(MagicMock())
1621+
assert not mock_defer.called
1622+
assert mock_execute.called
1623+
15981624
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
15991625
def test_bigquery_check_operator_async(self, mock_hook, create_task_instance_of_operator):
16001626
"""

0 commit comments

Comments
 (0)