Skip to content

Commit

Permalink
Pick from idle workers first
Browse files Browse the repository at this point in the history
  • Loading branch information
gjoseph92 committed Jun 18, 2021
1 parent d810d2d commit 768d660
Showing 1 changed file with 23 additions and 16 deletions.
39 changes: 23 additions & 16 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2325,6 +2325,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState:
ws = decide_worker(
ts,
self._workers_dv.values(),
self._idle_dv.values(),
valid_workers,
partial(self.worker_objective, ts),
)
Expand Down Expand Up @@ -7459,14 +7460,19 @@ def _reevaluate_occupancy_worker(state: SchedulerState, ws: WorkerState):
@cfunc
@exceptval(check=False)
def decide_worker(
ts: TaskState, all_workers, valid_workers: set, objective
ts: TaskState,
all_workers: sortedcontainers.SortedValuesView,
idle_workers: sortedcontainers.SortedValuesView,
valid_workers: set,
objective,
) -> WorkerState:
"""
Decide which worker should take task *ts*.
We choose the worker that has the data on which *ts* depends.
We consider all workers which hold dependencies of *ts*,
plus a sample of 20 random workers (with preference for idle ones).
If several workers have dependencies then we choose the less-busy worker.
From those, we choose the worker where the *objective* function is minimized.
Optionally provide *valid_workers* of where jobs are allowed to occur
(if all workers are allowed to take the task, pass None instead).
Expand All @@ -7476,29 +7482,30 @@ def decide_worker(
of bytes sent between workers. This is determined by calling the
*objective* function.
"""
# TODO should it be a bounded fraction of `len(all_workers)`?
N_RANDOM_WORKERS: Py_ssize_t = 20

ws: WorkerState = None
wws: WorkerState
dts: TaskState
deps: set = ts._dependencies
random_workers_set: set = (
valid_workers if valid_workers is not None else all_workers
)
candidates: set
assert all([dts._who_has for dts in deps])
if ts._actor:
candidates = set(all_workers)
else:
if len(random_workers_set) <= N_RANDOM_WORKERS:
# Fastpath: every worker would end up in `candidates`, so no need to build the set from `who_has`.
candidates = random_workers_set
else:
candidates = {wws for dts in deps for wws in dts._who_has}
# TODO the ordering of this set is likely not actually random.
# Ideally it would be ordered by occupancy (but that's more work than we want to do).
# Can we at least ensure idle workers come first? And that it's reasonably random after that?
candidates.update(itertools.islice(random_workers_set, N_RANDOM_WORKERS))
candidates = {wws for dts in deps for wws in dts._who_has}
# Add some random workers to into `candidates`, starting with idle ones
# TODO shuffle to prevent hotspots?
candidates.update(idle_workers[:N_RANDOM_WORKERS])
if len(idle_workers) < N_RANDOM_WORKERS:
sample_from = (
list(valid_workers) if valid_workers is not None else all_workers
)
candidates.update(
random.sample(sample_from, min(N_RANDOM_WORKERS, len(sample_from)))
# ^ NOTE: `min` because `random.sample` errors if `len(sample) < k`
)
if valid_workers is None:
if not candidates:
candidates = set(all_workers)
Expand All @@ -7508,7 +7515,7 @@ def decide_worker(
candidates = valid_workers
if not candidates:
if ts._loose_restrictions:
ws = decide_worker(ts, all_workers, None, objective)
ws = decide_worker(ts, all_workers, idle_workers, None, objective)
return ws

ncandidates: Py_ssize_t = len(candidates)
Expand Down

0 comments on commit 768d660

Please sign in to comment.