Skip to content

Commit 6ef44b6

Browse files
authored
Clean-up of google cloud example dags - batch 2 (#19527)
- Use static start_date - Use catchup=False - Tidy up the chaining of tasks in some cases - Remove unnecessary specification of default conn ids
1 parent 4c495ca commit 6ef44b6

25 files changed

+164
-88
lines changed

β€Žairflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@
3737
'retry_delay': timedelta(minutes=5),
3838
},
3939
schedule_interval='@once',
40-
start_date=datetime(2018, 11, 1),
40+
start_date=datetime(2021, 1, 1),
41+
catchup=False,
4142
tags=['example'],
4243
) as dag:
4344
# [START howto_operator_azure_fileshare_to_gcs_basic]
@@ -46,8 +47,6 @@
4647
share_name=AZURE_SHARE_NAME,
4748
dest_gcs=DEST_GCS_BUCKET,
4849
directory_name=AZURE_DIRECTORY_NAME,
49-
azure_fileshare_conn_id='azure_fileshare_default',
50-
gcp_conn_id='google_cloud_default',
5150
replace=False,
5251
gzip=True,
5352
google_impersonation_chain=None,

β€Žairflow/providers/google/cloud/example_dags/example_compute.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,15 @@
3030
"""
3131

3232
import os
33+
from datetime import datetime
3334

3435
from airflow import models
36+
from airflow.models.baseoperator import chain
3537
from airflow.providers.google.cloud.operators.compute import (
3638
ComputeEngineSetMachineTypeOperator,
3739
ComputeEngineStartInstanceOperator,
3840
ComputeEngineStopInstanceOperator,
3941
)
40-
from airflow.utils.dates import days_ago
4142

4243
# [START howto_operator_gce_args_common]
4344
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
@@ -52,7 +53,8 @@
5253
with models.DAG(
5354
'example_gcp_compute',
5455
schedule_interval='@once', # Override to match your needs
55-
start_date=days_ago(1),
56+
start_date=datetime(2021, 1, 1),
57+
catchup=False,
5658
tags=['example'],
5759
) as dag:
5860
# [START howto_operator_gce_start]
@@ -96,5 +98,11 @@
9698
)
9799
# [END howto_operator_gce_set_machine_type_no_project_id]
98100

99-
gce_instance_start >> gce_instance_start2 >> gce_instance_stop >> gce_instance_stop2
100-
gce_instance_stop2 >> gce_set_machine_type >> gce_set_machine_type2
101+
chain(
102+
gce_instance_start,
103+
gce_instance_start2,
104+
gce_instance_stop,
105+
gce_instance_stop2,
106+
gce_set_machine_type,
107+
gce_set_machine_type2,
108+
)

β€Žairflow/providers/google/cloud/example_dags/example_compute_igm.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,14 @@
3939
"""
4040

4141
import os
42+
from datetime import datetime
4243

4344
from airflow import models
45+
from airflow.models.baseoperator import chain
4446
from airflow.providers.google.cloud.operators.compute import (
4547
ComputeEngineCopyInstanceTemplateOperator,
4648
ComputeEngineInstanceGroupUpdateManagerTemplateOperator,
4749
)
48-
from airflow.utils.dates import days_ago
4950

5051
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
5152
GCE_ZONE = os.environ.get('GCE_ZONE', 'europe-west1-b')
@@ -92,7 +93,8 @@
9293
with models.DAG(
9394
'example_gcp_compute_igm',
9495
schedule_interval='@once', # Override to match your needs
95-
start_date=days_ago(1),
96+
start_date=datetime(2021, 1, 1),
97+
catchup=False,
9698
tags=['example'],
9799
) as dag:
98100
# [START howto_operator_gce_igm_copy_template]
@@ -133,5 +135,9 @@
133135
)
134136
# [END howto_operator_gce_igm_update_template_no_project_id]
135137

136-
gce_instance_template_copy >> gce_instance_template_copy2 >> gce_instance_group_manager_update_template
137-
gce_instance_group_manager_update_template >> gce_instance_group_manager_update_template2
138+
chain(
139+
gce_instance_template_copy,
140+
gce_instance_template_copy2,
141+
gce_instance_group_manager_update_template,
142+
gce_instance_group_manager_update_template2,
143+
)

β€Žairflow/providers/google/cloud/example_dags/example_compute_ssh.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
# under the License.
1717

1818
import os
19+
from datetime import datetime
1920

2021
from airflow import models
2122
from airflow.providers.google.cloud.hooks.compute_ssh import ComputeEngineSSHHook
2223
from airflow.providers.ssh.operators.ssh import SSHOperator
23-
from airflow.utils import dates
2424

2525
# [START howto_operator_gce_args_common]
2626
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
@@ -30,8 +30,9 @@
3030

3131
with models.DAG(
3232
'example_compute_ssh',
33-
default_args=dict(start_date=dates.days_ago(1)),
3433
schedule_interval='@once', # Override to match your needs
34+
start_date=datetime(2021, 1, 1),
35+
catchup=False,
3536
tags=['example'],
3637
) as dag:
3738
# # [START howto_execute_command_on_remote1]

β€Žairflow/providers/google/cloud/example_dags/example_datacatalog.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
Example Airflow DAG that interacts with Google Data Catalog service
2121
"""
2222
import os
23+
from datetime import datetime
2324

2425
from google.cloud.datacatalog_v1beta1 import FieldType, TagField, TagTemplateField
2526

@@ -49,7 +50,6 @@
4950
CloudDataCatalogUpdateTagTemplateFieldOperator,
5051
CloudDataCatalogUpdateTagTemplateOperator,
5152
)
52-
from airflow.utils.dates import days_ago
5353

5454
PROJECT_ID = os.getenv("GCP_PROJECT_ID")
5555
BUCKET_ID = os.getenv("GCP_TEST_DATA_BUCKET", "INVALID BUCKET NAME")
@@ -61,7 +61,12 @@
6161
FIELD_NAME_2 = "second"
6262
FIELD_NAME_3 = "first-rename"
6363

64-
with models.DAG("example_gcp_datacatalog", schedule_interval='@once', start_date=days_ago(1)) as dag:
64+
with models.DAG(
65+
"example_gcp_datacatalog",
66+
schedule_interval='@once',
67+
start_date=datetime(2021, 1, 1),
68+
catchup=False,
69+
) as dag:
6570
# Create
6671
# [START howto_operator_gcp_datacatalog_create_entry_group]
6772
create_entry_group = CloudDataCatalogCreateEntryGroupOperator(

β€Žairflow/providers/google/cloud/example_dags/example_dataflow.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
Example Airflow DAG for Google Cloud Dataflow service
2121
"""
2222
import os
23+
from datetime import datetime
2324
from typing import Callable, Dict, List
2425
from urllib.parse import urlparse
2526

@@ -41,7 +42,8 @@
4142
DataflowJobStatusSensor,
4243
)
4344
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
44-
from airflow.utils.dates import days_ago
45+
46+
START_DATE = datetime(2021, 1, 1)
4547

4648
GCS_TMP = os.environ.get('GCP_DATAFLOW_GCS_TMP', 'gs://INVALID BUCKET NAME/temp/')
4749
GCS_STAGING = os.environ.get('GCP_DATAFLOW_GCS_STAGING', 'gs://INVALID BUCKET NAME/staging/')
@@ -63,7 +65,8 @@
6365
with models.DAG(
6466
"example_gcp_dataflow_native_java",
6567
schedule_interval='@once', # Override to match your needs
66-
start_date=days_ago(1),
68+
start_date=START_DATE,
69+
catchup=False,
6770
tags=['example'],
6871
) as dag_native_java:
6972

@@ -110,7 +113,8 @@
110113
with models.DAG(
111114
"example_gcp_dataflow_native_python",
112115
default_args=default_args,
113-
start_date=days_ago(1),
116+
start_date=START_DATE,
117+
catchup=False,
114118
schedule_interval='@once', # Override to match your needs
115119
tags=['example'],
116120
) as dag_native_python:
@@ -145,7 +149,8 @@
145149
with models.DAG(
146150
"example_gcp_dataflow_native_python_async",
147151
default_args=default_args,
148-
start_date=days_ago(1),
152+
start_date=START_DATE,
153+
catchup=False,
149154
schedule_interval='@once', # Override to match your needs
150155
tags=['example'],
151156
) as dag_native_python_async:
@@ -246,7 +251,8 @@ def check_autoscaling_event(autoscaling_events: List[dict]) -> bool:
246251
with models.DAG(
247252
"example_gcp_dataflow_template",
248253
default_args=default_args,
249-
start_date=days_ago(1),
254+
start_date=START_DATE,
255+
catchup=False,
250256
schedule_interval='@once', # Override to match your needs
251257
tags=['example'],
252258
) as dag_template:

β€Žairflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
Example Airflow DAG for Google Cloud Dataflow service
2121
"""
2222
import os
23+
from datetime import datetime
2324

2425
from airflow import models
2526
from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
26-
from airflow.utils.dates import days_ago
2727

2828
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
2929

@@ -45,7 +45,8 @@
4545

4646
with models.DAG(
4747
dag_id="example_gcp_dataflow_flex_template_java",
48-
start_date=days_ago(1),
48+
start_date=datetime(2021, 1, 1),
49+
catchup=False,
4950
schedule_interval='@once', # Override to match your needs
5051
) as dag_flex_template:
5152
# [START howto_operator_start_template_job]

β€Žairflow/providers/google/cloud/example_dags/example_dataflow_sql.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
Example Airflow DAG for Google Cloud Dataflow service
2121
"""
2222
import os
23+
from datetime import datetime
2324

2425
from airflow import models
2526
from airflow.providers.google.cloud.operators.dataflow import DataflowStartSqlJobOperator
26-
from airflow.utils.dates import days_ago
2727

2828
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
2929

@@ -36,7 +36,8 @@
3636

3737
with models.DAG(
3838
dag_id="example_gcp_dataflow_sql",
39-
start_date=days_ago(1),
39+
start_date=datetime(2021, 1, 1),
40+
catchup=False,
4041
schedule_interval='@once', # Override to match your needs
4142
tags=['example'],
4243
) as dag_sql:

β€Žairflow/providers/google/cloud/example_dags/example_datafusion.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
Example Airflow DAG that shows how to use DataFusion.
2020
"""
2121
import os
22+
from datetime import datetime
2223

2324
from airflow import models
2425
from airflow.operators.bash import BashOperator
@@ -35,7 +36,6 @@
3536
CloudDataFusionUpdateInstanceOperator,
3637
)
3738
from airflow.providers.google.cloud.sensors.datafusion import CloudDataFusionPipelineStateSensor
38-
from airflow.utils import dates
3939
from airflow.utils.state import State
4040

4141
# [START howto_data_fusion_env_variables]
@@ -153,7 +153,8 @@
153153
with models.DAG(
154154
"example_data_fusion",
155155
schedule_interval='@once', # Override to match your needs
156-
start_date=dates.days_ago(1),
156+
start_date=datetime(2021, 1, 1),
157+
catchup=False,
157158
) as dag:
158159
# [START howto_cloud_data_fusion_create_instance_operator]
159160
create_instance = CloudDataFusionCreateInstanceOperator(

β€Žairflow/providers/google/cloud/example_dags/example_dataprep.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
Example Airflow DAG that shows how to use Google Dataprep.
1919
"""
2020
import os
21+
from datetime import datetime
2122

2223
from airflow import models
2324
from airflow.providers.google.cloud.operators.dataprep import (
2425
DataprepGetJobGroupOperator,
2526
DataprepGetJobsForJobGroupOperator,
2627
DataprepRunJobGroupOperator,
2728
)
28-
from airflow.utils import dates
2929

3030
DATAPREP_JOB_ID = int(os.environ.get('DATAPREP_JOB_ID', 12345677))
3131
DATAPREP_JOB_RECIPE_ID = int(os.environ.get('DATAPREP_JOB_RECIPE_ID', 12345677))
@@ -53,7 +53,8 @@
5353
with models.DAG(
5454
"example_dataprep",
5555
schedule_interval='@once',
56-
start_date=dates.days_ago(1), # Override to match your needs
56+
start_date=datetime(2021, 1, 1), # Override to match your needs
57+
catchup=False,
5758
) as dag:
5859
# [START how_to_dataprep_run_job_group_operator]
5960
run_job_group = DataprepRunJobGroupOperator(task_id="run_job_group", body_request=DATA)

0 commit comments

Comments
 (0)