Skip to content

Commit

Permalink
static is_rootish based on cogroups
Browse files Browse the repository at this point in the history
Closes dask#7274
  • Loading branch information
gjoseph92 committed Dec 5, 2022
1 parent 5c3d81d commit ba846ad
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 11 deletions.
15 changes: 6 additions & 9 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2837,18 +2837,15 @@ def is_rootish(self, ts: TaskState) -> bool:
"""
Whether ``ts`` is a root or root-like task.
Root-ish tasks are part of a group that's much larger than the cluster,
and have few or no dependencies.
The task has a co-group, but no dependencies come from it.
(This also means tasks with 0 dependencies are root-ish.)
"""
if ts.resource_restrictions or ts.worker_restrictions or ts.host_restrictions:
return False
tg = ts.group
# TODO short-circuit to True if `not ts.dependencies`?
return (
len(tg) > self.total_nthreads * 2
and len(tg.dependencies) < 5
and sum(map(len, tg.dependencies)) < 5
)
if not (cg := ts.cogroup):
return len(ts.dependencies) == 0

return not any(dts.cogroup is cg for dts in ts.dependencies)

def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0) -> None:
"""Update the status of the idle and saturated state
Expand Down
8 changes: 6 additions & 2 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,9 @@ async def test_queued_remove_add_worker(c, s, a, b):
event = Event()
fs = c.map(lambda i: event.wait(), range(10))

await async_wait_for(lambda: len(s.queued) == 6, timeout=5)
await async_wait_for(
lambda: len(s.queued) == 6, timeout=5, fail_func=lambda: print(len(s.queued))
)
await s.remove_worker(a.address, stimulus_id="fake")
assert len(s.queued) == 8

Expand Down Expand Up @@ -4164,7 +4166,9 @@ async def test_transition_waiting_memory(c, s, a, b):
reason="Nothing will be classified as root-ish",
),
),
False,
pytest.param(
False, marks=pytest.mark.xfail(reason="FIXME tasks are always root-ish now")
),
],
)
@gen_cluster(client=True, nthreads=[("", 1)])
Expand Down

0 comments on commit ba846ad

Please sign in to comment.