|
23 | 23 | from datetime import datetime
|
24 | 24 |
|
25 | 25 | from airflow import models
|
| 26 | +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator |
26 | 27 | from airflow.providers.google.marketing_platform.operators.campaign_manager import (
|
27 | 28 | GoogleCampaignManagerBatchInsertConversionsOperator,
|
28 | 29 | GoogleCampaignManagerBatchUpdateConversionsOperator,
|
|
34 | 35 | from airflow.providers.google.marketing_platform.sensors.campaign_manager import (
|
35 | 36 | GoogleCampaignManagerReportSensor,
|
36 | 37 | )
|
| 38 | +from airflow.utils.trigger_rule import TriggerRule |
| 39 | + |
| 40 | +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") |
| 41 | +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") |
| 42 | + |
| 43 | +DAG_ID = "example_campaign_manager" |
37 | 44 |
|
38 | 45 | PROFILE_ID = os.environ.get("MARKETING_PROFILE_ID", "123456789")
|
39 | 46 | FLOODLIGHT_ACTIVITY_ID = int(os.environ.get("FLOODLIGHT_ACTIVITY_ID", 12345))
|
40 | 47 | FLOODLIGHT_CONFIGURATION_ID = int(os.environ.get("FLOODLIGHT_CONFIGURATION_ID", 12345))
|
41 | 48 | ENCRYPTION_ENTITY_ID = int(os.environ.get("ENCRYPTION_ENTITY_ID", 12345))
|
42 | 49 | DEVICE_ID = os.environ.get("DEVICE_ID", "12345")
|
43 |
| -BUCKET = os.environ.get("MARKETING_BUCKET", "test-cm-bucket") |
44 |
| -REPORT_NAME = "test-report" |
| 50 | +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" |
| 51 | +REPORT_NAME = f"report_{DAG_ID}_{ENV_ID}" |
45 | 52 | REPORT = {
|
46 | 53 | "type": "STANDARD",
|
47 | 54 | "name": REPORT_NAME,
|
|
84 | 91 | }
|
85 | 92 |
|
86 | 93 | with models.DAG(
|
87 |
| - "example_campaign_manager", |
| 94 | + DAG_ID, |
88 | 95 | schedule_interval='@once', # Override to match your needs,
|
89 | 96 | start_date=datetime(2021, 1, 1),
|
90 | 97 | catchup=False,
|
| 98 | + tags=["example", "campaign"], |
91 | 99 | ) as dag:
|
| 100 | + create_bucket = GCSCreateBucketOperator( |
| 101 | + task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID |
| 102 | + ) |
| 103 | + |
92 | 104 | # [START howto_campaign_manager_insert_report_operator]
|
93 | 105 | create_report = GoogleCampaignManagerInsertReportOperator(
|
94 | 106 | profile_id=PROFILE_ID, report=REPORT, task_id="create_report"
|
|
119 | 131 | report_id=report_id,
|
120 | 132 | file_id=file_id,
|
121 | 133 | report_name="test_report.csv",
|
122 |
| - bucket_name=BUCKET, |
| 134 | + bucket_name=BUCKET_NAME, |
123 | 135 | )
|
124 | 136 | # [END howto_campaign_manager_get_report_operator]
|
125 | 137 |
|
126 | 138 | # [START howto_campaign_manager_delete_report_operator]
|
127 | 139 | delete_report = GoogleCampaignManagerDeleteReportOperator(
|
128 |
| - profile_id=PROFILE_ID, report_name=REPORT_NAME, task_id="delete_report" |
| 140 | + profile_id=PROFILE_ID, |
| 141 | + report_name=REPORT_NAME, |
| 142 | + task_id="delete_report", |
| 143 | + trigger_rule=TriggerRule.ALL_DONE, |
129 | 144 | )
|
130 | 145 | # [END howto_campaign_manager_delete_report_operator]
|
131 | 146 |
|
132 |
| - wait_for_report >> get_report >> delete_report |
133 |
| - |
134 |
| - # Task dependencies created via `XComArgs`: |
135 |
| - # create_report >> run_report |
136 |
| - # create_report >> wait_for_report |
137 |
| - # create_report >> get_report |
138 |
| - # run_report >> get_report |
139 |
| - # run_report >> wait_for_report |
| 147 | + delete_bucket = GCSDeleteBucketOperator( |
| 148 | + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE |
| 149 | + ) |
140 | 150 |
|
141 | 151 | # [START howto_campaign_manager_insert_conversions]
|
142 | 152 | insert_conversion = GoogleCampaignManagerBatchInsertConversionsOperator(
|
|
161 | 171 | )
|
162 | 172 | # [END howto_campaign_manager_update_conversions]
|
163 | 173 |
|
164 |
| - insert_conversion >> update_conversion |
| 174 | + ( |
| 175 | + # TEST SETUP |
| 176 | + create_bucket |
| 177 | + >> create_report |
| 178 | + # TEST BODY |
| 179 | + >> run_report |
| 180 | + >> wait_for_report |
| 181 | + >> get_report |
| 182 | + >> insert_conversion |
| 183 | + >> update_conversion |
| 184 | + # TEST TEARDOWN |
| 185 | + >> delete_report |
| 186 | + >> delete_bucket |
| 187 | + ) |
| 188 | + |
| 189 | + from tests.system.utils.watcher import watcher |
| 190 | + |
| 191 | + # This test needs watcher in order to properly mark success/failure |
| 192 | + # when "tearDown" task with trigger rule is part of the DAG |
| 193 | + list(dag.tasks) >> watcher() |
165 | 194 |
|
| 195 | +from tests.system.utils import get_test_run # noqa: E402 |
166 | 196 |
|
167 |
| -if __name__ == "__main__": |
168 |
| - dag.clear() |
169 |
| - dag.run() |
| 197 | +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) |
| 198 | +test_run = get_test_run(dag) |
0 commit comments