45
45
46
46
# show Airflow's deprecation warnings
47
47
if not sys .warnoptions :
48
- warnings .filterwarnings (
49
- action = 'default' , category = DeprecationWarning , module = 'airflow' )
50
- warnings .filterwarnings (
51
- action = 'default' , category = PendingDeprecationWarning , module = 'airflow' )
48
+ warnings .filterwarnings (action = 'default' , category = DeprecationWarning , module = 'airflow' )
49
+ warnings .filterwarnings (action = 'default' , category = PendingDeprecationWarning , module = 'airflow' )
52
50
53
51
54
52
def expand_env_var (env_var ):
@@ -70,17 +68,14 @@ def expand_env_var(env_var):
70
68
def run_command (command ):
71
69
"""Runs command and returns stdout"""
72
70
process = subprocess .Popen (
73
- shlex .split (command ),
74
- stdout = subprocess .PIPE ,
75
- stderr = subprocess .PIPE ,
76
- close_fds = True )
77
- output , stderr = [stream .decode (sys .getdefaultencoding (), 'ignore' )
78
- for stream in process .communicate ()]
71
+ shlex .split (command ), stdout = subprocess .PIPE , stderr = subprocess .PIPE , close_fds = True
72
+ )
73
+ output , stderr = [stream .decode (sys .getdefaultencoding (), 'ignore' ) for stream in process .communicate ()]
79
74
80
75
if process .returncode != 0 :
81
76
raise AirflowConfigException (
82
- "Cannot execute {}. Error code is: {}. Output: {}, Stderr: {} "
83
- . format ( command , process . returncode , output , stderr )
77
+ f "Cannot execute { command } . Error code is: { process . returncode } . "
78
+ f"Output: { output } , Stderr: { stderr } "
84
79
)
85
80
86
81
return output
@@ -183,9 +178,9 @@ class AirflowConfigParser(ConfigParser): # pylint: disable=too-many-ancestors
183
178
'email_backend' : (
184
179
re .compile (r'^airflow\.contrib\.utils\.sendgrid\.send_email$' ),
185
180
r'airflow.providers.sendgrid.utils.emailer.send_email' ,
186
- '2.0'
181
+ '2.0' ,
187
182
),
188
- }
183
+ },
189
184
}
190
185
191
186
# This method transforms option names on every read, get, or set operation.
@@ -213,14 +208,14 @@ def _validate(self):
213
208
current_value = self .get (section , name , fallback = None )
214
209
if self ._using_old_value (old , current_value ):
215
210
new_value = re .sub (old , new , current_value )
216
- self ._update_env_var (
217
- section = section , name = name , new_value = new_value )
211
+ self ._update_env_var (section = section , name = name , new_value = new_value )
218
212
self ._create_future_warning (
219
213
name = name ,
220
214
section = section ,
221
215
current_value = current_value ,
222
216
new_value = new_value ,
223
- version = version )
217
+ version = version ,
218
+ )
224
219
225
220
self .is_validated = True
226
221
@@ -229,21 +224,27 @@ def _validate_config_dependencies(self):
229
224
Validate that config values aren't invalid given other config values
230
225
or system-level limitations and requirements.
231
226
"""
232
- if (
233
- self .get ("core" , "executor" ) not in ('DebugExecutor' , 'SequentialExecutor' ) and
234
- "sqlite" in self .get ('core' , 'sql_alchemy_conn' )):
227
+ is_executor_without_sqlite_support = self .get ("core" , "executor" ) not in (
228
+ 'DebugExecutor' ,
229
+ 'SequentialExecutor' ,
230
+ )
231
+ is_sqlite = "sqlite" in self .get ('core' , 'sql_alchemy_conn' )
232
+ if is_executor_without_sqlite_support and is_sqlite :
235
233
raise AirflowConfigException (
236
- "error: cannot use sqlite with the {}" .format (
237
- self . get ( 'core' , 'executor' )) )
234
+ "error: cannot use sqlite with the {}" .format (self . get ( 'core' , 'executor' ))
235
+ )
238
236
239
237
if self .has_option ('core' , 'mp_start_method' ):
240
238
mp_start_method = self .get ('core' , 'mp_start_method' )
241
239
start_method_options = multiprocessing .get_all_start_methods ()
242
240
243
241
if mp_start_method not in start_method_options :
244
242
raise AirflowConfigException (
245
- "mp_start_method should not be " + mp_start_method +
246
- ". Possible values are " + ", " .join (start_method_options ))
243
+ "mp_start_method should not be "
244
+ + mp_start_method
245
+ + ". Possible values are "
246
+ + ", " .join (start_method_options )
247
+ )
247
248
248
249
def _using_old_value (self , old , current_value ): # noqa
249
250
return old .search (current_value ) is not None
@@ -264,7 +265,7 @@ def _create_future_warning(name, section, current_value, new_value, version):
264
265
'Airflow {version}.' .format (
265
266
name = name , section = section , current_value = current_value , new_value = new_value , version = version
266
267
),
267
- FutureWarning
268
+ FutureWarning ,
268
269
)
269
270
270
271
@staticmethod
@@ -336,17 +337,12 @@ def get(self, section, key, **kwargs):
336
337
def _get_option_from_default_config (self , section , key , ** kwargs ):
337
338
# ...then the default config
338
339
if self .airflow_defaults .has_option (section , key ) or 'fallback' in kwargs :
339
- return expand_env_var (
340
- self .airflow_defaults .get (section , key , ** kwargs ))
340
+ return expand_env_var (self .airflow_defaults .get (section , key , ** kwargs ))
341
341
342
342
else :
343
- log .warning (
344
- "section/key [%s/%s] not found in config" , section , key
345
- )
343
+ log .warning ("section/key [%s/%s] not found in config" , section , key )
346
344
347
- raise AirflowConfigException (
348
- "section/key [{section}/{key}] not found "
349
- "in config" .format (section = section , key = key ))
345
+ raise AirflowConfigException (f"section/key [{ section } /{ key } ] not found in config" )
350
346
351
347
def _get_option_from_secrets (self , deprecated_key , deprecated_section , key , section ):
352
348
# ...then from secret backends
@@ -377,16 +373,11 @@ def _get_option_from_config_file(self, deprecated_key, deprecated_section, key,
377
373
if super ().has_option (section , key ):
378
374
# Use the parent's methods to get the actual config here to be able to
379
375
# separate the config from default config.
380
- return expand_env_var (
381
- super ().get (section , key , ** kwargs ))
376
+ return expand_env_var (super ().get (section , key , ** kwargs ))
382
377
if deprecated_section :
383
378
if super ().has_option (deprecated_section , deprecated_key ):
384
379
self ._warn_deprecate (section , key , deprecated_section , deprecated_key )
385
- return expand_env_var (super ().get (
386
- deprecated_section ,
387
- deprecated_key ,
388
- ** kwargs
389
- ))
380
+ return expand_env_var (super ().get (deprecated_section , deprecated_key , ** kwargs ))
390
381
return None
391
382
392
383
def _get_environment_variables (self , deprecated_key , deprecated_section , key , section ):
@@ -543,8 +534,13 @@ def write(self, fp, space_around_delimiters=True):
543
534
self ._write_section (fp , section , self .getsection (section ).items (), delimiter )
544
535
545
536
def as_dict (
546
- self , display_source = False , display_sensitive = False , raw = False ,
547
- include_env = True , include_cmds = True , include_secret = True
537
+ self ,
538
+ display_source = False ,
539
+ display_sensitive = False ,
540
+ raw = False ,
541
+ include_env = True ,
542
+ include_cmds = True ,
543
+ include_secret = True ,
548
544
) -> Dict [str , Dict [str , str ]]:
549
545
"""
550
546
Returns the current configuration as an OrderedDict of OrderedDicts.
@@ -624,8 +620,9 @@ def _include_commands(self, config_sources, display_sensitive, display_source, r
624
620
del config_sources [section ][key + '_cmd' ]
625
621
626
622
def _include_envs (self , config_sources , display_sensitive , display_source , raw ):
627
- for env_var in [os_environment
628
- for os_environment in os .environ if os_environment .startswith ('AIRFLOW__' )]:
623
+ for env_var in [
624
+ os_environment for os_environment in os .environ if os_environment .startswith ('AIRFLOW__' )
625
+ ]:
629
626
try :
630
627
_ , section , key = env_var .split ('__' , 2 )
631
628
opt = self ._get_env_var_option (section , key )
@@ -651,13 +648,14 @@ def _include_envs(self, config_sources, display_sensitive, display_source, raw):
651
648
def _replace_config_with_display_sources (config_sources , configs , display_source , raw ):
652
649
for (source_name , config ) in configs :
653
650
for section in config .sections ():
654
- AirflowConfigParser .\
655
- _replace_section_config_with_display_sources (
656
- config , config_sources , display_source , raw , section , source_name )
651
+ AirflowConfigParser ._replace_section_config_with_display_sources (
652
+ config , config_sources , display_source , raw , section , source_name
653
+ )
657
654
658
655
@staticmethod
659
- def _replace_section_config_with_display_sources (config , config_sources , display_source , raw , section ,
660
- source_name ):
656
+ def _replace_section_config_with_display_sources (
657
+ config , config_sources , display_source , raw , section , source_name
658
+ ):
661
659
sect = config_sources .setdefault (section , OrderedDict ())
662
660
for (k , val ) in config .items (section = section , raw = raw ):
663
661
if display_source :
@@ -730,19 +728,17 @@ def get_airflow_config(airflow_home):
730
728
# Set up dags folder for unit tests
731
729
# this directory won't exist if users install via pip
732
730
_TEST_DAGS_FOLDER = os .path .join (
733
- os .path .dirname (os .path .dirname (os .path .realpath (__file__ ))),
734
- 'tests' ,
735
- 'dags' )
731
+ os .path .dirname (os .path .dirname (os .path .realpath (__file__ ))), 'tests' , 'dags'
732
+ )
736
733
if os .path .exists (_TEST_DAGS_FOLDER ):
737
734
TEST_DAGS_FOLDER = _TEST_DAGS_FOLDER
738
735
else :
739
736
TEST_DAGS_FOLDER = os .path .join (AIRFLOW_HOME , 'dags' )
740
737
741
738
# Set up plugins folder for unit tests
742
739
_TEST_PLUGINS_FOLDER = os .path .join (
743
- os .path .dirname (os .path .dirname (os .path .realpath (__file__ ))),
744
- 'tests' ,
745
- 'plugins' )
740
+ os .path .dirname (os .path .dirname (os .path .realpath (__file__ ))), 'tests' , 'plugins'
741
+ )
746
742
if os .path .exists (_TEST_PLUGINS_FOLDER ):
747
743
TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER
748
744
else :
@@ -777,20 +773,14 @@ def get_airflow_test_config(airflow_home):
777
773
778
774
SECRET_KEY = b64encode (os .urandom (16 )).decode ('utf-8' )
779
775
780
- TEMPLATE_START = (
781
- '# ----------------------- TEMPLATE BEGINS HERE -----------------------' )
776
+ TEMPLATE_START = '# ----------------------- TEMPLATE BEGINS HERE -----------------------'
782
777
if not os .path .isfile (TEST_CONFIG_FILE ):
783
- log .info (
784
- 'Creating new Airflow config file for unit tests in: %s' , TEST_CONFIG_FILE
785
- )
778
+ log .info ('Creating new Airflow config file for unit tests in: %s' , TEST_CONFIG_FILE )
786
779
with open (TEST_CONFIG_FILE , 'w' ) as file :
787
780
cfg = parameterized_config (TEST_CONFIG )
788
781
file .write (cfg .split (TEMPLATE_START )[- 1 ].strip ())
789
782
if not os .path .isfile (AIRFLOW_CONFIG ):
790
- log .info (
791
- 'Creating new Airflow config file in: %s' ,
792
- AIRFLOW_CONFIG
793
- )
783
+ log .info ('Creating new Airflow config file in: %s' , AIRFLOW_CONFIG )
794
784
with open (AIRFLOW_CONFIG , 'w' ) as file :
795
785
cfg = parameterized_config (DEFAULT_CONFIG )
796
786
cfg = cfg .split (TEMPLATE_START )[- 1 ].strip ()
@@ -835,110 +825,110 @@ def get_airflow_test_config(airflow_home):
835
825
836
826
837
827
# Historical convenience functions to access config entries
838
- def load_test_config (): # noqa: D103
828
+ def load_test_config (): # noqa: D103
839
829
"""Historical load_test_config"""
840
830
warnings .warn (
841
831
"Accessing configuration method 'load_test_config' directly from the configuration module is "
842
832
"deprecated. Please access the configuration from the 'configuration.conf' object via "
843
833
"'conf.load_test_config'" ,
844
834
DeprecationWarning ,
845
- stacklevel = 2
835
+ stacklevel = 2 ,
846
836
)
847
837
conf .load_test_config ()
848
838
849
839
850
- def get (* args , ** kwargs ): # noqa: D103
840
+ def get (* args , ** kwargs ): # noqa: D103
851
841
"""Historical get"""
852
842
warnings .warn (
853
843
"Accessing configuration method 'get' directly from the configuration module is "
854
844
"deprecated. Please access the configuration from the 'configuration.conf' object via "
855
845
"'conf.get'" ,
856
846
DeprecationWarning ,
857
- stacklevel = 2
847
+ stacklevel = 2 ,
858
848
)
859
849
return conf .get (* args , ** kwargs )
860
850
861
851
862
- def getboolean (* args , ** kwargs ): # noqa: D103
852
+ def getboolean (* args , ** kwargs ): # noqa: D103
863
853
"""Historical getboolean"""
864
854
warnings .warn (
865
855
"Accessing configuration method 'getboolean' directly from the configuration module is "
866
856
"deprecated. Please access the configuration from the 'configuration.conf' object via "
867
857
"'conf.getboolean'" ,
868
858
DeprecationWarning ,
869
- stacklevel = 2
859
+ stacklevel = 2 ,
870
860
)
871
861
return conf .getboolean (* args , ** kwargs )
872
862
873
863
874
- def getfloat (* args , ** kwargs ): # noqa: D103
864
+ def getfloat (* args , ** kwargs ): # noqa: D103
875
865
"""Historical getfloat"""
876
866
warnings .warn (
877
867
"Accessing configuration method 'getfloat' directly from the configuration module is "
878
868
"deprecated. Please access the configuration from the 'configuration.conf' object via "
879
869
"'conf.getfloat'" ,
880
870
DeprecationWarning ,
881
- stacklevel = 2
871
+ stacklevel = 2 ,
882
872
)
883
873
return conf .getfloat (* args , ** kwargs )
884
874
885
875
886
- def getint (* args , ** kwargs ): # noqa: D103
876
+ def getint (* args , ** kwargs ): # noqa: D103
887
877
"""Historical getint"""
888
878
warnings .warn (
889
879
"Accessing configuration method 'getint' directly from the configuration module is "
890
880
"deprecated. Please access the configuration from the 'configuration.conf' object via "
891
881
"'conf.getint'" ,
892
882
DeprecationWarning ,
893
- stacklevel = 2
883
+ stacklevel = 2 ,
894
884
)
895
885
return conf .getint (* args , ** kwargs )
896
886
897
887
898
- def getsection (* args , ** kwargs ): # noqa: D103
888
+ def getsection (* args , ** kwargs ): # noqa: D103
899
889
"""Historical getsection"""
900
890
warnings .warn (
901
891
"Accessing configuration method 'getsection' directly from the configuration module is "
902
892
"deprecated. Please access the configuration from the 'configuration.conf' object via "
903
893
"'conf.getsection'" ,
904
894
DeprecationWarning ,
905
- stacklevel = 2
895
+ stacklevel = 2 ,
906
896
)
907
897
return conf .getint (* args , ** kwargs )
908
898
909
899
910
- def has_option (* args , ** kwargs ): # noqa: D103
900
+ def has_option (* args , ** kwargs ): # noqa: D103
911
901
"""Historical has_option"""
912
902
warnings .warn (
913
903
"Accessing configuration method 'has_option' directly from the configuration module is "
914
904
"deprecated. Please access the configuration from the 'configuration.conf' object via "
915
905
"'conf.has_option'" ,
916
906
DeprecationWarning ,
917
- stacklevel = 2
907
+ stacklevel = 2 ,
918
908
)
919
909
return conf .has_option (* args , ** kwargs )
920
910
921
911
922
- def remove_option (* args , ** kwargs ): # noqa: D103
912
+ def remove_option (* args , ** kwargs ): # noqa: D103
923
913
"""Historical remove_option"""
924
914
warnings .warn (
925
915
"Accessing configuration method 'remove_option' directly from the configuration module is "
926
916
"deprecated. Please access the configuration from the 'configuration.conf' object via "
927
917
"'conf.remove_option'" ,
928
918
DeprecationWarning ,
929
- stacklevel = 2
919
+ stacklevel = 2 ,
930
920
)
931
921
return conf .remove_option (* args , ** kwargs )
932
922
933
923
934
- def as_dict (* args , ** kwargs ): # noqa: D103
924
+ def as_dict (* args , ** kwargs ): # noqa: D103
935
925
"""Historical as_dict"""
936
926
warnings .warn (
937
927
"Accessing configuration method 'as_dict' directly from the configuration module is "
938
928
"deprecated. Please access the configuration from the 'configuration.conf' object via "
939
929
"'conf.as_dict'" ,
940
930
DeprecationWarning ,
941
- stacklevel = 2
931
+ stacklevel = 2 ,
942
932
)
943
933
return conf .as_dict (* args , ** kwargs )
944
934
@@ -950,7 +940,7 @@ def set(*args, **kwargs): # noqa pylint: disable=redefined-builtin
950
940
"deprecated. Please access the configuration from the 'configuration.conf' object via "
951
941
"'conf.set'" ,
952
942
DeprecationWarning ,
953
- stacklevel = 2
943
+ stacklevel = 2 ,
954
944
)
955
945
return conf .set (* args , ** kwargs )
956
946
0 commit comments