From 7215427df8db03f8a3bd17f1fa94d602515f3c6a Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 10 Nov 2022 12:05:54 +0100 Subject: [PATCH] Review comments --- distributed/scheduler.py | 15 ++++++++------- distributed/tests/test_scheduler.py | 5 ++++- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d582e18897..e6200b17aa 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1509,7 +1509,7 @@ class SchedulerState: idle: dict[str, WorkerState] #: Similar to `idle` #: Definition based on assigned tasks - idle_task_count: dict[str, WorkerState] + idle_task_count: set[WorkerState] #: Workers that are fully utilized. May include non-running workers. saturated: set[WorkerState] total_nthreads: int @@ -1616,7 +1616,7 @@ def __init__( self.extensions = {} self.host_info = host_info self.idle = SortedDict() - self.idle_task_count = dict() + self.idle_task_count = set() self.n_tasks = 0 self.resources = resources self.saturated = set() @@ -2158,7 +2158,7 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: # Just pick the least busy worker. # NOTE: this will lead to worst-case scheduling with regards to co-assignment. ws = min( - self.idle_task_count.values(), + self.idle_task_count, key=lambda ws: len(ws.processing) / ws.nthreads, ) if self.validate: @@ -3082,10 +3082,11 @@ def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0): if 0.4 < pending > 1.9 * (self.total_occupancy / self.total_nthreads): saturated.add(ws) - self.idle_task_count.pop(ws.address, None) if not _worker_full(ws, self.WORKER_SATURATION): if ws.status == Status.running: - self.idle_task_count[ws.address] = ws + self.idle_task_count.add(ws) + else: + self.idle_task_count.discard(ws) def is_unoccupied( self, ws: WorkerState, occupancy: float, nprocessing: int @@ -4751,7 +4752,7 @@ async def remove_worker( del self.stream_comms[address] del self.aliases[ws.name] self.idle.pop(ws.address, None) - self.idle_task_count.pop(ws.address, None) + self.idle_task_count.discard(ws) self.saturated.discard(ws) del self.workers[address] ws.status = Status.closed @@ -5339,7 +5340,7 @@ def handle_worker_status_change( else: self.running.discard(ws) self.idle.pop(ws.address, None) - self.idle_task_count.pop(ws.address, None) + self.idle_task_count.discard(ws) async def handle_request_refresh_who_has( self, keys: Iterable[str], worker: str, stimulus_id: str diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 48ad99db77..e5028f8746 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -398,6 +398,7 @@ async def test_queued_paused_new_worker(c, s, a, b): await asyncio.sleep(0.01) assert not s.idle + assert not s.idle_task_count assert not s.running async with Worker(s.address, nthreads=2) as w: @@ -446,6 +447,7 @@ async def test_queued_paused_unpaused(c, s, a, b, queue): assert not s.running assert not s.idle + assert not s.idle_task_count # un-pause a.status = Status.running @@ -455,7 +457,8 @@ async def test_queued_paused_unpaused(c, s, a, b, queue): if queue: assert not s.idle # workers should have been (or already were) filled - # If queuing is disabled, all workers might already be saturated when they un-pause. + # If queuing is disabled, all workers might already be saturated when they un-pause. + assert not s.idle_task_count await wait(final)