diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 3aae49f346b..6bbf1460855 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -17,7 +17,6 @@ Worker, wait, ) -from distributed.compatibility import LINUX, MACOS, WINDOWS from distributed.metrics import time from distributed.utils_test import async_poll_for, gen_cluster, gen_test, slowinc @@ -196,6 +195,7 @@ async def test_adapt_quickly(): processes=False, silence_logs=False, dashboard_address=":0", + threads_per_worker=1, ) as cluster, Client(cluster, asynchronous=True) as client: adapt = cluster.adapt(interval="20 ms", wait_count=5, maximum=10) future = client.submit(slowinc, 1, delay=0.100) @@ -294,32 +294,44 @@ def test_basic_no_loop(cleanup): assert loop is None or not loop.asyncio_loop.is_running() -@pytest.mark.flaky(condition=LINUX, reruns=10, reruns_delay=5) -@pytest.mark.xfail(condition=MACOS or WINDOWS, reason="extremely flaky") -@gen_test() -async def test_target_duration(): - with dask.config.set( - { - "distributed.scheduler.default-task-durations": {"slowinc": 1}, - # adaptive target for queued tasks doesn't yet consider default or learned task durations - "distributed.scheduler.worker-saturation": float("inf"), - } - ): - async with LocalCluster( - n_workers=0, - asynchronous=True, - processes=False, - silence_logs=False, - dashboard_address=":0", - ) as cluster: - adapt = cluster.adapt(interval="20ms", minimum=2, target_duration="5s") - async with Client(cluster, asynchronous=True) as client: - await client.wait_for_workers(2) - futures = client.map(slowinc, range(100), delay=0.3) - await wait(futures) - - assert adapt.log[0][1] == {"status": "up", "n": 2} - assert adapt.log[1][1] == {"status": "up", "n": 20} +@pytest.mark.parametrize("target_duration", [5, 1]) +def test_target_duration(target_duration): + @gen_test() + async def _test(): + with dask.config.set( + { + "distributed.scheduler.default-task-durations": {"slowinc": 1}, + # adaptive target for queued tasks doesn't yet consider default or learned task durations + "distributed.scheduler.worker-saturation": float("inf"), + } + ): + async with LocalCluster( + n_workers=0, + asynchronous=True, + processes=False, + silence_logs=False, + dashboard_address=":0", + ) as cluster: + adapt = cluster.adapt( + interval="20ms", minimum=2, target_duration=target_duration + ) + # FIXME: LocalCluster is starting workers with CPU_COUNT threads + # each + # The default target duration is set to 1s + max_scaleup = 5 + n_tasks = target_duration * dask.system.CPU_COUNT * max_scaleup + + async with Client(cluster, asynchronous=True) as client: + await client.wait_for_workers(2) + futures = client.map(slowinc, range(n_tasks), delay=0.3) + await wait(futures) + scaleup_recs = [ + msg[1]["n"] for msg in adapt.log if msg[1].get("status") == "up" + ] + + assert 2 <= min(scaleup_recs) < max(scaleup_recs) <= max_scaleup + + _test() @gen_test() @@ -507,3 +519,22 @@ async def test_scale_up_large_tasks(c, s, saturation): await asyncio.sleep(0.001) assert s.adaptive_target() == 200 + + +@gen_cluster( + client=True, + nthreads=[("", 5)], + config={"distributed.scheduler.default-task-durations": {"slowinc": 1000}}, +) +async def test_respect_average_nthreads(c, s, w): + futures = c.map(slowinc, range(10)) + while not s.tasks: + await asyncio.sleep(0.001) + + assert s.adaptive_target() == 2 + + more_futures = c.map(slowinc, range(200)) + while len(s.tasks) != 200: + await asyncio.sleep(0.001) + + assert s.adaptive_target() == 40 diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4649dc205d8..9bdd2337b0e 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -8074,9 +8074,7 @@ def adaptive_target(self, target_duration=None): if len(self.queued) + len(self.unrunnable) > 100: queued_occupancy *= (len(self.queued) + len(self.unrunnable)) / 100 - cpu = math.ceil( - (self.total_occupancy + queued_occupancy) / target_duration - ) # TODO: threads per worker + cpu = math.ceil((self.total_occupancy + queued_occupancy) / target_duration) # Avoid a few long tasks from asking for many cores tasks_ready = len(self.queued) + len(self.unrunnable) @@ -8088,6 +8086,11 @@ def adaptive_target(self, target_duration=None): else: cpu = min(tasks_ready, cpu) + # Divide by average nthreads per worker + if self.workers: + nthreads = sum(ws.nthreads for ws in self.workers.values()) + cpu = math.ceil(cpu / nthreads * len(self.workers)) + if (self.unrunnable or self.queued) and not self.workers: cpu = max(1, cpu)