Skip to content

Commit

Permalink
Allow passing a multiprocessing context (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfeurer authored Nov 20, 2020
1 parent 6de9614 commit b762e57
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 16 deletions.
31 changes: 30 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
=====
Usage
=====

The pynisher is a little module intended to limit a functions resources.
It starts a new process, sets the desired limits, and executes the
function inside it. In the end, it returns the function return value.
Expand Down Expand Up @@ -32,7 +36,7 @@ The full list of argments to enforce_limits reads:
mem_in_mb=None, cpu_time_in_s=None,
wall_time_in_s=None, num_processes=None,
grace_period_in_s=None, logger=None,
capture_output=False)
capture_output=False, context=None)
The first four are actual constraints on the memory, the CPU time, the wall time, and the
number of subprocesses of the function. All values should be integers or None, which means
Expand Down Expand Up @@ -89,4 +93,29 @@ a AnythingException is returned where a Cpu-/TimeoutException would be appropria
is the exitcode returned by the subprocess, see `multiprocessing.Process.exitcode <https://docs
.python.org/3/library/multiprocessing.html#multiprocessing.Process.exitcode>`_

Finally, see `Pynisher and Multithreading`_ for the use of the ``context`` argument.

=====
Other
=====

Pynisher and Multithreading
===========================

When the Pynisher is used together with the Python Threading library, it is possible to run into
a deadlock when using the standard ``fork`` method to start new processes as described in

* https://github.com/Delgan/loguru/issues/231
* https://gist.github.com/mfm24/e62ec5d50c672524107ca00a391e6104
* https://github.com/dask/dask/issues/3759

One way of solving this would be to change the forking behavior as described
`here <https://github.com/google/python-atfork/blob/main/atfork/stdlib_fixer.py>`_, but this is
also makes very strong assumptions on how the code is executed. An alternative is passing a
`Context <https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods>`_
which uses either ``spawn`` or ``forkserver`` as the process startup method.

Project origin
==============

This repository is based on Stefan Falkner's https://github.com/sfalkner/pynisher.
39 changes: 25 additions & 14 deletions pynisher/limit_function_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,19 @@ def handler(signum, frame):

class enforce_limits(object):
def __init__(self, mem_in_mb=None, cpu_time_in_s=None, wall_time_in_s=None, num_processes=None,
grace_period_in_s=None, logger=None, capture_output=False):
grace_period_in_s=None, logger=None, capture_output=False, context=None):

if context is None:
self.context = multiprocessing.get_context()
else:
self.context = multiprocessing

self.mem_in_mb = mem_in_mb
self.cpu_time_in_s = cpu_time_in_s
self.num_processes = num_processes
self.wall_time_in_s = wall_time_in_s
self.grace_period_in_s = 0 if grace_period_in_s is None else grace_period_in_s
self.logger = logger if logger is not None else multiprocessing.get_logger()
self.logger = logger if logger is not None else self.context.get_logger()
self.capture_output = capture_output

if self.mem_in_mb is not None:
Expand Down Expand Up @@ -207,7 +213,7 @@ def __call__(self2, *args, **kwargs):
self2._reset_attributes()

# create a pipe to retrieve the return value
parent_conn, child_conn = multiprocessing.Pipe(False)
parent_conn, child_conn = self.context.Pipe(False)
# import pdb; pdb.set_trace()

if self.capture_output:
Expand All @@ -218,17 +224,22 @@ def __call__(self2, *args, **kwargs):
tmp_dir_name = None

# create and start the process
subproc = multiprocessing.Process(target=subprocess_func, name="pynisher function call", args=(
self2.func,
child_conn,
self.logger,
self.mem_in_mb,
self.cpu_time_in_s,
self.wall_time_in_s,
self.num_processes,
self.grace_period_in_s,
tmp_dir_name) + args,
kwargs=kwargs)
subproc = self.context.Process(
target=subprocess_func,
name="pynisher function call",
args=(
self2.func,
child_conn,
self.logger,
self.mem_in_mb,
self.cpu_time_in_s,
self.wall_time_in_s,
self.num_processes,
self.grace_period_in_s,
tmp_dir_name
) + args,
kwargs=kwargs,
)
self.logger.debug("Function called with argument: {}, {}".format(args, kwargs))

# start the process
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='pynisher',
version="0.6.1",
version="0.6.2",
packages=['pynisher'],
install_requires=['docutils>=0.3', 'setuptools', 'psutil'],
author="Stefan Falkner, Christina Hernandez-Wunsch, Samuel Mueller and Matthias Feurer",
Expand Down

0 comments on commit b762e57

Please sign in to comment.