From 42e34e35997d7c8e95312486ae9c3034aa640241 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Thu, 17 Oct 2024 13:15:27 +0200 Subject: [PATCH] Add configurations for rootish taskgroup threshold (#8898) * Increase rootish dependencies * Use config variables * Add configurations for rootish taskgroup threshold --- distributed/distributed-schema.yaml | 26 ++++++++++++++++++++++++++ distributed/distributed.yaml | 2 ++ distributed/scheduler.py | 11 +++++++++-- distributed/tests/test_scheduler.py | 12 ++++++++++++ 4 files changed, 49 insertions(+), 2 deletions(-) diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index bb2484d194..55f20deabe 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -133,6 +133,32 @@ properties: generally leave `worker-saturation` at 1.0, though 1.25-1.5 could slightly improve performance if ample memory is available. + rootish-taskgroup: + type: + - integer + + description: | + Controls when a specific task group is identified as rootish when + worker saturation is set. + + A task group is identifier as rootish if it has only up to a certain number + of dependencies (5 by default). This can be faulty for very large datasets + where the number of data tasks from xarray can be higher than 5. + + Increasing this limit will capture these root tasks successfully but increase + the risk of misidentifying task groups as rootish, which can have + performance implications. + + rootish-taskgroup-dependencies: + type: + - integer + + description: | + Controls the number of transitive dependencies a task group can have to be considered rootish. + It checks the number of dependencies each dependency of a rootish task groups has. + + The same caveats as for `rootish-taskgroup` apply. + worker-ttl: type: - string diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 6468172eb0..d3ae4844c5 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -23,6 +23,8 @@ distributed: work-stealing: True # workers should steal tasks from each other work-stealing-interval: 100ms # Callback time for work stealing worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers + rootish-taskgroup: 5 # number of dependencies of a rootish tg + rootish-taskgroup-dependencies: 5 # number of dependencies of the dependencies of the rootish tg worker-ttl: "5 minutes" # like '60s'. Time to live for workers. They must heartbeat faster than this preload: [] # Run custom modules with Scheduler preload-argv: [] # See https://docs.dask.org/en/latest/how-to/customize-initialization.html diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d1f18d7c2f..790c519623 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1840,6 +1840,13 @@ def __init__( + repr(self.WORKER_SATURATION) ) + self.rootish_tg_threshold = dask.config.get( + "distributed.scheduler.rootish-taskgroup" + ) + self.rootish_tg_dependencies_threshold = dask.config.get( + "distributed.scheduler.rootish-taskgroup-dependencies" + ) + @abstractmethod def log_event(self, topic: str | Collection[str], msg: Any) -> None: ... @@ -3090,8 +3097,8 @@ def is_rootish(self, ts: TaskState) -> bool: # TODO short-circuit to True if `not ts.dependencies`? return ( len(tg) > self.total_nthreads * 2 - and len(tg.dependencies) < 5 - and sum(map(len, tg.dependencies)) < 5 + and len(tg.dependencies) < self.rootish_tg_threshold + and sum(map(len, tg.dependencies)) < self.rootish_tg_dependencies_threshold ) def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0) -> None: diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index cf3b8b8d4f..b081ed9285 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -5277,3 +5277,15 @@ async def before_close(self): assert s.plugins["before_close"].call_count == 1 lines = caplog.getvalue().split("\n") assert sum("Closing scheduler" in line for line in lines) == 1 + + +@gen_cluster( + client=True, + config={ + "distributed.scheduler.rootish-taskgroup": 10, + "distributed.scheduler.rootish-taskgroup-dependencies": 15, + }, +) +async def test_rootish_taskgroup_configuration(c, s, *workers): + assert s.rootish_tg_threshold == 10 + assert s.rootish_tg_dependencies_threshold == 15