Skip to content

Commit 1d12c34

Browse files
authored
Refactor BigQuery check operators (#8813)
* Refactor BigQuery check operators This commit applies some code formatting to existing BigQuery check operators. It also adds location parameter to BigQueryIntervalCheckOperator and BigQueryValueCheckOperator. * fixup! Refactor BigQuery check operators
1 parent 7533378 commit 1d12c34

File tree

1 file changed

+79
-50
lines changed

1 file changed

+79
-50
lines changed

β€Žairflow/providers/google/cloud/operators/bigquery.py

Lines changed: 79 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838

3939
BIGQUERY_JOB_DETAILS_LINK_FMT = 'https://console.cloud.google.com/bigquery?j={job_id}'
4040

41+
_DEPRECATION_MSG = "The bigquery_conn_id parameter has been deprecated. " \
42+
"You should pass the gcp_conn_id parameter."
43+
4144

4245
class BigQueryUIColors(enum.Enum):
4346
"""Hex colors for BigQuery operators"""
@@ -120,8 +123,7 @@ class BigQueryCheckOperator(CheckOperator):
120123
:param use_legacy_sql: Whether to use legacy SQL (true)
121124
or standard SQL (false).
122125
:type use_legacy_sql: bool
123-
:param location: The geographic location of the job. Required except for
124-
US and EU. See details at
126+
:param location: The geographic location of the job. See details at:
125127
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
126128
:type location: str
127129
"""
@@ -131,29 +133,32 @@ class BigQueryCheckOperator(CheckOperator):
131133
ui_color = BigQueryUIColors.CHECK.value
132134

133135
@apply_defaults
134-
def __init__(self,
135-
sql: str,
136-
gcp_conn_id: str = 'google_cloud_default',
137-
bigquery_conn_id: Optional[str] = None,
138-
use_legacy_sql: bool = True,
139-
location=None,
140-
*args, **kwargs) -> None:
136+
def __init__(
137+
self,
138+
sql: str,
139+
gcp_conn_id: str = 'google_cloud_default',
140+
bigquery_conn_id: Optional[str] = None,
141+
use_legacy_sql: bool = True,
142+
location: Optional[str] = None,
143+
*args,
144+
**kwargs,
145+
) -> None:
141146
super().__init__(sql=sql, *args, **kwargs)
142147
if bigquery_conn_id:
143-
warnings.warn(
144-
"The bigquery_conn_id parameter has been deprecated. You should pass "
145-
"the gcp_conn_id parameter.", DeprecationWarning, stacklevel=3)
148+
warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
146149
gcp_conn_id = bigquery_conn_id # type: ignore
147150

148151
self.gcp_conn_id = gcp_conn_id
149152
self.sql = sql
150153
self.use_legacy_sql = use_legacy_sql
151154
self.location = location
152155

153-
def get_db_hook(self):
154-
return BigQueryHook(bigquery_conn_id=self.gcp_conn_id,
155-
use_legacy_sql=self.use_legacy_sql,
156-
location=self.location)
156+
def get_db_hook(self) -> BigQueryHook:
157+
return BigQueryHook(
158+
gcp_conn_id=self.gcp_conn_id,
159+
use_legacy_sql=self.use_legacy_sql,
160+
location=self.location
161+
)
157162

158163

159164
class BigQueryValueCheckOperator(ValueCheckOperator):
@@ -170,36 +175,49 @@ class BigQueryValueCheckOperator(ValueCheckOperator):
170175
:param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud Platform.
171176
This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
172177
:type bigquery_conn_id: str
178+
:param location: The geographic location of the job. See details at:
179+
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
180+
:type location: str
173181
"""
174182

175183
template_fields = ('sql', 'gcp_conn_id', 'pass_value',)
176184
template_ext = ('.sql',)
177185
ui_color = BigQueryUIColors.CHECK.value
178186

179187
@apply_defaults
180-
def __init__(self, sql: str,
181-
pass_value: Any,
182-
tolerance: Any = None,
183-
gcp_conn_id: str = 'google_cloud_default',
184-
bigquery_conn_id: Optional[str] = None,
185-
use_legacy_sql: bool = True,
186-
*args, **kwargs) -> None:
188+
def __init__(
189+
self,
190+
sql: str,
191+
pass_value: Any,
192+
tolerance: Any = None,
193+
gcp_conn_id: str = 'google_cloud_default',
194+
bigquery_conn_id: Optional[str] = None,
195+
use_legacy_sql: bool = True,
196+
location: Optional[str] = None,
197+
*args,
198+
**kwargs,
199+
) -> None:
187200
super().__init__(
188-
sql=sql, pass_value=pass_value, tolerance=tolerance,
189-
*args, **kwargs)
201+
sql=sql,
202+
pass_value=pass_value,
203+
tolerance=tolerance,
204+
*args, **kwargs
205+
)
190206

191207
if bigquery_conn_id:
192-
warnings.warn(
193-
"The bigquery_conn_id parameter has been deprecated. You should pass "
194-
"the gcp_conn_id parameter.", DeprecationWarning, stacklevel=3)
208+
warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
195209
gcp_conn_id = bigquery_conn_id
196210

211+
self.location = location
197212
self.gcp_conn_id = gcp_conn_id
198213
self.use_legacy_sql = use_legacy_sql
199214

200-
def get_db_hook(self):
201-
return BigQueryHook(bigquery_conn_id=self.gcp_conn_id,
202-
use_legacy_sql=self.use_legacy_sql)
215+
def get_db_hook(self) -> BigQueryHook:
216+
return BigQueryHook(
217+
gcp_conn_id=self.gcp_conn_id,
218+
use_legacy_sql=self.use_legacy_sql,
219+
location=self.location
220+
)
203221

204222

205223
class BigQueryIntervalCheckOperator(IntervalCheckOperator):
@@ -229,39 +247,50 @@ class BigQueryIntervalCheckOperator(IntervalCheckOperator):
229247
:param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud Platform.
230248
This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
231249
:type bigquery_conn_id: str
250+
:param location: The geographic location of the job. See details at:
251+
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
252+
:type location: str
232253
"""
233254

234255
template_fields = ('table', 'gcp_conn_id', 'sql1', 'sql2')
235256
ui_color = BigQueryUIColors.CHECK.value
236257

237258
@apply_defaults
238-
def __init__(self,
239-
table: str,
240-
metrics_thresholds: dict,
241-
date_filter_column: str = 'ds',
242-
days_back: SupportsAbs[int] = -7,
243-
gcp_conn_id: str = 'google_cloud_default',
244-
bigquery_conn_id: Optional[str] = None,
245-
use_legacy_sql: bool = True,
246-
*args,
247-
**kwargs) -> None:
259+
def __init__(
260+
self,
261+
table: str,
262+
metrics_thresholds: dict,
263+
date_filter_column: str = 'ds',
264+
days_back: SupportsAbs[int] = -7,
265+
gcp_conn_id: str = 'google_cloud_default',
266+
bigquery_conn_id: Optional[str] = None,
267+
use_legacy_sql: bool = True,
268+
location: Optional[str] = None,
269+
*args,
270+
**kwargs,
271+
) -> None:
248272
super().__init__(
249-
table=table, metrics_thresholds=metrics_thresholds,
250-
date_filter_column=date_filter_column, days_back=days_back,
251-
*args, **kwargs)
273+
table=table,
274+
metrics_thresholds=metrics_thresholds,
275+
date_filter_column=date_filter_column,
276+
days_back=days_back,
277+
*args, **kwargs
278+
)
252279

253280
if bigquery_conn_id:
254-
warnings.warn(
255-
"The bigquery_conn_id parameter has been deprecated. You should pass "
256-
"the gcp_conn_id parameter.", DeprecationWarning, stacklevel=3)
281+
warnings.warn(_DEPRECATION_MSG, DeprecationWarning, stacklevel=3)
257282
gcp_conn_id = bigquery_conn_id
258283

259284
self.gcp_conn_id = gcp_conn_id
260285
self.use_legacy_sql = use_legacy_sql
286+
self.location = location
261287

262-
def get_db_hook(self):
263-
return BigQueryHook(bigquery_conn_id=self.gcp_conn_id,
264-
use_legacy_sql=self.use_legacy_sql)
288+
def get_db_hook(self) -> BigQueryHook:
289+
return BigQueryHook(
290+
gcp_conn_id=self.gcp_conn_id,
291+
use_legacy_sql=self.use_legacy_sql,
292+
location=self.location,
293+
)
265294

266295

267296
class BigQueryGetDataOperator(BaseOperator):

0 commit comments

Comments
 (0)