-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove timeout and executor_wait from Worker.close #7320
base: main
Are you sure you want to change the base?
Conversation
A couple of related test failures. Mostly due to changes in timing, e.g. the scheduler likely doesn't know that the worker is dead immediately after exiting the async with of a worker now. Previously this was mostly "coincidence" |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 20 files ± 0 20 suites ±0 11h 3m 54s ⏱️ - 54m 58s For more details on these failures, see this check. Results for commit b779ea7. ± Comparison against base commit 9255987. ♻️ This comment has been updated with latest results. |
Turns out there are quite some tests that are relying on a quite brittle timing that the worker is actually already removed from the scheduler after the worker is closed. This is, of course never guaranteed and depends on network / event loop busyness. |
distributed/worker.py
Outdated
def _close(executor, wait): | ||
if isinstance(executor, ThreadPoolExecutor): | ||
executor._work_queue.queue.clear() | ||
executor.shutdown(wait=wait, timeout=timeout) | ||
else: | ||
executor.shutdown(wait=wait) | ||
|
||
# Waiting for the shutdown can block the event loop causing | ||
# weird deadlocks particularly if the task that is executing in | ||
# the thread is waiting for a server reply, e.g. when using | ||
# worker clients, semaphores, etc. | ||
if is_python_shutting_down(): | ||
# If we're shutting down there is no need to wait for daemon | ||
# threads to finish | ||
_close(executor=executor, wait=False) | ||
else: | ||
try: | ||
await to_thread(_close, executor=executor, wait=executor_wait) | ||
except RuntimeError: # Are we shutting down the process? | ||
logger.error( | ||
"Could not close executor %r by dispatching to thread. Trying synchronously.", | ||
executor, | ||
exc_info=True, | ||
) | ||
_close( | ||
executor=executor, wait=executor_wait | ||
) # Just run it directly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dispatch to a threadpool executor makes all the "close the assert that worker is gone" tests fail because the worker is closing just a tiny bit faster now. In fact, just skipping one tick (sleep(0)) is sufficient to make all other tests pass.
pending_to_cancel = self._async_instructions | ||
while pending_to_cancel: | ||
for task in self._async_instructions: | ||
task.cancel() | ||
# async tasks can handle cancellation and could take an arbitrary amount | ||
# of time to terminate | ||
_, pending = await asyncio.wait(self._async_instructions, timeout=0.1) | ||
pending_to_cancel.update(pending) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really know how I feel about this. I think it's entirely unnecessary since we're not handling CancelledErrors. I think just cancelling is fine. There is one artificial test which provokes this but I'd be fine deleting that test.
@graingert curious to get your thoughts about this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For context, the tasks that can be in async_instructions is very limited. It needs to belong to an Instruction
which is implemneted here
distributed/distributed/worker_state_machine.py
Lines 3629 to 3667 in 69b74f9
for inst in instructions: | |
task: asyncio.Task | None = None | |
if isinstance(inst, SendMessageToScheduler): | |
self.batched_send(inst.to_dict()) | |
elif isinstance(inst, GatherDep): | |
assert inst.to_gather | |
keys_str = ", ".join(peekn(27, inst.to_gather)[0]) | |
if len(keys_str) > 80: | |
keys_str = keys_str[:77] + "..." | |
task = asyncio.create_task( | |
self.gather_dep( | |
inst.worker, | |
inst.to_gather, | |
total_nbytes=inst.total_nbytes, | |
stimulus_id=inst.stimulus_id, | |
), | |
name=f"gather_dep({inst.worker}, {{{keys_str}}})", | |
) | |
elif isinstance(inst, Execute): | |
task = asyncio.create_task( | |
self.execute(inst.key, stimulus_id=inst.stimulus_id), | |
name=f"execute({inst.key})", | |
) | |
elif isinstance(inst, RetryBusyWorkerLater): | |
task = asyncio.create_task( | |
self.retry_busy_worker_later(inst.worker), | |
name=f"retry_busy_worker_later({inst.worker})", | |
) | |
else: | |
raise TypeError(inst) # pragma: nocover | |
if task is not None: | |
self._async_instructions.add(task) | |
task.add_done_callback(self._handle_stimulus_from_task) |
On a side note, I think we can/should just use our AsyncTaskGroup for this but I would prefer doing this in another PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The real-life use case that is impacted by this change is that a user has a async client to a database running in a Execute task, which catches CancelledError to perform a clean shutdown but does not implement its own timeout when the server for whatever reason fails to answer.
This PR will cause Worker.close()
to get stuck forever in this case.
I'm ok to shift the burden onto the user. If you catch CancelledError without implementing your own timeout, you're shooting yourself in the foot and we should not go out of our way to deal with it gracefully.
Worth noting however that this behaviour is incoherent with sync tasks, which now run forever. I think we should just blindly cancel and simply not wait beyond 0.1s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I think our current implementation of async user functions is leaky in the sense that I don't want their coroutines be executed on our event loop. Instead, we should still dispatch this to a threadpool and in this threadpool they have their own event loop
- With my changes nothing would be stuck but I'd abort their graceful shutdown by cancelling it as well.
Worth noting however that this behaviour is incoherent with sync tasks, which now run forever.
They always wait forever. There is now way to cancel/abort/interrupt a running thread. See #4694 for a discussion and #4726 for a prototype
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think our current implementation of async user functions is leaky in the sense that I don't want their coroutines be executed on our event loop. Instead, we should still dispatch this to a threadpool and in this threadpool they have their own event loop
Yes, I had this exact problem in my previous job, where I built an application on top of dask. The application was CPU-heavy, interleaved with async db calls. I started my own event loop in a thread and it worked quite well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly:
with this PR, Worker.close()
leaks threads for all currently executing tasks, with the expectation that they will eventually terminate.
If there's a Nanny wrapping around the worker:
Tasks will now be killed halfway through. If they're tasks that write to a database without transactions, or to disk, this PR adds a use case[1] where the tasks will potentially corrupt the database or the disk.
[1] Current use cases where a task can be killed halfway through, leaving database/disk in incoherent state:
- the task takes longer than the close timeout to complete
- the worker process passes the
terminate
threshold - segmentation fault
- OS or hardware level death of the whole worker VM - e.g. due to swapping out program memory
What are the production use cases for calling Worker.close() while tasks are running on the worker?
- Manual worker shutdown or Client.restart() - I don't care about this, as we're likely in a dev phase so messing with the db/disk is probably not a big deal
- Dynamic scale down should NOT shut down a worker with tasks on it - please correct me if I'm wrong
- other?
If there's no Nanny wrapping around the worker:
We just leaked a thread, potentially for many seconds or forever, and the user will never realize until things start getting wrong in the interpreter.
Again, what are the use cases where a user will want to start a Worker without a Nanny in a process that is not dedicated to it?
- unit test using
@gen_cluster
. This implements its own wait for straggling threads. - other unit tests. This will potentially get messy.
- any production use cases?
@@ -789,8 +789,12 @@ async def kill( | |||
If the worker process does not terminate within ``timeout`` seconds, | |||
even after being killed, `asyncio.TimeoutError` is raised. | |||
""" | |||
if executor_wait is not None: | |||
warnings.warn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warnings.warn( | |
warnings.warn( # pragma: nocover |
@@ -65,6 +66,8 @@ def remove_worker(self, worker, scheduler): | |||
await b | |||
await a.close() | |||
await b.close() | |||
while any(w.address in s.workers for w in [a, b]): | |||
await asyncio.sleep(0.01) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this cover specifically the use case where a user task or third-party WorkerPlugin catches CancelledError? If so, do we need this logic in our unit tests? We have a lot of unit tests that blindly assume that, once Worker.close()
returns or async with Worker
exits, the worker is no longer registered on the Scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a lot of unit tests that blindly assume that, once Worker.close() returns or async with Worker exits, the worker is no longer registered on the Scheduler.
This assumption was always wrong. This was always timing dependign. If you check out this PR and add another asyncio.sleep(0)
towards the end of Worker.close
all of these while any worker still on scheduler; sleep
will be unnecessary
distributed/tests/test_nanny.py
Outdated
|
||
def block_forever(): | ||
event.set() | ||
import time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you move imports to the top of the module?
if not executor_wait: | ||
logger.info("Not waiting on executor to close") | ||
if executor_wait is not None: | ||
warnings.warn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warnings.warn( | |
warnings.warn( # pragma: nocover |
FutureWarning, | ||
) | ||
if timeout is not None: | ||
warnings.warn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warnings.warn( | |
warnings.warn( # pragma: nocover |
pending_to_cancel = self._async_instructions | ||
while pending_to_cancel: | ||
for task in self._async_instructions: | ||
task.cancel() | ||
# async tasks can handle cancellation and could take an arbitrary amount | ||
# of time to terminate | ||
_, pending = await asyncio.wait(self._async_instructions, timeout=0.1) | ||
pending_to_cancel.update(pending) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The real-life use case that is impacted by this change is that a user has a async client to a database running in a Execute task, which catches CancelledError to perform a clean shutdown but does not implement its own timeout when the server for whatever reason fails to answer.
This PR will cause Worker.close()
to get stuck forever in this case.
I'm ok to shift the burden onto the user. If you catch CancelledError without implementing your own timeout, you're shooting yourself in the foot and we should not go out of our way to deal with it gracefully.
Worth noting however that this behaviour is incoherent with sync tasks, which now run forever. I think we should just blindly cancel and simply not wait beyond 0.1s.
I'd like to point out that the distributed/distributed/threadpoolexecutor.py Line 107 in 2311d4d
|
As mentioned a couple of times now, this is and always was true.
async def udf(...):
...
client.submit(udf) we will cancel this task without mercy. My proposal is to move this off the worker event loop which will be better for everyone. This way async tasks have the same behavior as all other tasks
I think there are plenty. Ranging from the restart up to a very simple shutdown. User tasks can always be on the threadpool. The important thing is
Both requirements are still given but 2.) could even be relaxed if there is a nanny (which would allow us to use the default py3.10 ThreadpoolExecutor) The only difference between this PR and main is that Worker closes faster
If you inspect |
I opened a PR to move any submitted coros to a thread in #7339 |
f76d180
to
b779ea7
Compare
Closes #7318
As you can see, complexity reduces quite a bit. At least locally no tests broke. I could see some resource leakage warnings to be raised in CI, though. Let's see. I don't think any user would be impacted.
I'm still curious about everyones thoughts