Skip to content

Commit

Permalink
Enable configuration of prometheus metrics namespace (#4722)
Browse files Browse the repository at this point in the history
* Enable configuration of prometheus metrics namespace

* Undo editor formatting

* Refactor into base class and remove private function import

* Rename semaphore subsystem

* Remove serving metric and rename connections metric
  • Loading branch information
jacobtomlinson authored Apr 22, 2021
1 parent 4a883a7 commit 42e3f22
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 158 deletions.
7 changes: 7 additions & 0 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,13 @@ properties:
export-tool:
type: boolean

prometheus:
type: object
properties:
namespace:
type: string
description: Namespace prefix to use for all prometheus metrics.

admin:
type: object
description: |
Expand Down
2 changes: 2 additions & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ distributed:
link: "{scheme}://{host}:{port}/status"
export-tool: False
graph-max-items: 5000 # maximum number of tasks to try to plot in graph view
prometheus:
namespace: "dask"

##################
# Administrative #
Expand Down
17 changes: 17 additions & 0 deletions distributed/http/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import dask.config


class PrometheusCollector:
def __init__(self, server):
self.server = server
self.namespace = dask.config.get("distributed.dashboard.prometheus.namespace")
self.subsystem = None

def build_name(self, name):
full_name = []
if self.namespace:
full_name.append(self.namespace)
if self.subsystem:
full_name.append(self.subsystem)
full_name.append(name)
return "_".join(full_name)
104 changes: 1 addition & 103 deletions distributed/http/scheduler/prometheus/__init__.py
Original file line number Diff line number Diff line change
@@ -1,105 +1,3 @@
import toolz

from distributed.http.utils import RequestHandler
from distributed.scheduler import ALL_TASK_STATES

from .semaphore import SemaphoreMetricExtension


class _PrometheusCollector:
def __init__(self, dask_server):
self.server = dask_server

def collect(self):
from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily

yield GaugeMetricFamily(
"dask_scheduler_clients",
"Number of clients connected.",
value=len([k for k in self.server.clients if k != "fire-and-forget"]),
)

yield GaugeMetricFamily(
"dask_scheduler_desired_workers",
"Number of workers scheduler needs for task graph.",
value=self.server.adaptive_target(),
)

worker_states = GaugeMetricFamily(
"dask_scheduler_workers",
"Number of workers known by scheduler.",
labels=["state"],
)
worker_states.add_metric(["connected"], len(self.server.workers))
worker_states.add_metric(["saturated"], len(self.server.saturated))
worker_states.add_metric(["idle"], len(self.server.idle))
yield worker_states

tasks = GaugeMetricFamily(
"dask_scheduler_tasks",
"Number of tasks known by scheduler.",
labels=["state"],
)

task_counter = toolz.merge_with(
sum, (tp.states for tp in self.server.task_prefixes.values())
)

suspicious_tasks = CounterMetricFamily(
"dask_scheduler_tasks_suspicious",
"Total number of times a task has been marked suspicious",
labels=["task_prefix_name"],
)

for tp in self.server.task_prefixes.values():
suspicious_tasks.add_metric([tp.name], tp.suspicious)
yield suspicious_tasks

yield CounterMetricFamily(
"dask_scheduler_tasks_forgotten",
(
"Total number of processed tasks no longer in memory and already "
"removed from the scheduler job queue. Note task groups on the "
"scheduler which have all tasks in the forgotten state are not included."
),
value=task_counter.get("forgotten", 0.0),
)

for state in ALL_TASK_STATES:
tasks.add_metric([state], task_counter.get(state, 0.0))
yield tasks


COLLECTORS = [_PrometheusCollector, SemaphoreMetricExtension]


class PrometheusHandler(RequestHandler):
_collectors = None

def __init__(self, *args, dask_server=None, **kwargs):
import prometheus_client

super().__init__(*args, dask_server=dask_server, **kwargs)

if PrometheusHandler._collectors:
# Especially during testing, multiple schedulers are started
# sequentially in the same python process
for _collector in PrometheusHandler._collectors:
_collector.server = self.server
return

PrometheusHandler._collectors = tuple(
collector(self.server) for collector in COLLECTORS
)
# Register collectors
for instantiated_collector in PrometheusHandler._collectors:
prometheus_client.REGISTRY.register(instantiated_collector)

def get(self):
import prometheus_client

self.write(prometheus_client.generate_latest())
self.set_header("Content-Type", "text/plain; version=0.0.4")

from .core import PrometheusHandler

routes = [("/metrics", PrometheusHandler, {})]
104 changes: 104 additions & 0 deletions distributed/http/scheduler/prometheus/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import toolz

from distributed.http.prometheus import PrometheusCollector
from distributed.http.utils import RequestHandler
from distributed.scheduler import ALL_TASK_STATES

from .semaphore import SemaphoreMetricCollector


class SchedulerMetricCollector(PrometheusCollector):
def __init__(self, server):
super().__init__(server)
self.subsystem = "scheduler"

def collect(self):
from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily

yield GaugeMetricFamily(
self.build_name("clients"),
"Number of clients connected.",
value=len([k for k in self.server.clients if k != "fire-and-forget"]),
)

yield GaugeMetricFamily(
self.build_name("desired_workers"),
"Number of workers scheduler needs for task graph.",
value=self.server.adaptive_target(),
)

worker_states = GaugeMetricFamily(
self.build_name("workers"),
"Number of workers known by scheduler.",
labels=["state"],
)
worker_states.add_metric(["connected"], len(self.server.workers))
worker_states.add_metric(["saturated"], len(self.server.saturated))
worker_states.add_metric(["idle"], len(self.server.idle))
yield worker_states

tasks = GaugeMetricFamily(
self.build_name("tasks"),
"Number of tasks known by scheduler.",
labels=["state"],
)

task_counter = toolz.merge_with(
sum, (tp.states for tp in self.server.task_prefixes.values())
)

suspicious_tasks = CounterMetricFamily(
self.build_name("tasks_suspicious"),
"Total number of times a task has been marked suspicious",
labels=["task_prefix_name"],
)

for tp in self.server.task_prefixes.values():
suspicious_tasks.add_metric([tp.name], tp.suspicious)
yield suspicious_tasks

yield CounterMetricFamily(
self.build_name("tasks_forgotten"),
(
"Total number of processed tasks no longer in memory and already "
"removed from the scheduler job queue. Note task groups on the "
"scheduler which have all tasks in the forgotten state are not included."
),
value=task_counter.get("forgotten", 0.0),
)

for state in ALL_TASK_STATES:
tasks.add_metric([state], task_counter.get(state, 0.0))
yield tasks


COLLECTORS = [SchedulerMetricCollector, SemaphoreMetricCollector]


class PrometheusHandler(RequestHandler):
_collectors = None

def __init__(self, *args, dask_server=None, **kwargs):
import prometheus_client

super().__init__(*args, dask_server=dask_server, **kwargs)

if PrometheusHandler._collectors:
# Especially during testing, multiple schedulers are started
# sequentially in the same python process
for _collector in PrometheusHandler._collectors:
_collector.server = self.server
return

PrometheusHandler._collectors = tuple(
collector(self.server) for collector in COLLECTORS
)
# Register collectors
for instantiated_collector in PrometheusHandler._collectors:
prometheus_client.REGISTRY.register(instantiated_collector)

def get(self):
import prometheus_client

self.write(prometheus_client.generate_latest())
self.set_header("Content-Type", "text/plain; version=0.0.4")
22 changes: 13 additions & 9 deletions distributed/http/scheduler/prometheus/semaphore.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,48 @@
class SemaphoreMetricExtension:
def __init__(self, dask_server):
self.server = dask_server
from distributed.http.prometheus import PrometheusCollector


class SemaphoreMetricCollector(PrometheusCollector):
def __init__(self, server):
super().__init__(server)
self.subsystem = "semaphore"

def collect(self):
from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily

sem_ext = self.server.extensions["semaphores"]

semaphore_max_leases_family = GaugeMetricFamily(
"semaphore_max_leases",
self.build_name("max_leases"),
"Maximum leases allowed per semaphore, this will be constant for each semaphore during its lifetime.",
labels=["name"],
)
semaphore_active_leases_family = GaugeMetricFamily(
"semaphore_active_leases",
self.build_name("active_leases"),
"Amount of currently active leases per semaphore.",
labels=["name"],
)
semaphore_pending_leases = GaugeMetricFamily(
"semaphore_pending_leases",
self.build_name("pending_leases"),
"Amount of currently pending leases per semaphore.",
labels=["name"],
)

semaphore_acquire_total = CounterMetricFamily(
"semaphore_acquire_total",
self.build_name("acquire_total"),
"Total number of leases acquired per semaphore.",
labels=["name"],
)

semaphore_release_total = CounterMetricFamily(
"semaphore_release_total",
self.build_name("release_total"),
"Total number of leases released per semaphore.\n"
"Note: if a semaphore is closed while there are still leases active, this count will not equal "
"`semaphore_acquired_total` after execution.",
labels=["name"],
)

semaphore_average_pending_lease_time = GaugeMetricFamily(
"semaphore_average_pending_lease_time",
self.build_name("average_pending_lease_time"),
"Exponential moving average of the time it took to acquire a lease per semaphore.\n"
"Note: this only includes time spent on scheduler side, "
"it does"
Expand Down
46 changes: 26 additions & 20 deletions distributed/http/scheduler/tests/test_semaphore_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ async def fetch_metrics():
families = {
family.name: family
for family in text_string_to_metric_families(txt)
if family.name.startswith("semaphore_")
if family.name.startswith("dask_semaphore_")
}
return families

active_metrics = await fetch_metrics()

expected_metrics = {
"semaphore_max_leases",
"semaphore_active_leases",
"semaphore_pending_leases",
"semaphore_acquire",
"semaphore_release",
"semaphore_average_pending_lease_time_s",
"dask_semaphore_max_leases",
"dask_semaphore_active_leases",
"dask_semaphore_pending_leases",
"dask_semaphore_acquire",
"dask_semaphore_release",
"dask_semaphore_average_pending_lease_time_s",
}

assert active_metrics.keys() == expected_metrics
Expand All @@ -48,28 +48,34 @@ async def fetch_metrics():
assert len(samples) == 1
sample = samples.pop()
assert sample.labels["name"] == "test"
if name == "semaphore_max_leases":
if name == "dask_semaphore_max_leases":
assert sample.value == 2
else:
assert sample.value == 0

assert await sem.acquire()
active_metrics = await fetch_metrics()
assert active_metrics["semaphore_max_leases"].samples[0].value == 2
assert active_metrics["semaphore_active_leases"].samples[0].value == 1
assert active_metrics["semaphore_average_pending_lease_time_s"].samples[0].value > 0
assert active_metrics["semaphore_acquire"].samples[0].value == 1
assert active_metrics["semaphore_release"].samples[0].value == 0
assert active_metrics["semaphore_pending_leases"].samples[0].value == 0
assert active_metrics["dask_semaphore_max_leases"].samples[0].value == 2
assert active_metrics["dask_semaphore_active_leases"].samples[0].value == 1
assert (
active_metrics["dask_semaphore_average_pending_lease_time_s"].samples[0].value
> 0
)
assert active_metrics["dask_semaphore_acquire"].samples[0].value == 1
assert active_metrics["dask_semaphore_release"].samples[0].value == 0
assert active_metrics["dask_semaphore_pending_leases"].samples[0].value == 0

assert await sem.release() is True
active_metrics = await fetch_metrics()
assert active_metrics["semaphore_max_leases"].samples[0].value == 2
assert active_metrics["semaphore_active_leases"].samples[0].value == 0
assert active_metrics["semaphore_average_pending_lease_time_s"].samples[0].value > 0
assert active_metrics["semaphore_acquire"].samples[0].value == 1
assert active_metrics["semaphore_release"].samples[0].value == 1
assert active_metrics["semaphore_pending_leases"].samples[0].value == 0
assert active_metrics["dask_semaphore_max_leases"].samples[0].value == 2
assert active_metrics["dask_semaphore_active_leases"].samples[0].value == 0
assert (
active_metrics["dask_semaphore_average_pending_lease_time_s"].samples[0].value
> 0
)
assert active_metrics["dask_semaphore_acquire"].samples[0].value == 1
assert active_metrics["dask_semaphore_release"].samples[0].value == 1
assert active_metrics["dask_semaphore_pending_leases"].samples[0].value == 0

await sem.close()
active_metrics = await fetch_metrics()
Expand Down
3 changes: 3 additions & 0 deletions distributed/http/worker/prometheus/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .core import PrometheusHandler

routes = [("/metrics", PrometheusHandler, {})]
Loading

0 comments on commit 42e3f22

Please sign in to comment.