Skip to content

Commit

Permalink
Fix deadlock if enqueued sink in use during fork (#231)
Browse files Browse the repository at this point in the history
  • Loading branch information
Delgan committed May 2, 2020
1 parent be597bd commit 8c42511
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 17 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
- Add a new ``onerror`` optional argument to ``logger.catch()``, it should be a function which will be called when an exception occurs in order to customize error handling (`#224 <https://github.com/Delgan/loguru/issues/224>`_).
- Add a new ``exclude`` optional argument to ``logger.catch()``, is should be a type of exception to be purposefully ignored and propagated to the caller without being logged (`#248 <https://github.com/Delgan/loguru/issues/248>`_).
- Modify ``complete()`` to make it callable from non-asynchronous functions, it can thus be used if ``enqueue=True`` to make sure all messages have been processed (`#231 <https://github.com/Delgan/loguru/issues/231>`_).
- Fix possible deadlock while mixing ``threading`` with ``multiprocessing`` on Linux (`#231 <https://github.com/Delgan/loguru/issues/231>`_).
- Fix the filter function listing files for ``retention`` being too restrictive, it now matches files based on the pattern ``"basename(.*).ext(.*)"`` (`#229 <https://github.com/Delgan/loguru/issues/229>`_).
- Fix possible deadlocks on Linux when ``multiprocessing.Process()`` collides with ``enqueue=True`` or ``threading`` (`#231 <https://github.com/Delgan/loguru/issues/231>`_).
- Fix the filter function listing files for ``retention`` being too restrictive, it now matches files based on the pattern ``"basename(.*).ext(.*)"`` (`#229 <https://github.com/Delgan/loguru/issues/229>`_).
- Fix the impossibility to ``remove()`` a handler if an exception is raised while the sink' ``stop()`` function is called (`#237 <https://github.com/Delgan/loguru/issues/237>`_).
- Fix file sink left in an unstable state if an exception occurred during ``retention`` or ``compression`` process (`#238 <https://github.com/Delgan/loguru/issues/238>`_).
- Fix situation where changes made to ``record["message"]`` were unexpectedly ignored when ``opt(colors=True)``, causing "out-of-date" ``message`` to be logged due to implementation details (`#221 <https://github.com/Delgan/loguru/issues/221>`_).
Expand Down
31 changes: 19 additions & 12 deletions loguru/_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ def __init__(
self._precolorized_formats = {}
self._memoize_dynamic_format = None

self._stopped = False
self._lock = create_handler_lock()
self._queue = None
self._confirmation_event = None
self._confirmation_lock = None
self._thread = None
self._stopped = False
self._owner_process = None
self._thread = None

if self._is_formatter_dynamic:
if self._colorize:
Expand All @@ -82,10 +82,10 @@ def __init__(
self._decolorized_format = self._formatter.strip()

if self._enqueue:
self._owner_process = multiprocessing.current_process()
self._queue = multiprocessing.SimpleQueue()
self._confirmation_event = multiprocessing.Event()
self._confirmation_lock = multiprocessing.Lock()
self._owner_process = multiprocessing.current_process()
self._thread = Thread(
target=self._queued_writer, daemon=True, name="loguru-writer-%d" % self._id
)
Expand Down Expand Up @@ -255,13 +255,19 @@ def _serialize_record(text, record):
def _queued_writer(self):
message = None
queue = self._queue

# We need to use a lock to protect sink during fork.
# Particularly, writing to stderr may lead to deadlock in child process.
lock = create_handler_lock()

while True:
try:
message = queue.get()
except Exception:
if not self._error_interceptor.should_catch():
raise
self._error_interceptor.print(None)
with lock:
if not self._error_interceptor.should_catch():
raise
self._error_interceptor.print(None)
continue

if message is None:
Expand All @@ -271,12 +277,13 @@ def _queued_writer(self):
self._confirmation_event.set()
continue

try:
self._sink.write(message)
except Exception:
if not self._error_interceptor.should_catch():
raise
self._error_interceptor.print(message.record)
with lock:
try:
self._sink.write(message)
except Exception:
if not self._error_interceptor.should_catch():
raise
self._error_interceptor.print(message.record)

def __getstate__(self):
state = self.__dict__.copy()
Expand Down
29 changes: 26 additions & 3 deletions tests/test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ def test_not_picklable_sinks_inheritance(capsys, tmpdir):
@pytest.mark.skipif(sys.version_info < (3, 7), reason="No 'os.register_at_fork()' function")
@pytest.mark.parametrize("enqueue", [True, False])
@pytest.mark.parametrize("deepcopied", [True, False])
def test_no_deadlock_if_lock_in_use(tmpdir, enqueue, deepcopied):
def test_no_deadlock_if_internal_lock_in_use(tmpdir, enqueue, deepcopied):
if deepcopied:
logger_ = copy.deepcopy(logger)
else:
Expand Down Expand Up @@ -585,20 +585,43 @@ def worker():
assert output.read() in ("Main\nChild\n", "Child\nMain\n")


@pytest.mark.skipif(sys.version_info < (3, 7), reason="No 'os.register_at_fork()' function")
@pytest.mark.skipif(os.name == "nt", reason="Windows does not support forking")
@pytest.mark.parametrize("enqueue", [True, False])
def test_no_deadlock_if_external_lock_in_use(enqueue, capsys):
# Can't reproduce the bug on pytest (even if stderr is not wrapped), but let it anyway
logger.add(sys.stderr, enqueue=enqueue, catch=True, format="{message}")
num = 100

for i in range(num):
logger.info("This is a message: {}", i)
process = multiprocessing.Process(target=lambda: None)
process.start()
process.join(1)
assert process.exitcode == 0

logger.remove()

out, err = capsys.readouterr()
assert out == ""
assert err == "".join("This is a message: %d\n" % i for i in range(num))


@pytest.mark.skipif(os.name == "nt", reason="Windows does not support forking")
@pytest.mark.skipif(platform.python_implementation() == "PyPy", reason="PyPy is too slow")
def test_complete_from_multiple_child_processes(capsys):
logger.add(lambda _: None, enqueue=True, catch=False)
num = 100

barrier = multiprocessing.Barrier(100)
barrier = multiprocessing.Barrier(num)

def worker(barrier):
barrier.wait()
logger.complete()

processes = []

for _ in range(100):
for _ in range(num):
process = multiprocessing.Process(target=worker, args=(barrier,))
process.start()
processes.append(process)
Expand Down

0 comments on commit 8c42511

Please sign in to comment.