Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add no worker timeout for scheduler #8371

Merged
merged 14 commits into from
Jan 11, 2024
Merged

Add no worker timeout for scheduler #8371

merged 14 commits into from
Jan 11, 2024

Conversation

FTang21
Copy link
Contributor

@FTang21 FTang21 commented Nov 26, 2023

Closes #8126

  • Tests added / passed
  • Passes pre-commit run --all-files

Add a new variable no_worker_timeout that checks if there are tasks still waiting to be processed, but no worker is processing them.
Update logic to include this behavior without changing the old logic too much.

@FTang21 FTang21 requested a review from fjetter as a code owner November 26, 2023 18:57
@GPUtester
Copy link
Collaborator

Can one of the admins verify this patch?

Admins can comment ok to test to allow this one PR to run or add to allowlist to allow all future PRs from the same author to run.

Copy link
Contributor

github-actions bot commented Nov 26, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    27 files  +    1      27 suites  +1   9h 54m 41s ⏱️ + 45m 34s
 3 957 tests +    5   3 846 ✅ +    7    109 💤 ± 0  2 ❌  - 2 
49 774 runs  +2 587  47 483 ✅ +2 511  2 289 💤 +78  2 ❌  - 2 

For more details on these failures, see this check.

Results for commit 50c7024. ± Comparison against base commit b95cf96.

♻️ This comment has been updated with latest results.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a unit test for this? If you add the timeout as a config value it should be much easier to write one.

distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
@FTang21 FTang21 requested a review from fjetter December 10, 2023 02:35
@hendrikmakait
Copy link
Member

@fjetter: Do you have time for another review on this PR?

@fjetter
Copy link
Member

fjetter commented Dec 21, 2023

Good looks fine but there are a couple of test failures that appear to be related

There is a test failure in distributed.tests.test_config that raises a config schema error. That's likely connected to the new option.

test_idle_timeout_no_workers also failed once. This is likely a little flaky but I'd be surprised if this isn't connected. I recommend running the test a couple of times concurrently. This typically triggers those flaky things reliably, e.g. pytest -x distributed/tests/test_scheduler.py::test_idle_timeout_no_workers -nauto --count=100 (requires pytest-xdist and pytest-repeat)

@crusaderky
Copy link
Collaborator

@FTang21 do you have time to look into the oustanding issues?

distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/distributed-schema.yaml Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/tests/test_scheduler.py Show resolved Hide resolved
distributed/tests/test_scheduler.py Outdated Show resolved Hide resolved
@crusaderky
Copy link
Collaborator

I can envision how users may get confused by the cluster shutting down unexpectedly. Could you please add logging that explain which timeout tripped the timeout?

@@ -3860,9 +3863,12 @@ async def post(self):
pc = PeriodicCallback(self.check_worker_ttl, self.worker_ttl * 1000)
self.periodic_callbacks["worker-ttl"] = pc

pc = PeriodicCallback(self.check_idle, (self.idle_timeout or 1) * 1000 / 4)
pc = PeriodicCallback(self.check_idle, 250)
Copy link
Collaborator

@crusaderky crusaderky Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods are tiny and running them 4 times per second is inconsequential.
I would be annoyed if I had set a timeout of 2h and the shutdown was 30min later than expected.
Also, some third-party SchedulerPlugin could read from self.idle_since or self.no_workers_since and break when the user sets a timeout.

@crusaderky crusaderky self-requested a review January 11, 2024 12:27
@crusaderky crusaderky self-assigned this Jan 11, 2024
@crusaderky crusaderky removed their request for review January 11, 2024 12:27
Comment on lines 3520 to 3524
worker_ttl: float | None
idle_since: float | None
idle_timeout: float | None
no_workers_since: float | None # Note: not None iff there are pending tasks
no_workers_timeout: float | None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the spirit of #8190, is there a reason to make these public?

Copy link
Collaborator

@crusaderky crusaderky Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a rule of thumb it makes sense to me to keep the variables that hold a public setting as public too.
I don't have a strong opinion on the two _since variables or the two check_ methods, so I'll change them to private.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change the pre-existing attributes in a follow-up PR though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a rule of thumb it makes sense to me to keep the variables that hold a public setting as public too.

👍, makes sense to me.

distributed/tests/test_scheduler.py Outdated Show resolved Hide resolved
if self.status in (Status.closing, Status.closed):
return # pragma: nocover

if (not self.queued and not self.unrunnable) or (self.queued and self.workers):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check whether we have tasks in processing (regardless of queued tasks) and return if that's the case?

Copy link
Collaborator

@crusaderky crusaderky Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so.
The intent is to shut down the cluster if there are tasks to run but nowhere to run them.

"there's a task to run" translates to queued,unrunnable, or processing.

  • unrunnable tasks can exist if there are no workers whatsoever and queueing is disabled, or if there are some workers but they've all been excluded by task restrictions. In both cases their existance should trip the timeout.
  • queued tasks can exist either if there are no workers whatsoever or all workers are busy with other tasks. So if there are any workers on the cluster, we can just assume they're already busy with other rootish tasks and reset the timeout.
  • processing tasks exist if there's somewhere to run them, which should cause the timeout to reset.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't that translate to:

Suggested change
if (not self.queued and not self.unrunnable) or (self.queued and self.workers):
if self.processing or (not self.queued and not self.unrunnable) or (self.queued and self.workers):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, let me rephrase:
Should we also check whether we have tasks in processing (regardless of queued tasks) and reset no_workers_since if that's the case?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SchedulerState.processing sadly doesn't exist; you'd need any(ws.processing for ws in self.workers.values()) (like check_idle does).

Do you see a use case that the current logic doesn't cover? I can't think of any...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gen_cluster(
    client=True,
    nthreads=[("", 1)],
    config={"distributed.scheduler.no-workers-timeout": "100ms"},
)
async def test_no_workers_timeout_with_worker(c, s, a):
    """Do not trip no-workers-timeout when there are tasks processing"""
    import time
    s._check_no_workers()
    await asyncio.sleep(0.2)
    assert s.status == Status.running

    f1 = c.submit(time.sleep, 2)
    f2 = c.submit(inc, 1, key="x", workers=["127.0.0.2:1234"])
    await f1
    assert s.status == Status.running

would kill the scheduler before we're able to complete f1. From what I understand, we only ever want to kill the cluster if there's nothing that could possibly be processed with the current set of workers.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀 I had not seen that use case. Thank you. Fixed.

@crusaderky
Copy link
Collaborator

@hendrikmakait all comments have been addressed

@hendrikmakait hendrikmakait self-requested a review January 11, 2024 14:54
if self.status in (Status.closing, Status.closed):
return # pragma: nocover

if (not self.queued and not self.unrunnable) or (self.queued and self.workers):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gen_cluster(
    client=True,
    nthreads=[("", 1)],
    config={"distributed.scheduler.no-workers-timeout": "100ms"},
)
async def test_no_workers_timeout_with_worker(c, s, a):
    """Do not trip no-workers-timeout when there are tasks processing"""
    import time
    s._check_no_workers()
    await asyncio.sleep(0.2)
    assert s.status == Status.running

    f1 = c.submit(time.sleep, 2)
    f2 = c.submit(inc, 1, key="x", workers=["127.0.0.2:1234"])
    await f1
    assert s.status == Status.running

would kill the scheduler before we're able to complete f1. From what I understand, we only ever want to kill the cluster if there's nothing that could possibly be processed with the current set of workers.

Comment on lines 20 to 21
idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes"
no-workers-timeout: 20m # Shut down if there are tasks but no workers to process them
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that idle-timeout is currently null, I'd also default to null for no-workers-timeout. If I don't want to shut down my cluster when there's nothing at all to be done, I probably also don't want to shut it down if I have something to be done but lack the means to do so.

Copy link
Collaborator

@crusaderky crusaderky Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, there's the use case of adaptive clusters that scale down to zero or almost zero. There you likely want to keep the scheduler always running, but if the cluster hangs e.g. while 100 CPU workers are up because a single GPU worker failed to start, you want to tear it down quickly.

However, I agree that None is generally a more desirable default particularly for non-adaptive situations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, there's the use case of adaptive clusters that scale down to zero or almost zero. There you likely want to keep the scheduler always running, but if the cluster hangs e.g. while 100 CPU workers are up because a single GPU worker failed to start, you want to tear it down quickly.

Fair point

@crusaderky
Copy link
Collaborator

@hendrikmakait all comments addressed

Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! LGTM assuming CI ends up green(ish).

@crusaderky crusaderky merged commit 9fb41e3 into dask:main Jan 11, 2024
30 of 33 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Eventually shut down scheduler when no workers return
5 participants