Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add no worker timeout for scheduler #8371

Merged
merged 14 commits into from
Jan 11, 2024
15 changes: 14 additions & 1 deletion distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,20 @@ properties:
description: |
Shut down the scheduler after this duration if no activity has occurred

This can be helpful to reduce costs and stop zombie processes from roaming the earth.
no-workers-timeout:
type:
- string
- "null"
description: |
Shut down the scheduler after this duration if there are pending tasks,
but no workers that can process them. This can either mean that there are
no workers running at all, or that there are idle workers but they've been
excluded through worker or resource restrictions.

In adaptive clusters, this timeout must be set to be safely higher than
the time it takes for workers to spin up.

Works in conjunction with idle-timeout.

work-stealing:
type: boolean
Expand Down
1 change: 1 addition & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ distributed:
# after they have been removed from the scheduler
events-cleanup-delay: 1h
idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes"
no-workers-timeout: 20m # Shut down if there are tasks but no workers to process them
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that idle-timeout is currently null, I'd also default to null for no-workers-timeout. If I don't want to shut down my cluster when there's nothing at all to be done, I probably also don't want to shut it down if I have something to be done but lack the means to do so.

Copy link
Collaborator

@crusaderky crusaderky Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, there's the use case of adaptive clusters that scale down to zero or almost zero. There you likely want to keep the scheduler always running, but if the cluster hangs e.g. while 100 CPU workers are up because a single GPU worker failed to start, you want to tear it down quickly.

However, I agree that None is generally a more desirable default particularly for non-adaptive situations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, there's the use case of adaptive clusters that scale down to zero or almost zero. There you likely want to keep the scheduler always running, but if the cluster hangs e.g. while 100 CPU workers are up because a single GPU worker failed to start, you want to tear it down quickly.

Fair point

work-stealing: True # workers should steal tasks from each other
work-stealing-interval: 100ms # Callback time for work stealing
worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers
Expand Down
59 changes: 49 additions & 10 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3517,6 +3517,12 @@
default_port = 8786
_instances: ClassVar[weakref.WeakSet[Scheduler]] = weakref.WeakSet()

worker_ttl: float | None
idle_since: float | None
idle_timeout: float | None
_no_workers_since: float | None # Note: not None iff there are pending tasks
no_workers_timeout: float | None

def __init__(
self,
loop=None,
Expand Down Expand Up @@ -3578,16 +3584,19 @@
self.service_kwargs = service_kwargs or {}
self.services = {}
self.scheduler_file = scheduler_file
worker_ttl = worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
self.worker_ttl = parse_timedelta(worker_ttl) if worker_ttl else None
idle_timeout = idle_timeout or dask.config.get(
"distributed.scheduler.idle-timeout"

self.worker_ttl = parse_timedelta(
worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
)
self.idle_timeout = parse_timedelta(
idle_timeout or dask.config.get("distributed.scheduler.idle-timeout")
)
if idle_timeout:
self.idle_timeout = parse_timedelta(idle_timeout)
else:
self.idle_timeout = None
self.idle_since = time()
self.no_workers_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.no-workers-timeout")
)
self._no_workers_since = None

self.time_started = self.idle_since # compatibility for dask-gateway
self._replica_lock = RLock()
self.bandwidth_workers = defaultdict(float)
Expand Down Expand Up @@ -3860,9 +3869,12 @@
pc = PeriodicCallback(self.check_worker_ttl, self.worker_ttl * 1000)
self.periodic_callbacks["worker-ttl"] = pc

pc = PeriodicCallback(self.check_idle, (self.idle_timeout or 1) * 1000 / 4)
pc = PeriodicCallback(self.check_idle, 250)
Copy link
Collaborator

@crusaderky crusaderky Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods are tiny and running them 4 times per second is inconsequential.
I would be annoyed if I had set a timeout of 2h and the shutdown was 30min later than expected.
Also, some third-party SchedulerPlugin could read from self.idle_since or self.no_workers_since and break when the user sets a timeout.

self.periodic_callbacks["idle-timeout"] = pc

pc = PeriodicCallback(self._check_no_workers, 250)
self.periodic_callbacks["no-workers-timeout"] = pc

if extensions is None:
extensions = DEFAULT_EXTENSIONS.copy()
if not dask.config.get("distributed.scheduler.work-stealing"):
Expand Down Expand Up @@ -8141,7 +8153,7 @@

def check_idle(self) -> float | None:
if self.status in (Status.closing, Status.closed):
return None
return None # pragma: nocover

if self.transition_counter != self._idle_transition_counter:
self._idle_transition_counter = self.transition_counter
Expand Down Expand Up @@ -8178,6 +8190,33 @@
self._ongoing_background_tasks.call_soon(self.close)
return self.idle_since

def _check_no_workers(self) -> None:
if self.status in (Status.closing, Status.closed):
return # pragma: nocover

if (not self.queued and not self.unrunnable) or (self.queued and self.workers):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check whether we have tasks in processing (regardless of queued tasks) and return if that's the case?

Copy link
Collaborator

@crusaderky crusaderky Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so.
The intent is to shut down the cluster if there are tasks to run but nowhere to run them.

"there's a task to run" translates to queued,unrunnable, or processing.

  • unrunnable tasks can exist if there are no workers whatsoever and queueing is disabled, or if there are some workers but they've all been excluded by task restrictions. In both cases their existance should trip the timeout.
  • queued tasks can exist either if there are no workers whatsoever or all workers are busy with other tasks. So if there are any workers on the cluster, we can just assume they're already busy with other rootish tasks and reset the timeout.
  • processing tasks exist if there's somewhere to run them, which should cause the timeout to reset.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't that translate to:

Suggested change
if (not self.queued and not self.unrunnable) or (self.queued and self.workers):
if self.processing or (not self.queued and not self.unrunnable) or (self.queued and self.workers):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, let me rephrase:
Should we also check whether we have tasks in processing (regardless of queued tasks) and reset no_workers_since if that's the case?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SchedulerState.processing sadly doesn't exist; you'd need any(ws.processing for ws in self.workers.values()) (like check_idle does).

Do you see a use case that the current logic doesn't cover? I can't think of any...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gen_cluster(
    client=True,
    nthreads=[("", 1)],
    config={"distributed.scheduler.no-workers-timeout": "100ms"},
)
async def test_no_workers_timeout_with_worker(c, s, a):
    """Do not trip no-workers-timeout when there are tasks processing"""
    import time
    s._check_no_workers()
    await asyncio.sleep(0.2)
    assert s.status == Status.running

    f1 = c.submit(time.sleep, 2)
    f2 = c.submit(inc, 1, key="x", workers=["127.0.0.2:1234"])
    await f1
    assert s.status == Status.running

would kill the scheduler before we're able to complete f1. From what I understand, we only ever want to kill the cluster if there's nothing that could possibly be processed with the current set of workers.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀 I had not seen that use case. Thank you. Fixed.

self._no_workers_since = None
return

# 1. There are queued or unrunnable tasks and no workers at all
# 2. There are unrunnable tasks and no workers satisfy their restrictions
# (Only rootish tasks can be queued, and rootish tasks can't have restrictions)

if not self._no_workers_since:
self._no_workers_since = time()
return

if (
self.no_workers_timeout
and time() > self._no_workers_since + self.no_workers_timeout
):
logger.info(

Check warning on line 8213 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L8213

Added line #L8213 was not covered by tests
"Tasks have been without any workers to run them for %s; "
"shutting scheduler down",
format_time(self.no_workers_timeout),
)
self._ongoing_background_tasks.call_soon(self.close)

Check warning on line 8218 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L8218

Added line #L8218 was not covered by tests

def adaptive_target(self, target_duration=None):
"""Desired number of workers based on the current workload

Expand Down
67 changes: 63 additions & 4 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2405,11 +2405,11 @@ async def test_idle_timeout(c, s, a, b):
pc.stop()


@gen_cluster(
client=True,
nthreads=[],
)
@gen_cluster(client=True, nthreads=[])
async def test_idle_timeout_no_workers(c, s):
"""Test that idle-timeout is not triggered if there are no workers available
but there are tasks queued
"""
# Cancel the idle check periodic timeout so we can step through manually
s.periodic_callbacks["idle-timeout"].stop()

Expand Down Expand Up @@ -2440,6 +2440,65 @@ async def test_idle_timeout_no_workers(c, s):
assert s.check_idle()


@gen_cluster(
client=True,
nthreads=[],
config={"distributed.scheduler.no-workers-timeout": None},
)
async def test_no_workers_timeout_disabled(c, s, a, b):
"""no-workers-timeout has been disabled"""
future = c.submit(inc, 1, key="x")
await wait_for_state("x", ("queued", "no-worker"), s)
s._check_no_workers()
await asyncio.sleep(0.2)
assert s.status == Status.running


@gen_cluster(
client=True,
nthreads=[],
config={"distributed.scheduler.no-workers-timeout": "100ms"},
)
async def test_no_workers_timeout_without_workers(c, s):
"""Trip no-workers-timeout when there are no workers available"""
# Don't trip scheduler shutdown when there are no tasks
s._check_no_workers()
await asyncio.sleep(0.2)
assert s.status == Status.running

future = c.submit(inc, 1)
while s.status != Status.closed:
await asyncio.sleep(0.01)


@gen_cluster(
client=True,
config={"distributed.scheduler.no-workers-timeout": "100ms"},
)
async def test_no_workers_timeout_bad_restrictions(c, s, a, b):
"""Trip no-workers-timeout when there are workers available but none satisfies
task restrictions
"""
future = c.submit(inc, 1, key="x", workers=["127.0.0.2:1234"])
while s.status != Status.closed:
await asyncio.sleep(0.01)


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_no_workers_timeout_queued(c, s, a):
"""Don't trip no-workers-timeout when there are queued tasks AND processing tasks"""
ev = Event()
futures = [c.submit(lambda ev: ev.wait(), ev, pure=False) for _ in range(3)]
while not a.state.tasks:
await asyncio.sleep(0.01)
assert s.queued or math.isinf(s.WORKER_SATURATION)

s._check_no_workers()
await asyncio.sleep(0.2)
assert s.status == Status.running
await ev.set()


crusaderky marked this conversation as resolved.
Show resolved Hide resolved
@gen_cluster(client=True, config={"distributed.scheduler.bandwidth": "100 GB"})
async def test_bandwidth(c, s, a, b):
start = s.bandwidth
Expand Down
Loading