Skip to content

Commit 3157002

Browse files
bkossakowskaBeata Kossakowskamoiseenkov
authored
Fix DataplexDataQualityJobStatusSensor and add unit tests (#33440)
Fix for the test in the Dataplex DQ. Add unit tests for cought and fixed DataplexDataQualityJobStatusSensor failure --------- Co-authored-by: Beata Kossakowska <bkossakowska@google.com> Co-authored-by: Maksim Moiseenkov <maksim_moiseenkov@epam.com>
1 parent 1945c1a commit 3157002

File tree

2 files changed

+46
-4
lines changed

2 files changed

+46
-4
lines changed

β€Žairflow/providers/google/cloud/sensors/dataplex.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ def __init__(
167167
impersonation_chain: str | Sequence[str] | None = None,
168168
fail_on_dq_failure: bool = False,
169169
result_timeout: float = 60.0 * 10,
170-
start_sensor_time: float = time.monotonic(),
170+
start_sensor_time: float | None = None,
171171
*args,
172172
**kwargs,
173173
) -> None:
@@ -185,10 +185,9 @@ def __init__(
185185
self.result_timeout = result_timeout
186186
self.start_sensor_time = start_sensor_time
187187

188-
def execute(self, context: Context) -> None:
189-
super().execute(context)
190-
191188
def _duration(self):
189+
if not self.start_sensor_time:
190+
self.start_sensor_time = time.monotonic()
192191
return time.monotonic() - self.start_sensor_time
193192

194193
def poke(self, context: Context) -> bool:

β€Žtests/providers/google/cloud/sensors/test_dataplex.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from google.cloud.dataplex_v1.types import DataScanJob
2424

2525
from airflow import AirflowException
26+
from airflow.providers.google.cloud.hooks.dataplex import AirflowDataQualityScanResultTimeoutException
2627
from airflow.providers.google.cloud.sensors.dataplex import (
2728
DataplexDataQualityJobStatusSensor,
2829
DataplexTaskStateSensor,
@@ -144,3 +145,45 @@ def test_done(self, mock_hook):
144145
)
145146

146147
assert result
148+
149+
def test_start_sensor_time(self):
150+
sensor = DataplexDataQualityJobStatusSensor(
151+
task_id=TASK_ID,
152+
project_id=PROJECT_ID,
153+
job_id=TEST_JOB_ID,
154+
data_scan_id=TEST_DATA_SCAN_ID,
155+
region=REGION,
156+
api_version=API_VERSION,
157+
gcp_conn_id=GCP_CONN_ID,
158+
impersonation_chain=IMPERSONATION_CHAIN,
159+
timeout=TIMEOUT,
160+
)
161+
162+
assert sensor.start_sensor_time is None
163+
164+
duration_1 = sensor._duration()
165+
duration_2 = sensor._duration()
166+
167+
assert sensor.start_sensor_time
168+
assert 0 < duration_1 < duration_2
169+
170+
@mock.patch.object(DataplexDataQualityJobStatusSensor, "_duration")
171+
def test_start_sensor_time_timeout(self, mock_duration):
172+
result_timeout = 100
173+
mock_duration.return_value = result_timeout + 1
174+
175+
sensor = DataplexDataQualityJobStatusSensor(
176+
task_id=TASK_ID,
177+
project_id=PROJECT_ID,
178+
job_id=TEST_JOB_ID,
179+
data_scan_id=TEST_DATA_SCAN_ID,
180+
region=REGION,
181+
api_version=API_VERSION,
182+
gcp_conn_id=GCP_CONN_ID,
183+
impersonation_chain=IMPERSONATION_CHAIN,
184+
timeout=TIMEOUT,
185+
result_timeout=result_timeout,
186+
)
187+
188+
with pytest.raises(AirflowDataQualityScanResultTimeoutException):
189+
sensor.poke(context={})

0 commit comments

Comments
 (0)