-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Future deserialization without available client #7580
Conversation
thread_state.execution_state = execution_state | ||
thread_state.key = key | ||
thread_state.actor = True | ||
|
||
result = function(*args, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
State was actually leaking all over the place making get_worker in our test suite extremely unreliable.
try: | ||
return first(w for w in Worker._instances if w.status in WORKER_ANY_RUNNING) | ||
except StopIteration: | ||
raise ValueError("No workers found") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get the appeal of this but this can throw off some of our test logic and tests suggest that certain functionality would work which in reality doesn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i.e. due to our async tests, we are likely to get a worker just because in the same process an instance already exists. However, in reality there is no valid worker in the context, i.e. if the test example would be executed on a "real" cluster, it would fail
@gen_cluster(client=True) | ||
async def test_timeout_wake_waiter(c, s, a, b): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change to get_worker causes these things to break. On main, this test should actually not allowed to pass since there is no worker and no client in the context of the multi lock. However, due to the "pick first worker in the process" logic, it would pick a random worker and create implicitly a worker client. The Worker doesn't even have to be alive for this as long as it isn't GCed, yet, it would work
In most cases this is not a big deal but implicit, surprising client creations can cause things like #7498
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 26 files ± 0 26 suites ±0 11h 44m 40s ⏱️ - 41m 0s For more details on these failures, see this check. Results for commit f7786ee. ± Comparison against base commit 89d5ad4. This pull request removes 1 and adds 3 tests. Note that renamed tests count towards both.
This pull request skips 2 and un-skips 1 tests.
♻️ This comment has been updated with latest results. |
Interesting. I seem to hit a related recursion error in |
b19041a
to
7780ad7
Compare
OK, so I'm not entirely sure if the profiling thing is indeed related but apparently with this change, we're more likely to generate a very deeply nested profile message. The profile collection works out nicely but as soon as one tries to submit this profile message, we're hitting a recursion error during serialization. We can't truly protect ourselves from this during serialization since the serialization code does not know what is OK to cut out and what isn't. The only way I see we can deal with this is to be more conservative with the cutoff in profiling. Local testing found ~250 stacks to be the magical limit to get |
if depth <= 0: | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check caused us to not collect any information in these cases. I think it's still valuable to get a snapshot through even if it's not the lowest frame. Moving the depth check further below achieves this (see test_profile.py)
80bb9ff
to
a1f1f56
Compare
68f48cf
to
f7786ee
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code generally looks good to me. I have a few minor nits, and a few edge cases are untested. I suggest marking them with pragma: nocover
if we don't want to test these.
distributed/worker.py
Outdated
return first(w for w in Worker._instances if w.status in WORKER_ANY_RUNNING) | ||
except StopIteration: | ||
raise ValueError("No workers found") | ||
raise ValueError("No workers found") from None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
raise ValueError("No workers found") from None | |
raise ValueError("No worker found") from None |
distributed/tests/test_utils_test.py
Outdated
with pytest.raises(AssertionError): | ||
with ensure_no_new_clients(): | ||
async with Client(s.address, asynchronous=True): | ||
with ensure_no_new_clients(): | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand the intention of this correct, I'd split it up into two distinct checks
with pytest.raises(AssertionError): | |
with ensure_no_new_clients(): | |
async with Client(s.address, asynchronous=True): | |
with ensure_no_new_clients(): | |
pass | |
with pytest.raises(AssertionError): | |
with ensure_no_new_clients(): | |
async with Client(s.address, asynchronous=True): | |
pass | |
async with Client(s.address, asynchronous=True): | |
with ensure_no_new_clients(): | |
pass |
# @pytest.mark.skipif( | ||
# pa is not None, | ||
# reason="We don't have a CI job that is installing a very old pyarrow version", | ||
# ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# @pytest.mark.skipif( | |
# pa is not None, | |
# reason="We don't have a CI job that is installing a very old pyarrow version", | |
# ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear to me why this change was made to this test. My impression is that what's currently on main
is still want we want. I've reverted back to what's on main
(confirmed the test passes locally when pyarrow
isn't installed). Happy to submit a follow-up PR if needed
@@ -86,8 +86,9 @@ async def test_minimal_version(c, s, a, b): | |||
dtypes={"x": float, "y": float}, | |||
freq="10 s", | |||
) | |||
with pytest.raises(RuntimeError, match="requires pyarrow"): | |||
await c.compute(dd.shuffle.shuffle(df, "x", shuffle="p2p")) | |||
# with pytest.raises(RuntimeError, match="requires pyarrow"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# with pytest.raises(RuntimeError, match="requires pyarrow"): |
Noting test failures are unrelated to the changes in this PR (xref #7688) |
In dask/distributed#7580 `get_worker` was modified to return the worker of a task, thus it cannot be used by `client.run`, and we must now use `dask_worker` as the first argument to `client.run` to obtain the worker.
In dask/distributed#7580 get_worker was modified to return the worker of a task, thus it cannot be used by client.run, and we must now use dask_worker as the first argument to client.run to obtain the worker.
In dask/distributed#7580 `get_worker` was modified to return the worker of a task, thus it cannot be used by `client.run`, and we must now use `dask_worker` as the first argument to `client.run` to obtain the worker. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Mads R. B. Kristensen (https://github.com/madsbk) - Lawrence Mitchell (https://github.com/wence-) URL: #1141
In dask/distributed#7580 get_worker was modified to return the worker of a task, thus it cannot be used by client.run, and we must now use dask_worker as the first argument to client.run to obtain the worker. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Corey J. Nolet (https://github.com/cjnolet) - AJ Schmidt (https://github.com/ajschmidt8) URL: #1365
Since this PR, the following code fails surprisingly:
This got noticed because it was causing fsspec's dask implementation to hang in tests. |
I did this to workaround, since my filesystem only needs to know whether this is a process housing a worker, not if it happens to be running in a task. |
Thanks @martindurant, this looks related to #7696. There's a good conversation in that issue, but #7696 (comment) is the specific comment that relates to the snippet you posted |
Thanks for the pointer. From fsspec's point of view, the question is "am I on a worker machine", independent of serialisation or task state, so I'll use the simple approach. |
This is similar to
This grew a bit more complicated because I stumbled over #7498 again and had a deeper look and tried to preserve the "pass futures in collections" feature but it is fundamentally flawed. While I could bypass most of the "accidental client creations" in this PR (which is good), the fundamental flaw about it being possible to release a future before it is being deserialized is still there and hard to avoid without a more fundamental approach.
Two notable changes
This is again a supporting PR for #7564