Skip to content

Commit 943baff

Browse files
authored
Add job labels to bigquery check operators. (#14685)
1 parent 60373eb commit 943baff

File tree

2 files changed

+21
-0
lines changed

2 files changed

+21
-0
lines changed

β€Žairflow/providers/google/cloud/hooks/bigquery.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ def __init__(
8181
bigquery_conn_id: Optional[str] = None,
8282
api_resource_configs: Optional[Dict] = None,
8383
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
84+
labels: Optional[Dict] = None,
8485
) -> None:
8586
# To preserve backward compatibility
8687
# TODO: remove one day
@@ -101,6 +102,7 @@ def __init__(
101102
self.location = location
102103
self.running_job_id = None # type: Optional[str]
103104
self.api_resource_configs = api_resource_configs if api_resource_configs else {} # type Dict
105+
self.labels = labels
104106

105107
def get_conn(self) -> "BigQueryConnection":
106108
"""Returns a BigQuery PEP 249 connection object."""
@@ -2060,6 +2062,7 @@ def run_query(
20602062
if not self.project_id:
20612063
raise ValueError("The project_id should be set")
20622064

2065+
labels = labels or self.labels
20632066
schema_update_options = list(schema_update_options or [])
20642067

20652068
if time_partitioning is None:
@@ -2258,6 +2261,7 @@ def __init__(
22582261
api_resource_configs: Optional[Dict] = None,
22592262
location: Optional[str] = None,
22602263
num_retries: int = 5,
2264+
labels: Optional[Dict] = None,
22612265
) -> None:
22622266

22632267
super().__init__()
@@ -2270,6 +2274,7 @@ def __init__(
22702274
self.running_job_id = None # type: Optional[str]
22712275
self.location = location
22722276
self.num_retries = num_retries
2277+
self.labels = labels
22732278
self.hook = hook
22742279

22752280
def create_empty_table(self, *args, **kwargs) -> None:

β€Žairflow/providers/google/cloud/operators/bigquery.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ def get_db_hook(self) -> BigQueryHook:
9595
use_legacy_sql=self.use_legacy_sql,
9696
location=self.location,
9797
impersonation_chain=self.impersonation_chain,
98+
labels=self.labels,
9899
)
99100

100101

@@ -152,12 +153,15 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin, SQLCheckOperator):
152153
Service Account Token Creator IAM role to the directly preceding identity, with first
153154
account from the list granting this role to the originating account (templated).
154155
:type impersonation_chain: Union[str, Sequence[str]]
156+
:param labels: a dictionary containing labels for the table, passed to BigQuery
157+
:type labels: dict
155158
"""
156159

157160
template_fields = (
158161
'sql',
159162
'gcp_conn_id',
160163
'impersonation_chain',
164+
'labels',
161165
)
162166
template_ext = ('.sql',)
163167
ui_color = BigQueryUIColors.CHECK.value
@@ -172,6 +176,7 @@ def __init__(
172176
use_legacy_sql: bool = True,
173177
location: Optional[str] = None,
174178
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
179+
labels: Optional[dict] = None,
175180
**kwargs,
176181
) -> None:
177182
super().__init__(sql=sql, **kwargs)
@@ -184,6 +189,7 @@ def __init__(
184189
self.use_legacy_sql = use_legacy_sql
185190
self.location = location
186191
self.impersonation_chain = impersonation_chain
192+
self.labels = labels
187193

188194

189195
class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
@@ -216,13 +222,16 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
216222
Service Account Token Creator IAM role to the directly preceding identity, with first
217223
account from the list granting this role to the originating account (templated).
218224
:type impersonation_chain: Union[str, Sequence[str]]
225+
:param labels: a dictionary containing labels for the table, passed to BigQuery
226+
:type labels: dict
219227
"""
220228

221229
template_fields = (
222230
'sql',
223231
'gcp_conn_id',
224232
'pass_value',
225233
'impersonation_chain',
234+
'labels',
226235
)
227236
template_ext = ('.sql',)
228237
ui_color = BigQueryUIColors.CHECK.value
@@ -239,6 +248,7 @@ def __init__(
239248
use_legacy_sql: bool = True,
240249
location: Optional[str] = None,
241250
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
251+
labels: Optional[dict] = None,
242252
**kwargs,
243253
) -> None:
244254
super().__init__(sql=sql, pass_value=pass_value, tolerance=tolerance, **kwargs)
@@ -251,6 +261,7 @@ def __init__(
251261
self.gcp_conn_id = gcp_conn_id
252262
self.use_legacy_sql = use_legacy_sql
253263
self.impersonation_chain = impersonation_chain
264+
self.labels = labels
254265

255266

256267
class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperator):
@@ -296,6 +307,8 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat
296307
Service Account Token Creator IAM role to the directly preceding identity, with first
297308
account from the list granting this role to the originating account (templated).
298309
:type impersonation_chain: Union[str, Sequence[str]]
310+
:param labels: a dictionary containing labels for the table, passed to BigQuery
311+
:type labels: dict
299312
"""
300313

301314
template_fields = (
@@ -304,6 +317,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat
304317
'sql1',
305318
'sql2',
306319
'impersonation_chain',
320+
'labels',
307321
)
308322
ui_color = BigQueryUIColors.CHECK.value
309323

@@ -320,6 +334,7 @@ def __init__(
320334
use_legacy_sql: bool = True,
321335
location: Optional[str] = None,
322336
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
337+
labels: Optional[Dict] = None,
323338
**kwargs,
324339
) -> None:
325340
super().__init__(
@@ -338,6 +353,7 @@ def __init__(
338353
self.use_legacy_sql = use_legacy_sql
339354
self.location = location
340355
self.impersonation_chain = impersonation_chain
356+
self.labels = labels
341357

342358

343359
class BigQueryGetDataOperator(BaseOperator):

0 commit comments

Comments
 (0)