diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 22358fcf0e0..0e8faf2f19d 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -728,6 +728,13 @@ properties: export-tool: type: boolean + prometheus: + type: object + properties: + namespace: + type: string + description: Namespace prefix to use for all prometheus metrics. + admin: type: object description: | diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 156a61abe33..c71845e3769 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -168,6 +168,8 @@ distributed: link: "{scheme}://{host}:{port}/status" export-tool: False graph-max-items: 5000 # maximum number of tasks to try to plot in graph view + prometheus: + namespace: "dask" ################## # Administrative # diff --git a/distributed/http/prometheus.py b/distributed/http/prometheus.py new file mode 100644 index 00000000000..254a8f95911 --- /dev/null +++ b/distributed/http/prometheus.py @@ -0,0 +1,17 @@ +import dask.config + + +class PrometheusCollector: + def __init__(self, server): + self.server = server + self.namespace = dask.config.get("distributed.dashboard.prometheus.namespace") + self.subsystem = None + + def build_name(self, name): + full_name = [] + if self.namespace: + full_name.append(self.namespace) + if self.subsystem: + full_name.append(self.subsystem) + full_name.append(name) + return "_".join(full_name) diff --git a/distributed/http/scheduler/prometheus/__init__.py b/distributed/http/scheduler/prometheus/__init__.py index 120a01dab58..63c0310d0aa 100644 --- a/distributed/http/scheduler/prometheus/__init__.py +++ b/distributed/http/scheduler/prometheus/__init__.py @@ -1,105 +1,3 @@ -import toolz - -from distributed.http.utils import RequestHandler -from distributed.scheduler import ALL_TASK_STATES - -from .semaphore import SemaphoreMetricExtension - - -class _PrometheusCollector: - def __init__(self, dask_server): - self.server = dask_server - - def collect(self): - from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily - - yield GaugeMetricFamily( - "dask_scheduler_clients", - "Number of clients connected.", - value=len([k for k in self.server.clients if k != "fire-and-forget"]), - ) - - yield GaugeMetricFamily( - "dask_scheduler_desired_workers", - "Number of workers scheduler needs for task graph.", - value=self.server.adaptive_target(), - ) - - worker_states = GaugeMetricFamily( - "dask_scheduler_workers", - "Number of workers known by scheduler.", - labels=["state"], - ) - worker_states.add_metric(["connected"], len(self.server.workers)) - worker_states.add_metric(["saturated"], len(self.server.saturated)) - worker_states.add_metric(["idle"], len(self.server.idle)) - yield worker_states - - tasks = GaugeMetricFamily( - "dask_scheduler_tasks", - "Number of tasks known by scheduler.", - labels=["state"], - ) - - task_counter = toolz.merge_with( - sum, (tp.states for tp in self.server.task_prefixes.values()) - ) - - suspicious_tasks = CounterMetricFamily( - "dask_scheduler_tasks_suspicious", - "Total number of times a task has been marked suspicious", - labels=["task_prefix_name"], - ) - - for tp in self.server.task_prefixes.values(): - suspicious_tasks.add_metric([tp.name], tp.suspicious) - yield suspicious_tasks - - yield CounterMetricFamily( - "dask_scheduler_tasks_forgotten", - ( - "Total number of processed tasks no longer in memory and already " - "removed from the scheduler job queue. Note task groups on the " - "scheduler which have all tasks in the forgotten state are not included." - ), - value=task_counter.get("forgotten", 0.0), - ) - - for state in ALL_TASK_STATES: - tasks.add_metric([state], task_counter.get(state, 0.0)) - yield tasks - - -COLLECTORS = [_PrometheusCollector, SemaphoreMetricExtension] - - -class PrometheusHandler(RequestHandler): - _collectors = None - - def __init__(self, *args, dask_server=None, **kwargs): - import prometheus_client - - super().__init__(*args, dask_server=dask_server, **kwargs) - - if PrometheusHandler._collectors: - # Especially during testing, multiple schedulers are started - # sequentially in the same python process - for _collector in PrometheusHandler._collectors: - _collector.server = self.server - return - - PrometheusHandler._collectors = tuple( - collector(self.server) for collector in COLLECTORS - ) - # Register collectors - for instantiated_collector in PrometheusHandler._collectors: - prometheus_client.REGISTRY.register(instantiated_collector) - - def get(self): - import prometheus_client - - self.write(prometheus_client.generate_latest()) - self.set_header("Content-Type", "text/plain; version=0.0.4") - +from .core import PrometheusHandler routes = [("/metrics", PrometheusHandler, {})] diff --git a/distributed/http/scheduler/prometheus/core.py b/distributed/http/scheduler/prometheus/core.py new file mode 100644 index 00000000000..2a274940781 --- /dev/null +++ b/distributed/http/scheduler/prometheus/core.py @@ -0,0 +1,104 @@ +import toolz + +from distributed.http.prometheus import PrometheusCollector +from distributed.http.utils import RequestHandler +from distributed.scheduler import ALL_TASK_STATES + +from .semaphore import SemaphoreMetricCollector + + +class SchedulerMetricCollector(PrometheusCollector): + def __init__(self, server): + super().__init__(server) + self.subsystem = "scheduler" + + def collect(self): + from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily + + yield GaugeMetricFamily( + self.build_name("clients"), + "Number of clients connected.", + value=len([k for k in self.server.clients if k != "fire-and-forget"]), + ) + + yield GaugeMetricFamily( + self.build_name("desired_workers"), + "Number of workers scheduler needs for task graph.", + value=self.server.adaptive_target(), + ) + + worker_states = GaugeMetricFamily( + self.build_name("workers"), + "Number of workers known by scheduler.", + labels=["state"], + ) + worker_states.add_metric(["connected"], len(self.server.workers)) + worker_states.add_metric(["saturated"], len(self.server.saturated)) + worker_states.add_metric(["idle"], len(self.server.idle)) + yield worker_states + + tasks = GaugeMetricFamily( + self.build_name("tasks"), + "Number of tasks known by scheduler.", + labels=["state"], + ) + + task_counter = toolz.merge_with( + sum, (tp.states for tp in self.server.task_prefixes.values()) + ) + + suspicious_tasks = CounterMetricFamily( + self.build_name("tasks_suspicious"), + "Total number of times a task has been marked suspicious", + labels=["task_prefix_name"], + ) + + for tp in self.server.task_prefixes.values(): + suspicious_tasks.add_metric([tp.name], tp.suspicious) + yield suspicious_tasks + + yield CounterMetricFamily( + self.build_name("tasks_forgotten"), + ( + "Total number of processed tasks no longer in memory and already " + "removed from the scheduler job queue. Note task groups on the " + "scheduler which have all tasks in the forgotten state are not included." + ), + value=task_counter.get("forgotten", 0.0), + ) + + for state in ALL_TASK_STATES: + tasks.add_metric([state], task_counter.get(state, 0.0)) + yield tasks + + +COLLECTORS = [SchedulerMetricCollector, SemaphoreMetricCollector] + + +class PrometheusHandler(RequestHandler): + _collectors = None + + def __init__(self, *args, dask_server=None, **kwargs): + import prometheus_client + + super().__init__(*args, dask_server=dask_server, **kwargs) + + if PrometheusHandler._collectors: + # Especially during testing, multiple schedulers are started + # sequentially in the same python process + for _collector in PrometheusHandler._collectors: + _collector.server = self.server + return + + PrometheusHandler._collectors = tuple( + collector(self.server) for collector in COLLECTORS + ) + # Register collectors + for instantiated_collector in PrometheusHandler._collectors: + prometheus_client.REGISTRY.register(instantiated_collector) + + def get(self): + import prometheus_client + + self.write(prometheus_client.generate_latest()) + self.set_header("Content-Type", "text/plain; version=0.0.4") diff --git a/distributed/http/scheduler/prometheus/semaphore.py b/distributed/http/scheduler/prometheus/semaphore.py index aac467b66cc..36defbbeeae 100644 --- a/distributed/http/scheduler/prometheus/semaphore.py +++ b/distributed/http/scheduler/prometheus/semaphore.py @@ -1,6 +1,10 @@ -class SemaphoreMetricExtension: - def __init__(self, dask_server): - self.server = dask_server +from distributed.http.prometheus import PrometheusCollector + + +class SemaphoreMetricCollector(PrometheusCollector): + def __init__(self, server): + super().__init__(server) + self.subsystem = "semaphore" def collect(self): from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily @@ -8,29 +12,29 @@ def collect(self): sem_ext = self.server.extensions["semaphores"] semaphore_max_leases_family = GaugeMetricFamily( - "semaphore_max_leases", + self.build_name("max_leases"), "Maximum leases allowed per semaphore, this will be constant for each semaphore during its lifetime.", labels=["name"], ) semaphore_active_leases_family = GaugeMetricFamily( - "semaphore_active_leases", + self.build_name("active_leases"), "Amount of currently active leases per semaphore.", labels=["name"], ) semaphore_pending_leases = GaugeMetricFamily( - "semaphore_pending_leases", + self.build_name("pending_leases"), "Amount of currently pending leases per semaphore.", labels=["name"], ) semaphore_acquire_total = CounterMetricFamily( - "semaphore_acquire_total", + self.build_name("acquire_total"), "Total number of leases acquired per semaphore.", labels=["name"], ) semaphore_release_total = CounterMetricFamily( - "semaphore_release_total", + self.build_name("release_total"), "Total number of leases released per semaphore.\n" "Note: if a semaphore is closed while there are still leases active, this count will not equal " "`semaphore_acquired_total` after execution.", @@ -38,7 +42,7 @@ def collect(self): ) semaphore_average_pending_lease_time = GaugeMetricFamily( - "semaphore_average_pending_lease_time", + self.build_name("average_pending_lease_time"), "Exponential moving average of the time it took to acquire a lease per semaphore.\n" "Note: this only includes time spent on scheduler side, " "it does" diff --git a/distributed/http/scheduler/tests/test_semaphore_http.py b/distributed/http/scheduler/tests/test_semaphore_http.py index 21996cb35f4..67842e141fb 100644 --- a/distributed/http/scheduler/tests/test_semaphore_http.py +++ b/distributed/http/scheduler/tests/test_semaphore_http.py @@ -19,19 +19,19 @@ async def fetch_metrics(): families = { family.name: family for family in text_string_to_metric_families(txt) - if family.name.startswith("semaphore_") + if family.name.startswith("dask_semaphore_") } return families active_metrics = await fetch_metrics() expected_metrics = { - "semaphore_max_leases", - "semaphore_active_leases", - "semaphore_pending_leases", - "semaphore_acquire", - "semaphore_release", - "semaphore_average_pending_lease_time_s", + "dask_semaphore_max_leases", + "dask_semaphore_active_leases", + "dask_semaphore_pending_leases", + "dask_semaphore_acquire", + "dask_semaphore_release", + "dask_semaphore_average_pending_lease_time_s", } assert active_metrics.keys() == expected_metrics @@ -48,28 +48,34 @@ async def fetch_metrics(): assert len(samples) == 1 sample = samples.pop() assert sample.labels["name"] == "test" - if name == "semaphore_max_leases": + if name == "dask_semaphore_max_leases": assert sample.value == 2 else: assert sample.value == 0 assert await sem.acquire() active_metrics = await fetch_metrics() - assert active_metrics["semaphore_max_leases"].samples[0].value == 2 - assert active_metrics["semaphore_active_leases"].samples[0].value == 1 - assert active_metrics["semaphore_average_pending_lease_time_s"].samples[0].value > 0 - assert active_metrics["semaphore_acquire"].samples[0].value == 1 - assert active_metrics["semaphore_release"].samples[0].value == 0 - assert active_metrics["semaphore_pending_leases"].samples[0].value == 0 + assert active_metrics["dask_semaphore_max_leases"].samples[0].value == 2 + assert active_metrics["dask_semaphore_active_leases"].samples[0].value == 1 + assert ( + active_metrics["dask_semaphore_average_pending_lease_time_s"].samples[0].value + > 0 + ) + assert active_metrics["dask_semaphore_acquire"].samples[0].value == 1 + assert active_metrics["dask_semaphore_release"].samples[0].value == 0 + assert active_metrics["dask_semaphore_pending_leases"].samples[0].value == 0 assert await sem.release() is True active_metrics = await fetch_metrics() - assert active_metrics["semaphore_max_leases"].samples[0].value == 2 - assert active_metrics["semaphore_active_leases"].samples[0].value == 0 - assert active_metrics["semaphore_average_pending_lease_time_s"].samples[0].value > 0 - assert active_metrics["semaphore_acquire"].samples[0].value == 1 - assert active_metrics["semaphore_release"].samples[0].value == 1 - assert active_metrics["semaphore_pending_leases"].samples[0].value == 0 + assert active_metrics["dask_semaphore_max_leases"].samples[0].value == 2 + assert active_metrics["dask_semaphore_active_leases"].samples[0].value == 0 + assert ( + active_metrics["dask_semaphore_average_pending_lease_time_s"].samples[0].value + > 0 + ) + assert active_metrics["dask_semaphore_acquire"].samples[0].value == 1 + assert active_metrics["dask_semaphore_release"].samples[0].value == 1 + assert active_metrics["dask_semaphore_pending_leases"].samples[0].value == 0 await sem.close() active_metrics = await fetch_metrics() diff --git a/distributed/http/worker/prometheus/__init__.py b/distributed/http/worker/prometheus/__init__.py new file mode 100644 index 00000000000..63c0310d0aa --- /dev/null +++ b/distributed/http/worker/prometheus/__init__.py @@ -0,0 +1,3 @@ +from .core import PrometheusHandler + +routes = [("/metrics", PrometheusHandler, {})] diff --git a/distributed/http/worker/prometheus.py b/distributed/http/worker/prometheus/core.py similarity index 57% rename from distributed/http/worker/prometheus.py rename to distributed/http/worker/prometheus/core.py index 4d0c0a55e60..cd082ea6a33 100644 --- a/distributed/http/worker/prometheus.py +++ b/distributed/http/worker/prometheus/core.py @@ -1,12 +1,14 @@ import logging -from ..utils import RequestHandler +from distributed.http.prometheus import PrometheusCollector +from distributed.http.utils import RequestHandler -class _PrometheusCollector: +class WorkerMetricCollector(PrometheusCollector): def __init__(self, server): - self.worker = server + super().__init__(server) self.logger = logging.getLogger("distributed.dask_worker") + self.subsystem = "worker" self.crick_available = True try: import crick # noqa: F401 @@ -20,52 +22,53 @@ def collect(self): from prometheus_client.core import GaugeMetricFamily tasks = GaugeMetricFamily( - "dask_worker_tasks", "Number of tasks at worker.", labels=["state"] + self.build_name("tasks"), + "Number of tasks at worker.", + labels=["state"], ) - tasks.add_metric(["stored"], len(self.worker.data)) - tasks.add_metric(["executing"], self.worker.executing_count) - tasks.add_metric(["ready"], len(self.worker.ready)) - tasks.add_metric(["waiting"], self.worker.waiting_for_data_count) - tasks.add_metric(["serving"], len(self.worker._comms)) + tasks.add_metric(["stored"], len(self.server.data)) + tasks.add_metric(["executing"], self.server.executing_count) + tasks.add_metric(["ready"], len(self.server.ready)) + tasks.add_metric(["waiting"], self.server.waiting_for_data_count) yield tasks yield GaugeMetricFamily( - "dask_worker_connections", - "Number of task connections to other workers.", - value=len(self.worker.in_flight_workers), + self.build_name("concurrent_fetch_requests"), + "Number of open fetch requests to other workers.", + value=len(self.server.in_flight_workers), ) yield GaugeMetricFamily( - "dask_worker_threads", + self.build_name("threads"), "Number of worker threads.", - value=self.worker.nthreads, + value=self.server.nthreads, ) yield GaugeMetricFamily( - "dask_worker_latency_seconds", + self.build_name("latency_seconds"), "Latency of worker connection.", - value=self.worker.latency, + value=self.server.latency, ) # all metrics using digests require crick to be installed # the following metrics will export NaN, if the corresponding digests are None if self.crick_available: yield GaugeMetricFamily( - "dask_worker_tick_duration_median_seconds", + self.build_name("tick_duration_median_seconds"), "Median tick duration at worker.", - value=self.worker.digests["tick-duration"].components[1].quantile(50), + value=self.server.digests["tick-duration"].components[1].quantile(50), ) yield GaugeMetricFamily( - "dask_worker_task_duration_median_seconds", + self.build_name("task_duration_median_seconds"), "Median task runtime at worker.", - value=self.worker.digests["task-duration"].components[1].quantile(50), + value=self.server.digests["task-duration"].components[1].quantile(50), ) yield GaugeMetricFamily( - "dask_worker_transfer_bandwidth_median_bytes", + self.build_name("transfer_bandwidth_median_bytes"), "Bandwidth for transfer at worker in Bytes.", - value=self.worker.digests["transfer-bandwidth"] + value=self.server.digests["transfer-bandwidth"] .components[1] .quantile(50), ) @@ -82,7 +85,7 @@ def __init__(self, *args, **kwargs): if PrometheusHandler._initialized: return - prometheus_client.REGISTRY.register(_PrometheusCollector(self.server)) + prometheus_client.REGISTRY.register(WorkerMetricCollector(self.server)) PrometheusHandler._initialized = True @@ -91,6 +94,3 @@ def get(self): self.write(prometheus_client.generate_latest()) self.set_header("Content-Type", "text/plain; version=0.0.4") - - -routes = [(r"metrics", PrometheusHandler, {})]