Skip to content

Commit 14e6b65

Browse files
author
Łukasz Wyszomirski
authored
Add timeout and retry to the BigQueryInsertJobOperator (#22395)
1 parent 82d2fa7 commit 14e6b65

File tree

3 files changed

+26
-3
lines changed

3 files changed

+26
-3
lines changed

β€Žairflow/providers/google/cloud/hooks/bigquery.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1503,6 +1503,8 @@ def insert_job(
15031503
project_id: Optional[str] = None,
15041504
location: Optional[str] = None,
15051505
nowait: bool = False,
1506+
retry: Retry = DEFAULT_RETRY,
1507+
timeout: Optional[float] = None,
15061508
) -> BigQueryJob:
15071509
"""
15081510
Executes a BigQuery job. Waits for the job to complete and returns job id.
@@ -1520,6 +1522,9 @@ def insert_job(
15201522
:param project_id: Google Cloud Project where the job is running
15211523
:param location: location the job is running
15221524
:param nowait: specify whether to insert job without waiting for the result
1525+
:param retry: How to retry the RPC.
1526+
:param timeout: The number of seconds to wait for the underlying HTTP transport
1527+
before using ``retry``.
15231528
"""
15241529
location = location or self.location
15251530
job_id = job_id or self._custom_job_id(configuration)
@@ -1552,7 +1557,7 @@ def insert_job(
15521557
job._begin()
15531558
else:
15541559
# Start the job and wait for it to complete and get the result.
1555-
job.result()
1560+
job.result(timeout=timeout, retry=retry)
15561561
return job
15571562

15581563
def run_with_configuration(self, configuration: dict) -> str:

β€Žairflow/providers/google/cloud/operators/bigquery.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929

3030
import attr
3131
from google.api_core.exceptions import Conflict
32+
from google.api_core.retry import Retry
33+
from google.cloud.bigquery import DEFAULT_RETRY
3234

3335
from airflow.exceptions import AirflowException
3436
from airflow.models import BaseOperator, BaseOperatorLink
@@ -2052,6 +2054,8 @@ class BigQueryInsertJobOperator(BaseOperator):
20522054
Service Account Token Creator IAM role to the directly preceding identity, with first
20532055
account from the list granting this role to the originating account (templated).
20542056
:param cancel_on_kill: Flag which indicates whether cancel the hook's job or not, when on_kill is called
2057+
:param result_retry: How to retry the `result` call that retrieves rows
2058+
:param result_timeout: The number of seconds to wait for `result` method before using `result_retry`
20552059
"""
20562060

20572061
template_fields: Sequence[str] = (
@@ -2075,6 +2079,8 @@ def __init__(
20752079
delegate_to: Optional[str] = None,
20762080
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
20772081
cancel_on_kill: bool = True,
2082+
result_retry: Retry = DEFAULT_RETRY,
2083+
result_timeout: Optional[float] = None,
20782084
**kwargs,
20792085
) -> None:
20802086
super().__init__(**kwargs)
@@ -2088,6 +2094,8 @@ def __init__(
20882094
self.reattach_states: Set[str] = reattach_states or set()
20892095
self.impersonation_chain = impersonation_chain
20902096
self.cancel_on_kill = cancel_on_kill
2097+
self.result_retry = result_retry
2098+
self.result_timeout = result_timeout
20912099
self.hook: Optional[BigQueryHook] = None
20922100

20932101
def prepare_template(self) -> None:
@@ -2107,6 +2115,8 @@ def _submit_job(
21072115
project_id=self.project_id,
21082116
location=self.location,
21092117
job_id=job_id,
2118+
timeout=self.result_timeout,
2119+
retry=self.result_retry,
21102120
)
21112121

21122122
@staticmethod
@@ -2151,7 +2161,7 @@ def execute(self, context: Any):
21512161
)
21522162
if job.state in self.reattach_states:
21532163
# We are reattaching to a job
2154-
job.result()
2164+
job.result(timeout=self.result_timeout, retry=self.result_retry)
21552165
self._handle_job_error(job)
21562166
else:
21572167
# Same job configuration so we need force_rerun

β€Žtests/providers/google/cloud/operators/test_bigquery.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from unittest.mock import MagicMock
2222

2323
import pytest
24+
from google.cloud.bigquery import DEFAULT_RETRY
2425
from google.cloud.exceptions import Conflict
2526

2627
from airflow.exceptions import AirflowException
@@ -840,6 +841,8 @@ def test_execute_success(self, mock_hook, mock_md5):
840841
location=TEST_DATASET_LOCATION,
841842
job_id=real_job_id,
842843
project_id=TEST_GCP_PROJECT_ID,
844+
retry=DEFAULT_RETRY,
845+
timeout=None,
843846
)
844847

845848
assert result == real_job_id
@@ -947,7 +950,10 @@ def test_execute_reattach(self, mock_hook, mock_md5):
947950
project_id=TEST_GCP_PROJECT_ID,
948951
)
949952

950-
job.result.assert_called_once_with()
953+
job.result.assert_called_once_with(
954+
retry=DEFAULT_RETRY,
955+
timeout=None,
956+
)
951957

952958
assert result == real_job_id
953959

@@ -988,6 +994,8 @@ def test_execute_force_rerun(self, mock_hook, mock_uuid, mock_md5):
988994
location=TEST_DATASET_LOCATION,
989995
job_id=real_job_id,
990996
project_id=TEST_GCP_PROJECT_ID,
997+
retry=DEFAULT_RETRY,
998+
timeout=None,
991999
)
9921000

9931001
assert result == real_job_id

0 commit comments

Comments
 (0)