Skip to content

Commit f1f1940

Browse files
authored
Add DataflowStartSQLQuery operator (#8553)
1 parent 41bf172 commit f1f1940

File tree

6 files changed

+623
-120
lines changed

6 files changed

+623
-120
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
"""
20+
Example Airflow DAG for Google Cloud Dataflow service
21+
"""
22+
import os
23+
24+
from airflow import models
25+
from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator
26+
from airflow.utils.dates import days_ago
27+
28+
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
29+
30+
BQ_SQL_DATASET = os.environ.get("DATAFLOW_BQ_SQL_DATASET", "airflow_dataflow_samples")
31+
BQ_SQL_TABLE_INPUT = os.environ.get("BQ_SQL_TABLE_INPUT", "beam_input")
32+
BQ_SQL_TABLE_OUTPUT = os.environ.get("BQ_SQL_TABLE_OUTPUT", "beam_output")
33+
DATAFLOW_SQL_JOB_NAME = os.environ.get("DATAFLOW_SQL_JOB_NAME", "dataflow-sql")
34+
DATAFLOW_SQL_LOCATION = os.environ.get("DATAFLOW_SQL_LOCATION", "us-west1")
35+
36+
with models.DAG(
37+
dag_id="example_gcp_dataflow_sql",
38+
start_date=days_ago(1),
39+
schedule_interval=None, # Override to match your needs
40+
tags=['example'],
41+
) as dag_sql:
42+
start_sql = DataflowStartSqlJobOperator(
43+
task_id="start_sql_query",
44+
job_name=DATAFLOW_SQL_JOB_NAME,
45+
query=f"""
46+
SELECT
47+
sales_region as sales_region,
48+
count(state_id) as count_state
49+
FROM
50+
bigquery.table.`{GCP_PROJECT_ID}`.`{BQ_SQL_DATASET}`.`{BQ_SQL_TABLE_INPUT}`
51+
WHERE state_id >= @state_id_min
52+
GROUP BY sales_region;
53+
""",
54+
options={
55+
"bigquery-project": GCP_PROJECT_ID,
56+
"bigquery-dataset": BQ_SQL_DATASET,
57+
"bigquery-table": BQ_SQL_TABLE_OUTPUT,
58+
"bigquery-write-disposition": "write-truncate",
59+
"parameter": "state_id_min:INT64:2",
60+
},
61+
location=DATAFLOW_SQL_LOCATION,
62+
do_xcom_push=True,
63+
)

0 commit comments

Comments
 (0)