Skip to content

Commit 59c94c6

Browse files
ysktirephraimbuddy
andauthored
Add exists_ok flag to BigQueryCreateEmptyTable(Dataset)Operator (#14026)
Co-authored-by: uma6 <> Co-authored-by: Ephraim Anierobi <splendidzigy24@gmail.com>
1 parent 9536953 commit 59c94c6

File tree

2 files changed

+11
-3
lines changed

2 files changed

+11
-3
lines changed

β€Žairflow/providers/google/cloud/hooks/bigquery.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ def create_empty_dataset(
411411
:param dataset_reference: Dataset reference that could be provided with request body. More info:
412412
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
413413
:type dataset_reference: dict
414-
:param exists_ok: If ``True``, ignore "already exists" errors when creating the DATASET.
414+
:param exists_ok: If ``True``, ignore "already exists" errors when creating the dataset.
415415
:type exists_ok: bool
416416
"""
417417
dataset_reference = dataset_reference or {"datasetReference": {}}

β€Žairflow/providers/google/cloud/operators/bigquery.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -868,6 +868,8 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
868868
Service Account Token Creator IAM role to the directly preceding identity, with first
869869
account from the list granting this role to the originating account (templated).
870870
:type impersonation_chain: Union[str, Sequence[str]]
871+
:param exists_ok: If ``True``, ignore "already exists" errors when creating the table.
872+
:type exists_ok: bool
871873
"""
872874

873875
template_fields = (
@@ -905,6 +907,7 @@ def __init__(
905907
location: Optional[str] = None,
906908
cluster_fields: Optional[List[str]] = None,
907909
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
910+
exists_ok: bool = False,
908911
**kwargs,
909912
) -> None:
910913
super().__init__(**kwargs)
@@ -926,6 +929,7 @@ def __init__(
926929
self.cluster_fields = cluster_fields
927930
self.table_resource = table_resource
928931
self.impersonation_chain = impersonation_chain
932+
self.exists_ok = exists_ok
929933

930934
def execute(self, context) -> None:
931935
bq_hook = BigQueryHook(
@@ -960,7 +964,7 @@ def execute(self, context) -> None:
960964
materialized_view=self.materialized_view,
961965
encryption_configuration=self.encryption_configuration,
962966
table_resource=self.table_resource,
963-
exists_ok=False,
967+
exists_ok=self.exists_ok,
964968
)
965969
self.log.info(
966970
'Table %s.%s.%s created successfully', table.project, table.dataset_id, table.table_id
@@ -1357,6 +1361,8 @@ class BigQueryCreateEmptyDatasetOperator(BaseOperator):
13571361
Service Account Token Creator IAM role to the directly preceding identity, with first
13581362
account from the list granting this role to the originating account (templated).
13591363
:type impersonation_chain: Union[str, Sequence[str]]
1364+
:param exists_ok: If ``True``, ignore "already exists" errors when creating the dataset.
1365+
:type exists_ok: bool
13601366
**Example**: ::
13611367
13621368
create_new_dataset = BigQueryCreateEmptyDatasetOperator(
@@ -1389,6 +1395,7 @@ def __init__(
13891395
bigquery_conn_id: Optional[str] = None,
13901396
delegate_to: Optional[str] = None,
13911397
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
1398+
exists_ok: bool = False,
13921399
**kwargs,
13931400
) -> None:
13941401

@@ -1408,6 +1415,7 @@ def __init__(
14081415
self.dataset_reference = dataset_reference if dataset_reference else {}
14091416
self.delegate_to = delegate_to
14101417
self.impersonation_chain = impersonation_chain
1418+
self.exists_ok = exists_ok
14111419

14121420
super().__init__(**kwargs)
14131421

@@ -1425,7 +1433,7 @@ def execute(self, context) -> None:
14251433
dataset_id=self.dataset_id,
14261434
dataset_reference=self.dataset_reference,
14271435
location=self.location,
1428-
exists_ok=False,
1436+
exists_ok=self.exists_ok,
14291437
)
14301438
except Conflict:
14311439
dataset_id = self.dataset_reference.get("datasetReference", {}).get("datasetId", self.dataset_id)

0 commit comments

Comments
 (0)