-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor occupancy #7075
Refactor occupancy #7075
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 6h 34m 22s ⏱️ + 11m 6s For more details on these failures, see this check. Results for commit f8f11fe. ± Comparison against base commit 68e5a6a. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@crusaderky @gjoseph92 would either of you have time to review this? @hendrikmakait mentioned he'd like to see this included in the release tomorrow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, very happy to see this change. This metric feels pretty sensible to me, and I very much like having it always be correct.
Mostly nits, but a couple more significant comments too.
@property | ||
def scheduler(self): | ||
assert self.scheduler_ref | ||
s = self.scheduler_ref() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get the reason for this pattern of the weakref to the scheduler, but I don't love it. It just feels a little odd, and also means every access to self.scheduler
has to resolve a few references.
How awkward would it be if all the TaskState
methods that needed to do something to the scheduler state took the scheduler as an argument? That would also make it explicit that they mutate the scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The more I look at this, the more I consider moving these functions (or at least the parts that require access to a SchedulerState
back up the hierarchy into the scheduler state. Since we need access to the SchedulerState
as a whole, it seems that this would be the better correct root object to handle those operations. This would also avoid problems like acquiring resources when using Scheduler._add_to_processing
but not doing so in WorkerState.add_to_processing
. It's not clear which of these methods should be used by others such as WorkStealing
and I'm increasingly convinced that it should not be the WorkerState
-based ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit of an elaborate change, so I think it might make sense to extract it into an individual PR to avoid blocking this one and litter some technically unrelated refactoring changes into this.
@@ -1361,6 +1356,7 @@ async def test_reschedule_concurrent_requests_deadlock(c, s, *workers): | |||
assert msgs in (expect1, expect2, expect3) | |||
|
|||
|
|||
@pytest.mark.skip("executing heartbeats not considered yet") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also seems important to fix before a release. I have a feeling that main use cases where people actually depend on stealing right now is submitting a bunch of very, very slow tasks, then scaling up the cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #7030 (comment) for @fjetter's thoughts on this.
distributed/tests/test_steal.py
Outdated
s._reevaluate_occupancy_worker(ws) | ||
# Re-evaluate idle/saturated classification to avoid outdated classifications due to | ||
# the initialization order of workers. On a real cluster, this would get constantly | ||
# updated by tasks completing (except for stragglers). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, as I mentioned in the comment above, there are definitely use-cases right now where people submit very, very slow tasks (~hours) and expect them to rebalance to new workers, even before any tasks have completed.
Adding this re-evaluation in here to every test case might hide things that could cause actual problems in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've adjusted the comment. The idle/saturated classification happens whenever a task completes or is added.
In your example, if we assume tasks being scheduled roughly round-robin, all workers should end up as saturated in the beginning. Since new workers would be idle, stealing should happen.
The problem in this particular test implementation is that we do not schedule tasks round-robin but for one worker after another. Thus, if we first schedule all specified tasks on a worker that's supposed to be idle in the grand scheme of the test, it would be classified as saturated for the lack of other even more saturated workers. These other workers only get filled up after this worker, which requires us to reclassify once after we completed the test setup.
Co-authored-by: Gabe Joseph <[email protected]>
assert ws in state.running, state.running | ||
assert (o := state.workers.get(ws.address)) is ws, (ws, o) | ||
|
||
state._set_duration_estimate(ts, ws) | ||
ws.add_to_processing(ts) | ||
ts.processing_on = ws | ||
ts.state = "processing" | ||
state.acquire_resources(ts, ws) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, we never acquire(d) resources in stealing
. Is that on purpose or an oversight?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #5937 (comment); I think this is a bug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's address this in another PR
A/B test: https://github.com/coiled/coiled-runtime/actions/runs/3159631751 Results |
|
||
|
||
# Reproducer from https://github.com/dask/distributed/issues/6573 | ||
@gen_cluster( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -487,7 +481,7 @@ def balance(self) -> None: | |||
) | |||
|
|||
if log: | |||
self.log(log) | |||
self.log(("request", log)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Driveby: Add an identifier to the logged bulk event.
Co-authored-by: fjetter <[email protected]> Co-authored-by: Gabe Joseph <[email protected]>
This is an implementation of the suggestion in #7027
Supersedes
pre-commit run --all-files