Skip to content

Commit

Permalink
Respect average nthreads in adaptive (#8041)
Browse files Browse the repository at this point in the history
Co-authored-by: fjetter <[email protected]>
Co-authored-by: Hendrik Makait <[email protected]>
  • Loading branch information
3 people authored Jul 28, 2023
1 parent 9d9702e commit a7f7764
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 30 deletions.
85 changes: 58 additions & 27 deletions distributed/deploy/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
Worker,
wait,
)
from distributed.compatibility import LINUX, MACOS, WINDOWS
from distributed.metrics import time
from distributed.utils_test import async_poll_for, gen_cluster, gen_test, slowinc

Expand Down Expand Up @@ -196,6 +195,7 @@ async def test_adapt_quickly():
processes=False,
silence_logs=False,
dashboard_address=":0",
threads_per_worker=1,
) as cluster, Client(cluster, asynchronous=True) as client:
adapt = cluster.adapt(interval="20 ms", wait_count=5, maximum=10)
future = client.submit(slowinc, 1, delay=0.100)
Expand Down Expand Up @@ -294,32 +294,44 @@ def test_basic_no_loop(cleanup):
assert loop is None or not loop.asyncio_loop.is_running()


@pytest.mark.flaky(condition=LINUX, reruns=10, reruns_delay=5)
@pytest.mark.xfail(condition=MACOS or WINDOWS, reason="extremely flaky")
@gen_test()
async def test_target_duration():
with dask.config.set(
{
"distributed.scheduler.default-task-durations": {"slowinc": 1},
# adaptive target for queued tasks doesn't yet consider default or learned task durations
"distributed.scheduler.worker-saturation": float("inf"),
}
):
async with LocalCluster(
n_workers=0,
asynchronous=True,
processes=False,
silence_logs=False,
dashboard_address=":0",
) as cluster:
adapt = cluster.adapt(interval="20ms", minimum=2, target_duration="5s")
async with Client(cluster, asynchronous=True) as client:
await client.wait_for_workers(2)
futures = client.map(slowinc, range(100), delay=0.3)
await wait(futures)

assert adapt.log[0][1] == {"status": "up", "n": 2}
assert adapt.log[1][1] == {"status": "up", "n": 20}
@pytest.mark.parametrize("target_duration", [5, 1])
def test_target_duration(target_duration):
@gen_test()
async def _test():
with dask.config.set(
{
"distributed.scheduler.default-task-durations": {"slowinc": 1},
# adaptive target for queued tasks doesn't yet consider default or learned task durations
"distributed.scheduler.worker-saturation": float("inf"),
}
):
async with LocalCluster(
n_workers=0,
asynchronous=True,
processes=False,
silence_logs=False,
dashboard_address=":0",
) as cluster:
adapt = cluster.adapt(
interval="20ms", minimum=2, target_duration=target_duration
)
# FIXME: LocalCluster is starting workers with CPU_COUNT threads
# each
# The default target duration is set to 1s
max_scaleup = 5
n_tasks = target_duration * dask.system.CPU_COUNT * max_scaleup

async with Client(cluster, asynchronous=True) as client:
await client.wait_for_workers(2)
futures = client.map(slowinc, range(n_tasks), delay=0.3)
await wait(futures)
scaleup_recs = [
msg[1]["n"] for msg in adapt.log if msg[1].get("status") == "up"
]

assert 2 <= min(scaleup_recs) < max(scaleup_recs) <= max_scaleup

_test()


@gen_test()
Expand Down Expand Up @@ -507,3 +519,22 @@ async def test_scale_up_large_tasks(c, s, saturation):
await asyncio.sleep(0.001)

assert s.adaptive_target() == 200


@gen_cluster(
client=True,
nthreads=[("", 5)],
config={"distributed.scheduler.default-task-durations": {"slowinc": 1000}},
)
async def test_respect_average_nthreads(c, s, w):
futures = c.map(slowinc, range(10))
while not s.tasks:
await asyncio.sleep(0.001)

assert s.adaptive_target() == 2

more_futures = c.map(slowinc, range(200))
while len(s.tasks) != 200:
await asyncio.sleep(0.001)

assert s.adaptive_target() == 40
9 changes: 6 additions & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8074,9 +8074,7 @@ def adaptive_target(self, target_duration=None):
if len(self.queued) + len(self.unrunnable) > 100:
queued_occupancy *= (len(self.queued) + len(self.unrunnable)) / 100

cpu = math.ceil(
(self.total_occupancy + queued_occupancy) / target_duration
) # TODO: threads per worker
cpu = math.ceil((self.total_occupancy + queued_occupancy) / target_duration)

# Avoid a few long tasks from asking for many cores
tasks_ready = len(self.queued) + len(self.unrunnable)
Expand All @@ -8088,6 +8086,11 @@ def adaptive_target(self, target_duration=None):
else:
cpu = min(tasks_ready, cpu)

# Divide by average nthreads per worker
if self.workers:
nthreads = sum(ws.nthreads for ws in self.workers.values())
cpu = math.ceil(cpu / nthreads * len(self.workers))

if (self.unrunnable or self.queued) and not self.workers:
cpu = max(1, cpu)

Expand Down

0 comments on commit a7f7764

Please sign in to comment.