Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Python3-only reusable thread executor #214

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6a0e252
ExecutorTest -> {ProcessExecutorTest, ExecutorTest}
pierreglaser Jul 22, 2019
608a58b
ExecutorShutdownTest ->{Thread,Process, }ExecutorShutdownTest
pierreglaser Jul 22, 2019
ea376b1
add ThreadWaitTest skeleton
pierreglaser Jul 22, 2019
c9095d2
TST backport ThreadPoolExecutorShutdownTest from cpython
pierreglaser Jul 22, 2019
d518bc2
TST mention differences between loky and cpython tests
pierreglaser Jul 23, 2019
facf0b2
MNT _ReusablePoolExecutor -> _ReusableProcessPoolExecutor
pierreglaser Jul 23, 2019
2dbaf6f
ENH implement reusable thread executor
pierreglaser Jul 23, 2019
4510499
ExecutorMixin -> ProcessExecutorMixin
pierreglaser Jul 23, 2019
0887bcc
ExecutorMixin ->{Process, }ExecutorMixin
pierreglaser Jul 23, 2019
e486f4a
move 1 test (ExecutorShutdown->ProcessExecutorShutdown)
pierreglaser Jul 23, 2019
bc6335e
CLN style
pierreglaser Jul 23, 2019
ee32102
TST populate ThreadExecutorShutdownTest
pierreglaser Jul 23, 2019
7fb2ba2
FIX use correctly re.match in ThreadExecutorShutdownTest
pierreglaser Jul 23, 2019
4452597
make test teardown method backend-specific
pierreglaser Jul 23, 2019
fa2a98d
TST make the Shutdown test suite pass for concurrent.futures
pierreglaser Jul 23, 2019
48f3bfb
FIX pass thread_name_prefix across __init__
pierreglaser Jul 23, 2019
d3b1958
create a setup method for ThreadExecutorMixin
pierreglaser Jul 23, 2019
9f9f1c7
[ThreadShutdownTest] use setup methods to create the executor when po…
pierreglaser Jul 23, 2019
7fafb5a
TST make WaitTests work for threadpoolexecutor
pierreglaser Jul 24, 2019
472812e
add AsCompletedTests for ThreadPoolExecutor
pierreglaser Jul 24, 2019
254dd55
MNT set worker_count to None for ThreadTests
pierreglaser Jul 24, 2019
82bd00c
TST make TestsThreadExecutor work
pierreglaser Jul 24, 2019
515c751
make InitializerTests work for ReusableExecutors
pierreglaser Jul 24, 2019
c2b73b9
refactor TestGetReusableExecutor
pierreglaser Jul 24, 2019
727c098
refactor ResizeExecutorTests
pierreglaser Jul 24, 2019
9dfe9bd
refactor TerminateExecutorTest
pierreglaser Jul 24, 2019
e622d0a
CLN move thread tests into test_thread_executor
pierreglaser Jul 24, 2019
db0f7cf
MNT add new tests (not backported from CPython)
pierreglaser Jul 24, 2019
3d76110
FIX use correct get_reusable_executor methods for Tests
pierreglaser Jul 25, 2019
b5fae4e
MNT create Reusable{Thread,Process}ExecutorMixin
pierreglaser Jul 25, 2019
5773e5c
don't expose ThreadPoolExecutor additions by default
pierreglaser Jul 29, 2019
b24cc3d
MNT remove python2-3.4 entries from CI
pierreglaser Jul 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ matrix:
osx_image: xcode7.3
language: generic # https://github.com/travis-ci/travis-ci/issues/2312
env: TOXENV="py37" PYTHON="python3" RUN_MEMORY="true"
- python: 2.7
env: TOXENV="py27"
- python: 3.4
env: TOXENV="py34"
- python: 3.5
env: TOXENV="py35"
- python: "3.6"
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/appveyor/runtests.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Authors: Thomas Moreau
# License: 3-clause BSD

$VERSION=(36, 27)
$VERSION=(36)
$TOX_CMD = "python ./continuous_integration/appveyor/tox"

function TestPythonVersions () {
Expand Down
13 changes: 8 additions & 5 deletions loky/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
from .reusable_executor import get_reusable_executor
from .cloudpickle_wrapper import wrap_non_picklable_objects
from .process_executor import BrokenProcessPool, ProcessPoolExecutor
from .reusable_thread_executor import (ThreadPoolExecutor,
get_reusable_thread_executor)


__all__ = ["get_reusable_executor", "cpu_count", "wait", "as_completed",
"Future", "Executor", "ProcessPoolExecutor",
"BrokenProcessPool", "CancelledError", "TimeoutError",
"FIRST_COMPLETED", "FIRST_EXCEPTION", "ALL_COMPLETED",
"wrap_non_picklable_objects", "set_loky_pickler"]
__all__ = ["get_reusable_executor", "get_reusable_thread_executor",
"cpu_count", "wait", "as_completed", "Future", "Executor",
"ProcessPoolExecutor", "ThreadPoolExecutor" "BrokenProcessPool",
"CancelledError", "TimeoutError", "FIRST_COMPLETED",
"FIRST_EXCEPTION", "ALL_COMPLETED", "wrap_non_picklable_objects",
"set_loky_pickler"]


__version__ = '2.5.1'
10 changes: 5 additions & 5 deletions loky/reusable_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def get_reusable_executor(max_workers=None, context=None, timeout=10,
.format(max_workers))
executor_id = _get_next_executor_id()
_executor_kwargs = kwargs
_executor = executor = _ReusablePoolExecutor(
_executor = executor = _ReusableProcessPoolExecutor(
_executor_lock, max_workers=max_workers,
executor_id=executor_id, **kwargs)
else:
Expand Down Expand Up @@ -134,11 +134,11 @@ def get_reusable_executor(max_workers=None, context=None, timeout=10,
return executor


class _ReusablePoolExecutor(ProcessPoolExecutor):
class _ReusableProcessPoolExecutor(ProcessPoolExecutor):
def __init__(self, submit_resize_lock, max_workers=None, context=None,
timeout=None, executor_id=0, job_reducers=None,
result_reducers=None, initializer=None, initargs=()):
super(_ReusablePoolExecutor, self).__init__(
super(_ReusableProcessPoolExecutor, self).__init__(
max_workers=max_workers, context=context, timeout=timeout,
job_reducers=job_reducers, result_reducers=result_reducers,
initializer=initializer, initargs=initargs)
Expand All @@ -147,7 +147,7 @@ def __init__(self, submit_resize_lock, max_workers=None, context=None,

def submit(self, fn, *args, **kwargs):
with self._submit_resize_lock:
return super(_ReusablePoolExecutor, self).submit(
return super(_ReusableProcessPoolExecutor, self).submit(
fn, *args, **kwargs)

def _resize(self, max_workers):
Expand Down Expand Up @@ -201,5 +201,5 @@ def _setup_queues(self, job_reducers, result_reducers):
# As this executor can be resized, use a large queue size to avoid
# underestimating capacity and introducing overhead
queue_size = 2 * cpu_count() + EXTRA_QUEUED_CALLS
super(_ReusablePoolExecutor, self)._setup_queues(
super(_ReusableProcessPoolExecutor, self)._setup_queues(
job_reducers, result_reducers, queue_size=queue_size)
209 changes: 209 additions & 0 deletions loky/reusable_thread_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
import multiprocessing as mp
import threading
import weakref
from concurrent.futures import _base, thread
from concurrent.futures.thread import ThreadPoolExecutor

_next_thread_executor_id = 0
_thread_executor_kwargs = None
_thread_executor = None


def _get_next_thread_executor_id():
"""Ensure that each successive executor instance has a unique monotonic id.

The purpose of this monotonic id is to help debug and test automated
instance creation.
"""
global _next_thread_executor_id
thread_executor_id = _next_thread_executor_id
_next_thread_executor_id += 1
return thread_executor_id


def _worker(executor_reference, work_queue, *args):
if args:
initializer, initargs = args
try:
initializer(*initargs)
except BaseException:
_base.LOGGER.critical("Exception in initializer:", exc_info=True)
executor = executor_reference()
if executor is not None:
executor._initializer_failed()
return
try:
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
# Delete references to object. See issue16284
del work_item
continue
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if thread._shutdown or executor is None or executor._shutdown:
# Flag the executor as shutting down as early as possible if it
# is not gc-ed yet.
if executor is not None:
executor._shutdown = True
# Notice other workers
work_queue.put(None)
del executor
return
else:
# Leave the thread pool. This comes from a resize event.
return
except BaseException:
_base.LOGGER.critical("Exception in worker", exc_info=True)


class _ReusableThreadPoolExecutor(ThreadPoolExecutor):
def __init__(
self,
max_workers=None,
executor_id=0,
**executor_kwargs
):
# executor_kwargs can contain initializer arguments as well as
# thread_name_prefix
super(_ReusableThreadPoolExecutor, self).__init__(
max_workers=max_workers, **executor_kwargs
)
self.executor_id = executor_id

def _resize(self, max_workers):
if max_workers is None:
raise ValueError("Trying to resize with max_workers=None")
elif max_workers == self._max_workers:
return

nb_children_alive = sum(t.is_alive() for t in self._threads)

for _ in range(max_workers, nb_children_alive):
self._work_queue.put(None)

# The original ThreadPoolExecutor of concurrent.futures adds threads
# lazily during `executor.submit` calls. We stick to this behavior in
# the case where threads should be added to the pool (max_workers >
# nb_children_alive)
self._max_workers = max_workers

def _adjust_thread_count(self):
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)

# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
num_threads = len(self._threads)
if hasattr(self, "_initializer"):
thread_args = (
weakref.ref(self, weakref_cb),
self._work_queue,
self._initializer,
self._initargs,
)
else:
thread_args = (weakref.ref(self, weakref_cb), self._work_queue)

if num_threads < self._max_workers:
thread_name = "%s_%d" % (
getattr(self, "_thread_name_prefix", None) or self,
num_threads
)
# use our custom _worker function as a target, that can kill
# where workers can die without shutting down the executor
t = threading.Thread(
name=thread_name,
target=_worker,
args=thread_args
)
t.daemon = True
t.start()
self._threads.add(t)
thread._threads_queues[t] = self._work_queue


def get_reusable_thread_executor(
max_workers=None, reuse="auto", **executor_kwargs
):
"""Return the current _ReusableThreadPoolExectutor instance.

Start a new instance if it has not been started already or if the previous
instance was left in a broken state.

If the previous instance does not have the requested number of workers, the
executor is dynamically resized to adjust the number of workers prior to
returning.

Reusing a singleton instance spares the overhead of starting new worker
threads and re-executing initializer functions each time.

``max_workers`` controls the maximum number of tasks that can be running in
parallel in worker threads. By default this is set to the number of
5 times the number of CPUs on the host.

When provided, the ``initializer`` is run first in newly created
threads with argument ``initargs``.
"""
global _thread_executor, _thread_executor_kwargs
thread_executor = _thread_executor

if max_workers is None:
if reuse is True and thread_executor is not None:
max_workers = thread_executor._max_workers
else:
max_workers = (mp.cpu_count() or 1) * 5
elif max_workers <= 0:
raise ValueError(
"max_workers must be greater than 0, got {}.".format(max_workers)
)

this_executor_kwargs = executor_kwargs
if thread_executor is None:
mp.util.debug(
"Create a thread_executor with max_workers={}.".format(max_workers)
)
executor_id = _get_next_thread_executor_id()
_thread_executor_kwargs = this_executor_kwargs
_thread_executor = thread_executor = _ReusableThreadPoolExecutor(
max_workers=max_workers, executor_id=executor_id,
**this_executor_kwargs
)
else:
if reuse == "auto":
reuse = this_executor_kwargs == _thread_executor_kwargs
# _broken exists only for python >= 3.7
broken = getattr(thread_executor, "_broken", False)
if broken or thread_executor._shutdown or not reuse:
if broken:
reason = "broken"
elif thread_executor._shutdown:
reason = "shutdown"
else:
reason = "arguments have changed"
mp.util.debug(
"Creating a new thread_executor with max_workers={} as the "
"previous instance cannot be reused ({}).".format(
max_workers, reason
)
)
thread_executor.shutdown(wait=True)
_thread_executor = thread_executor = _thread_executor_kwargs = None
# Recursive call to build a new instance
return get_reusable_thread_executor(
max_workers=max_workers, **this_executor_kwargs
)
else:
mp.util.debug(
"Reusing existing thread_executor with "
"max_workers={}.".format(thread_executor._max_workers)
)
thread_executor._resize(max_workers)

return thread_executor
Loading