Skip to content

Commit 833087f

Browse files
authored
Enable asynchronous job submission in BigQuery hook (#21385)
* Add nowait flag to the insert_job method * When nowait is True, the execution won't wait till the job results are available. * By default, the job execution will wait till job results are available.
1 parent b6892c4 commit 833087f

File tree

2 files changed

+20
-11
lines changed

2 files changed

+20
-11
lines changed

β€Žairflow/providers/google/cloud/hooks/bigquery.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1498,6 +1498,7 @@ def insert_job(
14981498
job_id: Optional[str] = None,
14991499
project_id: Optional[str] = None,
15001500
location: Optional[str] = None,
1501+
nowait: bool = False,
15011502
) -> BigQueryJob:
15021503
"""
15031504
Executes a BigQuery job. Waits for the job to complete and returns job id.
@@ -1514,6 +1515,7 @@ def insert_job(
15141515
characters. If not provided then uuid will be generated.
15151516
:param project_id: Google Cloud Project where the job is running
15161517
:param location: location the job is running
1518+
:param nowait: specify whether to insert job without waiting for the result
15171519
"""
15181520
location = location or self.location
15191521
job_id = job_id or self._custom_job_id(configuration)
@@ -1541,8 +1543,12 @@ def insert_job(
15411543
raise AirflowException(f"Unknown job type. Supported types: {supported_jobs.keys()}")
15421544
job = job.from_api_repr(job_data, client)
15431545
self.log.info("Inserting job %s", job.job_id)
1544-
# Start the job and wait for it to complete and get the result.
1545-
job.result()
1546+
if nowait:
1547+
# Initiate the job and don't wait for it to complete.
1548+
job._begin()
1549+
else:
1550+
# Start the job and wait for it to complete and get the result.
1551+
job.result()
15461552
return job
15471553

15481554
def run_with_configuration(self, configuration: dict) -> str:

β€Žtests/providers/google/cloud/hooks/test_bigquery.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@
5454
TABLE_REFERENCE = TableReference.from_api_repr(TABLE_REFERENCE_REPR)
5555

5656

57-
class _BigQueryBaseTestClass(unittest.TestCase):
58-
def setUp(self) -> None:
57+
class _BigQueryBaseTestClass:
58+
def setup_method(self) -> None:
5959
class MockedBigQueryHook(BigQueryHook):
6060
def _get_credentials_and_project_id(self):
6161
return CREDENTIALS, PROJECT_ID
@@ -898,9 +898,10 @@ def test_run_query_with_arg(self, mock_insert):
898898
_, kwargs = mock_insert.call_args
899899
assert kwargs["configuration"]['labels'] == {'label1': 'test1', 'label2': 'test2'}
900900

901+
@pytest.mark.parametrize('nowait', [True, False])
901902
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.QueryJob")
902903
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
903-
def test_insert_job(self, mock_client, mock_query_job):
904+
def test_insert_job(self, mock_client, mock_query_job, nowait):
904905
job_conf = {
905906
"query": {
906907
"query": "SELECT * FROM test",
@@ -910,10 +911,7 @@ def test_insert_job(self, mock_client, mock_query_job):
910911
mock_query_job._JOB_TYPE = "query"
911912

912913
self.hook.insert_job(
913-
configuration=job_conf,
914-
job_id=JOB_ID,
915-
project_id=PROJECT_ID,
916-
location=LOCATION,
914+
configuration=job_conf, job_id=JOB_ID, project_id=PROJECT_ID, location=LOCATION, nowait=nowait
917915
)
918916

919917
mock_client.assert_called_once_with(
@@ -928,7 +926,12 @@ def test_insert_job(self, mock_client, mock_query_job):
928926
},
929927
mock_client.return_value,
930928
)
931-
mock_query_job.from_api_repr.return_value.result.assert_called_once_with()
929+
if nowait:
930+
mock_query_job.from_api_repr.return_value._begin.assert_called_once()
931+
mock_query_job.from_api_repr.return_value.result.assert_not_called()
932+
else:
933+
mock_query_job.from_api_repr.return_value._begin.assert_not_called()
934+
mock_query_job.from_api_repr.return_value.result.assert_called_once()
932935

933936
def test_dbapi_get_uri(self):
934937
assert self.hook.get_uri().startswith('bigquery://')
@@ -2014,7 +2017,7 @@ def test_create_external_table_labels(self, mock_create):
20142017
)
20152018

20162019
_, kwargs = mock_create.call_args
2017-
self.assertDictEqual(kwargs['table_resource']['labels'], labels)
2020+
assert kwargs['table_resource']['labels'] == labels
20182021

20192022
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.create_empty_table")
20202023
def test_create_external_table_description(self, mock_create):

0 commit comments

Comments
 (0)