Skip to content

Commit 2a79fb7

Browse files
bkossakowskaBeata Kossakowska
andauthored
Fix BIGQUERY_JOB_DETAILS_LINK_FMT in BigQueryConsoleLink (#31953)
Co-authored-by: Beata Kossakowska <bkossakowska@google.com>
1 parent 79bcc2e commit 2a79fb7

File tree

4 files changed

+59
-27
lines changed

4 files changed

+59
-27
lines changed

β€Žairflow/providers/google/cloud/operators/bigquery.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
BigQueryIntervalCheckTrigger,
5252
BigQueryValueCheckTrigger,
5353
)
54+
from airflow.providers.google.cloud.utils.bigquery import convert_job_id
5455

5556
if TYPE_CHECKING:
5657
from google.cloud.bigquery import UnknownJob
@@ -90,8 +91,8 @@ def get_link(
9091
*,
9192
ti_key: TaskInstanceKey,
9293
):
93-
job_id = XCom.get_value(key="job_id", ti_key=ti_key)
94-
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ""
94+
job_id_path = XCom.get_value(key="job_id_path", ti_key=ti_key)
95+
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id_path) if job_id_path else ""
9596

9697

9798
@attr.s(auto_attribs=True)
@@ -110,7 +111,7 @@ def get_link(
110111
*,
111112
ti_key: TaskInstanceKey,
112113
):
113-
job_ids = XCom.get_value(key="job_id", ti_key=ti_key)
114+
job_ids = XCom.get_value(key="job_id_path", ti_key=ti_key)
114115
if not job_ids:
115116
return None
116117
if len(job_ids) < self.index:
@@ -1184,7 +1185,11 @@ def execute(self, context: Context):
11841185
]
11851186
else:
11861187
raise AirflowException(f"argument 'sql' of type {type(str)} is neither a string nor an iterable")
1187-
context["task_instance"].xcom_push(key="job_id", value=job_id)
1188+
project_id = self.hook.project_id
1189+
if project_id:
1190+
job_id_path = convert_job_id(job_id=job_id, project_id=project_id, location=self.location)
1191+
context["task_instance"].xcom_push(key="job_id_path", value=job_id_path)
1192+
return job_id
11881193

11891194
def on_kill(self) -> None:
11901195
super().on_kill()
@@ -2727,9 +2732,11 @@ def execute(self, context: Any):
27272732
persist_kwargs["dataset_id"] = table["datasetId"]
27282733
persist_kwargs["project_id"] = table["projectId"]
27292734
BigQueryTableLink.persist(**persist_kwargs)
2730-
27312735
self.job_id = job.job_id
2732-
context["ti"].xcom_push(key="job_id", value=self.job_id)
2736+
project_id = self.project_id or self.hook.project_id
2737+
if project_id:
2738+
job_id_path = convert_job_id(job_id=job_id, project_id=project_id, location=self.location)
2739+
context["ti"].xcom_push(key="job_id_path", value=job_id_path)
27332740
# Wait for the job to complete
27342741
if not self.deferrable:
27352742
job.result(timeout=self.result_timeout, retry=self.result_retry)

β€Žairflow/providers/google/cloud/utils/bigquery.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19+
from typing import Any
20+
1921

2022
def bq_cast(string_field: str, bq_type: str) -> None | int | float | bool | str:
2123
"""
@@ -34,3 +36,18 @@ def bq_cast(string_field: str, bq_type: str) -> None | int | float | bool | str:
3436
return string_field == "true"
3537
else:
3638
return string_field
39+
40+
41+
def convert_job_id(job_id: str | list[str], project_id: str, location: str | None) -> Any:
42+
"""
43+
Helper method that converts to path: project_id:location:job_id
44+
:param project_id: Required. The ID of the Google Cloud project where workspace located.
45+
:param location: Optional. The ID of the Google Cloud region where workspace located.
46+
:param job_id: Required. The ID of the job.
47+
:return: str or list[str] of project_id:location:job_id.
48+
"""
49+
location = location if location else "US"
50+
if isinstance(job_id, list):
51+
return [f"{project_id}:{location}:{i}" for i in job_id]
52+
else:
53+
return f"{project_id}:{location}:{job_id}"

β€Žtests/api_connexion/endpoints/test_extra_link_endpoint.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ def test_should_raise_403_forbidden(self):
142142
@mock_plugin_manager(plugins=[])
143143
def test_should_respond_200(self):
144144
XCom.set(
145-
key="job_id",
145+
key="job_id_path",
146146
value="TEST_JOB_ID",
147147
task_id="TEST_SINGLE_QUERY",
148148
dag_id=self.dag.dag_id,
@@ -171,7 +171,7 @@ def test_should_respond_200_missing_xcom(self):
171171
@mock_plugin_manager(plugins=[])
172172
def test_should_respond_200_multiple_links(self):
173173
XCom.set(
174-
key="job_id",
174+
key="job_id_path",
175175
value=["TEST_JOB_ID_1", "TEST_JOB_ID_2"],
176176
task_id="TEST_MULTIPLE_QUERY",
177177
dag_id=self.dag.dag_id,

β€Žtests/providers/google/cloud/operators/test_bigquery.py

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@
8383
}
8484
TEST_TABLE = "test-table"
8585
GCP_CONN_ID = "google_cloud_default"
86+
TEST_JOB_ID_1 = "test-job-id"
87+
TEST_JOB_ID_2 = "test-123"
88+
TEST_FULL_JOB_ID = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_1}"
89+
TEST_FULL_JOB_ID_2 = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_2}"
8690

8791

8892
class TestBigQueryCreateEmptyTableOperator:
@@ -673,10 +677,10 @@ def test_bigquery_operator_extra_serialized_field_when_single_query(
673677
# Check DeSerialized version of operator link
674678
assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleLink)
675679

676-
ti.xcom_push("job_id", 12345)
680+
ti.xcom_push("job_id_path", TEST_FULL_JOB_ID)
677681

678682
url = simple_task.get_extra_links(ti, BigQueryConsoleLink.name)
679-
assert url == "https://console.cloud.google.com/bigquery?j=12345"
683+
assert url == f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
680684

681685
@pytest.mark.need_serialized_dag
682686
def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
@@ -711,17 +715,18 @@ def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
711715
# Check DeSerialized version of operator link
712716
assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleIndexableLink)
713717

714-
job_id = ["123", "45"]
715-
ti.xcom_push(key="job_id", value=job_id)
718+
ti.xcom_push(key="job_id_path", value=[TEST_FULL_JOB_ID, TEST_FULL_JOB_ID_2])
716719

717720
assert {"BigQuery Console #1", "BigQuery Console #2"} == simple_task.operator_extra_link_dict.keys()
718721

719-
assert "https://console.cloud.google.com/bigquery?j=123" == simple_task.get_extra_links(
720-
ti, "BigQuery Console #1"
722+
assert (
723+
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
724+
== simple_task.get_extra_links(ti, "BigQuery Console #1")
721725
)
722726

723-
assert "https://console.cloud.google.com/bigquery?j=45" == simple_task.get_extra_links(
724-
ti, "BigQuery Console #2"
727+
assert (
728+
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
729+
== simple_task.get_extra_links(ti, "BigQuery Console #2")
725730
)
726731

727732
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@@ -740,7 +745,9 @@ def test_bigquery_operator_extra_link_when_missing_job_id(
740745

741746
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
742747
def test_bigquery_operator_extra_link_when_single_query(
743-
self, mock_hook, create_task_instance_of_operator
748+
self,
749+
mock_hook,
750+
create_task_instance_of_operator,
744751
):
745752
ti = create_task_instance_of_operator(
746753
BigQueryExecuteQueryOperator,
@@ -751,11 +758,11 @@ def test_bigquery_operator_extra_link_when_single_query(
751758
)
752759
bigquery_task = ti.task
753760

754-
job_id = "12345"
755-
ti.xcom_push(key="job_id", value=job_id)
761+
ti.xcom_push(key="job_id_path", value=TEST_FULL_JOB_ID)
756762

757-
assert f"https://console.cloud.google.com/bigquery?j={job_id}" == bigquery_task.get_extra_links(
758-
ti, BigQueryConsoleLink.name
763+
assert (
764+
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
765+
== bigquery_task.get_extra_links(ti, BigQueryConsoleLink.name)
759766
)
760767

761768
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@@ -771,17 +778,18 @@ def test_bigquery_operator_extra_link_when_multiple_query(
771778
)
772779
bigquery_task = ti.task
773780

774-
job_id = ["123", "45"]
775-
ti.xcom_push(key="job_id", value=job_id)
781+
ti.xcom_push(key="job_id_path", value=[TEST_FULL_JOB_ID, TEST_FULL_JOB_ID_2])
776782

777783
assert {"BigQuery Console #1", "BigQuery Console #2"} == bigquery_task.operator_extra_link_dict.keys()
778784

779-
assert "https://console.cloud.google.com/bigquery?j=123" == bigquery_task.get_extra_links(
780-
ti, "BigQuery Console #1"
785+
assert (
786+
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
787+
== bigquery_task.get_extra_links(ti, "BigQuery Console #1")
781788
)
782789

783-
assert "https://console.cloud.google.com/bigquery?j=45" == bigquery_task.get_extra_links(
784-
ti, "BigQuery Console #2"
790+
assert (
791+
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
792+
== bigquery_task.get_extra_links(ti, "BigQuery Console #2")
785793
)
786794

787795

0 commit comments

Comments
 (0)