-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Close state machine and add-ins first in Worker.close
#8066
Close state machine and add-ins first in Worker.close
#8066
Conversation
Worker.close
Worker.close
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 20 files ± 0 20 suites ±0 11h 29m 24s ⏱️ + 17m 52s For more details on these failures and errors, see this check. Results for commit 567bbcd. ± Comparison against base commit 84e1984. This pull request removes 2 tests.
♻️ This comment has been updated with latest results. |
CI doesn't seem to be less happy than usual. |
Worker.close
Worker.close
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I would do/try
- I think everything in this PR is good until the preload teardown
- Close clients afterwards
- Stop PC
- stop listner
- close RPC
- close stream
- shutdown TPE
- ...
This way we're drawing a hard line for external communication on all channels
for plugin in self.plugins.values() | ||
if hasattr(plugin, "teardown") | ||
] | ||
await asyncio.gather(*(td for td in teardowns if isawaitable(td))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohhh, this is bad. This was here before but we should fix this regardless
await asyncio.gather(*(td for td in teardowns if isawaitable(td))) | |
for res in await asyncio.gather(*(td for td in teardowns if isawaitable(td)), return_exceptions=True): | |
if isinstance(res, Exception): | |
logger.error("Encountered exception during teardown of plugin %s", res) |
or smth like that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, ok, just saw #8072
for preload in self.preloads: | ||
try: | ||
await preload.teardown() | ||
except Exception: | ||
logger.exception("Failed to tear down preload") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preloads are an interesting case. They are processed right after ServerNode
is initialized (i.e. not started). Therefore, during startup, preloads don't have access to comms and there could be a case for delaying this even further.
I don't think this is super relevant, though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, this placement feels a bit shaky. I'd leave it here for now until it becomes relevant.
@@ -1597,8 +1597,6 @@ async def close( # type: ignore | |||
|
|||
await self.scheduler.close_rpc() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is misleading. This is actually a noop since Worker.scheduler
is of type PooledRPCCall
and not rpc
await self.scheduler.close_rpc() |
distributed/distributed/core.py
Lines 1348 to 1382 in eb297b3
class PooledRPCCall: | |
"""The result of ConnectionPool()('host:port') | |
See Also: | |
ConnectionPool | |
""" | |
def __init__(self, addr, pool, serializers=None, deserializers=None): | |
self.addr = addr | |
self.pool = pool | |
self.serializers = serializers | |
self.deserializers = deserializers if deserializers is not None else serializers | |
@property | |
def address(self): | |
return self.addr | |
def __getattr__(self, key): | |
async def send_recv_from_rpc(**kwargs): | |
if self.serializers is not None and kwargs.get("serializers") is None: | |
kwargs["serializers"] = self.serializers | |
if self.deserializers is not None and kwargs.get("deserializers") is None: | |
kwargs["deserializers"] = self.deserializers | |
comm = await self.pool.connect(self.addr) | |
prev_name, comm.name = comm.name, "ConnectionPool." + key | |
try: | |
return await send_recv(comm=comm, op=key, **kwargs) | |
finally: | |
self.pool.reuse(self.addr, comm) | |
comm.name = prev_name | |
return send_recv_from_rpc | |
async def close_rpc(self): | |
pass |
@@ -1649,10 +1647,11 @@ def _close(executor, wait): | |||
await self.rpc.close() | |||
|
|||
self.status = Status.closed | |||
setproctitle("dask worker [closed]") | |||
|
|||
await ServerNode.close(self) | |||
|
|||
self.__exit_stack.__exit__(None, None, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a side comment: The cleanest way would probably be to have everything be contextaware such that we can throw everything into the exitstack
for pc in self.periodic_callbacks.values(): | ||
pc.stop() | ||
|
||
await asyncio.gather(*(td for td in teardowns if isawaitable(td))) | ||
self.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My gut feeling tells me to close the clients before stopping
@@ -1649,10 +1647,11 @@ def _close(executor, wait): | |||
await self.rpc.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My gut feeling would tell me to put listener stops, RPC close and stream close together. This would basically mark the point where we're severing all outside communication and will only do some internal housekeeping afterwards.
As it is right now, these events are still spread out a log, aren't they?
Generally speaking, I'm happy if CI is happy but the above order feels more sensible to me. Either way, this already looks like an improvement |
@fjetter: I'm working on a follow-up PR that would provide a full ordering. The point of this PR is to break it down into smaller pieces and make P2P happy with the initial step. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with the code changes. Haven't cross referenced the failing tests to see if something new came in. The shuffle problems seem to be resolved so that's a win
I don't recall |
|
Partially closes #8062
pre-commit run --all-files