19
19
from tempfile import NamedTemporaryFile
20
20
from unittest import TestCase , mock
21
21
22
+ from parameterized import parameterized
23
+
24
+ from airflow .models import DAG , TaskInstance as TI
22
25
from airflow .providers .google .marketing_platform .operators .campaign_manager import (
23
26
GoogleCampaignManagerBatchInsertConversionsOperator ,
24
27
GoogleCampaignManagerBatchUpdateConversionsOperator ,
27
30
GoogleCampaignManagerInsertReportOperator ,
28
31
GoogleCampaignManagerRunReportOperator ,
29
32
)
33
+ from airflow .utils import timezone
34
+ from airflow .utils .session import create_session
30
35
31
36
API_VERSION = "api_version"
32
37
GCP_CONN_ID = "google_cloud_default"
46
51
],
47
52
}
48
53
54
+ DEFAULT_DATE = timezone .datetime (2021 , 1 , 1 )
55
+ PROFILE_ID = "profile_id"
56
+ REPORT_ID = "report_id"
57
+ FILE_ID = "file_id"
58
+ BUCKET_NAME = "test_bucket"
59
+ REPORT_NAME = "test_report.csv"
60
+ TEMP_FILE_NAME = "test"
61
+
49
62
50
63
class TestGoogleCampaignManagerDeleteReportOperator (TestCase ):
51
64
@mock .patch (
52
65
"airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerHook"
53
66
)
54
67
@mock .patch ("airflow.providers.google.marketing_platform.operators.campaign_manager.BaseOperator" )
55
68
def test_execute (self , mock_base_op , hook_mock ):
56
- profile_id = "PROFILE_ID"
57
- report_id = "REPORT_ID"
58
69
op = GoogleCampaignManagerDeleteReportOperator (
59
- profile_id = profile_id ,
60
- report_id = report_id ,
70
+ profile_id = PROFILE_ID ,
71
+ report_id = REPORT_ID ,
61
72
api_version = API_VERSION ,
62
73
task_id = "test_task" ,
63
74
)
@@ -69,11 +80,19 @@ def test_execute(self, mock_base_op, hook_mock):
69
80
impersonation_chain = None ,
70
81
)
71
82
hook_mock .return_value .delete_report .assert_called_once_with (
72
- profile_id = profile_id , report_id = report_id
83
+ profile_id = PROFILE_ID , report_id = REPORT_ID
73
84
)
74
85
75
86
76
- class TestGoogleCampaignManagerGetReportOperator (TestCase ):
87
+ class TestGoogleCampaignManagerDownloadReportOperator (TestCase ):
88
+ def setUp (self ):
89
+ with create_session () as session :
90
+ session .query (TI ).delete ()
91
+
92
+ def tearDown (self ):
93
+ with create_session () as session :
94
+ session .query (TI ).delete ()
95
+
77
96
@mock .patch ("airflow.providers.google.marketing_platform.operators.campaign_manager.http" )
78
97
@mock .patch ("airflow.providers.google.marketing_platform.operators.campaign_manager.tempfile" )
79
98
@mock .patch (
@@ -94,24 +113,17 @@ def test_execute(
94
113
tempfile_mock ,
95
114
http_mock ,
96
115
):
97
- profile_id = "PROFILE_ID"
98
- report_id = "REPORT_ID"
99
- file_id = "FILE_ID"
100
- bucket_name = "test_bucket"
101
- report_name = "test_report.csv"
102
- temp_file_name = "TEST"
103
-
104
116
http_mock .MediaIoBaseDownload .return_value .next_chunk .return_value = (
105
117
None ,
106
118
True ,
107
119
)
108
- tempfile_mock .NamedTemporaryFile .return_value .__enter__ .return_value .name = temp_file_name
120
+ tempfile_mock .NamedTemporaryFile .return_value .__enter__ .return_value .name = TEMP_FILE_NAME
109
121
op = GoogleCampaignManagerDownloadReportOperator (
110
- profile_id = profile_id ,
111
- report_id = report_id ,
112
- file_id = file_id ,
113
- bucket_name = bucket_name ,
114
- report_name = report_name ,
122
+ profile_id = PROFILE_ID ,
123
+ report_id = REPORT_ID ,
124
+ file_id = FILE_ID ,
125
+ bucket_name = BUCKET_NAME ,
126
+ report_name = REPORT_NAME ,
115
127
api_version = API_VERSION ,
116
128
task_id = "test_task" ,
117
129
)
@@ -123,21 +135,78 @@ def test_execute(
123
135
impersonation_chain = None ,
124
136
)
125
137
hook_mock .return_value .get_report_file .assert_called_once_with (
126
- profile_id = profile_id , report_id = report_id , file_id = file_id
138
+ profile_id = PROFILE_ID , report_id = REPORT_ID , file_id = FILE_ID
127
139
)
128
140
gcs_hook_mock .assert_called_once_with (
129
141
gcp_conn_id = GCP_CONN_ID ,
130
142
delegate_to = None ,
131
143
impersonation_chain = None ,
132
144
)
133
145
gcs_hook_mock .return_value .upload .assert_called_once_with (
134
- bucket_name = bucket_name ,
135
- object_name = report_name + ".gz" ,
146
+ bucket_name = BUCKET_NAME ,
147
+ object_name = REPORT_NAME + ".gz" ,
148
+ gzip = True ,
149
+ filename = TEMP_FILE_NAME ,
150
+ mime_type = "text/csv" ,
151
+ )
152
+ xcom_mock .assert_called_once_with (None , key = "report_name" , value = REPORT_NAME + ".gz" )
153
+
154
+ @parameterized .expand ([BUCKET_NAME , f"gs://{ BUCKET_NAME } " , "XComArg" , "{{ ti.xcom_pull(task_ids='f') }}" ])
155
+ @mock .patch ("airflow.providers.google.marketing_platform.operators.campaign_manager.http" )
156
+ @mock .patch ("airflow.providers.google.marketing_platform.operators.campaign_manager.tempfile" )
157
+ @mock .patch (
158
+ "airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerHook"
159
+ )
160
+ @mock .patch ("airflow.providers.google.marketing_platform.operators.campaign_manager.GCSHook" )
161
+ def test_set_bucket_name (
162
+ self ,
163
+ test_bucket_name ,
164
+ gcs_hook_mock ,
165
+ hook_mock ,
166
+ tempfile_mock ,
167
+ http_mock ,
168
+ ):
169
+ http_mock .MediaIoBaseDownload .return_value .next_chunk .return_value = (
170
+ None ,
171
+ True ,
172
+ )
173
+ tempfile_mock .NamedTemporaryFile .return_value .__enter__ .return_value .name = TEMP_FILE_NAME
174
+
175
+ dag = DAG (
176
+ dag_id = "test_set_bucket_name" ,
177
+ start_date = DEFAULT_DATE ,
178
+ schedule_interval = None ,
179
+ catchup = False ,
180
+ )
181
+
182
+ if BUCKET_NAME not in test_bucket_name :
183
+
184
+ @dag .task
185
+ def f ():
186
+ return BUCKET_NAME
187
+
188
+ taskflow_op = f ()
189
+ taskflow_op .operator .run (start_date = DEFAULT_DATE , end_date = DEFAULT_DATE )
190
+
191
+ op = GoogleCampaignManagerDownloadReportOperator (
192
+ profile_id = PROFILE_ID ,
193
+ report_id = REPORT_ID ,
194
+ file_id = FILE_ID ,
195
+ bucket_name = test_bucket_name if test_bucket_name != "XComArg" else taskflow_op ,
196
+ report_name = REPORT_NAME ,
197
+ api_version = API_VERSION ,
198
+ task_id = "test_task" ,
199
+ dag = dag ,
200
+ )
201
+ op .run (start_date = DEFAULT_DATE , end_date = DEFAULT_DATE )
202
+
203
+ gcs_hook_mock .return_value .upload .assert_called_once_with (
204
+ bucket_name = BUCKET_NAME ,
205
+ object_name = REPORT_NAME + ".gz" ,
136
206
gzip = True ,
137
- filename = temp_file_name ,
207
+ filename = TEMP_FILE_NAME ,
138
208
mime_type = "text/csv" ,
139
209
)
140
- xcom_mock .assert_called_once_with (None , key = "report_name" , value = report_name + ".gz" )
141
210
142
211
143
212
class TestGoogleCampaignManagerInsertReportOperator (TestCase ):
@@ -150,14 +219,12 @@ class TestGoogleCampaignManagerInsertReportOperator(TestCase):
150
219
"campaign_manager.GoogleCampaignManagerInsertReportOperator.xcom_push"
151
220
)
152
221
def test_execute (self , xcom_mock , mock_base_op , hook_mock ):
153
- profile_id = "PROFILE_ID"
154
222
report = {"report" : "test" }
155
- report_id = "test"
156
223
157
- hook_mock .return_value .insert_report .return_value = {"id" : report_id }
224
+ hook_mock .return_value .insert_report .return_value = {"id" : REPORT_ID }
158
225
159
226
op = GoogleCampaignManagerInsertReportOperator (
160
- profile_id = profile_id ,
227
+ profile_id = PROFILE_ID ,
161
228
report = report ,
162
229
api_version = API_VERSION ,
163
230
task_id = "test_task" ,
@@ -169,17 +236,16 @@ def test_execute(self, xcom_mock, mock_base_op, hook_mock):
169
236
api_version = API_VERSION ,
170
237
impersonation_chain = None ,
171
238
)
172
- hook_mock .return_value .insert_report .assert_called_once_with (profile_id = profile_id , report = report )
173
- xcom_mock .assert_called_once_with (None , key = "report_id" , value = report_id )
239
+ hook_mock .return_value .insert_report .assert_called_once_with (profile_id = PROFILE_ID , report = report )
240
+ xcom_mock .assert_called_once_with (None , key = "report_id" , value = REPORT_ID )
174
241
175
242
def test_prepare_template (self ):
176
- profile_id = "PROFILE_ID"
177
243
report = {"key" : "value" }
178
244
with NamedTemporaryFile ("w+" , suffix = ".json" ) as f :
179
245
f .write (json .dumps (report ))
180
246
f .flush ()
181
247
op = GoogleCampaignManagerInsertReportOperator (
182
- profile_id = profile_id ,
248
+ profile_id = PROFILE_ID ,
183
249
report = f .name ,
184
250
api_version = API_VERSION ,
185
251
task_id = "test_task" ,
@@ -200,16 +266,13 @@ class TestGoogleCampaignManagerRunReportOperator(TestCase):
200
266
"campaign_manager.GoogleCampaignManagerRunReportOperator.xcom_push"
201
267
)
202
268
def test_execute (self , xcom_mock , mock_base_op , hook_mock ):
203
- profile_id = "PROFILE_ID"
204
- report_id = "REPORT_ID"
205
- file_id = "FILE_ID"
206
269
synchronous = True
207
270
208
- hook_mock .return_value .run_report .return_value = {"id" : file_id }
271
+ hook_mock .return_value .run_report .return_value = {"id" : FILE_ID }
209
272
210
273
op = GoogleCampaignManagerRunReportOperator (
211
- profile_id = profile_id ,
212
- report_id = report_id ,
274
+ profile_id = PROFILE_ID ,
275
+ report_id = REPORT_ID ,
213
276
synchronous = synchronous ,
214
277
api_version = API_VERSION ,
215
278
task_id = "test_task" ,
@@ -222,9 +285,9 @@ def test_execute(self, xcom_mock, mock_base_op, hook_mock):
222
285
impersonation_chain = None ,
223
286
)
224
287
hook_mock .return_value .run_report .assert_called_once_with (
225
- profile_id = profile_id , report_id = report_id , synchronous = synchronous
288
+ profile_id = PROFILE_ID , report_id = REPORT_ID , synchronous = synchronous
226
289
)
227
- xcom_mock .assert_called_once_with (None , key = "file_id" , value = file_id )
290
+ xcom_mock .assert_called_once_with (None , key = "file_id" , value = FILE_ID )
228
291
229
292
230
293
class TestGoogleCampaignManagerBatchInsertConversionsOperator (TestCase ):
@@ -233,18 +296,17 @@ class TestGoogleCampaignManagerBatchInsertConversionsOperator(TestCase):
233
296
)
234
297
@mock .patch ("airflow.providers.google.marketing_platform.operators.campaign_manager.BaseOperator" )
235
298
def test_execute (self , mock_base_op , hook_mock ):
236
- profile_id = "PROFILE_ID"
237
299
op = GoogleCampaignManagerBatchInsertConversionsOperator (
238
300
task_id = "insert_conversion" ,
239
- profile_id = profile_id ,
301
+ profile_id = PROFILE_ID ,
240
302
conversions = [CONVERSION ],
241
303
encryption_source = "AD_SERVING" ,
242
304
encryption_entity_type = "DCM_ADVERTISER" ,
243
305
encryption_entity_id = 123456789 ,
244
306
)
245
307
op .execute (None )
246
308
hook_mock .return_value .conversions_batch_insert .assert_called_once_with (
247
- profile_id = profile_id ,
309
+ profile_id = PROFILE_ID ,
248
310
conversions = [CONVERSION ],
249
311
encryption_source = "AD_SERVING" ,
250
312
encryption_entity_type = "DCM_ADVERTISER" ,
@@ -259,18 +321,17 @@ class TestGoogleCampaignManagerBatchUpdateConversionOperator(TestCase):
259
321
)
260
322
@mock .patch ("airflow.providers.google.marketing_platform.operators.campaign_manager.BaseOperator" )
261
323
def test_execute (self , mock_base_op , hook_mock ):
262
- profile_id = "PROFILE_ID"
263
324
op = GoogleCampaignManagerBatchUpdateConversionsOperator (
264
325
task_id = "update_conversion" ,
265
- profile_id = profile_id ,
326
+ profile_id = PROFILE_ID ,
266
327
conversions = [CONVERSION ],
267
328
encryption_source = "AD_SERVING" ,
268
329
encryption_entity_type = "DCM_ADVERTISER" ,
269
330
encryption_entity_id = 123456789 ,
270
331
)
271
332
op .execute (None )
272
333
hook_mock .return_value .conversions_batch_update .assert_called_once_with (
273
- profile_id = profile_id ,
334
+ profile_id = PROFILE_ID ,
274
335
conversions = [CONVERSION ],
275
336
encryption_source = "AD_SERVING" ,
276
337
encryption_entity_type = "DCM_ADVERTISER" ,
0 commit comments