-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove report and safe from Worker.close #6363
Changes from 6 commits
e324e58
a48aff4
f887cf8
4e491e3
8475bfe
34859b0
ea103a8
1e260fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,10 +12,9 @@ | |
|
||
from dask.system import CPU_COUNT | ||
|
||
from distributed import Client, Nanny, Worker, get_client | ||
from distributed import Client, LocalCluster, Nanny, Worker, get_client | ||
from distributed.compatibility import LINUX | ||
from distributed.core import Status | ||
from distributed.deploy.local import LocalCluster | ||
from distributed.deploy.utils_test import ClusterTest | ||
from distributed.metrics import time | ||
from distributed.system import MEMORY_LIMIT | ||
|
@@ -29,6 +28,7 @@ | |
clean, | ||
gen_test, | ||
inc, | ||
raises_with_cause, | ||
slowinc, | ||
tls_only_security, | ||
xfail_ssl_issue5601, | ||
|
@@ -1155,3 +1155,22 @@ async def test_connect_to_closed_cluster(): | |
# Raises during init without actually connecting since we're not | ||
# awaiting anything | ||
Client(cluster, asynchronous=True) | ||
|
||
|
||
class MyPlugin: | ||
def setup(self, worker=None): | ||
import my_nonexistent_library # noqa | ||
|
||
|
||
@gen_test( | ||
clean_kwargs={ | ||
# FIXME: This doesn't close the LoopRunner properly, leaving a thread around | ||
"threads": False | ||
} | ||
) | ||
async def test_localcluster_start_exception(): | ||
with raises_with_cause(RuntimeError, None, ImportError, None): | ||
async with LocalCluster( | ||
plugins={MyPlugin()}, | ||
): | ||
return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a simplified version of the LocalCluster test reproducer. The subclassing suggested in #6363 (comment) is raising an exception during init which avoids us going through proper cleanup by the contextmanager, i.e. resources are never closed. The interesting bit, however, is the plugin for the workers which can be just passed through as a kwarg There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc @pentschev There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW failure scenarios for worker startup in a SpecCluster is a bit messy, see #5919 |
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -3347,7 +3347,7 @@ def del_scheduler_file(): | |||||||||||||
setproctitle(f"dask-scheduler [{self.address}]") | ||||||||||||||
return self | ||||||||||||||
|
||||||||||||||
async def close(self, fast=False, close_workers=False): | ||||||||||||||
async def close(self): | ||||||||||||||
"""Send cleanup signal to all coroutines then wait until finished | ||||||||||||||
|
||||||||||||||
See Also | ||||||||||||||
|
@@ -3370,19 +3370,6 @@ async def close(self, fast=False, close_workers=False): | |||||||||||||
for preload in self.preloads: | ||||||||||||||
await preload.teardown() | ||||||||||||||
|
||||||||||||||
if close_workers: | ||||||||||||||
await self.broadcast(msg={"op": "close_gracefully"}, nanny=True) | ||||||||||||||
for worker in self.workers: | ||||||||||||||
# Report would require the worker to unregister with the | ||||||||||||||
# currently closing scheduler. This is not necessary and might | ||||||||||||||
# delay shutdown of the worker unnecessarily | ||||||||||||||
self.worker_send(worker, {"op": "close", "report": False}) | ||||||||||||||
for i in range(20): # wait a second for send signals to clear | ||||||||||||||
if self.workers: | ||||||||||||||
await asyncio.sleep(0.05) | ||||||||||||||
else: | ||||||||||||||
break | ||||||||||||||
|
||||||||||||||
await asyncio.gather( | ||||||||||||||
*[plugin.close() for plugin in list(self.plugins.values())] | ||||||||||||||
) | ||||||||||||||
|
@@ -3399,15 +3386,16 @@ async def close(self, fast=False, close_workers=False): | |||||||||||||
logger.info("Scheduler closing all comms") | ||||||||||||||
|
||||||||||||||
futures = [] | ||||||||||||||
for w, comm in list(self.stream_comms.items()): | ||||||||||||||
for _, comm in list(self.stream_comms.items()): | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Out of scope, but I'm curious why we don't just use distributed/distributed/scheduler.py Lines 4826 to 4831 in 33fc50c
When we fix up |
||||||||||||||
if not comm.closed(): | ||||||||||||||
comm.send({"op": "close", "report": False}) | ||||||||||||||
# This closes the Worker and ensures that if a Nanny is around, | ||||||||||||||
# it is closed as well | ||||||||||||||
comm.send({"op": "terminate"}) | ||||||||||||||
comm.send({"op": "close-stream"}) | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This distributed/distributed/worker.py Lines 1543 to 1545 in 33fc50c
Doesn't hurt to leave it though if you want to play it safe. |
||||||||||||||
with suppress(AttributeError): | ||||||||||||||
futures.append(comm.close()) | ||||||||||||||
|
||||||||||||||
for future in futures: # TODO: do all at once | ||||||||||||||
await future | ||||||||||||||
await asyncio.gather(*futures) | ||||||||||||||
|
||||||||||||||
for comm in self.client_comms.values(): | ||||||||||||||
comm.abort() | ||||||||||||||
|
@@ -3431,8 +3419,8 @@ async def close_worker(self, worker: str, stimulus_id: str, safe: bool = False): | |||||||||||||
""" | ||||||||||||||
logger.info("Closing worker %s", worker) | ||||||||||||||
self.log_event(worker, {"action": "close-worker"}) | ||||||||||||||
# FIXME: This does not handle nannies | ||||||||||||||
self.worker_send(worker, {"op": "close", "report": False}) | ||||||||||||||
ws = self.workers[worker] | ||||||||||||||
self.worker_send(worker, {"op": "close", "nanny": bool(ws.nanny)}) | ||||||||||||||
Comment on lines
+3422
to
+3423
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
The worker can figure out what to do with the nanny on its own. I'd like to use the new semantics and interfaces properly. I don't think the naming of these is clear enough, but what we want to do here, semantically, is |
||||||||||||||
await self.remove_worker(address=worker, safe=safe, stimulus_id=stimulus_id) | ||||||||||||||
|
||||||||||||||
########### | ||||||||||||||
|
@@ -4183,7 +4171,7 @@ async def remove_worker(self, address, stimulus_id, safe=False, close=True): | |||||||||||||
logger.info("Remove worker %s", ws) | ||||||||||||||
if close: | ||||||||||||||
with suppress(AttributeError, CommClosedError): | ||||||||||||||
self.stream_comms[address].send({"op": "close", "report": False}) | ||||||||||||||
self.stream_comms[address].send({"op": "close"}) | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Here at least, it doesn't (though this case should be changed anyway #6227): distributed/distributed/scheduler.py Lines 6127 to 6130 in 33fc50c
|
||||||||||||||
|
||||||||||||||
self.remove_resources(address) | ||||||||||||||
|
||||||||||||||
|
@@ -4744,7 +4732,7 @@ def handle_long_running( | |||||||||||||
ws.long_running.add(ts) | ||||||||||||||
self.check_idle_saturated(ws) | ||||||||||||||
|
||||||||||||||
def handle_worker_status_change( | ||||||||||||||
async def handle_worker_status_change( | ||||||||||||||
self, status: str, worker: str, stimulus_id: str | ||||||||||||||
) -> None: | ||||||||||||||
ws = self.workers.get(worker) | ||||||||||||||
|
@@ -4772,9 +4760,12 @@ def handle_worker_status_change( | |||||||||||||
worker_msgs: dict = {} | ||||||||||||||
self._transitions(recs, client_msgs, worker_msgs, stimulus_id) | ||||||||||||||
self.send_all(client_msgs, worker_msgs) | ||||||||||||||
|
||||||||||||||
else: | ||||||||||||||
self.running.discard(ws) | ||||||||||||||
elif ws.status == Status.paused: | ||||||||||||||
self.running.remove(ws) | ||||||||||||||
elif ws.status == Status.closing: | ||||||||||||||
await self.remove_worker( | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW once we're in this state, the worker is already in I am very certain that I added this code path to ensure tests pass. I believe it was connected to a test in deploy but I do not recall what the problem was exactly. |
||||||||||||||
address=ws.address, stimulus_id=stimulus_id, close=False | ||||||||||||||
) | ||||||||||||||
|
||||||||||||||
async def handle_worker(self, comm=None, worker=None, stimulus_id=None): | ||||||||||||||
""" | ||||||||||||||
|
@@ -5101,12 +5092,7 @@ async def restart(self, client=None, timeout=30): | |||||||||||||
] | ||||||||||||||
|
||||||||||||||
resps = All( | ||||||||||||||
[ | ||||||||||||||
nanny.restart( | ||||||||||||||
close=True, timeout=timeout * 0.8, executor_wait=False | ||||||||||||||
) | ||||||||||||||
for nanny in nannies | ||||||||||||||
] | ||||||||||||||
[nanny.restart(close=True, timeout=timeout * 0.8) for nanny in nannies] | ||||||||||||||
) | ||||||||||||||
try: | ||||||||||||||
resps = await asyncio.wait_for(resps, timeout) | ||||||||||||||
|
@@ -5999,6 +5985,7 @@ async def retire_workers( | |||||||||||||
prev_status = ws.status | ||||||||||||||
ws.status = Status.closing_gracefully | ||||||||||||||
self.running.discard(ws) | ||||||||||||||
# FIXME: We should send a message to the nanny first. | ||||||||||||||
self.stream_comms[ws.address].send( | ||||||||||||||
{ | ||||||||||||||
"op": "worker-status-change", | ||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@graingert this is leaving the thread of a loop runner alive. I didn't want to mess with this but it may interest you