|
29 | 29 | from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
|
30 | 30 | from airflow.providers.google.marketing_platform.hooks.display_video import GoogleDisplayVideo360Hook
|
31 | 31 | from airflow.providers.google.marketing_platform.operators.display_video import (
|
| 32 | + GoogleDisplayVideo360CreateQueryOperator, |
32 | 33 | GoogleDisplayVideo360CreateReportOperator,
|
33 | 34 | GoogleDisplayVideo360CreateSDFDownloadTaskOperator,
|
34 | 35 | GoogleDisplayVideo360DeleteReportOperator,
|
35 | 36 | GoogleDisplayVideo360DownloadLineItemsOperator,
|
36 | 37 | GoogleDisplayVideo360DownloadReportOperator,
|
| 38 | + GoogleDisplayVideo360DownloadReportV2Operator, |
| 39 | + GoogleDisplayVideo360RunQueryOperator, |
37 | 40 | GoogleDisplayVideo360RunReportOperator,
|
38 | 41 | GoogleDisplayVideo360SDFtoGCSOperator,
|
39 | 42 | GoogleDisplayVideo360UploadLineItemsOperator,
|
40 | 43 | )
|
41 | 44 | from airflow.providers.google.marketing_platform.sensors.display_video import (
|
42 | 45 | GoogleDisplayVideo360GetSDFDownloadOperationSensor,
|
43 | 46 | GoogleDisplayVideo360ReportSensor,
|
| 47 | + GoogleDisplayVideo360RunQuerySensor, |
44 | 48 | )
|
45 | 49 |
|
46 | 50 | # [START howto_display_video_env_variables]
|
|
50 | 54 | PATH_TO_UPLOAD_FILE = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt")
|
51 | 55 | PATH_TO_SAVED_FILE = os.environ.get("GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt")
|
52 | 56 | BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
|
53 |
| -SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_1") |
| 57 | +SDF_VERSION = os.environ.get("GMP_SDF_VERSION", "SDF_VERSION_5_5") |
54 | 58 | BQ_DATA_SET = os.environ.get("GMP_BQ_DATA_SET", "airflow_test")
|
55 | 59 | GMP_PARTNER_ID = os.environ.get("GMP_PARTNER_ID", 123)
|
56 | 60 | ENTITY_TYPE = os.environ.get("GMP_ENTITY_TYPE", "LineItem")
|
|
74 | 78 | "schedule": {"frequency": "ONE_TIME"},
|
75 | 79 | }
|
76 | 80 |
|
77 |
| -PARAMETERS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"} |
| 81 | +REPORT_V2 = { |
| 82 | + "metadata": { |
| 83 | + "title": "Airflow Test Report", |
| 84 | + "dataRange": {"range": "LAST_7_DAYS"}, |
| 85 | + "format": "CSV", |
| 86 | + "sendNotification": False, |
| 87 | + }, |
| 88 | + "params": { |
| 89 | + "type": "STANDARD", |
| 90 | + "groupBys": ["FILTER_DATE", "FILTER_PARTNER"], |
| 91 | + "filters": [{"type": "FILTER_PARTNER", "value": ADVERTISER_ID}], |
| 92 | + "metrics": ["METRIC_IMPRESSIONS", "METRIC_CLICKS"], |
| 93 | + }, |
| 94 | + "schedule": {"frequency": "ONE_TIME"}, |
| 95 | +} |
| 96 | + |
| 97 | +PARAMETERS = { |
| 98 | + "dataRange": {"range": "LAST_7_DAYS"}, |
| 99 | +} |
78 | 100 |
|
79 | 101 | CREATE_SDF_DOWNLOAD_TASK_BODY_REQUEST: dict = {
|
80 | 102 | "version": SDF_VERSION,
|
|
209 | 231 |
|
210 | 232 | # Task dependency created via `XComArgs`:
|
211 | 233 | # save_sdf_in_gcs >> upload_sdf_to_big_query
|
| 234 | + |
| 235 | +with models.DAG( |
| 236 | + "example_display_video_v2", |
| 237 | + start_date=START_DATE, |
| 238 | + catchup=False, |
| 239 | +) as dag: |
| 240 | + # [START howto_google_display_video_create_query_operator] |
| 241 | + create_query_v2 = GoogleDisplayVideo360CreateQueryOperator(body=REPORT_V2, task_id="create_query") |
| 242 | + |
| 243 | + query_id = cast(str, XComArg(create_query_v2, key="query_id")) |
| 244 | + # [END howto_google_display_video_create_query_operator] |
| 245 | + |
| 246 | + # [START howto_google_display_video_run_query_report_operator] |
| 247 | + run_query_v2 = GoogleDisplayVideo360RunQueryOperator( |
| 248 | + query_id=query_id, parameters=PARAMETERS, task_id="run_report" |
| 249 | + ) |
| 250 | + |
| 251 | + query_id = cast(str, XComArg(run_query_v2, key="query_id")) |
| 252 | + report_id = cast(str, XComArg(run_query_v2, key="report_id")) |
| 253 | + # [END howto_google_display_video_run_query_report_operator] |
| 254 | + |
| 255 | + # [START howto_google_display_video_wait_run_query_sensor] |
| 256 | + wait_for_query = GoogleDisplayVideo360RunQuerySensor( |
| 257 | + task_id="wait_for_query", |
| 258 | + query_id=query_id, |
| 259 | + report_id=report_id, |
| 260 | + ) |
| 261 | + # [END howto_google_display_video_wait_run_query_sensor] |
| 262 | + |
| 263 | + # [START howto_google_display_video_get_report_operator] |
| 264 | + get_report_v2 = GoogleDisplayVideo360DownloadReportV2Operator( |
| 265 | + query_id=query_id, |
| 266 | + report_id=report_id, |
| 267 | + task_id="get_report", |
| 268 | + bucket_name=BUCKET, |
| 269 | + report_name="test1.csv", |
| 270 | + ) |
| 271 | + # # [END howto_google_display_video_get_report_operator] |
| 272 | + # # [START howto_google_display_video_delete_query_report_operator] |
| 273 | + delete_report_v2 = GoogleDisplayVideo360DeleteReportOperator(report_id=report_id, task_id="delete_report") |
| 274 | + # # [END howto_google_display_video_delete_query_report_operator] |
| 275 | + |
| 276 | + create_query_v2 >> run_query_v2 >> wait_for_query >> get_report_v2 >> delete_report_v2 |
0 commit comments