Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky test_close_gracefully and test_lifetime #5677

Merged
merged 6 commits into from
Jan 25, 2022
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 52 additions & 16 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1627,32 +1627,68 @@ async def test_worker_listens_on_same_interface_by_default(cleanup, Worker):

@gen_cluster(client=True)
async def test_close_gracefully(c, s, a, b):
futures = c.map(slowinc, range(200), delay=0.1)
futures = c.map(slowinc, range(200), delay=0.1, workers=[b.address])

while not b.data:
# Note: keys will appear in b.data several milliseconds before they switch to
# status=memory in s.tasks. It's important to sample the in-memory keys from the
# scheduler side, because those that the scheduler thinks are still processing won't
# be replicated by retire_workers().
while True:
mem = {k for k, ts in s.tasks.items() if ts.state == "memory"}
if len(mem) >= 8:
break
await asyncio.sleep(0.01)
mem = set(b.data)
proc = {ts for ts in b.tasks.values() if ts.state == "executing"}
assert proc

assert any(ts for ts in b.tasks.values() if ts.state == "executing")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a tiny time window possible where no task is in executing state until the next ensure_computing is called

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


await b.close_gracefully()

assert b.status == Status.closed
assert b.address not in s.workers
assert mem.issubset(a.data.keys())
for ts in proc:
assert ts.state in ("executing", "memory")

# All tasks that were in memory in b have been copied over to a;
# they have not been recomputed
for key in mem:
assert_worker_story(
a.story(key),
[
(key, "put-in-memory"),
(key, "receive-from-scatter"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this a scatter event?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

legacy (pre-AMM) Scheduler.gather_on_worker, which underlies both Scheduler.replicate and Scheduler.retire_workers

],
strict=True,
)
assert key in a.data


@pytest.mark.slow
@gen_cluster(client=True, nthreads=[])
async def test_lifetime(c, s):
async with Worker(s.address) as a, Worker(s.address, lifetime="1 seconds") as b:
futures = c.map(slowinc, range(200), delay=0.1, worker=[b.address])
await asyncio.sleep(1.5)
assert b.status not in (Status.running, Status.paused)
await b.finished()
assert set(b.data) == set(a.data) # successfully moved data over
@gen_cluster(client=True, nthreads=[("", 1)], timeout=10)
async def test_lifetime(c, s, a):
async with Worker(s.address, lifetime="2 seconds") as b:
futures = c.map(slowinc, range(200), delay=0.1, workers=[b.address])
Copy link
Collaborator Author

@crusaderky crusaderky Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix spelling error worker->workers! All tasks were failing before!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like await next(as_completed(futures)) would protect us there and may remove the necessity for sleep below. I'm fine without it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revised

await asyncio.sleep(1)
assert not a.data
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After fixing the misspelling above, the test was occasionally failing this assertion with lifetime="1 seconds" and sleep(0.5)

# Note: keys will appear in b.data several milliseconds before they switch to
# status=memory in s.tasks. It's important to sample the in-memory keys from the
# scheduler side, because those that the scheduler thinks are still processing
# won't be replicated by retire_workers().
mem = {k for k, ts in s.tasks.items() if ts.state == "memory"}
assert mem

while b.status != Status.closed:
await asyncio.sleep(0.01)

# All tasks that were in memory in b have been copied over to a;
# they have not been recomputed
for key in mem:
assert_worker_story(
a.story(key),
[
(key, "put-in-memory"),
(key, "receive-from-scatter"),
],
strict=True,
)
assert key in a.data


@gen_cluster(worker_kwargs={"lifetime": "10s", "lifetime_stagger": "2s"})
Expand Down