Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add idle time to fine performance metrics #7938

Merged
merged 3 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 41 additions & 25 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
from distributed.diagnostics.task_stream import colors as ts_color_lookup
from distributed.metrics import time
from distributed.scheduler import Scheduler
from distributed.spans import Span, SpansSchedulerExtension
from distributed.spans import SpansSchedulerExtension
from distributed.utils import Log, log_errors

if dask.config.get("distributed.dashboard.export-tool"):
Expand Down Expand Up @@ -3532,6 +3532,9 @@ def _update_selectors(self) -> None:
if units:
self.unit_selector.options.extend(units)

if functions:
# Added on the fly by Span.cumulative_worker_metrics
functions.add("N/A")
functions.difference_update(self.function_selector.options)
if functions:
self.function_selector.options.extend(functions)
Expand Down Expand Up @@ -3595,38 +3598,51 @@ def _build_data_sources(self) -> None:

function_sel = set(self.function_selector.value)

span: Span | Scheduler
if self.span_tag_selector.value:
spans_ext: SpansSchedulerExtension = self.scheduler.extensions["spans"]
spans_ext: SpansSchedulerExtension | None = self.scheduler.extensions.get(
"spans"
)
if spans_ext and self.span_tag_selector.value:
span = spans_ext.merge_by_tags(*self.span_tag_selector.value)
execute_metrics = span.cumulative_worker_metrics
elif spans_ext and spans_ext.spans:
# Calculate idle time
span = spans_ext.merge_all()
execute_metrics = span.cumulative_worker_metrics
else:
span = self.scheduler
# Spans extension is not loaded
execute_metrics = {
k: v
for k, v in self.scheduler.cumulative_worker_metrics.items()
if isinstance(k, tuple) and k[0] == "execute"
}

for k, v in span.cumulative_worker_metrics.items():
if not isinstance(k, tuple):
continue # Only happens in global metrics
context, *other, activity, unit = k
for (context, function, activity, unit), v in execute_metrics.items():
assert context == "execute"
assert isinstance(function, str)
assert isinstance(unit, str)
assert self.unit_selector.value
if unit != self.unit_selector.value:
continue
if function_sel and function not in function_sel:
continue

if context == "execute":
(function,) = other
assert isinstance(function, str)
if not function_sel or function in function_sel:
# Custom metrics can provide any hashable as the label
activity = str(activity)
execute_by_func[function, activity] += v
execute[activity] += v
visible_functions.add(function)
visible_activities.add(activity)

elif context == "get-data" and not function_sel:
# Note: this will always be empty when a span is selected
assert isinstance(activity, str)
visible_activities.add(activity)
get_data[activity] += v
# Custom metrics won't necessarily contain a string as the label
activity = str(activity)
execute_by_func[function, activity] += v
execute[activity] += v
visible_functions.add(function)
visible_activities.add(activity)

if not self.function_selector.value and not self.span_tag_selector.value:
for k, v in self.scheduler.cumulative_worker_metrics.items():
if isinstance(k, tuple) and k[0] == "get-data":
_, activity, unit = k
assert isinstance(activity, str)
assert isinstance(unit, str)
assert self.unit_selector.value
if unit == self.unit_selector.value:
visible_activities.add(activity)
get_data[activity] += v

# Ignore memory-monitor and gather-dep metrics

Expand Down
35 changes: 33 additions & 2 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ async def test_FinePerformanceMetrics(c, s, a, b):
# Test with no metrics
cl.update()
assert not cl.visible_functions
assert not cl.span_tag_selector.options
assert not cl.function_selector.options
assert cl.unit_selector.options == ["seconds"]

# execute on default span; multiple tasks in same TaskGroup
x0 = c.submit(inc, 0, key="x-0", workers=[a.address])
Expand Down Expand Up @@ -383,12 +386,14 @@ def f():
await b.heartbeat()

cl.update()
assert sorted(cl.visible_functions) == ["v", "w", "x", "y", "z"]
assert sorted(cl.visible_functions) == ["N/A", "v", "w", "x", "y", "z"]
assert sorted(cl.function_selector.options) == ["N/A", "v", "w", "x", "y", "z"]
assert sorted(cl.unit_selector.options) == ["bytes", "count", "custom", "seconds"]
assert "thread-cpu" in cl.visible_activities
assert "('foo', 1)" in cl.visible_activities
assert "None" in cl.visible_activities
assert "hideme" not in cl.visible_activities
assert sorted(cl.span_tag_selector.options) == ["default", "foo"]

orig_activities = cl.visible_activities[:]

Expand All @@ -410,7 +415,33 @@ def f():

cl.span_tag_selector.value = ["foo"]
cl.update()
assert sorted(cl.visible_functions) == ["y", "z"]
assert sorted(cl.visible_functions) == ["N/A", "y", "z"]
assert sorted(cl.function_selector.options) == ["N/A", "v", "w", "x", "y", "z"]


@gen_cluster(
client=True,
scheduler_kwargs={"extensions": {}},
worker_kwargs={"extensions": {}},
)
async def test_FinePerformanceMetrics_no_spans(c, s, a, b):
cl = FinePerformanceMetrics(s)

# Test with no metrics
cl.update()
assert not cl.visible_functions
await c.submit(inc, 0, key="x-0")
await a.heartbeat()
await b.heartbeat()

cl.update()
assert sorted(cl.visible_functions) == ["x"]
assert sorted(cl.unit_selector.options) == ["bytes", "count", "seconds"]
assert "thread-cpu" in cl.visible_activities

cl.unit_selector.value = "bytes"
cl.update()
assert sorted(cl.visible_activities) == ["memory-read"]


@gen_cluster(client=True)
Expand Down
44 changes: 44 additions & 0 deletions distributed/itertools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from __future__ import annotations

from collections.abc import Iterable, Iterator
from typing import TypeVar

X = TypeVar("X")
Y = TypeVar("Y")


def ffill(x: Iterable[X], xp: Iterable[X], fp: Iterable[Y], left: Y) -> Iterator[Y]:
"""Forward-fill interpolation

Parameters
----------
x:
Output x series. Must be monotonic ascending.
xp:
Input x series. Must be strictly monotonic ascending.
fp:
Input y series. If it contains more or less elements than xp, the two series
will be clipped to the shortest one (like in :func:`zip`).
left:
Value to yield for x < xp[0]

Yields
------
Forward-fill interpolated elements from fp matching x

Examples
--------
>>> list(ffill([0.5, 2.2, 2.3, 4.5], [1, 2, 3], "abc", "-"))
["-", "b", "b", "c"]
"""
it = zip(xp, fp)
xp_done = False
xp1, fp1 = None, left
for xi in x:
while not xp_done and (xp1 is None or xi >= xp1): # type: ignore[unreachable]
fp0 = fp1
try:
xp1, fp1 = next(it)
except StopIteration:
xp_done = True
yield fp0
7 changes: 7 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,11 @@ class SchedulerState:
idle_task_count: set[WorkerState]
#: Workers that are fully utilized. May include non-running workers.
saturated: set[WorkerState]
#: Current number of threads across all workers
total_nthreads: int
#: History of number of threads
#: (timestamp, new number of threads)
total_nthreads_history: list[tuple[float, int]]
#: Cluster-wide resources. {resource name: {worker address: amount}}
resources: dict[str, dict[str, float]]

Expand Down Expand Up @@ -1690,6 +1694,7 @@ def __init__(
self.task_prefixes = {}
self.task_metadata = {}
self.total_nthreads = 0
self.total_nthreads_history = [(time(), 0)]
self.unknown_durations = {}
self.queued = queued
self.unrunnable = unrunnable
Expand Down Expand Up @@ -4246,6 +4251,7 @@ async def add_worker(
dh["nthreads"] += nthreads

self.total_nthreads += nthreads
self.total_nthreads_history.append((time(), self.total_nthreads))
self.aliases[name] = address

self.heartbeat_worker(
Expand Down Expand Up @@ -4996,6 +5002,7 @@ async def remove_worker(
dh_addresses.remove(address)
dh["nthreads"] -= ws.nthreads
self.total_nthreads -= ws.nthreads
self.total_nthreads_history.append((time(), self.total_nthreads))
if not dh_addresses:
del self.host_info[host]

Expand Down
Loading