Skip to content

Commit 511d0ee

Browse files
author
Wojciech Januszek
authored
Bigquery assets (#23165)
1 parent a914ec2 commit 511d0ee

File tree

12 files changed

+240
-45
lines changed

12 files changed

+240
-45
lines changed

β€Žairflow/providers/google/cloud/hooks/bigquery.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ def create_empty_dataset(
408408
location: Optional[str] = None,
409409
dataset_reference: Optional[Dict[str, Any]] = None,
410410
exists_ok: bool = True,
411-
) -> None:
411+
) -> Dict[str, Any]:
412412
"""
413413
Create a new empty dataset:
414414
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/insert
@@ -452,8 +452,11 @@ def create_empty_dataset(
452452

453453
dataset: Dataset = Dataset.from_api_repr(dataset_reference)
454454
self.log.info('Creating dataset: %s in project: %s ', dataset.dataset_id, dataset.project)
455-
self.get_client(location=location).create_dataset(dataset=dataset, exists_ok=exists_ok)
455+
dataset_object = self.get_client(location=location).create_dataset(
456+
dataset=dataset, exists_ok=exists_ok
457+
)
456458
self.log.info('Dataset created successfully.')
459+
return dataset_object.to_api_repr()
457460

458461
@GoogleBaseHook.fallback_to_default_project_id
459462
def get_dataset_tables(
@@ -533,7 +536,7 @@ def create_external_table(
533536
encryption_configuration: Optional[Dict] = None,
534537
location: Optional[str] = None,
535538
project_id: Optional[str] = None,
536-
) -> None:
539+
) -> Table:
537540
"""
538541
Creates a new external table in the dataset with the data from Google
539542
Cloud Storage. See here:
@@ -659,10 +662,11 @@ def create_external_table(
659662
table.encryption_configuration = EncryptionConfiguration.from_api_repr(encryption_configuration)
660663

661664
self.log.info('Creating external table: %s', external_project_dataset_table)
662-
self.create_empty_table(
665+
table_object = self.create_empty_table(
663666
table_resource=table.to_api_repr(), project_id=project_id, location=location, exists_ok=True
664667
)
665668
self.log.info('External table created successfully: %s', external_project_dataset_table)
669+
return table_object
666670

667671
@GoogleBaseHook.fallback_to_default_project_id
668672
def update_table(
@@ -1287,7 +1291,7 @@ def update_table_schema(
12871291
dataset_id: str,
12881292
table_id: str,
12891293
project_id: Optional[str] = None,
1290-
) -> None:
1294+
) -> Dict[str, Any]:
12911295
"""
12921296
Update fields within a schema for a given dataset and table. Note that
12931297
some fields in schemas are immutable and trying to change them will cause
@@ -1361,13 +1365,14 @@ def _remove_policy_tags(schema: List[Dict[str, Any]]):
13611365
if not include_policy_tags:
13621366
_remove_policy_tags(new_schema)
13631367

1364-
self.update_table(
1368+
table = self.update_table(
13651369
table_resource={"schema": {"fields": new_schema}},
13661370
fields=["schema"],
13671371
project_id=project_id,
13681372
dataset_id=dataset_id,
13691373
table_id=table_id,
13701374
)
1375+
return table
13711376

13721377
@GoogleBaseHook.fallback_to_default_project_id
13731378
def poll_job_complete(
@@ -2244,7 +2249,7 @@ def create_empty_table(self, *args, **kwargs) -> None:
22442249
)
22452250
return self.hook.create_empty_table(*args, **kwargs)
22462251

2247-
def create_empty_dataset(self, *args, **kwargs) -> None:
2252+
def create_empty_dataset(self, *args, **kwargs) -> Dict[str, Any]:
22482253
"""
22492254
This method is deprecated.
22502255
Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.create_empty_dataset`
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
"""This module contains Google BigQuery links."""
19+
from typing import TYPE_CHECKING
20+
21+
from airflow.models import BaseOperator
22+
from airflow.providers.google.cloud.links.base import BaseGoogleLink
23+
24+
if TYPE_CHECKING:
25+
from airflow.utils.context import Context
26+
27+
BIGQUERY_BASE_LINK = "https://console.cloud.google.com/bigquery"
28+
BIGQUERY_DATASET_LINK = (
29+
BIGQUERY_BASE_LINK + "?referrer=search&project={project_id}&d={dataset_id}&p={project_id}&page=dataset"
30+
)
31+
BIGQUERY_TABLE_LINK = (
32+
BIGQUERY_BASE_LINK
33+
+ "?referrer=search&project={project_id}&d={dataset_id}&p={project_id}&page=table&t={table_id}"
34+
)
35+
36+
37+
class BigQueryDatasetLink(BaseGoogleLink):
38+
"""Helper class for constructing BigQuery Dataset Link"""
39+
40+
name = "BigQuery Dataset"
41+
key = "bigquery_dataset"
42+
format_str = BIGQUERY_DATASET_LINK
43+
44+
@staticmethod
45+
def persist(
46+
context: "Context",
47+
task_instance: BaseOperator,
48+
dataset_id: str,
49+
project_id: str,
50+
):
51+
task_instance.xcom_push(
52+
context,
53+
key=BigQueryDatasetLink.key,
54+
value={"dataset_id": dataset_id, "project_id": project_id},
55+
)
56+
57+
58+
class BigQueryTableLink(BaseGoogleLink):
59+
"""Helper class for constructing BigQuery Table Link"""
60+
61+
name = "BigQuery Table"
62+
key = "bigquery_table"
63+
format_str = BIGQUERY_TABLE_LINK
64+
65+
@staticmethod
66+
def persist(
67+
context: "Context",
68+
task_instance: BaseOperator,
69+
dataset_id: str,
70+
project_id: str,
71+
table_id: str,
72+
):
73+
task_instance.xcom_push(
74+
context,
75+
key=BigQueryTableLink.key,
76+
value={"dataset_id": dataset_id, "project_id": project_id, "table_id": table_id},
77+
)

0 commit comments

Comments
 (0)