@@ -41,7 +41,9 @@ class BeamDataflowMixin(metaclass=ABCMeta):
41
41
"""
42
42
43
43
dataflow_hook : Optional [DataflowHook ]
44
- dataflow_config : Optional [DataflowConfiguration ]
44
+ dataflow_config : DataflowConfiguration
45
+ gcp_conn_id : str
46
+ delegate_to : Optional [str ]
45
47
46
48
def _set_dataflow (
47
49
self , pipeline_options : dict , job_name_variable_key : Optional [str ] = None
@@ -198,11 +200,17 @@ def __init__(
198
200
self .py_system_site_packages = py_system_site_packages
199
201
self .gcp_conn_id = gcp_conn_id
200
202
self .delegate_to = delegate_to
201
- self .dataflow_config = dataflow_config or {}
202
203
self .beam_hook : Optional [BeamHook ] = None
203
204
self .dataflow_hook : Optional [DataflowHook ] = None
204
205
self .dataflow_job_id : Optional [str ] = None
205
206
207
+ if dataflow_config is None :
208
+ self .dataflow_config = DataflowConfiguration ()
209
+ elif isinstance (dataflow_config , dict ):
210
+ self .dataflow_config = DataflowConfiguration (** dataflow_config )
211
+ else :
212
+ self .dataflow_config = dataflow_config
213
+
206
214
if self .dataflow_config and self .runner .lower () != BeamRunnerType .DataflowRunner .lower ():
207
215
self .log .warning (
208
216
"dataflow_config is defined but runner is different than DataflowRunner (%s)" , self .runner
@@ -216,9 +224,6 @@ def execute(self, context):
216
224
is_dataflow = self .runner .lower () == BeamRunnerType .DataflowRunner .lower ()
217
225
dataflow_job_name : Optional [str ] = None
218
226
219
- if isinstance (self .dataflow_config , dict ):
220
- self .dataflow_config = DataflowConfiguration (** self .dataflow_config )
221
-
222
227
if is_dataflow :
223
228
dataflow_job_name , pipeline_options , process_line_callback = self ._set_dataflow (
224
229
pipeline_options = pipeline_options , job_name_variable_key = "job_name"
@@ -366,14 +371,20 @@ def __init__(
366
371
self .default_pipeline_options = default_pipeline_options or {}
367
372
self .pipeline_options = pipeline_options or {}
368
373
self .job_class = job_class
369
- self .dataflow_config = dataflow_config or {}
370
374
self .gcp_conn_id = gcp_conn_id
371
375
self .delegate_to = delegate_to
372
376
self .dataflow_job_id = None
373
377
self .dataflow_hook : Optional [DataflowHook ] = None
374
378
self .beam_hook : Optional [BeamHook ] = None
375
379
self ._dataflow_job_name : Optional [str ] = None
376
380
381
+ if dataflow_config is None :
382
+ self .dataflow_config = DataflowConfiguration ()
383
+ elif isinstance (dataflow_config , dict ):
384
+ self .dataflow_config = DataflowConfiguration (** dataflow_config )
385
+ else :
386
+ self .dataflow_config = dataflow_config
387
+
377
388
if self .dataflow_config and self .runner .lower () != BeamRunnerType .DataflowRunner .lower ():
378
389
self .log .warning (
379
390
"dataflow_config is defined but runner is different than DataflowRunner (%s)" , self .runner
@@ -387,9 +398,6 @@ def execute(self, context):
387
398
is_dataflow = self .runner .lower () == BeamRunnerType .DataflowRunner .lower ()
388
399
dataflow_job_name : Optional [str ] = None
389
400
390
- if isinstance (self .dataflow_config , dict ):
391
- self .dataflow_config = DataflowConfiguration (** self .dataflow_config )
392
-
393
401
if is_dataflow :
394
402
dataflow_job_name , pipeline_options , process_line_callback = self ._set_dataflow (
395
403
pipeline_options = pipeline_options , job_name_variable_key = None
0 commit comments