diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 112fa581e0d..4fd1346aa3f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2325,6 +2325,7 @@ def decide_worker(self, ts: TaskState) -> WorkerState: ws = decide_worker( ts, self._workers_dv.values(), + self._idle_dv.values(), valid_workers, partial(self.worker_objective, ts), ) @@ -7459,14 +7460,19 @@ def _reevaluate_occupancy_worker(state: SchedulerState, ws: WorkerState): @cfunc @exceptval(check=False) def decide_worker( - ts: TaskState, all_workers, valid_workers: set, objective + ts: TaskState, + all_workers: sortedcontainers.SortedValuesView, + idle_workers: sortedcontainers.SortedValuesView, + valid_workers: set, + objective, ) -> WorkerState: """ Decide which worker should take task *ts*. - We choose the worker that has the data on which *ts* depends. + We consider all workers which hold dependencies of *ts*, + plus a sample of 20 random workers (with preference for idle ones). - If several workers have dependencies then we choose the less-busy worker. + From those, we choose the worker where the *objective* function is minimized. Optionally provide *valid_workers* of where jobs are allowed to occur (if all workers are allowed to take the task, pass None instead). @@ -7476,29 +7482,30 @@ def decide_worker( of bytes sent between workers. This is determined by calling the *objective* function. """ + # TODO should it be a bounded fraction of `len(all_workers)`? N_RANDOM_WORKERS: Py_ssize_t = 20 ws: WorkerState = None wws: WorkerState dts: TaskState deps: set = ts._dependencies - random_workers_set: set = ( - valid_workers if valid_workers is not None else all_workers - ) candidates: set assert all([dts._who_has for dts in deps]) if ts._actor: candidates = set(all_workers) else: - if len(random_workers_set) <= N_RANDOM_WORKERS: - # Fastpath: every worker would end up in `candidates`, so no need to build the set from `who_has`. - candidates = random_workers_set - else: - candidates = {wws for dts in deps for wws in dts._who_has} - # TODO the ordering of this set is likely not actually random. - # Ideally it would be ordered by occupancy (but that's more work than we want to do). - # Can we at least ensure idle workers come first? And that it's reasonably random after that? - candidates.update(itertools.islice(random_workers_set, N_RANDOM_WORKERS)) + candidates = {wws for dts in deps for wws in dts._who_has} + # Add some random workers to into `candidates`, starting with idle ones + # TODO shuffle to prevent hotspots? + candidates.update(idle_workers[:N_RANDOM_WORKERS]) + if len(idle_workers) < N_RANDOM_WORKERS: + sample_from = ( + list(valid_workers) if valid_workers is not None else all_workers + ) + candidates.update( + random.sample(sample_from, min(N_RANDOM_WORKERS, len(sample_from))) + # ^ NOTE: `min` because `random.sample` errors if `len(sample) < k` + ) if valid_workers is None: if not candidates: candidates = set(all_workers) @@ -7508,7 +7515,7 @@ def decide_worker( candidates = valid_workers if not candidates: if ts._loose_restrictions: - ws = decide_worker(ts, all_workers, None, objective) + ws = decide_worker(ts, all_workers, idle_workers, None, objective) return ws ncandidates: Py_ssize_t = len(candidates)