Skip to content

Commit

Permalink
Add no worker timeout for scheduler (dask#8371)
Browse files Browse the repository at this point in the history
  • Loading branch information
FTang21 authored Jan 11, 2024
1 parent b95cf96 commit 9fb41e3
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 18 deletions.
15 changes: 14 additions & 1 deletion distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,20 @@ properties:
description: |
Shut down the scheduler after this duration if no activity has occurred
This can be helpful to reduce costs and stop zombie processes from roaming the earth.
no-workers-timeout:
type:
- string
- "null"
description: |
Shut down the scheduler after this duration if there are pending tasks,
but no workers that can process them. This can either mean that there are
no workers running at all, or that there are idle workers but they've been
excluded through worker or resource restrictions.
In adaptive clusters, this timeout must be set to be safely higher than
the time it takes for workers to spin up.
Works in conjunction with idle-timeout.
work-stealing:
type: boolean
Expand Down
7 changes: 4 additions & 3 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ distributed:
# Number of seconds to wait until workers or clients are removed from the events log
# after they have been removed from the scheduler
events-cleanup-delay: 1h
idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes"
idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes"
no-workers-timeout: null # Shut down if there are tasks but no workers to process them
work-stealing: True # workers should steal tasks from each other
work-stealing-interval: 100ms # Callback time for work stealing
worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers
worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers
worker-ttl: "5 minutes" # like '60s'. Time to live for workers. They must heartbeat faster than this
preload: [] # Run custom modules with Scheduler
preload-argv: [] # See https://docs.dask.org/en/latest/how-to/customize-initialization.html
unknown-task-duration: 500ms # Default duration for all tasks with unknown durations ("15m", "2h")
default-task-durations: # How long we expect function names to run ("1h", "1s") (helps for long tasks)
default-task-durations: # How long we expect function names to run ("1h", "1s") (helps for long tasks)
rechunk-split: 1us
split-shuffle: 1us
validate: False # Check scheduler state at every step for debugging
Expand Down
67 changes: 57 additions & 10 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3517,6 +3517,12 @@ class Scheduler(SchedulerState, ServerNode):
default_port = 8786
_instances: ClassVar[weakref.WeakSet[Scheduler]] = weakref.WeakSet()

worker_ttl: float | None
idle_since: float | None
idle_timeout: float | None
_no_workers_since: float | None # Note: not None iff there are pending tasks
no_workers_timeout: float | None

def __init__(
self,
loop=None,
Expand Down Expand Up @@ -3578,16 +3584,19 @@ def __init__(
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
worker_ttl = worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
self.worker_ttl = parse_timedelta(worker_ttl) if worker_ttl else None
idle_timeout = idle_timeout or dask.config.get(
"distributed.scheduler.idle-timeout"

self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
if idle_timeout:
self.idle_timeout = parse_timedelta(idle_timeout)
else:
self.idle_timeout = None
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None

self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
Expand Down Expand Up @@ -3860,9 +3869,12 @@ async def post(self):
pc = PeriodicCallback(self.check_worker_ttl, self.worker_ttl * 1000)
self.periodic_callbacks["worker-ttl"] = pc

pc = PeriodicCallback(self.check_idle, (self.idle_timeout or 1) * 1000 / 4)
pc = PeriodicCallback(self.check_idle, 250)
self.periodic_callbacks["idle-timeout"] = pc

pc = PeriodicCallback(self._check_no_workers, 250)
self.periodic_callbacks["no-workers-timeout"] = pc

if extensions is None:
extensions = DEFAULT_EXTENSIONS.copy()
if not dask.config.get("distributed.scheduler.work-stealing"):
Expand Down Expand Up @@ -8141,7 +8153,7 @@ async def check_worker_ttl(self):

def check_idle(self) -> float | None:
if self.status in (Status.closing, Status.closed):
return None
return None # pragma: nocover

if self.transition_counter != self._idle_transition_counter:
self._idle_transition_counter = self.transition_counter
Expand Down Expand Up @@ -8178,6 +8190,41 @@ def check_idle(self) -> float | None:
self._ongoing_background_tasks.call_soon(self.close)
return self.idle_since

def _check_no_workers(self) -> None:
"""Shut down the scheduler if there have been tasks ready to run which have
nowhere to run for `distributed.scheduler.no-workers-timeout`, and there
aren't other tasks running.
"""
if self.status in (Status.closing, Status.closed):
return # pragma: nocover

if (
(not self.queued and not self.unrunnable)
or (self.queued and self.workers)
or any(ws.processing for ws in self.workers.values())
):
self._no_workers_since = None
return

# 1. There are queued or unrunnable tasks and no workers at all
# 2. There are unrunnable tasks and no workers satisfy their restrictions
# (Only rootish tasks can be queued, and rootish tasks can't have restrictions)

if not self._no_workers_since:
self._no_workers_since = time()
return

if (
self.no_workers_timeout
and time() > self._no_workers_since + self.no_workers_timeout
):
logger.info(
"Tasks have been without any workers to run them for %s; "
"shutting scheduler down",
format_time(self.no_workers_timeout),
)
self._ongoing_background_tasks.call_soon(self.close)

def adaptive_target(self, target_duration=None):
"""Desired number of workers based on the current workload
Expand Down
110 changes: 106 additions & 4 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2405,11 +2405,11 @@ async def test_idle_timeout(c, s, a, b):
pc.stop()


@gen_cluster(
client=True,
nthreads=[],
)
@gen_cluster(client=True, nthreads=[])
async def test_idle_timeout_no_workers(c, s):
"""Test that idle-timeout is not triggered if there are no workers available
but there are tasks queued
"""
# Cancel the idle check periodic timeout so we can step through manually
s.periodic_callbacks["idle-timeout"].stop()

Expand Down Expand Up @@ -2440,6 +2440,108 @@ async def test_idle_timeout_no_workers(c, s):
assert s.check_idle()


@gen_cluster(
client=True,
nthreads=[],
config={"distributed.scheduler.no-workers-timeout": None},
)
async def test_no_workers_timeout_disabled(c, s, a, b):
"""no-workers-timeout has been disabled"""
future = c.submit(inc, 1, key="x")
await wait_for_state("x", ("queued", "no-worker"), s)

s._check_no_workers()
await asyncio.sleep(0.2)
s._check_no_workers()
await asyncio.sleep(0.2)

assert s.status == Status.running


@pytest.mark.slow
@gen_cluster(
client=True,
nthreads=[],
config={"distributed.scheduler.no-workers-timeout": "100ms"},
)
async def test_no_workers_timeout_without_workers(c, s):
"""Trip no-workers-timeout when there are no workers available"""
# Don't trip scheduler shutdown when there are no tasks
s._check_no_workers()
await asyncio.sleep(0.2)
s._check_no_workers()
await asyncio.sleep(0.2)

assert s.status == Status.running

future = c.submit(inc, 1)
while s.status != Status.closed:
await asyncio.sleep(0.01)


@pytest.mark.slow
@gen_cluster(
client=True,
config={"distributed.scheduler.no-workers-timeout": "100ms"},
)
async def test_no_workers_timeout_bad_restrictions(c, s, a, b):
"""Trip no-workers-timeout when there are workers available but none satisfies
task restrictions
"""
future = c.submit(inc, 1, key="x", workers=["127.0.0.2:1234"])
while s.status != Status.closed:
await asyncio.sleep(0.01)


@gen_cluster(
client=True,
nthreads=[("", 1)],
config={"distributed.scheduler.no-workers-timeout": "100ms"},
)
async def test_no_workers_timeout_queued(c, s, a):
"""Don't trip no-workers-timeout when there are queued tasks AND processing tasks"""
ev = Event()
futures = [c.submit(lambda ev: ev.wait(), ev, pure=False) for _ in range(3)]
while not a.state.tasks:
await asyncio.sleep(0.01)
assert s.queued or math.isinf(s.WORKER_SATURATION)

s._check_no_workers()
await asyncio.sleep(0.2)
s._check_no_workers()
await asyncio.sleep(0.2)

assert s.status == Status.running
await ev.set()


@pytest.mark.slow
@gen_cluster(
client=True,
config={"distributed.scheduler.no-workers-timeout": "100ms"},
)
async def test_no_workers_timeout_processing(c, s, a, b):
"""Don't trip no-workers-timeout when there are tasks processing"""
ev = Event()
x = c.submit(lambda ev: ev.wait(), ev, key="x")
y = c.submit(inc, 1, key="y", workers=["127.0.0.2:1234"])
await wait_for_state("x", "processing", s)
await wait_for_state("y", "no-worker", s)

# Scheduler won't shut down for as long as f1 is running
s._check_no_workers()
await asyncio.sleep(0.2)
s._check_no_workers()
await asyncio.sleep(0.2)
assert s.status == Status.running

await ev.set()
await x

while s.status != Status.closed:
await asyncio.sleep(0.01)


@gen_cluster(client=True, config={"distributed.scheduler.bandwidth": "100 GB"})
async def test_bandwidth(c, s, a, b):
start = s.bandwidth
Expand Down

0 comments on commit 9fb41e3

Please sign in to comment.