Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust worker batched send interval to cluster size #5036

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3993,6 +3993,7 @@ def heartbeat_worker(
"status": "OK",
"time": local_now,
"heartbeat-interval": heartbeat_interval(len(parent._workers_dv)),
"batched-send-interval": batched_send_interval(len(parent._workers_dv)),
}

async def add_worker(
Expand Down Expand Up @@ -7752,6 +7753,16 @@ def heartbeat_interval(n):
return n / 200 + 1


def batched_send_interval(n):
"""
Interval in seconds that we desire each worker to send updates, based on number of workers

Target is 5,000 messages received per second; capped between 5ms and 100ms.
"""
target = n / 5000
return max(0.005, min(0.1, target))


class KilledWorker(Exception):
def __init__(self, task, last_worker):
super().__init__(task, last_worker)
Expand Down
24 changes: 23 additions & 1 deletion distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from distributed.core import CommClosedError, Status, rpc
from distributed.diagnostics.plugin import PipInstall
from distributed.metrics import time
from distributed.scheduler import Scheduler
from distributed.scheduler import Scheduler, heartbeat_interval
from distributed.utils import TimeoutError, tmpfile
from distributed.utils_test import (
TaskStateMetadataPlugin,
Expand Down Expand Up @@ -843,6 +843,28 @@ async def test_heartbeats(c, s, a, b):
assert a.periodic_callbacks["heartbeat"].callback_time < 1000


@gen_cluster(client=True)
async def test_adjust_batched_send_interval(c, s, a, b):
async def wait_for_heartbeat():
x = s.workers[a.address].last_seen
start = time()
await asyncio.sleep(heartbeat_interval(len(s.workers)) * 1.1)
while s.workers[a.address].last_seen == x:
await asyncio.sleep(0.01)
assert time() < start + 2

assert a.batched_stream.interval >= 0.005
await wait_for_heartbeat()
assert a.batched_stream.interval == 0.005

# Scale up cluster so send interval increases
workers = [Worker(s.address, nthreads=1) for i in range(27)]
await asyncio.gather(*workers)
await wait_for_heartbeat()

assert a.batched_stream.interval > 0.005


@pytest.mark.parametrize("worker", [Worker, Nanny])
def test_worker_dir(worker):
with tmpfile() as fn:
Expand Down
3 changes: 2 additions & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ def __init__(
self.nthreads, thread_name_prefix="Dask-Default-Threads"
)

self.batched_stream = BatchedSend(interval="2ms", loop=self.loop)
fjetter marked this conversation as resolved.
Show resolved Hide resolved
self.batched_stream = BatchedSend(interval="25ms", loop=self.loop)
self.name = name
self.scheduler_delay = 0
self.stream_comms = dict()
Expand Down Expand Up @@ -999,6 +999,7 @@ async def heartbeat(self):
self.periodic_callbacks["heartbeat"].callback_time = (
response["heartbeat-interval"] * 1000
)
self.batched_stream.interval = response["batched-send-interval"]
self.bandwidth_workers.clear()
self.bandwidth_types.clear()
except CommClosedError:
Expand Down