Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Follow-up on removing report and safe from Worker.close #6423

Merged
merged 12 commits into from
May 24, 2022
3 changes: 3 additions & 0 deletions distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,9 @@ def setup(self, worker=None):
async def test_localcluster_start_exception():
with raises_with_cause(RuntimeError, None, ImportError, "my_nonexistent_library"):
async with LocalCluster(
n_workers=1,
threads_per_worker=1,
processes=True,
plugins={MyPlugin()},
):
return
20 changes: 9 additions & 11 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3387,11 +3387,13 @@ async def close(self):

futures = []
for _, comm in list(self.stream_comms.items()):
# FIXME use `self.remove_worker()` instead after https://github.com/dask/distributed/issues/6390
if not comm.closed():
# This closes the Worker and ensures that if a Nanny is around,
# it is closed as well
comm.send({"op": "terminate"})
comm.send({"op": "close"})
comm.send({"op": "close-stream"})
# ^ TODO remove? `Worker.close` will close the stream anyway.
with suppress(AttributeError):
futures.append(comm.close())

Expand Down Expand Up @@ -3419,8 +3421,7 @@ async def close_worker(self, worker: str, stimulus_id: str, safe: bool = False):
"""
logger.info("Closing worker %s", worker)
self.log_event(worker, {"action": "close-worker"})
ws = self.workers[worker]
self.worker_send(worker, {"op": "close", "nanny": bool(ws.nanny)})
self.worker_send(worker, {"op": "close"}) # TODO redundant with `remove_worker`
await self.remove_worker(address=worker, safe=safe, stimulus_id=stimulus_id)

###########
Expand Down Expand Up @@ -4732,7 +4733,7 @@ def handle_long_running(
ws.long_running.add(ts)
self.check_idle_saturated(ws)

async def handle_worker_status_change(
def handle_worker_status_change(
self, status: str, worker: str, stimulus_id: str
) -> None:
ws = self.workers.get(worker)
Expand Down Expand Up @@ -4760,12 +4761,8 @@ async def handle_worker_status_change(
worker_msgs: dict = {}
self._transitions(recs, client_msgs, worker_msgs, stimulus_id)
self.send_all(client_msgs, worker_msgs)
elif ws.status == Status.paused:
self.running.remove(ws)
elif ws.status == Status.closing:
await self.remove_worker(
address=ws.address, stimulus_id=stimulus_id, close=False
)
else:
self.running.discard(ws)
Comment on lines +4764 to +4765
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolves https://github.com/dask/distributed/pull/6363/files#r878475185

This is equivalent to main before the PR:

def handle_worker_status_change(
self, status: str, worker: str, stimulus_id: str
) -> None:
ws = self.workers.get(worker)
if not ws:
return
prev_status = ws.status
ws.status = Status.lookup[status] # type: ignore
if ws.status == prev_status:
return
self.log_event(
ws.address,
{
"action": "worker-status-change",
"prev-status": prev_status.name,
"status": status,
},
)
if ws.status == Status.running:
self.running.add(ws)
recs = self.bulk_schedule_after_adding_worker(ws)
if recs:
client_msgs: dict = {}
worker_msgs: dict = {}
self._transitions(recs, client_msgs, worker_msgs, stimulus_id)
self.send_all(client_msgs, worker_msgs)
else:
self.running.discard(ws)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is required for some tests. https://github.com/dask/distributed/pull/6363/files#r879719934

I agree than an explicit test would be better but I expect something to break. If CI is happy, I'm happy, though.


async def handle_worker(self, comm=None, worker=None, stimulus_id=None):
"""
Expand Down Expand Up @@ -5986,7 +5983,8 @@ async def retire_workers(
prev_status = ws.status
ws.status = Status.closing_gracefully
self.running.discard(ws)
# FIXME: We should send a message to the nanny first.
# FIXME: We should send a message to the nanny first;
# eventually workers won't be able to close their own nannies.
self.stream_comms[ws.address].send(
{
"op": "worker-status-change",
Expand Down
16 changes: 16 additions & 0 deletions distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,3 +568,19 @@ async def test_restart_memory(c, s, n):

while not s.workers:
await asyncio.sleep(0.1)


@gen_cluster(Worker=Nanny, nthreads=[("", 1)])
async def test_scheduler_crash_doesnt_restart(s, a):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails on main

# Simulate a scheduler crash by disconnecting it first
# (`s.close()` would tell workers to cleanly shut down)
bcomm = next(iter(s.stream_comms.values()))
bcomm.abort()
await s.close()

while a.status != Status.closing_gracefully:
await asyncio.sleep(0.01)

await a.finished()
assert a.status == Status.closed
assert a.process is None
23 changes: 15 additions & 8 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ async def _force_close(self):
2. If it doesn't, log and kill the process
"""
try:
await asyncio.wait_for(self.close(executor_wait=False), 30)
await asyncio.wait_for(self.close(nanny=False, executor_wait=False), 30)
except (Exception, BaseException): # <-- include BaseException here or not??
# Worker is in a very broken state if closing fails. We need to shut down immediately,
# to ensure things don't get even worse and this worker potentially deadlocks the cluster.
Expand Down Expand Up @@ -785,7 +785,7 @@ def __init__(
"get_data": self.get_data,
"update_data": self.update_data,
"free_keys": self.handle_free_keys,
"terminate": self.terminate,
"terminate": self.close,
"ping": pingpong,
"upload_file": self.upload_file,
"call_stack": self.get_call_stack,
Expand All @@ -807,7 +807,6 @@ def __init__(

stream_handlers = {
"close": self.close,
"terminate": self.terminate,
"cancel-compute": self.handle_cancel_compute,
"acquire-replicas": self.handle_acquire_replicas,
"compute-task": self.handle_compute_task,
Expand Down Expand Up @@ -1440,24 +1439,32 @@ async def start_unsafe(self):
self.start_periodic_callbacks()
return self

async def terminate(self, **kwargs):
return await self.close(nanny=True, **kwargs)

Comment on lines -1443 to -1445
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted to remove Worker.terminate since it won't be used now that the Worker.close default is to also close the Nanny. I think we will want two different methods eventually, but found the naming of terminate vs close very confusing—it should be something like terminate vs maybe_restart?

If others would prefer to see terminate stick around, I'm happy to remove this commit and leave it.

@log_errors
async def close(
self,
timeout=30,
executor_wait=True,
nanny=False,
nanny=True,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

):
# FIXME: The worker should not be allowed to close the nanny. Ownership
# is the other way round. If an external caller wants to close
# nanny+worker, the nanny must be notified first. ==> Remove kwarg
# nanny, see also Scheduler.retire_workers
if self.status in (Status.closed, Status.closing):
if self.status in (Status.closed, Status.closing, Status.failed):
await self.finished()
return

if self.status == Status.init:
# If the worker is still in startup/init and is started by a nanny,
# this means the nanny itself is not up, yet. If the Nanny isn't up,
# yet, it's server will not accept any incoming RPC requests and
# will block until the startup is finished.
# Therefore, this worker trying to communicate with the Nanny during
# startup is not possible and we cannot close it.
# In this case, the Nanny will automatically close after inspecting
# the worker status
nanny = False
Comment on lines +1453 to +1466
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added another commit to be more explicit about the stati and added a more thorough explanation of why this is happening

cc @gjoseph92


disable_gc_diagnosis()

try:
Expand Down