-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure proper exception is raised if wrapped future cannot be initial… #7748
base: main
Are you sure you want to change the base?
Conversation
4b3b636
to
bf114c5
Compare
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 26 files + 1 26 suites +1 14h 27m 51s ⏱️ + 1h 23m 19s For more details on these failures, see this check. Results for commit 66718c1. ± Comparison against base commit 1ccc312. ♻️ This comment has been updated with latest results. |
distributed/client.py
Outdated
Critical error encountered during Future initialization. This | ||
typically occurs when passing around Future objects without | ||
handling the Client lifecycle explicitly. If you encounter this, | ||
please ensure to initialize a Client object yourself before |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like an improvement and I don't want to add much friction, but in case the perspective of a naive user is relevant:
If I'd written the code in the test below, and then I read "please ensure to initialize a Client object yourself before interacting with the Future object" then I think my reaction would be:
"But I did initialize a client object before interacting with the future! The very beginning of my test initializes a client, and then the client is explicitly right there in client.submit
and client.map
whenever I interact with the future."
I'm sure I'm wrong in some sense, and maybe I see a little bit of why but not sure I understand the actual situation well enough to suggest language that would have worked on me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it to
Critical error encountered during Future initialization.
This typically occurs when interacting with Future objects
directly inside of a Task without initializing a Client
explicitly inside of this Task first.
If you encounter this, please ensure to initialize a Client
object yourself before interacting with the Future object
directly. See also
https://distributed.dask.org/en/stable/api.html#distributed.worker_client
Is this easier to understand for a non-expert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The link also shows an example so I am hoping this will clear things up even if users do not comprehend exactly the details of the exception message but I'm happy to take further suggestions about the concrete phrasing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think the new text and the link are pretty clear about how to fix it.
(Still very confusing though! At this point as a user I'm thinking "wait what?! client.submit(function, arg)
without making a client inside the function is super common across all the dask usage, docs, tutorials, etc. that I've seen. Why am I being told now that it's bad?")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without making a client inside the function is super common across all the dask usage, docs, tutorials, etc. that I've seen. Why am I being told now that it's bad?")
f = client.submit(foo)
g = client.submit(bar, f)
is fine and very, very common. We'll resolve this for the user and the user will only interact with the result of f
, not the actual Future
object.
f = client.submit(foo)
g = client.submit(bar, OpaqueWrapper(f))
will give the user the actual Future
object and not the result of that Future
object. This is where it gets tricky.
I don't think this difference can be explained in an exception message. It's also relatively rare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a concrete code snippet in the error message would help make this clearer?
For example, instead of:
dask_future = client.submit(...)
def func():
x = dask_future.result()
f = client.submit(func)
Try:
dask_future = client.submit(...)
def func():
# Explicitly initialize client within task before retrieving future's result
with distributed.worker_client() as c:
x = dask_future.result()
f = client.submit(func)
It's interesting that our own test suite is encountering this issue apparently. Will need to investigate more |
bf114c5
to
9ef148f
Compare
Ok, turns out this was a thread safety issue in worker client creation. I'll leave the RuntimeError still in there since this condition is still harmful but I'm not sure if we're actually hitting this any longer. |
Test cases still hit the RuntimeError but cannot reproduce locally (yet). Will need to have a closer look tomorrow |
distributed/tests/test_client.py
Outdated
@pytest.mark.slow() | ||
def test_cancelled_error_wrapped_future(loop): | ||
# See https://github.com/dask/distributed/issues/7746 | ||
with LocalCluster(loop=loop, dashboard_address=":0", n_workers=1) as cluster: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with LocalCluster(loop=loop, dashboard_address=":0", n_workers=1) as cluster: | |
with LocalCluster(loop=loop, dashboard_address=":0", n_workers=1, threads_per_worker=2) as cluster: |
The number of threads used here would vary based on the machine, right? I could only get #7746 to reproduce reliably with >=2 threads and a small sleep in func_to_map
.
return self._client | ||
else: | ||
return self._get_client() | ||
if self._client: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems you've moved the locking into _get_client
instead. That makes sense, but if this is called in multiple threads at once while self._client is None
, all threads will call _get_client
. Is the idea that default_client()
will raise a ValueError
every time, so the subsequent ones will reach the if not self._client
line and bail out?
Adding a if self._client: return self._client
just after acquiring the lock in _get_client
might make this easier to read and reason about; then you know at most once call to _get_client
will ever take any actions to try constructing a client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this code is still a bit of a mess. this was mostly a quick attempt to see if this changes anythign. CI is still not happy so I need to dig deeper. In the meantime I added the shortcut if the client already exists as you suggested
distributed/client.py
Outdated
Critical error encountered during Future initialization. This | ||
typically occurs when passing around Future objects without | ||
handling the Client lifecycle explicitly. If you encounter this, | ||
please ensure to initialize a Client object yourself before |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a concrete code snippet in the error message would help make this clearer?
For example, instead of:
dask_future = client.submit(...)
def func():
x = dask_future.result()
f = client.submit(func)
Try:
dask_future = client.submit(...)
def func():
# Explicitly initialize client within task before retrieving future's result
with distributed.worker_client() as c:
x = dask_future.result()
f = client.submit(func)
return 1 | ||
|
||
def func_to_map(x, x0): | ||
if use_worker_client: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if use_worker_client: | |
sleep(0.1) | |
if use_worker_client: |
This seemed to be necessary to reproduce #7746
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did reproduce for me with the simpler configuration but I'm happy to add a sleep here
bd26590
to
66718c1
Compare
This ensure there is a proper, actionable exception message for cases like #7746
I'm also looking into whether this behavior can be salvaged. Generally speaking, an explicit client is always better than an implicitly created one but the implicit one is of course nice from an UX perspective.
However, the 🪄 of implicitly creating clients can cause otherwise spurious failures, see #7498 for the umbrella issue