@@ -108,8 +108,8 @@ def __init__(
108
108
bucket_name : str ,
109
109
object_name : str ,
110
110
fields : List [str ],
111
- params : Dict [str , Any ] = None ,
112
- parameters : Dict [str , Any ] = None ,
111
+ params : Optional [ Dict [str , Any ] ] = None ,
112
+ parameters : Optional [ Dict [str , Any ] ] = None ,
113
113
gzip : bool = False ,
114
114
upload_as_account : bool = False ,
115
115
api_version : Optional [str ] = None ,
@@ -200,30 +200,33 @@ def _prepare_rows_for_upload(
200
200
201
201
def _decide_and_flush (self , converted_rows_with_action : Dict [FlushAction , list ]):
202
202
total_data_count = 0
203
- if FlushAction .EXPORT_ONCE in converted_rows_with_action :
203
+ once_action = converted_rows_with_action .get (FlushAction .EXPORT_ONCE )
204
+ if once_action is not None :
204
205
self ._flush_rows (
205
- converted_rows = converted_rows_with_action . get ( FlushAction . EXPORT_ONCE ) ,
206
+ converted_rows = once_action ,
206
207
object_name = self .object_name ,
207
208
)
208
- total_data_count += len (converted_rows_with_action .get (FlushAction .EXPORT_ONCE ))
209
- elif FlushAction .EXPORT_EVERY_ACCOUNT in converted_rows_with_action :
210
- for converted_rows in converted_rows_with_action .get (FlushAction .EXPORT_EVERY_ACCOUNT ):
211
- self ._flush_rows (
212
- converted_rows = converted_rows .get ("converted_rows" ),
213
- object_name = self ._transform_object_name_with_account_id (
214
- account_id = converted_rows .get ("account_id" )
215
- ),
216
- )
217
- total_data_count += len (converted_rows .get ("converted_rows" ))
209
+ total_data_count += len (once_action )
218
210
else :
219
- message = (
220
- "FlushAction not found in the data. Please check the FlushAction in the operator. Converted "
221
- "Rows with Action: " + str (converted_rows_with_action )
222
- )
223
- raise AirflowException (message )
211
+ every_account_action = converted_rows_with_action .get (FlushAction .EXPORT_EVERY_ACCOUNT )
212
+ if every_account_action :
213
+ for converted_rows in every_account_action :
214
+ self ._flush_rows (
215
+ converted_rows = converted_rows .get ("converted_rows" ),
216
+ object_name = self ._transform_object_name_with_account_id (
217
+ account_id = converted_rows .get ("account_id" )
218
+ ),
219
+ )
220
+ total_data_count += len (converted_rows .get ("converted_rows" ))
221
+ else :
222
+ message = (
223
+ "FlushAction not found in the data. Please check the FlushAction in "
224
+ "the operator. Converted Rows with Action: " + str (converted_rows_with_action )
225
+ )
226
+ raise AirflowException (message )
224
227
return total_data_count
225
228
226
- def _flush_rows (self , converted_rows : list , object_name : str ):
229
+ def _flush_rows (self , converted_rows : Optional [ List [ Any ]] , object_name : str ):
227
230
if converted_rows :
228
231
headers = converted_rows [0 ].keys ()
229
232
with tempfile .NamedTemporaryFile ("w" , suffix = ".csv" ) as csvfile :
0 commit comments