Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure client session is quiet after cluster.close() or client.shutdown() #7429

Merged
merged 5 commits into from
Jan 12, 2023

Conversation

jrbourbeau
Copy link
Member

@jrbourbeau jrbourbeau commented Dec 21, 2022

This is a follow-up on #7428. Frequently I see users do the following in a notebook

cluster = ...  # Provision some cluster with dask-kubernetes, dask-cloudprovider, coiled, etc.
client = Client(cluster)
...  # do work
cluster.close() 

And then after a few seconds an error is printed (highlighted in red) in their notebook

distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client

The same type of thing can happen when using client.shutdown() instead of cluster.close().

This PR adds logic to determine if the scheduler/cluster is closing/has already been closed and, if so, don't attempt to reconnect the client to the scheduler, heartbeat, etc. The goal is to avoid scary errors in the user's client session if they're shutting things down in an expected way.

cc @ncclementi @shughes-uk @dchudz who have run into this before

EDIT: looks like we have some intentional tests around reconnecting if the connection between the client and scheduler is temporarily lost. This makes sense as we want to be resilient to transient network blips. I've restricted the "don't reconnect" logic to only be if there's a cluster manager associated with the client and the cluster is closing/closed (which seems safe to me).

with suppress(CommClosedError):
await self.scheduler.terminate()

await self._close()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Closing the client is an extra cleanup step (i.e. not needed to make the user's client session quiet). However, it seemed strange that we didn't close it when the cluster was closed. Happy to rollback is it causes issues or folks would rather keep it open for some reason

@jrbourbeau jrbourbeau self-assigned this Dec 21, 2022
@jrbourbeau jrbourbeau changed the title Ensure client session is quiet after cluster.close() and client.shutdown() Ensure client session is quiet after cluster.close() or client.shutdown() Dec 21, 2022
@github-actions
Copy link
Contributor

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       22 files  ±  0         22 suites  ±0   10h 4m 52s ⏱️ - 18m 54s
  3 279 tests +  2    3 191 ✔️ +  2       86 💤 +1  1  - 2  1 🔥 +1 
36 001 runs  +23  34 481 ✔️ +23  1 518 💤 +2  1  - 3  1 🔥 +1 

For more details on these failures and errors, see this check.

Results for commit a8740c0. ± Comparison against base commit 35c07cb.

@dchudz
Copy link
Contributor

dchudz commented Dec 22, 2022

Seems sensible to me!

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything here seems fine to me. I'm less aware of this code these days so it might be good to have someone else (@crusaderky ?) take a brief look. However, if you're seeing good benefits to this I'm also happy to just trust and merge.

# Don't send heartbeat if scheduler comm or cluster are already closed
if (self.scheduler_comm and not self.scheduler_comm.comm.closed()) or (
self.cluster and self.cluster.status not in (Status.closed, Status.closing)
):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm slightly confused by this. If scheduler_comm.comm.closed() should we send a heartbeat if the cluster is not closing?

If the client closes but the cluster is still alive then we should probably stop?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In particular, I think that this case might be rare but relevant

cluster = Cluster()

client1 = Client(cluster)
client2 = Client(cluster)

client1.close()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If scheduler_comm.comm.closed() should we send a heartbeat if the cluster is not closing?

I don't think we want to attempt to send a heartbeat if the scheduler_comm is closed. This if-statement should include that case.

If the client closes but the cluster is still alive then we should probably stop?

This should already be the case. If the client closes then the scheduler_comm will also be closed

In [1]: from distributed import LocalCluster, Client

In [2]: cluster = LocalCluster()
   ...:
   ...: client1 = Client(cluster)
   ...: client2 = Client(cluster)
   ...:
   ...: client1.close()

In [3]: client1.scheduler_comm.comm.closed()
Out[3]: True

In [4]: client2.scheduler_comm.comm.closed()
Out[4]: False

# Don't attempt to reconnect if cluster are already closed.
# Instead close down the client.
await self._close()
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems sensible to me

@jrbourbeau jrbourbeau mentioned this pull request Dec 22, 2022
6 tasks
Copy link
Member Author

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything here seems fine to me. I'm less aware of this code these days so it might be good to have someone else (@crusaderky ?) take a brief look. However, if you're seeing good benefits to this I'm also happy to just trust and merge.

I'm fairly confident in the changes here, and am seeing good benefits locally, but am also totally fine to wait for folks to review (possibly after coming back from the holidays).

@mrocklin
Copy link
Member

mrocklin commented Jan 3, 2023

@fjetter I've left this one to you and your team. Pinging so that it rises up in your queue.

@fjetter
Copy link
Member

fjetter commented Jan 5, 2023

@graingert can you review this please?

@jrbourbeau
Copy link
Member Author

Thanks for reviewing @graingert!

@jrbourbeau jrbourbeau merged commit aca9a5e into dask:main Jan 12, 2023
@jrbourbeau jrbourbeau deleted the quiet-shutdown-close branch January 12, 2023 17:18
@charlesbluca
Copy link
Member

Not immediately sure why, but it looks like this PR started causing dask-cuda's CI to hang (specifically test_proxy.py:: test_communicating_proxy_objects).

Apologies for the timing relative to release, wish I could've found this a few hours ago 😅

cc @pentschev @ajschmidt8

@jrbourbeau
Copy link
Member Author

Ah, thanks for surfacing @charlesbluca. Do you have a traceback I could look at? It looks like CI for the default branch is passing over in dask-cuda

@charlesbluca
Copy link
Member

Yeah, here's an example of a hanging run - unfortunately doesn't seem like much contextualizing info there 😕

https://github.com/rapidsai/dask-cuda/actions/runs/3914939838/jobs/6692645588

@jrbourbeau
Copy link
Member Author

Hmm yeah, unfortunately there doesn't appear to be much to work off on in the build output. Just to double check, we're sure this is the PR that leads to the hanging behavior (i.e. git bisect or something has pointed us here)?

@pentschev
Copy link
Member

I started also taking a look at this and can confirm @charlesbluca 's findings. So far I can reproduce the hang with Distributed main, but not anymore after reverting this commit. However, after reverting I see CommClosedError/CancelledError in previous tests as show below, so it's clear that this PR has at the same time fixed other existing issues and we must find a proper solution for both issues.

ConnectionRefusedError: [Errno 111] Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/datasets/pentschev/miniconda3/envs/rn-230116/lib/python3.9/site-packages/distributed/comm/core.py", line 291, in connect
    comm = await asyncio.wait_for(
  File "/datasets/pentschev/miniconda3/envs/rn-230116/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
    return fut.result()
  File "/datasets/pentschev/miniconda3/envs/rn-230116/lib/python3.9/site-packages/distributed/comm/tcp.py", line 511, in connect
    convert_stream_closed_error(self, e)
  File "/datasets/pentschev/miniconda3/envs/rn-230116/lib/python3.9/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedError: in <distributed.comm.tcp.TCPConnector object at 0x7f54d58b9c40>: ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/datasets/pentschev/miniconda3/envs/rn-230116/lib/python3.9/site-packages/distributed/utils.py", line 741, in wrapper
    return await func(*args, **kwargs)
  File "/datasets/pentschev/miniconda3/envs/rn-230116/lib/python3.9/site-packages/distributed/client.py", line 1301, in _reconnect
    await self._ensure_connected(timeout=timeout)
  File "/datasets/pentschev/miniconda3/envs/rn-230116/lib/python3.9/site-packages/distributed/client.py", line 1331, in _ensure_connected
    comm = await connect(
  File "/datasets/pentschev/miniconda3/envs/rn-230116/lib/python3.9/site-packages/distributed/comm/core.py", line 315, in connect
    await asyncio.sleep(backoff)
  File "/datasets/pentschev/miniconda3/envs/rn-230116/lib/python3.9/asyncio/tasks.py", line 652, in sleep
    return await future
asyncio.exceptions.CancelledError

Haven't dug in deeper yet, will try to do more of that throughout the day.

@pentschev
Copy link
Member

After digging a bit more I found out that the hang is the result of two threads concurrently trying to acquire the UCX spinlock and the GIL. Currently UCX-Py is not thread-safe, which can cause this sort of problem. Normally we expect that ALL communication would occur only on the Distributed communications thread, and although I have not yet been able to determine the exact cause for this issue, I believe it is that something changed in the order of task execution that causes the client thread to execute some communication (probably when calling distributed.comm.ucx.UCX.close()). I will continue digging and try to come up with a solution on the UCX comms side first.

@pentschev
Copy link
Member

It seems rapidsai/dask-cuda#1084 resolves the issue in the Dask-CUDA tests. On the description of that issue I wrote what I believe to be the cause, cross posting here for completeness:

After #7429 was merged, some of those tests started hanging and I could confirm there were two threads concurrently attempting to take the UCX spinlock and the GIL, which led to such deadlock. UCX-Py is currently not thread-safe, and indeed can cause problems like this should two or more threads attempt to call communication routines that will required the UCX spinlock. My theory is that the synchronous cluster will indeed cause communication on the main thread (in this case, the pytest thread) upon attempting to shutdown the cluster, instead of only within the Distributed communication thread, likely being the reason behind the test hanging.

Asynchronous Distributed clusters seem not to cause any communication from the main thread, but only in the communication thread as expected, thus making the tests asynchronous suffice to resolve such issues. In practice, it's unlikely that people will use sync Distributed clusters from the same process (as pytest does), and thus it's improbable to happen in real use-cases.

Thanks @charlesbluca for tracking this down and @jrbourbeau for this fix and taking the time to respond our comments!

rapids-bot bot pushed a commit to rapidsai/dask-cuda that referenced this pull request Jan 16, 2023
After dask/distributed#7429 was merged, some of those tests started hanging and I could confirm there were two threads concurrently attempting to take the UCX spinlock and the GIL, which led to such deadlock. UCX-Py is currently not thread-safe, and indeed can cause problems like this should two or more threads attempt to call communication routines that will required the UCX spinlock. My theory is that the synchronous cluster will indeed cause communication on the main thread (in this case, the `pytest` thread) upon attempting to shutdown the cluster, instead of only within the Distributed communication thread, likely being the reason behind the test hanging.

Asynchronous Distributed clusters seem not to cause any communication from the main thread, but only in the communication thread as expected, thus making the tests asynchronous suffice to resolve such issues. In practice, it's unlikely that people will use sync Distributed clusters from the same process (as pytest does), and thus it's improbable to happen in real use-cases.

Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)

URL: #1084
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants