Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scheduler transition error on memory->erred #8549

Merged
merged 26 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1964,6 +1964,7 @@
)

v = a_recs.get(key, finish)
# The inner rec has higher priority? Is that always desired?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a general comment about the two-step transitions. The recommendations created by the first step are executed before the second step, which may create weird state (as it did in this case).

func = self._TRANSITIONS_TABLE["released", v]
b_recs, b_cmsgs, b_wmsgs = func(self, key, stimulus_id)

Expand Down Expand Up @@ -2082,7 +2083,11 @@
assert not ts.who_has
assert not ts.processing_on
for dts in ts.dependencies:
assert dts.state not in {"forgotten", "erred"}
assert dts.state not in {"forgotten", "erred"}, (
ts,
dts,
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
self.transition_log,
)

if ts.has_lost_dependencies:
return {key: "forgotten"}, {}, {}
Expand Down Expand Up @@ -2480,7 +2485,9 @@
recommendations[key] = "forgotten"
elif ts.has_lost_dependencies:
recommendations[key] = "forgotten"
elif ts.who_wants or ts.waiters:
elif (ts.who_wants or ts.waiters) and not any(
dts.state in ("erred",) for dts in ts.dependencies
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
):
recommendations[key] = "waiting"

for dts in ts.waiters or ():
Expand All @@ -2505,14 +2512,13 @@
assert ts.exception_blame
assert not ts.who_has
assert not ts.waiting_on
assert not ts.waiters
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion does not work in two-step transitions.


failing_ts = ts.exception_blame
assert failing_ts

for dts in ts.dependents:
dts.exception_blame = failing_ts
if not dts.who_has:
dts.exception_blame = failing_ts

Check warning on line 2521 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L2521

Added line #L2521 was not covered by tests
recommendations[dts.key] = "erred"

report_msg = {
Expand Down Expand Up @@ -2547,6 +2553,9 @@

for dts in ts.dependents:
if dts.state == "erred":
# Does this make sense?
# This goes via released
# dts -> released -> waiting
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree this makes no sense to me either. Is there a unit test anywhere to shed light on it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't investigated this any further.

recommendations[dts.key] = "waiting"

w_msg = {
Expand Down Expand Up @@ -2621,8 +2630,8 @@
self,
key: Key,
stimulus_id: str,
worker: str | None = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can a task be processing without a worker? Is it when the worker it was processing on died and it caused the task to increase its suspicious count too much? If so, it may be a good idea to note it in a comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that this has been superseded by other changes in this PR. I'll remove it and see if CI complains.

*,
worker: str,
cause: Key | None = None,
exception: Serialized | None = None,
traceback: Serialized | None = None,
Expand Down Expand Up @@ -2675,7 +2684,8 @@

if not ts.erred_on:
ts.erred_on = set()
ts.erred_on.add(worker)
if worker:
ts.erred_on.add(worker)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the type in the function declaration should change to worker: str | None?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I didn't clean this up properly.

if exception is not None:
ts.exception = exception
ts.exception_text = exception_text
Expand All @@ -2699,8 +2709,9 @@
)

for dts in ts.dependents:
dts.exception_blame = failing_ts
recommendations[dts.key] = "erred"
if dts.who_has:
dts.exception_blame = failing_ts
recommendations[dts.key] = "erred"

Check warning on line 2714 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L2713-L2714

Added lines #L2713 - L2714 were not covered by tests

for dts in ts.dependencies:
if dts.waiters:
Expand Down
61 changes: 61 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4890,3 +4890,64 @@ async def test_resubmit_different_task_same_key_warns_only_once(

async with Worker(s.address):
assert await c.gather(zs) == [2, 3, 4] # Kept old ys


def block(x, in_event, block_event):
in_event.set()
block_event.wait()
return x


@gen_cluster(
client=True,
nthreads=[("", 1, {"resources": {"a": 1}}), ("", 1, {"resources": {"b": 1}})],
config={"distributed.scheduler.allowed-failures": 1},
Worker=Nanny,
)
async def test_fan_out_pattern_deadlock(c, s, a, b):
"""Regression test for https://github.com/dask/distributed/issues/8548

Conceptually, this test simulates a fan-out based workload, where the worker
that processed the single input task of the fan-out dies once the fan-out task is processed
but before the scheduler recognizes that the single input task has successfully been sent to
another worker which now processes a fan-out task. Then, workers continue to die during processing
of the input tasks.

This test heavily uses resources to force scheduling decisions.
"""
in_ancestor = Event()
block_ancestor = Event()
in_on_a_descendant = Event()
in_on_b_descendant = Event()
block_descendants = Event()
await block_ancestor.set()
f = c.submit(block, 1, in_ancestor, block_ancestor, key="f", resources={"a": 1})
g = c.submit(inc, f, key="g", resources={"a": 1})
h1 = c.submit(
block, g, in_on_a_descendant, block_descendants, key="h1", resources={"a": 1}
)
h2 = c.submit(
block, g, in_on_b_descendant, block_descendants, key="h2", resources={"b": 1}
)

await asyncio.gather(
wait_for_state("g", "memory", s),
in_on_a_descendant.wait(),
in_on_b_descendant.wait(),
)
await asyncio.gather(in_ancestor.clear(), block_ancestor.clear())
await a.process.process.kill()

await in_ancestor.wait()
await in_ancestor.clear()
await a.process.process.kill()

await in_ancestor.wait()
await in_ancestor.clear()
await a.process.process.kill()

await block_descendants.set()
# await block_ancestor.set()
await h2
with pytest.raises(KilledWorker, match="Attempted to run task 'h1'"):
await h1
Loading