Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache root-ish-ness for consistency #7262

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

gjoseph92
Copy link
Collaborator

This is a way of avoiding the consistency issues in #7259 with less thinking. If root-ish-ness can't change, things are simpler.

I don't love having to do this. But hopefully this will be determined statically (and likely cached) anyway: #6922.

  • Tests added / passed
  • Passes pre-commit run --all-files

it's possible for tasks to not be rootish when they go into no-worker, but to be rootish when they come out.
@@ -7052,6 +7083,8 @@ def get_metadata(self, keys: list[str], default=no_default):
def set_restrictions(self, worker: dict[str, Collection[str] | str]):
for key, restrictions in worker.items():
ts = self.tasks[key]
if ts._rootish is not None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that set_restrictions is a public API at all. Doesn't seem like something you should be able to do post-hoc.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails test_reschedule_concurrent_requests_deadlock, which sets restrictions on a processing task.

@gen_cluster(
client=True,
nthreads=[("", 1)] * 3,
config={
"distributed.scheduler.work-stealing-interval": 1_000_000,
},
)
async def test_reschedule_concurrent_requests_deadlock(c, s, *workers):
# https://github.com/dask/distributed/issues/5370
steal = s.extensions["stealing"]
w0 = workers[0]
ev = Event()
futs1 = c.map(
lambda _, ev: ev.wait(),
range(10),
ev=ev,
key=[f"f1-{ix}" for ix in range(10)],
workers=[w0.address],
allow_other_workers=True,
)
while not w0.active_keys:
await asyncio.sleep(0.01)
# ready is a heap but we don't need last, just not the next
victim_key = list(w0.active_keys)[0]
victim_ts = s.tasks[victim_key]
wsA = victim_ts.processing_on
other_workers = [ws for ws in s.workers.values() if ws != wsA]
wsB = other_workers[0]
wsC = other_workers[1]
steal.move_task_request(victim_ts, wsA, wsB)
s.set_restrictions(worker={victim_key: [wsB.address]})
s._reschedule(victim_key, stimulus_id="test")
assert wsB == victim_ts.processing_on
# move_task_request is not responsible for respecting worker restrictions
steal.move_task_request(victim_ts, wsB, wsC)
# Let tasks finish
await ev.set()
await c.gather(futs1)
assert victim_ts.who_has != {wsC}
msgs = steal.story(victim_ts)
msgs = [msg[:-1] for msg in msgs] # Remove random IDs
# There are three possible outcomes
expect1 = [
("stale-response", victim_key, "executing", wsA.address),
("already-computing", victim_key, "executing", wsB.address, wsC.address),
]
expect2 = [
("already-computing", victim_key, "executing", wsB.address, wsC.address),
("already-aborted", victim_key, "executing", wsA.address),
]
# This outcome appears only in ~2% of the runs
expect3 = [
("already-computing", victim_key, "executing", wsB.address, wsC.address),
("already-aborted", victim_key, "memory", wsA.address),
]
assert msgs in (expect1, expect2, expect3)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that set_restrictions is a public API at all. Doesn't seem like something you should be able to do post-hoc.

We can change it. First step is a deprecation warning

@github-actions
Copy link
Contributor

github-actions bot commented Nov 5, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±  0         15 suites  ±0   6h 43m 28s ⏱️ + 13m 3s
  3 171 tests +  2    3 084 ✔️ ±0    83 💤  - 1    4 +  3 
23 464 runs  +16  22 543 ✔️ +2  903 💤 ±0  18 +14 

For more details on these failures, see this check.

Results for commit 6f188bf. ± Comparison against base commit 4b00be1.

♻️ This comment has been updated with latest results.

@fjetter
Copy link
Member

fjetter commented Nov 7, 2022

I don't like caching this. I don't consider it user friendly to pin the behavior of our decision logic pending on the time a computation is dispatched. I think this will be a hard to debug problem if users run into weird scheduling just because their cluster was still scaling up.

@mrocklin
Copy link
Member

mrocklin commented Nov 7, 2022

I'm curious, what are some situations where rootish-ness can change?

@gjoseph92
Copy link
Collaborator Author

Broadly, it changes when either the size of the TaskGroup changes (add/cancel tasks) or the cluster size changes. Some specific examples:

  • With client.submit in a for-loop, the first nthreads * 2 tasks will be non-root-ish, the rest will be root-ish. But when the loop is done, is_rootish on every task is True, even the ones we originally scheduled as though they weren't root-ish.
  • You submit 100 tasks to a 40-thread cluster with queuing on, so they are all root-ish and get queued. They are very slow tasks. You scale up the cluster beyond 50 threads while some tasks are still queued. Now, when tasks come out of the queue, they are no longer root-ish.

@gjoseph92
Copy link
Collaborator Author

We're going to take a different approach; see #7259 (comment). Rather than trying to make is_rootish static, we'll accept that it's dynamic and switch behavior and state when it changes.

@gjoseph92
Copy link
Collaborator Author

I think this will be a hard to debug problem if users run into weird scheduling just because their cluster was still scaling up.

I think we're only worried about not queuing tasks when we should. That is, is_rootish gets cached as False when it later should be True. Queueing tasks when we "shouldn't" doesn't seem like a big deal to me; as we've seen, this makes little performance difference. If queuing is default scheduling behavior, or you've turned it on explicitly, it shouldn't be surprising when things get queued.

is_rootish getting cached as False when it later should be True can only happen when:

  • the TaskGroup grows
  • the cluster shrinks

So it won't happen when the cluster is scaling up. Just when submitting tasks before scaling down. And in that case:

  1. tasks are non-rootish, they all get assigned to workers at once
  2. workers leave. Their processing tasks are released. This resets the _rootish cache.
  3. tasks are rescheduled, now seen as root-ish (cluster shrunk), and queued as expected.

So that case is fine. Only remaining case is the TaskGroup growing. That happens most commonly through client.submit in a for-loop.

  1. first nthreads * 2 tasks are submitted immediately
  2. rest are queued

...and that's the same behavior as if we weren't caching. Having the group split like this isn't ideal, but there's nothing we can do about it, and caching doesn't change things.

@gjoseph92 gjoseph92 reopened this Nov 8, 2022
@gjoseph92
Copy link
Collaborator Author

gjoseph92 commented Nov 8, 2022

Also to clarify: the definition of caching in this approach is that is_rootish(ts) always returns the same thing only while the task is in queued or no-worker. Upon leaving those states, the cache is invalidated.

We don't need to cache root-ish-ness for the lifetime of a task; that's overkill. I only care that is_rootish is the same when a task gets put into the queued or unrunnable sets as when it's removed from those sets.

(To be clear I haven't fully implemented that here yet, but that would be the intent.) edit: now implemented

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants