Skip to content

Commit 6a03870

Browse files
improvement: introduce proejct_id in BigQueryIntervalCheckOperator (#34573)
* improvement: introduce proejct_id in BigQueryIntervalCheckOperator * Update bigquery.py --------- Co-authored-by: Hussein Awala <hussein@awala.fr>
1 parent 2c53345 commit 6a03870

File tree

2 files changed

+78
-1
lines changed

2 files changed

+78
-1
lines changed

β€Žairflow/providers/google/cloud/operators/bigquery.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat
505505
:param deferrable: Run operator in the deferrable mode
506506
:param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job.
507507
Defaults to 4 seconds.
508+
:param project_id: a string represents the BigQuery projectId
508509
"""
509510

510511
template_fields: Sequence[str] = (
@@ -532,6 +533,7 @@ def __init__(
532533
labels: dict | None = None,
533534
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
534535
poll_interval: float = 4.0,
536+
project_id: str | None = None,
535537
**kwargs,
536538
) -> None:
537539
super().__init__(
@@ -547,6 +549,7 @@ def __init__(
547549
self.location = location
548550
self.impersonation_chain = impersonation_chain
549551
self.labels = labels
552+
self.project_id = project_id
550553
self.deferrable = deferrable
551554
self.poll_interval = poll_interval
552555

@@ -560,7 +563,7 @@ def _submit_job(
560563
configuration = {"query": {"query": sql, "useLegacySql": self.use_legacy_sql}}
561564
return hook.insert_job(
562565
configuration=configuration,
563-
project_id=hook.project_id,
566+
project_id=self.project_id or hook.project_id,
564567
location=self.location,
565568
job_id=job_id,
566569
nowait=True,

β€Žtests/providers/google/cloud/operators/test_bigquery.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1768,6 +1768,80 @@ def test_bigquery_interval_check_operator_async(self, mock_hook, create_task_ins
17681768
exc.value.trigger, BigQueryIntervalCheckTrigger
17691769
), "Trigger is not a BigQueryIntervalCheckTrigger"
17701770

1771+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
1772+
def test_bigquery_interval_check_operator_with_project_id(
1773+
self, mock_hook, create_task_instance_of_operator
1774+
):
1775+
"""
1776+
Test BigQueryIntervalCheckOperator with a specified project_id.
1777+
Ensure that the bq_project_id is passed correctly when submitting the job.
1778+
"""
1779+
job_id = "123456"
1780+
hash_ = "hash"
1781+
real_job_id = f"{job_id}_{hash_}"
1782+
1783+
project_id = "test-project-id"
1784+
ti = create_task_instance_of_operator(
1785+
BigQueryIntervalCheckOperator,
1786+
dag_id="dag_id",
1787+
task_id="bq_interval_check_operator_with_project_id",
1788+
table="test_table",
1789+
metrics_thresholds={"COUNT(*)": 1.5},
1790+
location=TEST_DATASET_LOCATION,
1791+
deferrable=True,
1792+
project_id=project_id,
1793+
)
1794+
1795+
mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False)
1796+
1797+
with pytest.raises(TaskDeferred):
1798+
ti.task.execute(MagicMock())
1799+
1800+
mock_hook.return_value.insert_job.assert_called_with(
1801+
configuration=mock.ANY,
1802+
project_id=project_id,
1803+
location=TEST_DATASET_LOCATION,
1804+
job_id=mock.ANY,
1805+
nowait=True,
1806+
)
1807+
1808+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
1809+
def test_bigquery_interval_check_operator_without_project_id(
1810+
self, mock_hook, create_task_instance_of_operator
1811+
):
1812+
"""
1813+
Test BigQueryIntervalCheckOperator without a specified project_id.
1814+
Ensure that the project_id falls back to the hook.project_id as previously implemented.
1815+
"""
1816+
job_id = "123456"
1817+
hash_ = "hash"
1818+
real_job_id = f"{job_id}_{hash_}"
1819+
1820+
project_id = "test-project-id"
1821+
ti = create_task_instance_of_operator(
1822+
BigQueryIntervalCheckOperator,
1823+
dag_id="dag_id",
1824+
task_id="bq_interval_check_operator_without_project_id",
1825+
table="test_table",
1826+
metrics_thresholds={"COUNT(*)": 1.5},
1827+
location=TEST_DATASET_LOCATION,
1828+
deferrable=True,
1829+
)
1830+
1831+
mock_hook.return_value.project_id = project_id
1832+
mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False)
1833+
1834+
with pytest.raises(TaskDeferred):
1835+
ti.task.execute(MagicMock())
1836+
1837+
mock_hook.return_value.insert_job.assert_called_with(
1838+
configuration=mock.ANY,
1839+
project_id=mock_hook.return_value.project_id,
1840+
location=TEST_DATASET_LOCATION,
1841+
job_id=mock.ANY,
1842+
nowait=True,
1843+
)
1844+
17711845

17721846
class TestBigQueryCheckOperator:
17731847
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator.execute")

0 commit comments

Comments
 (0)