Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable configuration of prometheus metrics namespace #4722

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,13 @@ properties:
export-tool:
type: boolean

prometheus:
type: object
properties:
namespace:
type: string
description: Namespace prefix to use for all prometheus metrics.

admin:
type: object
description: |
Expand Down
2 changes: 2 additions & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ distributed:
link: "{scheme}://{host}:{port}/status"
export-tool: False
graph-max-items: 5000 # maximum number of tasks to try to plot in graph view
prometheus:
namespace: "dask"

##################
# Administrative #
Expand Down
17 changes: 17 additions & 0 deletions distributed/http/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import dask.config


class PrometheusCollector:
def __init__(self, server):
self.server = server
self.namespace = dask.config.get("distributed.dashboard.prometheus.namespace")
self.subsystem = None

def build_name(self, name):
full_name = []
if self.namespace:
full_name.append(self.namespace)
if self.subsystem:
full_name.append(self.subsystem)
full_name.append(name)
return "_".join(full_name)
104 changes: 1 addition & 103 deletions distributed/http/scheduler/prometheus/__init__.py
Original file line number Diff line number Diff line change
@@ -1,105 +1,3 @@
import toolz

from distributed.http.utils import RequestHandler
from distributed.scheduler import ALL_TASK_STATES

from .semaphore import SemaphoreMetricExtension


class _PrometheusCollector:
def __init__(self, dask_server):
self.server = dask_server

def collect(self):
from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily

yield GaugeMetricFamily(
"dask_scheduler_clients",
"Number of clients connected.",
value=len([k for k in self.server.clients if k != "fire-and-forget"]),
)

yield GaugeMetricFamily(
"dask_scheduler_desired_workers",
"Number of workers scheduler needs for task graph.",
value=self.server.adaptive_target(),
)

worker_states = GaugeMetricFamily(
"dask_scheduler_workers",
"Number of workers known by scheduler.",
labels=["state"],
)
worker_states.add_metric(["connected"], len(self.server.workers))
worker_states.add_metric(["saturated"], len(self.server.saturated))
worker_states.add_metric(["idle"], len(self.server.idle))
yield worker_states

tasks = GaugeMetricFamily(
"dask_scheduler_tasks",
"Number of tasks known by scheduler.",
labels=["state"],
)

task_counter = toolz.merge_with(
sum, (tp.states for tp in self.server.task_prefixes.values())
)

suspicious_tasks = CounterMetricFamily(
"dask_scheduler_tasks_suspicious",
"Total number of times a task has been marked suspicious",
labels=["task_prefix_name"],
)

for tp in self.server.task_prefixes.values():
suspicious_tasks.add_metric([tp.name], tp.suspicious)
yield suspicious_tasks

yield CounterMetricFamily(
"dask_scheduler_tasks_forgotten",
(
"Total number of processed tasks no longer in memory and already "
"removed from the scheduler job queue. Note task groups on the "
"scheduler which have all tasks in the forgotten state are not included."
),
value=task_counter.get("forgotten", 0.0),
)

for state in ALL_TASK_STATES:
tasks.add_metric([state], task_counter.get(state, 0.0))
yield tasks


COLLECTORS = [_PrometheusCollector, SemaphoreMetricExtension]


class PrometheusHandler(RequestHandler):
_collectors = None

def __init__(self, *args, dask_server=None, **kwargs):
import prometheus_client

super().__init__(*args, dask_server=dask_server, **kwargs)

if PrometheusHandler._collectors:
# Especially during testing, multiple schedulers are started
# sequentially in the same python process
for _collector in PrometheusHandler._collectors:
_collector.server = self.server
return

PrometheusHandler._collectors = tuple(
collector(self.server) for collector in COLLECTORS
)
# Register collectors
for instantiated_collector in PrometheusHandler._collectors:
prometheus_client.REGISTRY.register(instantiated_collector)

def get(self):
import prometheus_client

self.write(prometheus_client.generate_latest())
self.set_header("Content-Type", "text/plain; version=0.0.4")

from .core import PrometheusHandler

routes = [("/metrics", PrometheusHandler, {})]
104 changes: 104 additions & 0 deletions distributed/http/scheduler/prometheus/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import toolz

from distributed.http.prometheus import PrometheusCollector
from distributed.http.utils import RequestHandler
from distributed.scheduler import ALL_TASK_STATES

from .semaphore import SemaphoreMetricCollector


class SchedulerMetricCollector(PrometheusCollector):
def __init__(self, server):
super().__init__(server)
self.subsystem = "scheduler"

def collect(self):
from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily

yield GaugeMetricFamily(
self.build_name("clients"),
"Number of clients connected.",
value=len([k for k in self.server.clients if k != "fire-and-forget"]),
)

yield GaugeMetricFamily(
self.build_name("desired_workers"),
"Number of workers scheduler needs for task graph.",
value=self.server.adaptive_target(),
)

worker_states = GaugeMetricFamily(
self.build_name("workers"),
"Number of workers known by scheduler.",
labels=["state"],
)
worker_states.add_metric(["connected"], len(self.server.workers))
worker_states.add_metric(["saturated"], len(self.server.saturated))
worker_states.add_metric(["idle"], len(self.server.idle))
yield worker_states

tasks = GaugeMetricFamily(
self.build_name("tasks"),
"Number of tasks known by scheduler.",
labels=["state"],
)

task_counter = toolz.merge_with(
sum, (tp.states for tp in self.server.task_prefixes.values())
)

suspicious_tasks = CounterMetricFamily(
self.build_name("tasks_suspicious"),
"Total number of times a task has been marked suspicious",
labels=["task_prefix_name"],
)

for tp in self.server.task_prefixes.values():
suspicious_tasks.add_metric([tp.name], tp.suspicious)
yield suspicious_tasks

yield CounterMetricFamily(
self.build_name("tasks_forgotten"),
(
"Total number of processed tasks no longer in memory and already "
"removed from the scheduler job queue. Note task groups on the "
"scheduler which have all tasks in the forgotten state are not included."
),
value=task_counter.get("forgotten", 0.0),
)

for state in ALL_TASK_STATES:
tasks.add_metric([state], task_counter.get(state, 0.0))
yield tasks


COLLECTORS = [SchedulerMetricCollector, SemaphoreMetricCollector]


class PrometheusHandler(RequestHandler):
_collectors = None

def __init__(self, *args, dask_server=None, **kwargs):
import prometheus_client

super().__init__(*args, dask_server=dask_server, **kwargs)

if PrometheusHandler._collectors:
# Especially during testing, multiple schedulers are started
# sequentially in the same python process
for _collector in PrometheusHandler._collectors:
_collector.server = self.server
return

PrometheusHandler._collectors = tuple(
collector(self.server) for collector in COLLECTORS
)
# Register collectors
for instantiated_collector in PrometheusHandler._collectors:
prometheus_client.REGISTRY.register(instantiated_collector)

def get(self):
import prometheus_client

self.write(prometheus_client.generate_latest())
self.set_header("Content-Type", "text/plain; version=0.0.4")
22 changes: 13 additions & 9 deletions distributed/http/scheduler/prometheus/semaphore.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,48 @@
class SemaphoreMetricExtension:
def __init__(self, dask_server):
self.server = dask_server
from distributed.http.prometheus import PrometheusCollector


class SemaphoreMetricCollector(PrometheusCollector):
def __init__(self, server):
super().__init__(server)
self.subsystem = "semaphore"

def collect(self):
from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily

sem_ext = self.server.extensions["semaphores"]

semaphore_max_leases_family = GaugeMetricFamily(
"semaphore_max_leases",
self.build_name("max_leases"),
"Maximum leases allowed per semaphore, this will be constant for each semaphore during its lifetime.",
labels=["name"],
)
semaphore_active_leases_family = GaugeMetricFamily(
"semaphore_active_leases",
self.build_name("active_leases"),
"Amount of currently active leases per semaphore.",
labels=["name"],
)
semaphore_pending_leases = GaugeMetricFamily(
"semaphore_pending_leases",
self.build_name("pending_leases"),
"Amount of currently pending leases per semaphore.",
labels=["name"],
)

semaphore_acquire_total = CounterMetricFamily(
"semaphore_acquire_total",
self.build_name("acquire_total"),
"Total number of leases acquired per semaphore.",
labels=["name"],
)

semaphore_release_total = CounterMetricFamily(
"semaphore_release_total",
self.build_name("release_total"),
"Total number of leases released per semaphore.\n"
"Note: if a semaphore is closed while there are still leases active, this count will not equal "
"`semaphore_acquired_total` after execution.",
labels=["name"],
)

semaphore_average_pending_lease_time = GaugeMetricFamily(
"semaphore_average_pending_lease_time",
self.build_name("average_pending_lease_time"),
"Exponential moving average of the time it took to acquire a lease per semaphore.\n"
"Note: this only includes time spent on scheduler side, "
"it does"
Expand Down
46 changes: 26 additions & 20 deletions distributed/http/scheduler/tests/test_semaphore_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ async def fetch_metrics():
families = {
family.name: family
for family in text_string_to_metric_families(txt)
if family.name.startswith("semaphore_")
if family.name.startswith("dask_semaphore_")
}
return families

active_metrics = await fetch_metrics()

expected_metrics = {
"semaphore_max_leases",
"semaphore_active_leases",
"semaphore_pending_leases",
"semaphore_acquire",
"semaphore_release",
"semaphore_average_pending_lease_time_s",
"dask_semaphore_max_leases",
"dask_semaphore_active_leases",
"dask_semaphore_pending_leases",
"dask_semaphore_acquire",
"dask_semaphore_release",
"dask_semaphore_average_pending_lease_time_s",
}

assert active_metrics.keys() == expected_metrics
Expand All @@ -48,28 +48,34 @@ async def fetch_metrics():
assert len(samples) == 1
sample = samples.pop()
assert sample.labels["name"] == "test"
if name == "semaphore_max_leases":
if name == "dask_semaphore_max_leases":
assert sample.value == 2
else:
assert sample.value == 0

assert await sem.acquire()
active_metrics = await fetch_metrics()
assert active_metrics["semaphore_max_leases"].samples[0].value == 2
assert active_metrics["semaphore_active_leases"].samples[0].value == 1
assert active_metrics["semaphore_average_pending_lease_time_s"].samples[0].value > 0
assert active_metrics["semaphore_acquire"].samples[0].value == 1
assert active_metrics["semaphore_release"].samples[0].value == 0
assert active_metrics["semaphore_pending_leases"].samples[0].value == 0
assert active_metrics["dask_semaphore_max_leases"].samples[0].value == 2
assert active_metrics["dask_semaphore_active_leases"].samples[0].value == 1
assert (
active_metrics["dask_semaphore_average_pending_lease_time_s"].samples[0].value
> 0
)
assert active_metrics["dask_semaphore_acquire"].samples[0].value == 1
assert active_metrics["dask_semaphore_release"].samples[0].value == 0
assert active_metrics["dask_semaphore_pending_leases"].samples[0].value == 0

assert await sem.release() is True
active_metrics = await fetch_metrics()
assert active_metrics["semaphore_max_leases"].samples[0].value == 2
assert active_metrics["semaphore_active_leases"].samples[0].value == 0
assert active_metrics["semaphore_average_pending_lease_time_s"].samples[0].value > 0
assert active_metrics["semaphore_acquire"].samples[0].value == 1
assert active_metrics["semaphore_release"].samples[0].value == 1
assert active_metrics["semaphore_pending_leases"].samples[0].value == 0
assert active_metrics["dask_semaphore_max_leases"].samples[0].value == 2
assert active_metrics["dask_semaphore_active_leases"].samples[0].value == 0
assert (
active_metrics["dask_semaphore_average_pending_lease_time_s"].samples[0].value
> 0
)
assert active_metrics["dask_semaphore_acquire"].samples[0].value == 1
assert active_metrics["dask_semaphore_release"].samples[0].value == 1
assert active_metrics["dask_semaphore_pending_leases"].samples[0].value == 0

await sem.close()
active_metrics = await fetch_metrics()
Expand Down
3 changes: 3 additions & 0 deletions distributed/http/worker/prometheus/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .core import PrometheusHandler

routes = [("/metrics", PrometheusHandler, {})]
Loading