Skip to content

Commit

Permalink
Prometheus histograms (#1165)
Browse files Browse the repository at this point in the history
* catch and log exceptions in the interval timer function

Instead of letting the thread die, we log the exception with
a stack trace to ease debugging.

* implement histogram metrics and a prometheus histogram bridge

The spec for histograms has been merged in
elastic/apm-server#5360

* trying to debug failure that only happens on CI...

* adapt prometheus histograms to use centroids instead of upper limit buckets

* move midpoint calculation into base metrics
  • Loading branch information
beniwohli authored Jul 19, 2021
1 parent b33f449 commit dec91b4
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 45 deletions.
2 changes: 1 addition & 1 deletion elasticapm/contrib/django/middleware/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ElasticAPMClientMiddlewareMixin(object):
@property
def client(self):
try:
app = apps.get_app_config("elasticapm.contrib.django")
app = apps.get_app_config("elasticapm")
return app.client
except LookupError:
return get_client()
Expand Down
156 changes: 120 additions & 36 deletions elasticapm/metrics/base_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def __init__(self, registry):
self._counters = {}
self._gauges = {}
self._timers = {}
self._histograms = {}
self._registry = registry
self._label_limit_logged = False

Expand Down Expand Up @@ -155,7 +156,10 @@ def timer(self, name, reset_on_collect=False, unit=None, **labels):
"""
return self._metric(self._timers, Timer, name, reset_on_collect, labels, unit)

def _metric(self, container, metric_class, name, reset_on_collect, labels, unit=None):
def histogram(self, name, reset_on_collect=False, unit=None, buckets=None, **labels):
return self._metric(self._histograms, Histogram, name, reset_on_collect, labels, unit, buckets=buckets)

def _metric(self, container, metric_class, name, reset_on_collect, labels, unit=None, **kwargs):
"""
Returns an existing or creates and returns a metric
:param container: the container for the metric
Expand All @@ -172,7 +176,10 @@ def _metric(self, container, metric_class, name, reset_on_collect, labels, unit=
if key not in container:
if any(pattern.match(name) for pattern in self._registry.ignore_patterns):
metric = noop_metric
elif len(self._gauges) + len(self._counters) + len(self._timers) >= DISTINCT_LABEL_LIMIT:
elif (
len(self._gauges) + len(self._counters) + len(self._timers) + len(self._histograms)
>= DISTINCT_LABEL_LIMIT
):
if not self._label_limit_logged:
self._label_limit_logged = True
logger.warning(
Expand All @@ -181,7 +188,7 @@ def _metric(self, container, metric_class, name, reset_on_collect, labels, unit=
)
metric = noop_metric
else:
metric = metric_class(name, reset_on_collect=reset_on_collect, unit=unit)
metric = metric_class(name, reset_on_collect=reset_on_collect, unit=unit, **kwargs)
container[key] = metric
return container[key]

Expand All @@ -202,33 +209,65 @@ def collect(self):
samples = defaultdict(dict)
if self._counters:
# iterate over a copy of the dict to avoid threading issues, see #717
for (name, labels), c in compat.iteritems(self._counters.copy()):
if c is not noop_metric:
val = c.val
if val or not c.reset_on_collect:
for (name, labels), counter in compat.iteritems(self._counters.copy()):
if counter is not noop_metric:
val = counter.val
if val or not counter.reset_on_collect:
samples[labels].update({name: {"value": val}})
if c.reset_on_collect:
c.reset()
if counter.reset_on_collect:
counter.reset()
if self._gauges:
for (name, labels), g in compat.iteritems(self._gauges.copy()):
if g is not noop_metric:
val = g.val
if val or not g.reset_on_collect:
samples[labels].update({name: {"value": val}})
if g.reset_on_collect:
g.reset()
for (name, labels), gauge in compat.iteritems(self._gauges.copy()):
if gauge is not noop_metric:
val = gauge.val
if val or not gauge.reset_on_collect:
samples[labels].update({name: {"value": val, "type": "gauge"}})
if gauge.reset_on_collect:
gauge.reset()
if self._timers:
for (name, labels), t in compat.iteritems(self._timers.copy()):
if t is not noop_metric:
val, count = t.val
if val or not t.reset_on_collect:
for (name, labels), timer in compat.iteritems(self._timers.copy()):
if timer is not noop_metric:
val, count = timer.val
if val or not timer.reset_on_collect:
sum_name = ".sum"
if t._unit:
sum_name += "." + t._unit
if timer._unit:
sum_name += "." + timer._unit
samples[labels].update({name + sum_name: {"value": val}})
samples[labels].update({name + ".count": {"value": count}})
if t.reset_on_collect:
t.reset()
if timer.reset_on_collect:
timer.reset()
if self._histograms:
for (name, labels), histo in compat.iteritems(self._histograms.copy()):
if histo is not noop_metric:
counts = histo.val
if counts or not histo.reset_on_collect:
# For the bucket values, we follow the approach described by Prometheus's
# histogram_quantile function
# (https://prometheus.io/docs/prometheus/latest/querying/functions/#histogram_quantile)
# to achieve consistent percentile aggregation results:
#
# "The histogram_quantile() function interpolates quantile values by assuming a linear
# distribution within a bucket. (...) If a quantile is located in the highest bucket,
# the upper bound of the second highest bucket is returned. A lower limit of the lowest
# bucket is assumed to be 0 if the upper bound of that bucket is greater than 0. In that
# case, the usual linear interpolation is applied within that bucket. Otherwise, the upper
# bound of the lowest bucket is returned for quantiles located in the lowest bucket."
bucket_midpoints = []
for i, bucket_le in enumerate(histo.buckets):
if i == 0:
if bucket_le > 0:
bucket_le /= 2.0
elif i == len(histo.buckets) - 1:
bucket_le = histo.buckets[i - 1]
else:
bucket_le = histo.buckets[i - 1] + (bucket_le - histo.buckets[i - 1]) / 2.0
bucket_midpoints.append(bucket_le)
samples[labels].update(
{name: {"counts": counts, "values": bucket_midpoints, "type": "histogram"}}
)
if histo.reset_on_collect:
histo.reset()

if samples:
for labels, sample in compat.iteritems(samples):
result = {"samples": sample, "timestamp": timestamp}
Expand Down Expand Up @@ -263,8 +302,16 @@ def before_yield(self, data):
return data


class Counter(object):
__slots__ = ("name", "_lock", "_initial_value", "_val", "reset_on_collect")
class BaseMetric(object):
__slots__ = ("name", "reset_on_collect")

def __init__(self, name, reset_on_collect=False, **kwargs):
self.name = name
self.reset_on_collect = reset_on_collect


class Counter(BaseMetric):
__slots__ = BaseMetric.__slots__ + ("_lock", "_initial_value", "_val")

def __init__(self, name, initial_value=0, reset_on_collect=False, unit=None):
"""
Expand All @@ -273,10 +320,9 @@ def __init__(self, name, initial_value=0, reset_on_collect=False, unit=None):
:param initial_value: initial value of the counter, defaults to 0
:param unit: unit of the observed counter. Unused for counters
"""
self.name = name
self._lock = threading.Lock()
self._val = self._initial_value = initial_value
self.reset_on_collect = reset_on_collect
super(Counter, self).__init__(name, reset_on_collect=reset_on_collect)

def inc(self, delta=1):
"""
Expand Down Expand Up @@ -318,18 +364,17 @@ def val(self, value):
self._val = value


class Gauge(object):
__slots__ = ("name", "_val", "reset_on_collect")
class Gauge(BaseMetric):
__slots__ = BaseMetric.__slots__ + ("_val",)

def __init__(self, name, reset_on_collect=False, unit=None):
"""
Creates a new gauge
:param name: label of the gauge
:param unit of the observed gauge. Unused for gauges
"""
self.name = name
self._val = None
self.reset_on_collect = reset_on_collect
super(Gauge, self).__init__(name, reset_on_collect=reset_on_collect)

@property
def val(self):
Expand All @@ -343,16 +388,15 @@ def reset(self):
self._val = 0


class Timer(object):
__slots__ = ("name", "_val", "_count", "_lock", "_unit", "reset_on_collect")
class Timer(BaseMetric):
__slots__ = BaseMetric.__slots__ + ("_val", "_count", "_lock", "_unit")

def __init__(self, name=None, reset_on_collect=False, unit=None):
self.name = name
self._val = 0
self._count = 0
self._unit = unit
self._lock = threading.Lock()
self.reset_on_collect = reset_on_collect
super(Timer, self).__init__(name, reset_on_collect=reset_on_collect)

def update(self, duration, count=1):
with self._lock:
Expand All @@ -375,6 +419,46 @@ def val(self, value):
self._val, self._count = value


class Histogram(BaseMetric):
DEFAULT_BUCKETS = (0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10, float("inf"))

__slots__ = BaseMetric.__slots__ + ("_lock", "_buckets", "_counts", "_lock", "_unit")

def __init__(self, name=None, reset_on_collect=False, unit=None, buckets=None):
self._lock = threading.Lock()
self._buckets = buckets or Histogram.DEFAULT_BUCKETS
if self._buckets[-1] != float("inf"):
self._buckets.append(float("inf"))
self._counts = [0] * len(self._buckets)
self._unit = unit
super(Histogram, self).__init__(name, reset_on_collect=reset_on_collect)

def update(self, value, count=1):
pos = 0
while value > self._buckets[pos]:
pos += 1
with self._lock:
self._counts[pos] += count

@property
def val(self):
with self._lock:
return self._counts

@val.setter
def val(self, value):
with self._lock:
self._counts = value

@property
def buckets(self):
return self._buckets

def reset(self):
with self._lock:
self._counts = [0] * len(self._buckets)


class NoopMetric(object):
"""
A no-op metric that implements the "interface" of both Counter and Gauge.
Expand Down
43 changes: 38 additions & 5 deletions elasticapm/metrics/sets/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ def before_collect(self):
metric_type = self.METRIC_MAP.get(metric.type, None)
if not metric_type:
continue
metric_type(self, metric.name, metric.samples)
metric_type(self, metric.name, metric.samples, metric.unit)

def _prom_counter_handler(self, name, samples):
def _prom_counter_handler(self, name, samples, unit):
# Counters can be converted 1:1 from Prometheus to our
# format. Each pair of samples represents a distinct labelset for a
# given name. The pair consists of the value, and a "created" timestamp.
Expand All @@ -58,7 +58,7 @@ def _prom_counter_handler(self, name, samples):
self._registry.client.config.prometheus_metrics_prefix + name, **total_sample.labels
).val = total_sample.value

def _prom_gauge_handler(self, name, samples):
def _prom_gauge_handler(self, name, samples, unit):
# Counters can be converted 1:1 from Prometheus to our
# format. Each sample represents a distinct labelset for a
# given name
Expand All @@ -67,7 +67,7 @@ def _prom_gauge_handler(self, name, samples):
self._registry.client.config.prometheus_metrics_prefix + name, **sample.labels
).val = sample.value

def _prom_summary_handler(self, name, samples):
def _prom_summary_handler(self, name, samples, unit):
# Prometheus Summaries are analogous to our Timers, having
# a count and a sum. A prometheus summary is represented by
# three values. The list of samples for a given name can be
Expand All @@ -79,7 +79,40 @@ def _prom_summary_handler(self, name, samples):
count_sample.value,
)

METRIC_MAP = {"counter": _prom_counter_handler, "gauge": _prom_gauge_handler, "summary": _prom_summary_handler}
def _prom_histogram_handler(self, name, samples, unit):
# Prometheus histograms are structured as a series of counts
# with an "le" label. The count of each label signifies all
# observations with a lower-or-equal value with respect to
# the "le" label value.
# After the le-samples, 3 more samples follow, with an overall
# count, overall sum, and creation timestamp.
sample_pos = 0
prev_val = 0
counts = []
values = []
name = self._registry.client.config.prometheus_metrics_prefix + name
while sample_pos < len(samples):
sample = samples[sample_pos]
if "le" in sample.labels:
values.append(float(sample.labels["le"]))
counts.append(sample.value - prev_val)
prev_val = sample.value
sample_pos += 1

else:
# we reached the end of one set of buckets/values, this is the "count" sample
self.histogram(name, unit=unit, buckets=values, **sample.labels).val = counts
prev_val = 0
counts = []
values = []
sample_pos += 3 # skip sum/created samples

METRIC_MAP = {
"counter": _prom_counter_handler,
"gauge": _prom_gauge_handler,
"summary": _prom_summary_handler,
"histogram": _prom_histogram_handler,
}


def grouper(iterable, n, fillvalue=None):
Expand Down
19 changes: 19 additions & 0 deletions tests/metrics/base_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,25 @@ def test_metrics_counter(elasticapm_client):
assert data["samples"]["x"]["value"] == 0


def test_metrics_histogram(elasticapm_client):
metricset = MetricsSet(MetricsRegistry(elasticapm_client))
hist = metricset.histogram("x", buckets=[1, 10, 100])
assert len(hist.buckets) == 4

hist.update(0.3)
hist.update(1)
hist.update(5)
hist.update(20)
hist.update(100)
hist.update(1000)

data = list(metricset.collect())
assert len(data) == 1
d = data[0]
assert d["samples"]["x"]["counts"] == [2, 1, 2, 1]
assert d["samples"]["x"]["values"] == [0.5, 5.5, 55.0, 100]


def test_metrics_labels(elasticapm_client):
metricset = MetricsSet(MetricsRegistry(elasticapm_client))
metricset.counter("x", mylabel="a").inc()
Expand Down
Loading

0 comments on commit dec91b4

Please sign in to comment.