Skip to content

Commit

Permalink
Unblock event loop while waiting for ThreadpoolExecutor to shut down (#…
Browse files Browse the repository at this point in the history
…6091)

This reinstates #5883
which was reverted in #5961 / #5932

I could confirm the flakyness of `test_missing_data_errant_worker` after this change and am reasonably certain this is caused by #5910 which causes a closing worker to be restarted such that, even after `Worker.close` is done, the worker still appears to be partially up. 

The only reason I can see why this change promotes this behaviour is that if we no longer block the event loop while the threadpool is closing, this opens a much larger window for incoming requests to come in and being processed while close is running.

Closes #6239
  • Loading branch information
fjetter authored Apr 29, 2022
1 parent be45ba2 commit 70e1fca
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 6 deletions.
34 changes: 34 additions & 0 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3550,3 +3550,37 @@ async def test_broken_comm(c, s, a, b):
)
s = df.shuffle("id", shuffle="tasks")
await c.compute(s.size)


@gen_cluster(nthreads=[])
async def test_do_not_block_event_loop_during_shutdown(s):
loop = asyncio.get_running_loop()
called_handler = threading.Event()
block_handler = threading.Event()

w = await Worker(s.address)
executor = w.executors["default"]

# The block wait must be smaller than the test timeout and smaller than the
# default value for timeout in `Worker.close``
async def block():
def fn():
called_handler.set()
assert block_handler.wait(20)

await loop.run_in_executor(executor, fn)

async def set_future():
while True:
try:
await loop.run_in_executor(executor, sleep, 0.1)
except RuntimeError: # executor has started shutting down
block_handler.set()
return

async def close():
called_handler.wait()
# executor_wait is True by default but we want to be explicit here
await w.close(executor_wait=True)

await asyncio.gather(block(), close(), set_future())
20 changes: 14 additions & 6 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from distributed.comm import connect, get_address_host
from distributed.comm.addressing import address_from_user_args, parse_address
from distributed.comm.utils import OFFLOAD_THRESHOLD
from distributed.compatibility import randbytes
from distributed.compatibility import randbytes, to_thread
from distributed.core import (
CommClosedError,
ConnectionPool,
Expand Down Expand Up @@ -1589,11 +1589,19 @@ async def close(
for executor in self.executors.values():
if executor is utils._offload_executor:
continue # Never shutdown the offload executor
if isinstance(executor, ThreadPoolExecutor):
executor._work_queue.queue.clear()
executor.shutdown(wait=executor_wait, timeout=timeout)
else:
executor.shutdown(wait=executor_wait)

def _close():
if isinstance(executor, ThreadPoolExecutor):
executor._work_queue.queue.clear()
executor.shutdown(wait=executor_wait, timeout=timeout)
else:
executor.shutdown(wait=executor_wait)

# Waiting for the shutdown can block the event loop causing
# weird deadlocks particularly if the task that is executing in
# the thread is waiting for a server reply, e.g. when using
# worker clients, semaphores, etc.
await to_thread(_close)

self.stop()
await self.rpc.close()
Expand Down

0 comments on commit 70e1fca

Please sign in to comment.