Skip to content

Commit

Permalink
Follow-up on removing report and safe from Worker.close (#6423)
Browse files Browse the repository at this point in the history
Co-authored-by: Florian Jetter <[email protected]>
  • Loading branch information
gjoseph92 and fjetter authored May 24, 2022
1 parent 60f0886 commit 30a872f
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 19 deletions.
3 changes: 3 additions & 0 deletions distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,9 @@ def setup(self, worker=None):
async def test_localcluster_start_exception():
with raises_with_cause(RuntimeError, None, ImportError, "my_nonexistent_library"):
async with LocalCluster(
n_workers=1,
threads_per_worker=1,
processes=True,
plugins={MyPlugin()},
):
return
20 changes: 9 additions & 11 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3387,11 +3387,13 @@ async def close(self):

futures = []
for _, comm in list(self.stream_comms.items()):
# FIXME use `self.remove_worker()` instead after https://github.com/dask/distributed/issues/6390
if not comm.closed():
# This closes the Worker and ensures that if a Nanny is around,
# it is closed as well
comm.send({"op": "terminate"})
comm.send({"op": "close"})
comm.send({"op": "close-stream"})
# ^ TODO remove? `Worker.close` will close the stream anyway.
with suppress(AttributeError):
futures.append(comm.close())

Expand Down Expand Up @@ -3419,8 +3421,7 @@ async def close_worker(self, worker: str, stimulus_id: str, safe: bool = False):
"""
logger.info("Closing worker %s", worker)
self.log_event(worker, {"action": "close-worker"})
ws = self.workers[worker]
self.worker_send(worker, {"op": "close", "nanny": bool(ws.nanny)})
self.worker_send(worker, {"op": "close"}) # TODO redundant with `remove_worker`
await self.remove_worker(address=worker, safe=safe, stimulus_id=stimulus_id)

###########
Expand Down Expand Up @@ -4732,7 +4733,7 @@ def handle_long_running(
ws.long_running.add(ts)
self.check_idle_saturated(ws)

async def handle_worker_status_change(
def handle_worker_status_change(
self, status: str, worker: str, stimulus_id: str
) -> None:
ws = self.workers.get(worker)
Expand Down Expand Up @@ -4760,12 +4761,8 @@ async def handle_worker_status_change(
worker_msgs: dict = {}
self._transitions(recs, client_msgs, worker_msgs, stimulus_id)
self.send_all(client_msgs, worker_msgs)
elif ws.status == Status.paused:
self.running.remove(ws)
elif ws.status == Status.closing:
await self.remove_worker(
address=ws.address, stimulus_id=stimulus_id, close=False
)
else:
self.running.discard(ws)

async def handle_worker(self, comm=None, worker=None, stimulus_id=None):
"""
Expand Down Expand Up @@ -5986,7 +5983,8 @@ async def retire_workers(
prev_status = ws.status
ws.status = Status.closing_gracefully
self.running.discard(ws)
# FIXME: We should send a message to the nanny first.
# FIXME: We should send a message to the nanny first;
# eventually workers won't be able to close their own nannies.
self.stream_comms[ws.address].send(
{
"op": "worker-status-change",
Expand Down
16 changes: 16 additions & 0 deletions distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,3 +568,19 @@ async def test_restart_memory(c, s, n):

while not s.workers:
await asyncio.sleep(0.1)


@gen_cluster(Worker=Nanny, nthreads=[("", 1)])
async def test_scheduler_crash_doesnt_restart(s, a):
# Simulate a scheduler crash by disconnecting it first
# (`s.close()` would tell workers to cleanly shut down)
bcomm = next(iter(s.stream_comms.values()))
bcomm.abort()
await s.close()

while a.status != Status.closing_gracefully:
await asyncio.sleep(0.01)

await a.finished()
assert a.status == Status.closed
assert a.process is None
23 changes: 15 additions & 8 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ async def _force_close(self):
2. If it doesn't, log and kill the process
"""
try:
await asyncio.wait_for(self.close(executor_wait=False), 30)
await asyncio.wait_for(self.close(nanny=False, executor_wait=False), 30)
except (Exception, BaseException): # <-- include BaseException here or not??
# Worker is in a very broken state if closing fails. We need to shut down immediately,
# to ensure things don't get even worse and this worker potentially deadlocks the cluster.
Expand Down Expand Up @@ -785,7 +785,7 @@ def __init__(
"get_data": self.get_data,
"update_data": self.update_data,
"free_keys": self.handle_free_keys,
"terminate": self.terminate,
"terminate": self.close,
"ping": pingpong,
"upload_file": self.upload_file,
"call_stack": self.get_call_stack,
Expand All @@ -807,7 +807,6 @@ def __init__(

stream_handlers = {
"close": self.close,
"terminate": self.terminate,
"cancel-compute": self.handle_cancel_compute,
"acquire-replicas": self.handle_acquire_replicas,
"compute-task": self.handle_compute_task,
Expand Down Expand Up @@ -1440,24 +1439,32 @@ async def start_unsafe(self):
self.start_periodic_callbacks()
return self

async def terminate(self, **kwargs):
return await self.close(nanny=True, **kwargs)

@log_errors
async def close(
self,
timeout=30,
executor_wait=True,
nanny=False,
nanny=True,
):
# FIXME: The worker should not be allowed to close the nanny. Ownership
# is the other way round. If an external caller wants to close
# nanny+worker, the nanny must be notified first. ==> Remove kwarg
# nanny, see also Scheduler.retire_workers
if self.status in (Status.closed, Status.closing):
if self.status in (Status.closed, Status.closing, Status.failed):
await self.finished()
return

if self.status == Status.init:
# If the worker is still in startup/init and is started by a nanny,
# this means the nanny itself is not up, yet. If the Nanny isn't up,
# yet, it's server will not accept any incoming RPC requests and
# will block until the startup is finished.
# Therefore, this worker trying to communicate with the Nanny during
# startup is not possible and we cannot close it.
# In this case, the Nanny will automatically close after inspecting
# the worker status
nanny = False

disable_gc_diagnosis()

try:
Expand Down

0 comments on commit 30a872f

Please sign in to comment.