Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NOMRG] DBG add more logs+skip non-failing tests #280

Closed
wants to merge 42 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1348a58
DBG add more logs+skip non-failing tests
tomMoral Jan 7, 2021
14ea3a7
CI trigger
tomMoral Jan 7, 2021
870289f
Only run test_crash_races for faster debugging
ogrisel Jan 7, 2021
0ed4c93
trigger
ogrisel Jan 7, 2021
b1d171a
DBG more logs
tomMoral Jan 7, 2021
49a2753
CI trigger
tomMoral Jan 7, 2021
4091240
run all the tests of the TestExecutorDeadLock class
ogrisel Jan 7, 2021
641dd0e
New trigger
ogrisel Jan 7, 2021
1dde47f
DBG more logs
tomMoral Jan 7, 2021
5ad9ca7
DBG more logs
tomMoral Jan 7, 2021
5a54cbb
try psutil==5.7.3
ogrisel Jan 7, 2021
4c3033d
Disable psutil in tests
ogrisel Jan 7, 2021
8de729d
Disable psutil-based memory leak detector
ogrisel Jan 7, 2021
423fa30
Did not mean to push this: psutil is required for the tests
ogrisel Jan 7, 2021
6355d54
log more debug info
ogrisel Jan 7, 2021
475d476
Trigger CI
ogrisel Jan 7, 2021
fd09506
Check process exitcode
tomMoral Jan 7, 2021
3f2f2aa
More debug info
ogrisel Jan 7, 2021
78ffccf
DBG exit code for all processes
tomMoral Jan 7, 2021
e87bbcf
DBG add info in debug log
tomMoral Jan 7, 2021
706a81c
Trigger
ogrisel Jan 7, 2021
b7de791
Log terminate process reason
ogrisel Jan 7, 2021
8663330
DBG more debug logs
tomMoral Jan 7, 2021
9a9843b
DBG Cleaner debug message
tomMoral Jan 7, 2021
e2c26a9
Try to use Python 3.8.5
ogrisel Jan 7, 2021
977c2eb
revert
ogrisel Jan 7, 2021
d1fbe26
Try windows + Python 3.9
ogrisel Jan 7, 2021
e9e5d1c
trigger ci
ogrisel Jan 7, 2021
8cd9982
trigger ci
ogrisel Jan 7, 2021
0f1b553
trigger ci
ogrisel Jan 7, 2021
3427002
DBG monkey patch Queue.get
tomMoral Jan 7, 2021
8f91e21
DBG add logs in the right branch
tomMoral Jan 7, 2021
b840e91
DBG confirm fault in unpickle
tomMoral Jan 7, 2021
407fbc0
just to be sure
ogrisel Jan 7, 2021
11c8fa5
pickletools.dis
ogrisel Jan 7, 2021
8fee8f5
pickletools.dis out=buffer
ogrisel Jan 7, 2021
d5a687e
Try to use _pickle.loads (Python implementation)
ogrisel Jan 7, 2021
265f29a
explicit trigger of test function before unpickling
ogrisel Jan 7, 2021
d70febd
CLN cosmit
tomMoral Jan 7, 2021
d46ce1c
DBG run multiple time tox
tomMoral Jan 7, 2021
d0acd11
Try to lazy load lib with ctypes to decouple tests as much as possible
ogrisel Jan 7, 2021
cae115a
indent error
ogrisel Jan 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 61 additions & 57 deletions .azure_pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,75 +3,79 @@ jobs:
strategy:
matrix:

windows-py39:
imageName: "vs2017-win2016"
python.version: "3.9"
tox.env: py39
windows-py38:
imageName: "vs2017-win2016"
python.version: "3.8"
tox.env: py38
windows-py35:
imageName: "vs2017-win2016"
python.version: "3.5"
tox.env: py35
windows-py27:
imageName: "vs2017-win2016"
python.version: "2.7"
tox.env: py27
# windows-py35:
# imageName: "vs2017-win2016"
# python.version: "3.5"
# tox.env: py35
# windows-py27:
# imageName: "vs2017-win2016"
# python.version: "2.7"
# tox.env: py27

macos-py38:
imageName: "macos-10.14"
python.version: "3.8"
tox.env: py38
macos-py35:
imageName: "macos-10.14"
python.version: "3.5"
tox.env: py35
macos-py27-high-memory:
imageName: "macos-10.14"
python.version: "2.7"
tox.env: py27
RUN_MEMORY: "true"
# macos-py38:
# imageName: "macos-10.14"
# python.version: "3.8"
# tox.env: py38
# macos-py35:
# imageName: "macos-10.14"
# python.version: "3.5"
# tox.env: py35
# macos-py27-high-memory:
# imageName: "macos-10.14"
# python.version: "2.7"
# tox.env: py27
# RUN_MEMORY: "true"

linux-pypy3:
imageName: "ubuntu-16.04"
python.version: "pypy3"
tox.env: pypy3
LOKY_MAX_CPU_COUNT: "2"
# linux-pypy3:
# imageName: "ubuntu-16.04"
# python.version: "pypy3"
# tox.env: pypy3
# LOKY_MAX_CPU_COUNT: "2"

linux-python-nightly:
imageName: "ubuntu-16.04"
python.version: "3.9"
tox.env: py39
linux-py38-joblib-tests:
imageName: "ubuntu-16.04"
python.version: "3.8"
tox.env: "py38"
joblib.tests: "true"
linux-py38-high-memory:
imageName: "ubuntu-16.04"
python.version: "3.8"
tox.env: py38
RUN_MEMORY: "true"
linux-py37:
imageName: "ubuntu-16.04"
python.version: "3.7"
tox.env: py37
linux-py36:
imageName: "ubuntu-16.04"
python.version: "3.6"
tox.env: py36
linux-py35:
imageName: "ubuntu-16.04"
python.version: "3.5"
tox.env: py35
linux-py27:
imageName: "ubuntu-16.04"
python.version: "2.7"
tox.env: py27
# linux-python-nightly:
# imageName: "ubuntu-16.04"
# python.version: "3.9"
# tox.env: py39
# linux-py38-joblib-tests:
# imageName: "ubuntu-16.04"
# python.version: "3.8"
# tox.env: "py38"
# joblib.tests: "true"
# linux-py38-high-memory:
# imageName: "ubuntu-16.04"
# python.version: "3.8"
# tox.env: py38
# RUN_MEMORY: "true"
# linux-py37:
# imageName: "ubuntu-16.04"
# python.version: "3.7"
# tox.env: py37
# linux-py36:
# imageName: "ubuntu-16.04"
# python.version: "3.6"
# tox.env: py36
# linux-py35:
# imageName: "ubuntu-16.04"
# python.version: "3.5"
# tox.env: py35
# linux-py27:
# imageName: "ubuntu-16.04"
# python.version: "2.7"
# tox.env: py27

pool:
vmImage: $(imageName)
variables:
JUNITXML: 'test-data.xml'
PYTEST_ARGS: '-vl --timeout=60 --maxfail=5'
PYTEST_ARGS: '-vl --timeout=60 --maxfail=5 tests/test_reusable_executor.py::TestExecutorDeadLock'
steps:
- task: UsePythonVersion@0
inputs:
Expand Down
7 changes: 6 additions & 1 deletion continuous_integration/runtests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,10 @@ else
if [ "$RUN_MEMORY" != "true" ]; then
PYTEST_ARGS="$PYTEST_ARGS --skip-high-memory"
fi
tox -v -e "${TOX_ENV}" -- ${PYTEST_ARGS} --junitxml="${JUNITXML}"
for i in {1..100}; do
echo -e "\n\nRunning $i\n\n"
tox -v -e "${TOX_ENV}" -- ${PYTEST_ARGS} --junitxml="${JUNITXML}"
res=$?
test $res -ne 0 && exit $res
done
fi
58 changes: 57 additions & 1 deletion loky/backend/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,26 @@
# pickling process. (overload Queue._feed/SimpleQueue.put)
#
import os
import pickle
import sys
import time
import errno
import weakref
import threading

from multiprocessing import util
from multiprocessing import connection
from multiprocessing.queues import Full, Empty
from multiprocessing.synchronize import SEM_VALUE_MAX
from multiprocessing.queues import Full
from multiprocessing.queues import _sentinel, Queue as mp_Queue
from multiprocessing.queues import SimpleQueue as mp_SimpleQueue
from multiprocessing import context

from .reduction import loads, dumps
from .context import assert_spawning, get_context

_ForkingPickler = context.reduction.ForkingPickler


__all__ = ['Queue', 'SimpleQueue', 'Full']

Expand Down Expand Up @@ -61,6 +66,57 @@ def __init__(self, maxsize=0, reducers=None, ctx=None):

self._reducers = reducers

def get(self, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if block and timeout is None:
util.debug('acquiring lock (no timeout)')
with self._rlock:
util.debug('lock acquired')
res = self._recv_bytes()
util.debug('receiving bytes')
self._sem.release()
util.debug('releasing sem')
else:
if block:
deadline = time.monotonic() + timeout
util.debug('acquiring lock')
if not self._rlock.acquire(block, timeout):
util.debug('timeout in get!')
raise Empty
util.debug('lock acquired')
try:
util.debug('calling poll')
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):
util.debug('empty poll')
raise Empty
elif not self._poll():
util.debug('empty poll')
raise Empty

util.debug('receiving bytes')
res = self._recv_bytes()
util.debug('releasing sem')
self._sem.release()
util.debug('get done')
finally:
util.debug('releasing lock')
self._rlock.release()

# util.debug("importing tests.test_reusable_executor")
# from tests.test_reusable_executor import sleep_then_check_pids_exist
# util.debug("imported tests.test_reusable_executor")

# unserialize the data after having released the lock
# import pickletools, io
# buffer = io.StringIO()
# pickletools.dis(res, out=buffer)
# util.debug("payload:\n" + buffer.getvalue())
util.debug('unpickling task')
return _ForkingPickler.loads(res)

# Use custom queue set/get state to be able to reduce the custom reducers
def __getstate__(self):
assert_spawning(self)
Expand Down
42 changes: 33 additions & 9 deletions loky/process_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,18 @@


try:
from psutil import Process
_USE_PSUTIL = True
# from psutil import Process
# _USE_PSUTIL = True
_USE_PSUTIL = False

def _get_memory_usage(pid, force_gc=False):
mp.util.debug(f'psutil used to check memory size (force_gc={force_gc}')
if force_gc:
gc.collect()

return Process(pid).memory_info().rss
mem_size = Process(pid).memory_info().rss
mp.util.debug(f'psutil return memory size: {mem_size}')
return mem_size

except ImportError:
_USE_PSUTIL = False
Expand Down Expand Up @@ -401,7 +405,9 @@ def _process_worker(call_queue, result_queue, initializer, initargs,
mp.util.debug('Worker started with timeout=%s' % timeout)
while True:
try:
mp.util.debug('Worker waiting for a task')
call_item = call_queue.get(block=True, timeout=timeout)
mp.util.debug('Worker got a task')
if call_item is None:
mp.util.info("Shutting down worker on sentinel")
except queue.Empty:
Expand All @@ -414,27 +420,36 @@ def _process_worker(call_queue, result_queue, initializer, initargs,
mp.util.info("Could not acquire processes_management_lock")
continue
except BaseException:
mp.util.debug('Exception getting a task')
previous_tb = traceback.format_exc()
mp.util.debug(f'Exception getting a task:\n{previous_tb}')
try:
result_queue.put(_RemoteTraceback(previous_tb))
except BaseException:
# If we cannot format correctly the exception, at least print
# the traceback.
print(previous_tb)
mp.util.debug('Exiting with code 1')
sys.exit(1)
if call_item is None:
mp.util.debug('Received a sentinel - exiting')
# Notify queue management thread about clean worker shutdown
result_queue.put(pid)
with worker_exit_lock:
mp.util.debug('Exited cleanly')
return
try:
r = call_item()
except BaseException as e:
mp.util.debug('Exception calling a task')
exc = _ExceptionWithTraceback(e)
result_queue.put(_ResultItem(call_item.work_id, exception=exc))
mp.util.debug('Exception sent back to main process')
else:
mp.util.debug('Sending back result.')
_sendback_result(result_queue, call_item.work_id, result=r)
del r
mp.util.debug('Result has been sent back.')

# Free the resource as soon as possible, to avoid holding onto
# open files or shared memory that is not needed anymore
Expand Down Expand Up @@ -467,6 +482,7 @@ def _process_worker(call_queue, result_queue, initializer, initargs,
mp.util.info("Memory leak detected: shutting down worker")
result_queue.put(pid)
with worker_exit_lock:
mp.util.debug('Exited due to memory leak')
return
else:
# if psutil is not installed, trigger gc.collect events
Expand Down Expand Up @@ -645,14 +661,20 @@ def wait_result_broken_or_wakeup(self):
# unstable, therefore they are not appended in the exception
# message.
exit_codes = "\nThe exit codes of the workers are {}".format(
get_exitcodes_terminated_worker(self.processes))
get_exitcodes_terminated_worker(self.processes)
)
bpe = TerminatedWorkerError(
"A worker process managed by the executor was unexpectedly "
"terminated. This could be caused by a segmentation fault "
"while calling the function or by an excessive memory usage "
"causing the Operating System to kill the worker.\n"
"{}".format(exit_codes)
)
mp.util.debug('A worker unexpectedly terminated. Workers that '
'might have caused the breakage:'
+ str([(p.name, p.exitcode)
for p in list(self.processes.values())
if p is not None and p.sentinel in ready]))

self.thread_wakeup.clear()

Expand Down Expand Up @@ -733,7 +755,7 @@ def terminate_broken(self, bpe):

# Terminate remaining workers forcibly: the queues or their
# locks may be in a dirty state and block forever.
self.kill_workers()
self.kill_workers(reason="broken executor")

# clean up resources
self.join_executor_internals()
Expand All @@ -753,15 +775,16 @@ def flag_executor_shutting_down(self):
del work_item

# Kill the remaining worker forcibly to no waste time joining them
self.kill_workers()
self.kill_workers(reason="executor shutting down")

def kill_workers(self):
def kill_workers(self, reason=""):
# Terminate the remaining workers using SIGKILL. This function also
# terminates descendant workers of the children in case there is some
# nested parallelism.
while self.processes:
_, p = self.processes.popitem()
mp.util.debug('terminate process {}'.format(p.name))
mp.util.debug('terminate process {}, reason: {}'
.format(p.name, reason))
try:
recursive_terminate(p)
except ProcessLookupError: # pragma: no cover
Expand Down Expand Up @@ -1152,7 +1175,8 @@ def map(self, fn, *iterables, **kwargs):

results = super(ProcessPoolExecutor, self).map(
partial(_process_chunk, fn), _get_chunks(chunksize, *iterables),
timeout=timeout)
timeout=timeout
)
return _chain_from_iterable_of_lists(results)

def shutdown(self, wait=True, kill_workers=False):
Expand Down
Loading