-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Actor: don't hold key references on workers #4937
Changes from 3 commits
97f9d60
c3539c2
e786f39
7d780cd
0edc1a5
904f0f6
aa87db8
4176294
4dac642
cb8ca76
1cb1066
e2cee3f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -466,7 +466,7 @@ def f(block, ps=None): | |||||
|
||||||
|
||||||
@pytest.mark.flaky(reruns=10, reruns_delay=5) | ||||||
@gen_cluster(client=True) | ||||||
@gen_cluster(client=True, timeout=10) | ||||||
async def test_compute(c, s, a, b): | ||||||
@dask.delayed | ||||||
def f(n, counter): | ||||||
|
@@ -485,10 +485,8 @@ def check(counter, blanks): | |||||
result = await c.compute(final, actors=counter) | ||||||
assert result == 0 + 1 + 2 + 3 + 4 | ||||||
|
||||||
start = time() | ||||||
while a.data or b.data: | ||||||
await asyncio.sleep(0.01) | ||||||
assert time() < start + 30 | ||||||
|
||||||
|
||||||
def test_compute_sync(client): | ||||||
|
@@ -515,7 +513,7 @@ def check(dask_worker): | |||||
start = time() | ||||||
while any(client.run(check).values()): | ||||||
sleep(0.01) | ||||||
assert time() < start + 30 | ||||||
assert time() < start + 10 | ||||||
|
||||||
|
||||||
@gen_cluster( | ||||||
|
@@ -566,6 +564,61 @@ async def wait(self): | |||||
await c.gather(futures) | ||||||
|
||||||
|
||||||
@gen_cluster(client=True, client_kwargs=dict(set_as_default=False)) | ||||||
# ^ NOTE: without `set_as_default=False`, `get_client()` within worker would return | ||||||
# the same client instance the test is using (because it's all one process). | ||||||
# Even with this, both workers will share the same client instance. | ||||||
async def test_worker_actor_handle_is_weakref(c, s, a, b): | ||||||
counter = c.submit(Counter, actor=True, workers=[a.address]) | ||||||
|
||||||
await c.submit(lambda _: None, counter, workers=[b.address]) | ||||||
|
||||||
del counter | ||||||
|
||||||
start = time() | ||||||
while a.actors or b.data: | ||||||
await asyncio.sleep(0.1) | ||||||
assert time() < start + 10 | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a style not about the test. The implementation of this explicit timeout is usually not necessary. The gen_cluster ships with a timeout. I believe this pattern originated before the gen_cluster had its own timeout. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the historical context, that's good to know. I'll take these out, though I will also make the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It is typically that large because on CI the runtimes vary a lot. For local testing I usually also adapt this. FWIW, there is a constant in Note, though, I believe the other tests are not protected by a timeout by default. We only have the global pytest-timeout last resort timeout Lines 53 to 54 in 7d0f010
bottom line: you may keep your timeout, of course, I just wanted to fill in some info There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would not advise to reduce timeout below 30s. It's very easy to set something that looks reasonably comfortable on your laptop, only to find out later that CI can get 10x slower. |
||||||
|
||||||
|
||||||
def test_worker_actor_handle_is_weakref_sync(client): | ||||||
workers = list(client.run(lambda: None)) | ||||||
counter = client.submit(Counter, actor=True, workers=[workers[0]]) | ||||||
|
||||||
client.submit(lambda _: None, counter, workers=[workers[1]]).result() | ||||||
|
||||||
del counter | ||||||
|
||||||
def check(dask_worker): | ||||||
return len(dask_worker.data) + len(dask_worker.actors) | ||||||
|
||||||
start = time() | ||||||
while any(client.run(check).values()): | ||||||
sleep(0.01) | ||||||
assert time() < start + 10 | ||||||
|
||||||
|
||||||
def test_worker_actor_handle_is_weakref_from_compute_sync(client): | ||||||
workers = list(client.run(lambda: None)) | ||||||
|
||||||
with dask.annotate(workers=workers[0]): | ||||||
counter = dask.delayed(Counter)() | ||||||
with dask.annotate(workers=workers[1]): | ||||||
intermediate = dask.delayed(lambda c: None)(counter) | ||||||
with dask.annotate(workers=workers[0]): | ||||||
final = dask.delayed(lambda x, c: x)(intermediate, counter) | ||||||
|
||||||
final.compute(actors=counter, optimize_graph=False) | ||||||
|
||||||
def worker_tasks_running(dask_worker): | ||||||
return len(dask_worker.data) + len(dask_worker.actors) | ||||||
|
||||||
start = time() | ||||||
while any(client.run(worker_tasks_running).values()): | ||||||
sleep(0.01) | ||||||
assert time() < start + 10 | ||||||
|
||||||
|
||||||
def test_one_thread_deadlock(): | ||||||
with cluster(nworkers=2) as (cl, w): | ||||||
client = Client(cl["address"]) | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is the inform not always necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the key change in this PR. See #4936 (particularly the "complex case") for a description of the case this is solving. Basically, when an Actor handle got serialized and transferred to another worker, that new worker was taking out a lease on the Actor's key, which prevented that key from ever being released because the scheduler saw that some client (that worker) wanted it, which meant the scheduler never told that worker to release the key. In the end, the Actor would continue running even when nothing depended on it.
But also as I mentioned in the description, I don't think this is a good implementation. I just wanted to post the problem and a quick fix, and mostly hear from @mrocklin on what the
worker=
kwarg toActor
was supposed to be used for (since I can't find it used anywhere).I've been thinking something like
where we remove the
worker=
argument, and makeself._client
andself._worker
mutually exclusive. This way, whenever we're running on a worker, we don't create a Future.If the Actor handle was sent to another worker because a task depends on that Actor's key, holding the Future is unnecessary—the fact that a task depends on that key means the scheduler won't release that key yet. The only issue could be running a task where the function happens to connect to an Actor internally (but the task has no explicit dependency on that Actor)—in that case, if the client released the Actor's key while that task was running, in theory the Actor could be cancelled while that task needed it, since the task doesn't itself hold a Future to the key (since
get_worker()
succeeded).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds a bit like the worker where the actor is deserialized is not doing a good job of cleaning up after itself.
Worker A: owner of the actor; serialize
Worker B: interested in actor result; deserialize
In particular, once the key/actor is removed on Worker B (think release_key), the future should be released. Once the future is released, the scheduler should trigger the proper forget lifecycle for the actor.
Inspecting the actor class, we do not have a dedicated release mechanism. Therefore, the future is coupled to the lifetime of the client the future is attached to. The client will probably live as long as the worker lives and therefore will only be cleaned up once the worker closes. Even worse in same-process worker situations where clients may be shared between workers, the future may even outlive the worker which cause the reference.
I argue that something like the following should do the trick
however, this also deadlocks and doesn't release. this deadlock seems to connect to the recent worker state machine issues and loosely connects to #4918 I'm not entirely sure if the fixes over there would resolve that deadlock, though. To get the tests unstuck I needed to add one more patch.
FWIW, I think your approach is fine. For ordinary data, workers are not able to hold references to remote data and block their release. why should it be any different for actors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue is that there may be nothing to cause the key/actor to be removed on Worker B—
release_key
will never run.Let's call the key
actor-abcde
. The issue is that worker B holds a future foractor-abcde
. When everything that depends onactor-abcde
completes,_propagate_forgotten
runs on the scheduler. It sees thatactor-abcde
has no dependents. But it also sees thatTaskState('actor-abcde').who_wants
is not empty.distributed/distributed/scheduler.py
Line 7315 in bb991d1
Who wants it?
actor-abcde
's own client. Therefore, the scheduler doesn't recommend transitioningactor-abcde
to forgotten, so the scheduler will never tell the workers torelease_key('actor-abcde')
, so the client preventing that key from being released will never be released.Good point. That makes me feel comfortable going to go with the approach I showed above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a test showing that an actor is not currently cleaned up?