Skip to content

Commit

Permalink
Fix compression not parallelizable due to file renaming (#243)
Browse files Browse the repository at this point in the history
  • Loading branch information
Delgan committed May 5, 2020
1 parent 8c42511 commit 981edf0
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 60 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- Add a new ``exclude`` optional argument to ``logger.catch()``, is should be a type of exception to be purposefully ignored and propagated to the caller without being logged (`#248 <https://github.com/Delgan/loguru/issues/248>`_).
- Modify ``complete()`` to make it callable from non-asynchronous functions, it can thus be used if ``enqueue=True`` to make sure all messages have been processed (`#231 <https://github.com/Delgan/loguru/issues/231>`_).
- Fix possible deadlocks on Linux when ``multiprocessing.Process()`` collides with ``enqueue=True`` or ``threading`` (`#231 <https://github.com/Delgan/loguru/issues/231>`_).
- Fix ``compression`` function not executable concurrently due to file renaming (to resolve conflicts) being performed after and not before it (`#243 <https://github.com/Delgan/loguru/issues/243>`_).
- Fix the filter function listing files for ``retention`` being too restrictive, it now matches files based on the pattern ``"basename(.*).ext(.*)"`` (`#229 <https://github.com/Delgan/loguru/issues/229>`_).
- Fix the impossibility to ``remove()`` a handler if an exception is raised while the sink' ``stop()`` function is called (`#237 <https://github.com/Delgan/loguru/issues/237>`_).
- Fix file sink left in an unstable state if an exception occurred during ``retention`` or ``compression`` process (`#238 <https://github.com/Delgan/loguru/issues/238>`_).
Expand Down
111 changes: 61 additions & 50 deletions loguru/_file_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
from ._datetime import aware_now, datetime


def generate_rename_path(root, ext):
path_to_rename = root + ext
creation_time = get_ctime(path_to_rename)
def generate_rename_path(root, ext, creation_time):
creation_datetime = datetime.fromtimestamp(creation_time)
date = FileDateFormatter(creation_datetime)

Expand Down Expand Up @@ -59,9 +57,11 @@ def copy_compress(path_in, path_out, opener, **kwargs):
@staticmethod
def compression(path_in, ext, compress_function):
path_out = "{}{}".format(path_in, ext)
if os.path.isfile(path_out):

if os.path.exists(path_out):
creation_time = get_ctime(path_out)
root, ext_before = os.path.splitext(path_in)
renamed_path = generate_rename_path(root, ext_before + ext)
renamed_path = generate_rename_path(root, ext_before + ext, creation_time)
os.rename(path_out, renamed_path)
compress_function(path_in, path_out)
os.remove(path_in)
Expand Down Expand Up @@ -160,38 +160,76 @@ def __init__(
self._retention_function = self._make_retention_function(retention)
self._compression_function = self._make_compression_function(compression)

self._initialized = False
self._file = None
self._file_path = None

if not delay:
self._initialize_file(rename_existing=False)
self._initialize_file()

def write(self, message):
if self._file is None:
self._initialize_file(rename_existing=self._initialized)
self._initialize_file()

if self._rotation_function is not None and self._rotation_function(message, self._file):
self._terminate(teardown=True)
self._initialize_file(rename_existing=True)
set_ctime(self._file_path, datetime.now().timestamp())
self._terminate_file(is_rotating=True)

self._file.write(message)

def _initialize_file(self, *, rename_existing):
new_path = self._path.format_map({"time": FileDateFormatter()})
new_path = os.path.abspath(new_path)
new_dir = os.path.dirname(new_path)
os.makedirs(new_dir, exist_ok=True)
def _prepare_new_path(self):
path = self._path.format_map({"time": FileDateFormatter()})
path = os.path.abspath(path)
dirname = os.path.dirname(path)
os.makedirs(dirname, exist_ok=True)
return path

def _initialize_file(self):
path = self._prepare_new_path()
self._file = open(path, **self._kwargs)
self._file_path = path

def _terminate_file(self, *, is_rotating=False):
old_path = self._file_path

if self._file is not None:
self._file.close()
self._file = None
self._file_path = None

if is_rotating:
new_path = self._prepare_new_path()

if rename_existing and os.path.isfile(new_path):
root, ext = os.path.splitext(new_path)
renamed_path = generate_rename_path(root, ext)
os.rename(new_path, renamed_path)
if new_path == old_path:
creation_time = get_ctime(old_path)
root, ext = os.path.splitext(old_path)
renamed_path = generate_rename_path(root, ext, creation_time)
os.rename(old_path, renamed_path)
old_path = renamed_path

if is_rotating or self._rotation_function is None:
if self._compression_function is not None and old_path is not None:
self._compression_function(old_path)

if self._retention_function is not None:
logs = {
file
for pattern in self._glob_patterns
for file in glob.glob(pattern)
if os.path.isfile(file)
}
self._retention_function(list(logs))

if is_rotating:
file = open(new_path, **self._kwargs)
set_ctime(new_path, datetime.now().timestamp())

self._file_path = new_path
self._file = file

def stop(self):
self._terminate_file(is_rotating=False)

self._file_path = new_path
self._file = open(new_path, **self._kwargs)
self._initialized = True
async def complete(self):
pass

@staticmethod
def _make_glob_patterns(path):
Expand Down Expand Up @@ -332,30 +370,3 @@ def _make_compression_function(compression):
raise TypeError(
"Cannot infer compression for objects of type: '%s'" % type(compression).__name__
)

def stop(self):
self._terminate(teardown=self._rotation_function is None)

def _terminate(self, *, teardown):
filepath = self._file_path

if self._file is not None:
self._file.close()
self._file = None
self._file_path = None

if teardown:
if self._compression_function is not None and filepath is not None:
self._compression_function(filepath)

if self._retention_function is not None:
logs = {
file
for pattern in self._glob_patterns
for file in glob.glob(pattern)
if os.path.isfile(file)
}
self._retention_function(list(logs))

async def complete(self):
pass
8 changes: 4 additions & 4 deletions loguru/_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,8 @@ def add(
each log record will search for it's closest parent in the ``dict`` and use the associated
level as the filter. The ``dict`` values can be ``int`` severity, ``str`` level name or
``True`` and ``False`` to respectively authorize and discard all module logs
unconditionally. In order to set a default level, the `""` module name should be used as it
is the parent of all modules.
unconditionally. In order to set a default level, the ``""`` module name should be used as
it is the parent of all modules (it does not suppress global ``level`` threshold, though).
.. _levels:
Expand Down Expand Up @@ -1391,7 +1391,7 @@ def patch(self, patcher):
once before sending the log message to the different handlers.
It is recommended to apply modification on the ``record["extra"]`` dict rather than on the
``record`` dict itself, as some values are used internally by Loguru, and modify them may
``record`` dict itself, as some values are used internally by `Loguru`, and modify them may
produce unexpected results.
Parameters
Expand Down Expand Up @@ -1605,7 +1605,7 @@ def configure(self, *, handlers=None, levels=None, extra=None, patcher=None, act
``None``, this will replace previously configured ``patcher`` function.
activation : |list| of |tuple|, optional
A list of ``(name, state)`` tuples which denotes which loggers should be enabled (if
`state` is ``True``) or disabled (if `state` is ``False``). The calls to |enable|
``state`` is ``True``) or disabled (if ``state`` is ``False``). The calls to |enable|
and |disable| are made accordingly to the list order. This will not modify previously
activated loggers, so if you need a fresh start prepend your list with ``("", False)``
or ``("", True)``.
Expand Down
70 changes: 65 additions & 5 deletions tests/test_filesink_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import loguru
from loguru import logger
import datetime
import threading
import time


@pytest.mark.parametrize(
Expand All @@ -30,12 +32,19 @@ def compress(file):

@pytest.mark.parametrize("mode", ["a", "a+", "w", "x"])
def test_compression_at_rotation(tmpdir, mode):
i = logger.add(str(tmpdir.join("file.log")), rotation=0, compression="gz", mode=mode)
logger.debug("test")
logger.add(
str(tmpdir.join("file.log")), format="{message}", rotation=0, compression="gz", mode=mode
)
logger.debug("After compression")

assert len(tmpdir.listdir()) == 2
assert tmpdir.join("file.log").check(exists=1)
assert tmpdir.join("file.log.gz").check(exists=1)
files = sorted(tmpdir.listdir())

assert len(files) == 2
assert files[0].fnmatch(
"file.{0}-{1}-{1}_{1}-{1}-{1}_{2}.log.gz".format("[0-9]" * 4, "[0-9]" * 2, "[0-9]" * 6)
)
assert files[1].basename == "file.log"
assert files[1].read() == "After compression\n"


@pytest.mark.parametrize("mode", ["a", "a+", "w", "x"])
Expand Down Expand Up @@ -122,6 +131,57 @@ def test_renaming_compression_dest_exists_with_time(monkeypatch, monkeypatch_dat
).check(exists=1)


def test_compression_use_renamed_file_after_rotation(tmpdir):
compressed_file = None

def compression(filepath):
nonlocal compressed_file
compressed_file = filepath

def rotation(message, _):
return message.record["extra"].get("rotate", False)

filepath = tmpdir.join("test.log")
logger.add(str(filepath), format="{message}", compression=compression, rotation=rotation)

logger.info("Before")
logger.bind(rotate=True).info("Rotation")
logger.info("After")

assert compressed_file != str(filepath)
assert open(compressed_file, "r").read() == "Before\n"
assert filepath.read() == "Rotation\nAfter\n"


def test_threaded_compression_after_rotation(tmpdir):
thread = None

def rename(filepath):
time.sleep(1)
os.rename(filepath, str(tmpdir.join("test.log.mv")))

def compression(filepath):
nonlocal thread
thread = threading.Thread(target=rename, args=(filepath,))
thread.start()

def rotation(message, _):
return message.record["extra"].get("rotate", False)

logger.add(
str(tmpdir.join("test.log")), format="{message}", compression=compression, rotation=rotation
)

logger.info("Before")
logger.bind(rotate=True).info("Rotation")
logger.info("After")

thread.join()

assert tmpdir.join("test.log").read() == "Rotation\nAfter\n"
assert tmpdir.join("test.log.mv").read() == "Before\n"


@pytest.mark.parametrize("delay", [True, False])
def test_exception_during_compression_at_rotation(tmpdir, capsys, delay):
raising = True
Expand Down
2 changes: 1 addition & 1 deletion tests/test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ def worker(barrier):
processes.append(process)

for process in processes:
process.join(2)
process.join(5)
assert process.exitcode == 0

out, err = capsys.readouterr()
Expand Down

0 comments on commit 981edf0

Please sign in to comment.