Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scheduler transition error on memory->erred #8549

Merged
merged 26 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1964,6 +1964,7 @@
)

v = a_recs.get(key, finish)
# The inner rec has higher priority? Is that always desired?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a general comment about the two-step transitions. The recommendations created by the first step are executed before the second step, which may create weird state (as it did in this case).

func = self._TRANSITIONS_TABLE["released", v]
b_recs, b_cmsgs, b_wmsgs = func(self, key, stimulus_id)

Expand Down Expand Up @@ -2082,7 +2083,11 @@
assert not ts.who_has
assert not ts.processing_on
for dts in ts.dependencies:
assert dts.state not in {"forgotten", "erred"}
assert dts.state not in {"forgotten", "erred"}, (
ts,
dts,
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
self.transition_log,
)

if ts.has_lost_dependencies:
return {key: "forgotten"}, {}, {}
Expand Down Expand Up @@ -2480,7 +2485,9 @@
recommendations[key] = "forgotten"
elif ts.has_lost_dependencies:
recommendations[key] = "forgotten"
elif ts.who_wants or ts.waiters:
elif (ts.who_wants or ts.waiters) and not any(
dts.state in ("erred",) for dts in ts.dependencies
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
):
recommendations[key] = "waiting"

for dts in ts.waiters or ():
Expand All @@ -2505,14 +2512,13 @@
assert ts.exception_blame
assert not ts.who_has
assert not ts.waiting_on
assert not ts.waiters
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion does not work in two-step transitions.


failing_ts = ts.exception_blame
assert failing_ts

for dts in ts.dependents:
dts.exception_blame = failing_ts
if not dts.who_has:
dts.exception_blame = failing_ts
recommendations[dts.key] = "erred"

report_msg = {
Expand Down Expand Up @@ -2547,6 +2553,9 @@

for dts in ts.dependents:
if dts.state == "erred":
# Does this make sense?
# This goes via released
# dts -> released -> waiting
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree this makes no sense to me either. Is there a unit test anywhere to shed light on it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't investigated this any further.

recommendations[dts.key] = "waiting"

w_msg = {
Expand Down Expand Up @@ -2621,8 +2630,8 @@
self,
key: Key,
stimulus_id: str,
worker: str | None = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can a task be processing without a worker? Is it when the worker it was processing on died and it caused the task to increase its suspicious count too much? If so, it may be a good idea to note it in a comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that this has been superseded by other changes in this PR. I'll remove it and see if CI complains.

*,
worker: str,
cause: Key | None = None,
exception: Serialized | None = None,
traceback: Serialized | None = None,
Expand Down Expand Up @@ -2675,7 +2684,8 @@

if not ts.erred_on:
ts.erred_on = set()
ts.erred_on.add(worker)
if worker:
ts.erred_on.add(worker)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the type in the function declaration should change to worker: str | None?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I didn't clean this up properly.

if exception is not None:
ts.exception = exception
ts.exception_text = exception_text
Expand All @@ -2699,8 +2709,9 @@
)

for dts in ts.dependents:
dts.exception_blame = failing_ts
recommendations[dts.key] = "erred"
if not dts.who_has:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this the same as saying for dts in ts.waiters?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, looking at the code I think it should mean the same. Let's see if CI complains.

dts.exception_blame = failing_ts
recommendations[dts.key] = "erred"

for dts in ts.dependencies:
if dts.waiters:
Expand Down Expand Up @@ -5038,6 +5049,19 @@
"stimulus_id": stimulus_id,
}
]
elif ts.state == "erred":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this already covered by the next paragraph elif ts.run_id != run_id?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, AFAIR, we only issue a new run_id when the task is sent to a worker again.

Copy link
Member Author

@hendrikmakait hendrikmakait Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added another test for this. I don't like the way this handled context managers, if you have suggestions on how to simplify this, I'm all ears.

logger.debug(

Check warning on line 5053 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L5053

Added line #L5053 was not covered by tests
"Received already erred task, worker: %s" ", key: %s",
worker,
key,
)
worker_msgs[worker] = [

Check warning on line 5058 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L5058

Added line #L5058 was not covered by tests
{
"op": "free-keys",
"keys": [key],
"stimulus_id": stimulus_id,
}
]
elif ts.run_id != run_id:
if not ts.processing_on or ts.processing_on.address != worker:
logger.debug(
Expand Down
90 changes: 90 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4890,3 +4890,93 @@ async def test_resubmit_different_task_same_key_warns_only_once(

async with Worker(s.address):
assert await c.gather(zs) == [2, 3, 4] # Kept old ys


def block(x, in_event, block_event):
in_event.set()
block_event.wait()
return x


@gen_cluster(
client=True,
nthreads=[("", 1, {"resources": {"a": 1}})],
config={"distributed.scheduler.allowed-failures": 1},
)
async def test_fan_out_pattern_deadlock(c, s, a):
"""Regression test for https://github.com/dask/distributed/issues/8548

This test heavily uses resources to force scheduling decisions.
"""
in_ancestor = Event()
block_ancestor = Event()
in_on_a_descendant = Event()
in_on_b_descendant = Event()
block_on_a_descendant = Event()
block_on_b_descendant = Event()
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved

# Input task to 'g' that we can fail
with dask.annotate(resources={"b": 1}):
f = delayed(block)(1, in_ancestor, block_ancestor, dask_key_name="f")
g = delayed(inc)(f, dask_key_name="g")

# Fan-out from 'g' and run h1 and h2 on different workers
h1 = delayed(block)(
g, in_on_b_descendant, block_on_b_descendant, dask_key_name="h1"
)
with dask.annotate(resources={"a": 1}):
h2 = delayed(block)(
g, in_on_a_descendant, block_on_a_descendant, dask_key_name="h2"
)
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
del g
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved

f, h1, h2 = c.compute([f, h1, h2])
with captured_logger("distributed.scheduler", level=logging.ERROR) as logger:
async with Worker(s.address, nthreads=1, resources={"b": 1}) as b:
await block_ancestor.set()
await asyncio.gather(
in_on_a_descendant.wait(),
in_on_b_descendant.wait(),
)
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
await in_ancestor.clear()

# Make sure that the scheduler knows that both workers hold 'g' in memory
while len(s.tasks["g"].who_has) < 2:
await asyncio.sleep(0.1)
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
# Remove worker 'b' while it's processing h1
await s.remove_worker(b.address, stimulus_id="remove_b")
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
await block_on_b_descendant.set()
await b.close()
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
await block_ancestor.clear()

# Repeatedly remove new instances of the 'b' worker while it processes 'f'
# to trigger an transition for 'f' to 'erred'
crusaderky marked this conversation as resolved.
Show resolved Hide resolved
async with Worker(s.address, nthreads=1, resources={"b": 1}) as b:
await in_ancestor.wait()
await in_ancestor.clear()
await s.remove_worker(b.address, stimulus_id="remove_b")
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
await block_ancestor.set()
await b.close()
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
await block_ancestor.clear()

async with Worker(s.address, nthreads=1, resources={"b": 1}) as b:
await in_ancestor.wait()
await in_ancestor.clear()
await s.remove_worker(b.address, stimulus_id="remove_b")
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
await block_ancestor.set()
await b.close()
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved

await block_on_a_descendant.set()
await h2

with pytest.raises(KilledWorker, match="Attempted to run task 'h1'"):
await h1

del h1, h2
# Make sure that h2 gets forgotten on worker 'a'
await async_poll_for(lambda: not a.state.tasks, timeout=5)
# Ensure that no other errors including transition failures were logged
assert (
logger.getvalue()
== "Task h1 marked as failed because 2 workers died while trying to run it\nTask f marked as failed because 2 workers died while trying to run it\n"
)
Loading