From 304cbd8a5d2b17cee143fdcad92e75894faaed04 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Sat, 3 Jul 2021 21:28:36 -0700 Subject: [PATCH 1/2] WIP adapt batched send interval to cluster size - Both size and CPU based metrics in here. CPU kinda makes more sense to me, but should be weighted avg'd or something. - CPU test is way too unstable - Test is a little slow because of scale up --- distributed/scheduler.py | 28 ++++++++++++++++++++ distributed/tests/test_worker.py | 45 +++++++++++++++++++++++++++++++- distributed/worker.py | 4 ++- 3 files changed, 75 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index fe8d0af4b50..e2275d7a342 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3993,6 +3993,7 @@ def heartbeat_worker( "status": "OK", "time": local_now, "heartbeat-interval": heartbeat_interval(len(parent._workers_dv)), + "batched-send-interval": batched_send_interval(len(parent._workers_dv)), } async def add_worker( @@ -7137,6 +7138,23 @@ async def get_worker_monitor_info(self, recent=False, starts=None): ) return dict(zip(parent._workers_dv, results)) + def batched_send_interval(self) -> float: + min_ms: float = 2.0 + max_ms: float = 50.0 + cpu_fraction: float = self.proc.cpu_percent() / 100 + print(cpu_fraction) + if cpu_fraction < 0.5: + return min_ms / 1000 + target_ms: float = (max_ms - min_ms) * cpu_fraction + min_ms + return min(target_ms, max_ms) / 1000 + # return ( + # min_ms + # if target_ms < min_ms + # else max_ms + # if target_ms > max_ms + # else target_ms + # ) + ########### # Cleanup # ########### @@ -7752,6 +7770,16 @@ def heartbeat_interval(n): return n / 200 + 1 +def batched_send_interval(n): + """ + Interval in seconds that we desire each worker to send updates, based on number of workers + + Target is 5,000 messages received per second; capped between 5ms and 100ms. + """ + target = n / 5000 + return max(0.005, min(0.1, target)) + + class KilledWorker(Exception): def __init__(self, task, last_worker): super().__init__(task, last_worker) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index c38b8b76363..94553e199e0 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -35,7 +35,7 @@ from distributed.core import CommClosedError, Status, rpc from distributed.diagnostics.plugin import PipInstall from distributed.metrics import time -from distributed.scheduler import Scheduler +from distributed.scheduler import Scheduler, heartbeat_interval from distributed.utils import TimeoutError, tmpfile from distributed.utils_test import ( TaskStateMetadataPlugin, @@ -843,6 +843,49 @@ async def test_heartbeats(c, s, a, b): assert a.periodic_callbacks["heartbeat"].callback_time < 1000 +@gen_cluster(client=True) +async def test_adjust_batched_send_interval(c, s, a, b): + async def wait_for_heartbeat(): + x = s.workers[a.address].last_seen + start = time() + await asyncio.sleep(heartbeat_interval(len(s.workers)) * 1.1) + while s.workers[a.address].last_seen == x: + await asyncio.sleep(0.01) + assert time() < start + 2 + + assert a.batched_stream.interval >= 0.005 + await wait_for_heartbeat() + assert a.batched_stream.interval == 0.005 + + workers = [Worker(s.address, nthreads=1) for i in range(27)] + await asyncio.gather(*workers) + await wait_for_heartbeat() + + assert a.batched_stream.interval > 0.005 + + +# def test_batched_send_interval(client, a): +# sleep(1) +# address = a["address"] +# initials = client.run(lambda dask_worker: dask_worker.batched_stream.interval) +# assert initials[address] == 2 / 1000 +# heartbeat_interval = ( +# client.run( +# lambda dask_worker: dask_worker.periodic_callbacks[ +# "heartbeat" +# ].callback_time +# )[address] +# / 1000 +# ) +# # sleep(a.periodic_callbacks["heartbeat"].callback_time / 1000 + 0.1) +# sleep(heartbeat_interval + 0.2) +# intervals = client.run(lambda dask_worker: dask_worker.batched_stream.interval) +# assert intervals == initials +# client.run_on_scheduler(run_for, heartbeat_interval + 0.2) +# intervals = client.run(lambda dask_worker: dask_worker.batched_stream.interval) +# assert intervals[address] >= 27.5 / 1000 + + @pytest.mark.parametrize("worker", [Worker, Nanny]) def test_worker_dir(worker): with tmpfile() as fn: diff --git a/distributed/worker.py b/distributed/worker.py index c44bff14c93..2abdf71dad4 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -651,7 +651,7 @@ def __init__( self.nthreads, thread_name_prefix="Dask-Default-Threads" ) - self.batched_stream = BatchedSend(interval="2ms", loop=self.loop) + self.batched_stream = BatchedSend(interval="25ms", loop=self.loop) self.name = name self.scheduler_delay = 0 self.stream_comms = dict() @@ -999,6 +999,8 @@ async def heartbeat(self): self.periodic_callbacks["heartbeat"].callback_time = ( response["heartbeat-interval"] * 1000 ) + self.batched_stream.interval = response["batched-send-interval"] + # print(response["batched-send-interval"]) self.bandwidth_workers.clear() self.bandwidth_types.clear() except CommClosedError: From 92961ff9ad5958532463fc4c8ec12256dfc684b7 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 7 Jul 2021 16:05:52 -0800 Subject: [PATCH 2/2] cleanup --- distributed/scheduler.py | 17 ----------------- distributed/tests/test_worker.py | 23 +---------------------- distributed/worker.py | 1 - 3 files changed, 1 insertion(+), 40 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e2275d7a342..5870ef10549 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7138,23 +7138,6 @@ async def get_worker_monitor_info(self, recent=False, starts=None): ) return dict(zip(parent._workers_dv, results)) - def batched_send_interval(self) -> float: - min_ms: float = 2.0 - max_ms: float = 50.0 - cpu_fraction: float = self.proc.cpu_percent() / 100 - print(cpu_fraction) - if cpu_fraction < 0.5: - return min_ms / 1000 - target_ms: float = (max_ms - min_ms) * cpu_fraction + min_ms - return min(target_ms, max_ms) / 1000 - # return ( - # min_ms - # if target_ms < min_ms - # else max_ms - # if target_ms > max_ms - # else target_ms - # ) - ########### # Cleanup # ########### diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 94553e199e0..0c98bc7d7c9 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -857,6 +857,7 @@ async def wait_for_heartbeat(): await wait_for_heartbeat() assert a.batched_stream.interval == 0.005 + # Scale up cluster so send interval increases workers = [Worker(s.address, nthreads=1) for i in range(27)] await asyncio.gather(*workers) await wait_for_heartbeat() @@ -864,28 +865,6 @@ async def wait_for_heartbeat(): assert a.batched_stream.interval > 0.005 -# def test_batched_send_interval(client, a): -# sleep(1) -# address = a["address"] -# initials = client.run(lambda dask_worker: dask_worker.batched_stream.interval) -# assert initials[address] == 2 / 1000 -# heartbeat_interval = ( -# client.run( -# lambda dask_worker: dask_worker.periodic_callbacks[ -# "heartbeat" -# ].callback_time -# )[address] -# / 1000 -# ) -# # sleep(a.periodic_callbacks["heartbeat"].callback_time / 1000 + 0.1) -# sleep(heartbeat_interval + 0.2) -# intervals = client.run(lambda dask_worker: dask_worker.batched_stream.interval) -# assert intervals == initials -# client.run_on_scheduler(run_for, heartbeat_interval + 0.2) -# intervals = client.run(lambda dask_worker: dask_worker.batched_stream.interval) -# assert intervals[address] >= 27.5 / 1000 - - @pytest.mark.parametrize("worker", [Worker, Nanny]) def test_worker_dir(worker): with tmpfile() as fn: diff --git a/distributed/worker.py b/distributed/worker.py index 2abdf71dad4..aad0eaa9edd 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1000,7 +1000,6 @@ async def heartbeat(self): response["heartbeat-interval"] * 1000 ) self.batched_stream.interval = response["batched-send-interval"] - # print(response["batched-send-interval"]) self.bandwidth_workers.clear() self.bandwidth_types.clear() except CommClosedError: