Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add no worker timeout for scheduler #8371

Merged
merged 14 commits into from
Jan 11, 2024
15 changes: 14 additions & 1 deletion distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,20 @@ properties:
description: |
Shut down the scheduler after this duration if no activity has occurred

This can be helpful to reduce costs and stop zombie processes from roaming the earth.
no-workers-timeout:
type:
- string
- "null"
description: |
Shut down the scheduler after this duration if there are pending tasks,
but no workers that can process them. This can either mean that there are
no workers running at all, or that there are idle workers but they've been
excluded through worker or resource restrictions.

In adaptive clusters, this timeout must be set to be safely higher than
the time it takes for workers to spin up.

Works in conjunction with idle-timeout.

work-stealing:
type: boolean
Expand Down
7 changes: 4 additions & 3 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ distributed:
# Number of seconds to wait until workers or clients are removed from the events log
# after they have been removed from the scheduler
events-cleanup-delay: 1h
idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes"
idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes"
no-workers-timeout: null # Shut down if there are tasks but no workers to process them
work-stealing: True # workers should steal tasks from each other
work-stealing-interval: 100ms # Callback time for work stealing
worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers
worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers
worker-ttl: "5 minutes" # like '60s'. Time to live for workers. They must heartbeat faster than this
preload: [] # Run custom modules with Scheduler
preload-argv: [] # See https://docs.dask.org/en/latest/how-to/customize-initialization.html
unknown-task-duration: 500ms # Default duration for all tasks with unknown durations ("15m", "2h")
default-task-durations: # How long we expect function names to run ("1h", "1s") (helps for long tasks)
default-task-durations: # How long we expect function names to run ("1h", "1s") (helps for long tasks)
rechunk-split: 1us
split-shuffle: 1us
validate: False # Check scheduler state at every step for debugging
Expand Down
67 changes: 57 additions & 10 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3517,6 +3517,12 @@
default_port = 8786
_instances: ClassVar[weakref.WeakSet[Scheduler]] = weakref.WeakSet()

worker_ttl: float | None
idle_since: float | None
idle_timeout: float | None
_no_workers_since: float | None # Note: not None iff there are pending tasks
no_workers_timeout: float | None

def __init__(
self,
loop=None,
Expand Down Expand Up @@ -3578,16 +3584,19 @@
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
worker_ttl = worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
self.worker_ttl = parse_timedelta(worker_ttl) if worker_ttl else None
idle_timeout = idle_timeout or dask.config.get(
"distributed.scheduler.idle-timeout"

self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
if idle_timeout:
self.idle_timeout = parse_timedelta(idle_timeout)
else:
self.idle_timeout = None
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None

self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
Expand Down Expand Up @@ -3860,9 +3869,12 @@
pc = PeriodicCallback(self.check_worker_ttl, self.worker_ttl * 1000)
self.periodic_callbacks["worker-ttl"] = pc

pc = PeriodicCallback(self.check_idle, (self.idle_timeout or 1) * 1000 / 4)
pc = PeriodicCallback(self.check_idle, 250)
Copy link
Collaborator

@crusaderky crusaderky Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods are tiny and running them 4 times per second is inconsequential.
I would be annoyed if I had set a timeout of 2h and the shutdown was 30min later than expected.
Also, some third-party SchedulerPlugin could read from self.idle_since or self.no_workers_since and break when the user sets a timeout.

self.periodic_callbacks["idle-timeout"] = pc

pc = PeriodicCallback(self._check_no_workers, 250)
self.periodic_callbacks["no-workers-timeout"] = pc

if extensions is None:
extensions = DEFAULT_EXTENSIONS.copy()
if not dask.config.get("distributed.scheduler.work-stealing"):
Expand Down Expand Up @@ -8141,7 +8153,7 @@

def check_idle(self) -> float | None:
if self.status in (Status.closing, Status.closed):
return None
return None # pragma: nocover

if self.transition_counter != self._idle_transition_counter:
self._idle_transition_counter = self.transition_counter
Expand Down Expand Up @@ -8178,6 +8190,41 @@
self._ongoing_background_tasks.call_soon(self.close)
return self.idle_since

def _check_no_workers(self) -> None:
"""Shut down the scheduler if there have been tasks ready to run which have
nowhere to run for `distributed.scheduler.no-workers-timeout`, and there
aren't other tasks running.
"""
if self.status in (Status.closing, Status.closed):
return # pragma: nocover

if (
(not self.queued and not self.unrunnable)
or (self.queued and self.workers)
or any(ws.processing for ws in self.workers.values())
):
self._no_workers_since = None
return

# 1. There are queued or unrunnable tasks and no workers at all
# 2. There are unrunnable tasks and no workers satisfy their restrictions
# (Only rootish tasks can be queued, and rootish tasks can't have restrictions)

if not self._no_workers_since:
self._no_workers_since = time()
return

if (
self.no_workers_timeout
and time() > self._no_workers_since + self.no_workers_timeout
):
logger.info(

Check warning on line 8221 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L8221

Added line #L8221 was not covered by tests
"Tasks have been without any workers to run them for %s; "
"shutting scheduler down",
format_time(self.no_workers_timeout),
)
self._ongoing_background_tasks.call_soon(self.close)

Check warning on line 8226 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L8226

Added line #L8226 was not covered by tests

def adaptive_target(self, target_duration=None):
"""Desired number of workers based on the current workload

Expand Down
110 changes: 106 additions & 4 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2405,11 +2405,11 @@ async def test_idle_timeout(c, s, a, b):
pc.stop()


@gen_cluster(
client=True,
nthreads=[],
)
@gen_cluster(client=True, nthreads=[])
async def test_idle_timeout_no_workers(c, s):
"""Test that idle-timeout is not triggered if there are no workers available
but there are tasks queued
"""
# Cancel the idle check periodic timeout so we can step through manually
s.periodic_callbacks["idle-timeout"].stop()

Expand Down Expand Up @@ -2440,6 +2440,108 @@ async def test_idle_timeout_no_workers(c, s):
assert s.check_idle()


@gen_cluster(
client=True,
nthreads=[],
config={"distributed.scheduler.no-workers-timeout": None},
)
async def test_no_workers_timeout_disabled(c, s, a, b):
"""no-workers-timeout has been disabled"""
future = c.submit(inc, 1, key="x")
await wait_for_state("x", ("queued", "no-worker"), s)

s._check_no_workers()
await asyncio.sleep(0.2)
s._check_no_workers()
await asyncio.sleep(0.2)

assert s.status == Status.running


@pytest.mark.slow
@gen_cluster(
client=True,
nthreads=[],
config={"distributed.scheduler.no-workers-timeout": "100ms"},
)
async def test_no_workers_timeout_without_workers(c, s):
"""Trip no-workers-timeout when there are no workers available"""
# Don't trip scheduler shutdown when there are no tasks
s._check_no_workers()
await asyncio.sleep(0.2)
s._check_no_workers()
await asyncio.sleep(0.2)

assert s.status == Status.running

future = c.submit(inc, 1)
while s.status != Status.closed:
await asyncio.sleep(0.01)


@pytest.mark.slow
@gen_cluster(
client=True,
config={"distributed.scheduler.no-workers-timeout": "100ms"},
)
async def test_no_workers_timeout_bad_restrictions(c, s, a, b):
"""Trip no-workers-timeout when there are workers available but none satisfies
task restrictions
"""
future = c.submit(inc, 1, key="x", workers=["127.0.0.2:1234"])
while s.status != Status.closed:
await asyncio.sleep(0.01)


@gen_cluster(
client=True,
nthreads=[("", 1)],
config={"distributed.scheduler.no-workers-timeout": "100ms"},
)
async def test_no_workers_timeout_queued(c, s, a):
"""Don't trip no-workers-timeout when there are queued tasks AND processing tasks"""
ev = Event()
futures = [c.submit(lambda ev: ev.wait(), ev, pure=False) for _ in range(3)]
while not a.state.tasks:
await asyncio.sleep(0.01)
assert s.queued or math.isinf(s.WORKER_SATURATION)

s._check_no_workers()
await asyncio.sleep(0.2)
s._check_no_workers()
await asyncio.sleep(0.2)

assert s.status == Status.running
await ev.set()


@pytest.mark.slow
@gen_cluster(
client=True,
config={"distributed.scheduler.no-workers-timeout": "100ms"},
)
async def test_no_workers_timeout_processing(c, s, a, b):
"""Don't trip no-workers-timeout when there are tasks processing"""
ev = Event()
x = c.submit(lambda ev: ev.wait(), ev, key="x")
y = c.submit(inc, 1, key="y", workers=["127.0.0.2:1234"])
await wait_for_state("x", "processing", s)
await wait_for_state("y", "no-worker", s)

# Scheduler won't shut down for as long as f1 is running
s._check_no_workers()
await asyncio.sleep(0.2)
s._check_no_workers()
await asyncio.sleep(0.2)
assert s.status == Status.running

await ev.set()
await x

while s.status != Status.closed:
await asyncio.sleep(0.01)


crusaderky marked this conversation as resolved.
Show resolved Hide resolved
@gen_cluster(client=True, config={"distributed.scheduler.bandwidth": "100 GB"})
async def test_bandwidth(c, s, a, b):
start = s.bandwidth
Expand Down
Loading