diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py index acdb2910da78f..64156765930b6 100644 --- a/airflow/utils/cli.py +++ b/airflow/utils/cli.py @@ -34,6 +34,7 @@ from airflow import settings from airflow.exceptions import AirflowException from airflow.utils import cli_action_loggers +from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler from airflow.utils.platform import getuser, is_terminal_support_colors from airflow.utils.session import provide_session @@ -244,7 +245,7 @@ def setup_locations(process, pid=None, stdout=None, stderr=None, log=None): def setup_logging(filename): """Creates log file handler for daemon process""" root = logging.getLogger() - handler = logging.FileHandler(filename) + handler = NonCachingFileHandler(filename) formatter = logging.Formatter(settings.SIMPLE_LOG_FORMAT) handler.setFormatter(formatter) root.addHandler(handler) diff --git a/airflow/utils/log/file_processor_handler.py b/airflow/utils/log/file_processor_handler.py index 7792faf48eed4..e1ce4918b7ea9 100644 --- a/airflow/utils/log/file_processor_handler.py +++ b/airflow/utils/log/file_processor_handler.py @@ -23,6 +23,7 @@ from airflow import settings from airflow.utils.helpers import parse_template_string +from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler class FileProcessorHandler(logging.Handler): @@ -54,7 +55,7 @@ def set_context(self, filename): :param filename: filename in which the dag is located """ local_loc = self._init_file(filename) - self.handler = logging.FileHandler(local_loc) + self.handler = NonCachingFileHandler(local_loc) self.handler.setFormatter(self.formatter) self.handler.setLevel(self.level) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 0a1d4334489f0..e4531035d3f15 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -26,6 +26,7 @@ from airflow.configuration import AirflowConfigException, conf from airflow.utils.helpers import parse_template_string +from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler if TYPE_CHECKING: from airflow.models import TaskInstance @@ -55,7 +56,7 @@ def set_context(self, ti: "TaskInstance"): :param ti: task instance object """ local_loc = self._init_file(ti) - self.handler = logging.FileHandler(local_loc, encoding='utf-8') + self.handler = NonCachingFileHandler(local_loc, encoding='utf-8') if self.formatter: self.handler.setFormatter(self.formatter) self.handler.setLevel(self.level) diff --git a/airflow/utils/log/non_caching_file_handler.py b/airflow/utils/log/non_caching_file_handler.py new file mode 100644 index 0000000000000..caf9f4fe3ccdb --- /dev/null +++ b/airflow/utils/log/non_caching_file_handler.py @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import os + + +class NonCachingFileHandler(logging.FileHandler): + """ + This is an extension of the python FileHandler that advises the Kernel to not cache the file + in PageCache when it is written. While there is nothing wrong with such cache (it will be cleaned + when memory is needed), it causes ever-growing memory usage when scheduler is running as it keeps + on writing new log files and the files are not rotated later on. This might lead to confusion + for our users, who are monitoring memory usage of Scheduler - without realising that it is + harmless and expected in this case. + + See https://github.com/apache/airflow/issues/14924 + + Adding the advice to Kernel might help with not generating the cache memory growth in the first place. + """ + + def _open(self): + wrapper = super()._open() + try: + fd = wrapper.fileno() + os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_DONTNEED) + except Exception: + # in case either file descriptor cannot be retrieved or fadvise is not available + # we should simply return the wrapper retrieved by FileHandler's open method + # the advise to the kernel is just an advise and if we cannot give it, we won't + pass + return wrapper