Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test retire workers deadlock #6240

Merged
merged 8 commits into from
Jun 16, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 104 additions & 2 deletions distributed/tests/test_active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,20 @@

import pytest

from distributed import Nanny, wait
from distributed import Event, Nanny, Scheduler, Worker, wait
from distributed.active_memory_manager import (
ActiveMemoryManagerExtension,
ActiveMemoryManagerPolicy,
RetireWorker,
)
from distributed.core import Status
from distributed.utils_test import captured_logger, gen_cluster, inc, slowinc
from distributed.utils_test import (
assert_story,
captured_logger,
gen_cluster,
inc,
slowinc,
)

NO_AMM_START = {"distributed.scheduler.active-memory-manager.start": False}

Expand Down Expand Up @@ -903,6 +910,101 @@ async def test_RetireWorker_all_recipients_are_paused(c, s, a, b):
assert await c.submit(inc, 1) == 2


@gen_cluster(
client=True,
config={
"distributed.scheduler.active-memory-manager.start": True, # to avoid one-off AMM instance
"distributed.scheduler.active-memory-manager.policies": [],
},
timeout=15,
)
async def test_RetireWorker_new_keys_arrive_after_all_keys_moved_away(
c, s: Scheduler, a: Worker, b: Worker
):
"""
If all keys have been moved off a worker, but then new keys arrive (due to task completion or `gather_dep`)
before the worker has actually closed, make sure we still retire it (instead of hanging forever).

This test is timing-sensitive. If it runs too slowly, it *should* `pytest.skip` itself.

See https://github.com/dask/distributed/issues/6223 for motivation.
"""
ws_a = s.workers[a.address]
ws_b = s.workers[b.address]
event = Event()

# Put 200 keys on the worker, so `_track_retire_worker` will sleep for 0.5s
xs = c.map(lambda x: x, range(200), workers=[a.address])
await wait(xs)

# Put an extra task on the worker, which we will allow to complete once the `xs`
# have been replicated.
extra = c.submit(
lambda: event.wait("2s"),
workers=[a.address],
allow_other_workers=True,
key="extra",
)

while (
extra.key not in a.state.tasks or a.state.tasks[extra.key].state != "executing"
):
await asyncio.sleep(0.01)

t = asyncio.create_task(c.retire_workers([a.address]))

# Wait for all `xs` to be replicated.
while not len(ws_b.has_what) == len(xs):
await asyncio.sleep(0)

# `_track_retire_worker` _should_ now be sleeping for 0.5s, because there were >=200 keys on A.
# In this test, everything from the beginning of the transfers needs to happen within 0.5s.

# Simulate the policy running again. Because the default 2s AMM interval is longer
# than the 0.5s wait, what we're about to trigger is unlikely, but still possible
# for the times to line up. (Especially with a custom AMM interval.)
amm: ActiveMemoryManagerExtension = s.extensions["amm"]
assert len(amm.policies) == 1
policy = next(iter(amm.policies))
assert isinstance(policy, RetireWorker)

amm.run_once()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary. Dropping is not a precondition for done; it's just to reduce memory pressure in case of spilled keys.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling run_once is necessary, because what we wan to test is:

  • _track_retire_worker is sleeping
  • policy runs and removes itself because all keys have been replicated
  • another key appears on the retiring worker
  • _track_retire_worker wakes up

By running it once here, it will remove itself.


# The policy has removed itself, because all `xs` have been replicated.
assert not amm.policies
assert policy.done(), {ts.key: ts.who_has for ts in ws_a.has_what}

# But what if a new key arrives now while `_track_retire_worker` is still (maybe)
# sleeping? Let `extra` complete and wait for it to hit the scheduler.
await event.set()
await wait(extra)

if a.address not in s.workers:
# It took more than 0.5s to get here, and the scheduler closed our worker. Dang.
pytest.skip(
"Timing didn't work out: `_track_retire_worker` finished before `extra` completed."
)

# `retire_workers` doesn't hang
await t
assert a.address not in s.workers
assert not amm.policies

# `extra` was not transferred from `a` to `b`. Instead, it was recomputed on `b`.
story = b.state.story(extra.key)
assert_story(
story,
[
(extra.key, "compute-task", "released"),
(extra.key, "released", "waiting", "waiting", {"extra": "ready"}),
(extra.key, "waiting", "ready", "ready", {"extra": "executing"}),
],
)

# `extra` completes successfully and is fetched from the other worker.
await extra.result()


# FIXME can't drop runtime of this test below 10s; see distributed#5585
@pytest.mark.slow
@gen_cluster(
Expand Down