Skip to content

Commit c238269

Browse files
authored
Apply per-run log templates to log handlers (#24153)
1 parent 7498fba commit c238269

File tree

25 files changed

+232
-155
lines changed

25 files changed

+232
-155
lines changed

β€Žairflow/config_templates/airflow_local_settings.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@
8282
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
8383
'formatter': 'airflow',
8484
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
85-
'filename_template': FILENAME_TEMPLATE,
8685
'filters': ['mask_secrets'],
8786
},
8887
'processor': {

β€Žairflow/config_templates/default_test.cfg

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ base_log_folder = {AIRFLOW_HOME}/logs
5454
logging_level = INFO
5555
celery_logging_level = WARN
5656
fab_logging_level = WARN
57-
log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log
5857
log_processor_filename_template = {{{{ filename }}}}.log
5958
dag_processor_manager_log_location = {AIRFLOW_HOME}/logs/dag_processor_manager/dag_processor_manager.log
6059
worker_log_server_port = 8793

β€Žairflow/models/dagrun.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,14 +1065,23 @@ def schedule_tis(self, schedulable_tis: Iterable[TI], session: Session = NEW_SES
10651065
return count
10661066

10671067
@provide_session
1068-
def get_log_filename_template(self, *, session: Session = NEW_SESSION) -> str:
1068+
def get_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate:
10691069
if self.log_template_id is None: # DagRun created before LogTemplate introduction.
1070-
template = session.query(LogTemplate.filename).order_by(LogTemplate.id).limit(1).scalar()
1070+
template = session.query(LogTemplate).order_by(LogTemplate.id).first()
10711071
else:
1072-
template = session.query(LogTemplate.filename).filter_by(id=self.log_template_id).scalar()
1072+
template = session.query(LogTemplate).get(self.log_template_id)
10731073
if template is None:
10741074
raise AirflowException(
10751075
f"No log_template entry found for ID {self.log_template_id!r}. "
10761076
f"Please make sure you set up the metadatabase correctly."
10771077
)
10781078
return template
1079+
1080+
@provide_session
1081+
def get_log_filename_template(self, *, session: Session = NEW_SESSION) -> str:
1082+
warnings.warn(
1083+
"This method is deprecated. Please use get_log_template instead.",
1084+
DeprecationWarning,
1085+
stacklevel=2,
1086+
)
1087+
return self.get_log_template(session=session).filename

β€Žairflow/providers/alibaba/cloud/log/oss_task_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
3838
uploads to and reads from OSS remote storage.
3939
"""
4040

41-
def __init__(self, base_log_folder, oss_log_folder, filename_template):
41+
def __init__(self, base_log_folder, oss_log_folder, filename_template=None):
4242
self.log.info("Using oss_task_handler for remote logging...")
4343
super().__init__(base_log_folder, filename_template)
4444
(self.bucket_name, self.base_folder) = OSSHook.parse_oss_url(oss_log_folder)

β€Žairflow/providers/amazon/aws/log/cloudwatch_task_handler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
# under the License.
1818
import sys
1919
from datetime import datetime
20+
from typing import Optional
2021

2122
import watchtower
2223

@@ -42,7 +43,7 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
4243
:param filename_template: template for file name (local storage) or log stream name (remote)
4344
"""
4445

45-
def __init__(self, base_log_folder: str, log_group_arn: str, filename_template: str):
46+
def __init__(self, base_log_folder: str, log_group_arn: str, filename_template: Optional[str] = None):
4647
super().__init__(base_log_folder, filename_template)
4748
split_arn = log_group_arn.split(':')
4849

β€Žairflow/providers/amazon/aws/log/s3_task_handler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import os
1919
import pathlib
2020
import sys
21+
from typing import Optional
2122

2223
if sys.version_info >= (3, 8):
2324
from functools import cached_property
@@ -36,7 +37,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
3637
uploads to and reads from S3 remote storage.
3738
"""
3839

39-
def __init__(self, base_log_folder: str, s3_log_folder: str, filename_template: str):
40+
def __init__(self, base_log_folder: str, s3_log_folder: str, filename_template: Optional[str] = None):
4041
super().__init__(base_log_folder, filename_template)
4142
self.remote_base = s3_log_folder
4243
self.log_relative_path = ''

β€Žairflow/providers/elasticsearch/log/es_task_handler.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import logging
2020
import sys
21+
import warnings
2122
from collections import defaultdict
2223
from datetime import datetime
2324
from operator import attrgetter
@@ -31,15 +32,22 @@
3132
from elasticsearch_dsl import Search
3233

3334
from airflow.configuration import conf
34-
from airflow.models import TaskInstance
35+
from airflow.models.dagrun import DagRun
36+
from airflow.models.taskinstance import TaskInstance
3537
from airflow.utils import timezone
3638
from airflow.utils.log.file_task_handler import FileTaskHandler
3739
from airflow.utils.log.json_formatter import JSONFormatter
3840
from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
41+
from airflow.utils.session import create_session
3942

4043
# Elasticsearch hosted log type
4144
EsLogMsgType = List[Tuple[str, str]]
4245

46+
# Compatibility: Airflow 2.3.3 and up uses this method, which accesses the
47+
# LogTemplate model to record the log ID template used. If this function does
48+
# not exist, the task handler should use the log_id_template attribute instead.
49+
USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
50+
4351

4452
class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):
4553
"""
@@ -65,8 +73,6 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
6573
def __init__(
6674
self,
6775
base_log_folder: str,
68-
filename_template: str,
69-
log_id_template: str,
7076
end_of_log_mark: str,
7177
write_stdout: bool,
7278
json_format: bool,
@@ -76,6 +82,9 @@ def __init__(
7682
host: str = "localhost:9200",
7783
frontend: str = "localhost:5601",
7884
es_kwargs: Optional[dict] = conf.getsection("elasticsearch_configs"),
85+
*,
86+
filename_template: Optional[str] = None,
87+
log_id_template: Optional[str] = None,
7988
):
8089
"""
8190
:param base_log_folder: base folder to store logs locally
@@ -88,7 +97,13 @@ def __init__(
8897

8998
self.client = elasticsearch.Elasticsearch([host], **es_kwargs) # type: ignore[attr-defined]
9099

91-
self.log_id_template = log_id_template
100+
if USE_PER_RUN_LOG_ID and log_id_template is not None:
101+
warnings.warn(
102+
"Passing log_id_template to ElasticsearchTaskHandler is deprecated and has no effect",
103+
DeprecationWarning,
104+
)
105+
106+
self.log_id_template = log_id_template # Only used on Airflow < 2.3.2.
92107
self.frontend = frontend
93108
self.mark_end_on_close = True
94109
self.end_of_log_mark = end_of_log_mark
@@ -103,7 +118,13 @@ def __init__(
103118
self.handler: Union[logging.FileHandler, logging.StreamHandler] # type: ignore[assignment]
104119

105120
def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
106-
dag_run = ti.get_dagrun()
121+
with create_session() as session:
122+
dag_run = ti.get_dagrun(session=session)
123+
if USE_PER_RUN_LOG_ID:
124+
log_id_template = dag_run.get_log_template(session=session).elasticsearch_id
125+
else:
126+
log_id_template = self.log_id_template
127+
107128
dag = ti.task.dag
108129
assert dag is not None # For Mypy.
109130
try:
@@ -126,7 +147,7 @@ def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
126147
data_interval_end = ""
127148
execution_date = dag_run.execution_date.isoformat()
128149

129-
return self.log_id_template.format(
150+
return log_id_template.format(
130151
dag_id=ti.dag_id,
131152
task_id=ti.task_id,
132153
run_id=getattr(ti, "run_id", ""),

β€Žairflow/providers/google/cloud/log/gcs_task_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def __init__(
6767
*,
6868
base_log_folder: str,
6969
gcs_log_folder: str,
70-
filename_template: str,
70+
filename_template: Optional[str] = None,
7171
gcp_key_path: Optional[str] = None,
7272
gcp_keyfile_dict: Optional[dict] = None,
7373
gcp_scopes: Optional[Collection[str]] = _DEFAULT_SCOPESS,

β€Žairflow/providers/microsoft/azure/log/wasb_task_handler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ def __init__(
4444
base_log_folder: str,
4545
wasb_log_folder: str,
4646
wasb_container: str,
47-
filename_template: str,
4847
delete_local_copy: str,
48+
*,
49+
filename_template: Optional[str] = None,
4950
) -> None:
5051
super().__init__(base_log_folder, filename_template)
5152
self.wasb_container = wasb_container

β€Žairflow/utils/log/file_task_handler.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"""File logging handler for tasks."""
1919
import logging
2020
import os
21+
import warnings
2122
from datetime import datetime
2223
from pathlib import Path
2324
from typing import TYPE_CHECKING, Optional, Tuple
@@ -28,6 +29,7 @@
2829
from airflow.utils.context import Context
2930
from airflow.utils.helpers import parse_template_string, render_template_to_string
3031
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
32+
from airflow.utils.session import create_session
3133

3234
if TYPE_CHECKING:
3335
from airflow.models import TaskInstance
@@ -44,11 +46,15 @@ class FileTaskHandler(logging.Handler):
4446
:param filename_template: template filename string
4547
"""
4648

47-
def __init__(self, base_log_folder: str, filename_template: str):
49+
def __init__(self, base_log_folder: str, filename_template: Optional[str] = None):
4850
super().__init__()
4951
self.handler: Optional[logging.FileHandler] = None
5052
self.local_base = base_log_folder
51-
self.filename_template, self.filename_jinja_template = parse_template_string(filename_template)
53+
if filename_template is not None:
54+
warnings.warn(
55+
"Passing filename_template to FileTaskHandler is deprecated and has no effect",
56+
DeprecationWarning,
57+
)
5258

5359
def set_context(self, ti: "TaskInstance"):
5460
"""
@@ -75,15 +81,19 @@ def close(self):
7581
self.handler.close()
7682

7783
def _render_filename(self, ti: "TaskInstance", try_number: int) -> str:
78-
if self.filename_jinja_template:
84+
with create_session() as session:
85+
dag_run = ti.get_dagrun(session=session)
86+
template = dag_run.get_log_template(session=session).filename
87+
str_tpl, jinja_tpl = parse_template_string(template)
88+
89+
if jinja_tpl:
7990
if hasattr(ti, "task"):
8091
context = ti.get_template_context()
8192
else:
82-
context = Context(ti=ti, ts=ti.get_dagrun().logical_date.isoformat())
93+
context = Context(ti=ti, ts=dag_run.logical_date.isoformat())
8394
context["try_number"] = try_number
84-
return render_template_to_string(self.filename_jinja_template, context)
85-
elif self.filename_template:
86-
dag_run = ti.get_dagrun()
95+
return render_template_to_string(jinja_tpl, context)
96+
elif str_tpl:
8797
dag = ti.task.dag
8898
assert dag is not None # For Mypy.
8999
try:
@@ -98,7 +108,7 @@ def _render_filename(self, ti: "TaskInstance", try_number: int) -> str:
98108
data_interval_end = data_interval[1].isoformat()
99109
else:
100110
data_interval_end = ""
101-
return self.filename_template.format(
111+
return str_tpl.format(
102112
dag_id=ti.dag_id,
103113
task_id=ti.task_id,
104114
run_id=ti.run_id,

0 commit comments

Comments
 (0)