Skip to content

Commit fe34582

Browse files
New google operator: SQLToGoogleSheetsOperator (#17887)
1 parent 16b47ce commit fe34582

File tree

5 files changed

+296
-0
lines changed

5 files changed

+296
-0
lines changed

β€Žairflow/providers/google/provider.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,10 @@ transfers:
719719
target-integration-name: Google Spreadsheet
720720
how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/gcs_to_sheets.rst
721721
python-module: airflow.providers.google.suite.transfers.gcs_to_sheets
722+
- source-integration-name: SQL
723+
target-integration-name: Google Spreadsheet
724+
how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/sql_to_sheets.rst
725+
python-module: airflow.providers.google.suite.transfers.sql_to_sheets
722726
- source-integration-name: Local
723727
target-integration-name: Google Cloud Storage (GCS)
724728
how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/local_to_gcs.rst
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
from airflow import models
20+
from airflow.providers.google.suite.transfers.sql_to_sheets import SQLToGoogleSheetsOperator
21+
from airflow.utils.dates import days_ago
22+
23+
SQL = "select 1 as my_col"
24+
NEW_SPREADSHEET_ID = "123"
25+
26+
with models.DAG(
27+
"example_sql_to_sheets",
28+
start_date=days_ago(1),
29+
schedule_interval=None, # Override to match your needs
30+
tags=["example"],
31+
) as dag:
32+
33+
# [START upload_sql_to_sheets]
34+
upload_gcs_to_sheet = SQLToGoogleSheetsOperator(
35+
task_id="upload_sql_to_sheet",
36+
sql=SQL,
37+
sql_conn_id="database_conn_id",
38+
spreadsheet_id=NEW_SPREADSHEET_ID,
39+
)
40+
# [END upload_sql_to_sheets]
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
19+
import datetime
20+
import numbers
21+
from contextlib import closing
22+
from typing import Any, Iterable, Mapping, Optional, Sequence, Union
23+
24+
from airflow.operators.sql import BaseSQLOperator
25+
from airflow.providers.google.suite.hooks.sheets import GSheetsHook
26+
27+
28+
class SQLToGoogleSheetsOperator(BaseSQLOperator):
29+
"""
30+
Copy data from SQL results to provided Google Spreadsheet.
31+
32+
:param sql: The SQL to execute.
33+
:type sql: str
34+
:param spreadsheet_id: The Google Sheet ID to interact with.
35+
:type spreadsheet_id: str
36+
:param conn_id: the connection ID used to connect to the database.
37+
:type sql_conn_id: str
38+
:param parameters: The parameters to render the SQL query with.
39+
:type parameters: dict or iterable
40+
:param database: name of database which overwrite the defined one in connection
41+
:type database: str
42+
:param spreadsheet_range: The A1 notation of the values to retrieve.
43+
:type spreadsheet_range: str
44+
:param gcp_conn_id: The connection ID to use when fetching connection info.
45+
:type gcp_conn_id: str
46+
:param delegate_to: The account to impersonate using domain-wide delegation of authority,
47+
if any. For this to work, the service account making the request must have
48+
domain-wide delegation enabled.
49+
:type delegate_to: str
50+
:param impersonation_chain: Optional service account to impersonate using short-term
51+
credentials, or chained list of accounts required to get the access_token
52+
of the last account in the list, which will be impersonated in the request.
53+
If set as a string, the account must grant the originating account
54+
the Service Account Token Creator IAM role.
55+
If set as a sequence, the identities from the list must grant
56+
Service Account Token Creator IAM role to the directly preceding identity, with first
57+
account from the list granting this role to the originating account (templated).
58+
:type impersonation_chain: Union[str, Sequence[str]]
59+
"""
60+
61+
template_fields = (
62+
"sql",
63+
"spreadsheet_id",
64+
"spreadsheet_range",
65+
"impersonation_chain",
66+
)
67+
68+
template_fields_renderers = {"sql": "sql"}
69+
template_ext = (".sql",)
70+
71+
ui_color = "#a0e08c"
72+
73+
def __init__(
74+
self,
75+
*,
76+
sql: str,
77+
spreadsheet_id: str,
78+
sql_conn_id: str,
79+
parameters: Optional[Union[Mapping, Iterable]] = None,
80+
database: str = None,
81+
spreadsheet_range: str = "Sheet1",
82+
gcp_conn_id: str = "google_cloud_default",
83+
delegate_to: Optional[str] = None,
84+
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
85+
**kwargs,
86+
) -> None:
87+
super().__init__(**kwargs)
88+
89+
self.sql = sql
90+
self.conn_id = sql_conn_id
91+
self.database = database
92+
self.parameters = parameters
93+
self.gcp_conn_id = gcp_conn_id
94+
self.spreadsheet_id = spreadsheet_id
95+
self.spreadsheet_range = spreadsheet_range
96+
self.delegate_to = delegate_to
97+
self.impersonation_chain = impersonation_chain
98+
99+
def _data_prep(self, data):
100+
for row in data:
101+
item_list = []
102+
for item in row:
103+
if isinstance(item, (datetime.date, datetime.datetime)):
104+
item = item.isoformat()
105+
elif isinstance(item, int): # To exclude int from the number check.
106+
pass
107+
elif isinstance(item, numbers.Number):
108+
item = float(item)
109+
item_list.append(item)
110+
yield item_list
111+
112+
def _get_data(self):
113+
hook = self.get_db_hook()
114+
with closing(hook.get_conn()) as conn, closing(conn.cursor()) as cur:
115+
self.log.info("Executing query")
116+
cur.execute(self.sql, self.parameters or ())
117+
118+
yield [field[0] for field in cur.description]
119+
yield from self._data_prep(cur.fetchall())
120+
121+
def execute(self, context: Any) -> None:
122+
self.log.info("Getting data")
123+
values = list(self._get_data())
124+
125+
self.log.info("Connecting to Google")
126+
sheet_hook = GSheetsHook(
127+
gcp_conn_id=self.gcp_conn_id,
128+
delegate_to=self.delegate_to,
129+
impersonation_chain=self.impersonation_chain,
130+
)
131+
132+
self.log.info(f"Uploading data to https://docs.google.com/spreadsheets/d/{self.spreadsheet_id}")
133+
134+
sheet_hook.update_values(
135+
spreadsheet_id=self.spreadsheet_id,
136+
range_=self.spreadsheet_range,
137+
values=values,
138+
)
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
or more contributor license agreements. See the NOTICE file
3+
distributed with this work for additional information
4+
regarding copyright ownership. The ASF licenses this file
5+
to you under the Apache License, Version 2.0 (the
6+
"License"); you may not use this file except in compliance
7+
with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
software distributed under the License is distributed on an
13+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
KIND, either express or implied. See the License for the
15+
specific language governing permissions and limitations
16+
under the License.
17+
18+
SQL to Google Sheets Transfer Operators
19+
========================================================
20+
21+
With `Google Sheets <https://www.google.pl/intl/en/sheets/about/>`__, everyone can work together in the same
22+
spreadsheet at the same time. Use formulas functions, and formatting options to save time and simplify
23+
common spreadsheet tasks.
24+
25+
.. contents::
26+
:depth: 1
27+
:local:
28+
29+
Prerequisite Tasks
30+
^^^^^^^^^^^^^^^^^^
31+
32+
.. include::/operators/_partials/prerequisite_tasks.rst
33+
34+
.. _howto/operator:SQLToGoogleSheets:
35+
36+
Upload data from SQL to Google Sheets
37+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
38+
39+
To upload data from and Database using SQL to Google Spreadsheet you can use the
40+
:class:`~airflow.providers.google.suite.transfers.sql_to_sheets.SQLToGoogleSheetsOperator`.
41+
42+
.. exampleinclude:: /../../airflow/providers/google/suite/example_dags/example_sql_to_sheets.py
43+
:language: python
44+
:dedent: 4
45+
:start-after: [START upload_sql_to_sheets]
46+
:end-before: [END upload_sql_to_sheets]
47+
48+
You can use :ref:`Jinja templating <concepts:jinja-templating>` with
49+
:template-fields:`airflow.providers.google.suite.transfers.sql_to_sheets.SQLToGoogleSheetsOperator`.
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
import unittest
19+
from unittest.mock import Mock, patch
20+
21+
from airflow.providers.google.suite.transfers.sql_to_sheets import SQLToGoogleSheetsOperator
22+
23+
24+
class TestSQLToGoogleSheets(unittest.TestCase):
25+
"""
26+
Test class for SQLToGoogleSheetsOperator
27+
"""
28+
29+
def setUp(self):
30+
"""
31+
setup
32+
"""
33+
34+
self.gcp_conn_id = "test"
35+
self.sql_conn_id = "test"
36+
self.sql = "select 1 as my_col"
37+
self.spreadsheet_id = "1234567890"
38+
self.values = [[1, 2, 3]]
39+
40+
@patch("airflow.providers.google.suite.transfers.sql_to_sheets.GSheetsHook")
41+
def test_execute(self, mock_sheet_hook):
42+
43+
op = SQLToGoogleSheetsOperator(
44+
task_id="test_task",
45+
spreadsheet_id=self.spreadsheet_id,
46+
gcp_conn_id=self.gcp_conn_id,
47+
sql_conn_id=self.sql_conn_id,
48+
sql=self.sql,
49+
)
50+
51+
op._get_data = Mock(return_value=self.values)
52+
53+
op.execute(None)
54+
55+
mock_sheet_hook.assert_called_once_with(
56+
gcp_conn_id=self.gcp_conn_id,
57+
delegate_to=None,
58+
impersonation_chain=None,
59+
)
60+
61+
mock_sheet_hook.return_value.update_values.assert_called_once_with(
62+
spreadsheet_id=self.spreadsheet_id,
63+
range_="Sheet1",
64+
values=self.values,
65+
)

0 commit comments

Comments
 (0)