From dec91b45d7a2bbe34202e5d869a13769e7a85de0 Mon Sep 17 00:00:00 2001 From: Benjamin Wohlwend Date: Mon, 19 Jul 2021 13:37:17 +0200 Subject: [PATCH] Prometheus histograms (#1165) * catch and log exceptions in the interval timer function Instead of letting the thread die, we log the exception with a stack trace to ease debugging. * implement histogram metrics and a prometheus histogram bridge The spec for histograms has been merged in https://github.com/elastic/apm-server/pull/5360 * trying to debug failure that only happens on CI... * adapt prometheus histograms to use centroids instead of upper limit buckets * move midpoint calculation into base metrics --- .../contrib/django/middleware/__init__.py | 2 +- elasticapm/metrics/base_metrics.py | 156 ++++++++++++++---- elasticapm/metrics/sets/prometheus.py | 43 ++++- tests/metrics/base_tests.py | 19 +++ tests/metrics/prometheus_tests.py | 43 ++++- 5 files changed, 218 insertions(+), 45 deletions(-) diff --git a/elasticapm/contrib/django/middleware/__init__.py b/elasticapm/contrib/django/middleware/__init__.py index e270caa8d..55a2b8b3d 100644 --- a/elasticapm/contrib/django/middleware/__init__.py +++ b/elasticapm/contrib/django/middleware/__init__.py @@ -67,7 +67,7 @@ class ElasticAPMClientMiddlewareMixin(object): @property def client(self): try: - app = apps.get_app_config("elasticapm.contrib.django") + app = apps.get_app_config("elasticapm") return app.client except LookupError: return get_client() diff --git a/elasticapm/metrics/base_metrics.py b/elasticapm/metrics/base_metrics.py index 393d4c8be..c3320bd8a 100644 --- a/elasticapm/metrics/base_metrics.py +++ b/elasticapm/metrics/base_metrics.py @@ -121,6 +121,7 @@ def __init__(self, registry): self._counters = {} self._gauges = {} self._timers = {} + self._histograms = {} self._registry = registry self._label_limit_logged = False @@ -155,7 +156,10 @@ def timer(self, name, reset_on_collect=False, unit=None, **labels): """ return self._metric(self._timers, Timer, name, reset_on_collect, labels, unit) - def _metric(self, container, metric_class, name, reset_on_collect, labels, unit=None): + def histogram(self, name, reset_on_collect=False, unit=None, buckets=None, **labels): + return self._metric(self._histograms, Histogram, name, reset_on_collect, labels, unit, buckets=buckets) + + def _metric(self, container, metric_class, name, reset_on_collect, labels, unit=None, **kwargs): """ Returns an existing or creates and returns a metric :param container: the container for the metric @@ -172,7 +176,10 @@ def _metric(self, container, metric_class, name, reset_on_collect, labels, unit= if key not in container: if any(pattern.match(name) for pattern in self._registry.ignore_patterns): metric = noop_metric - elif len(self._gauges) + len(self._counters) + len(self._timers) >= DISTINCT_LABEL_LIMIT: + elif ( + len(self._gauges) + len(self._counters) + len(self._timers) + len(self._histograms) + >= DISTINCT_LABEL_LIMIT + ): if not self._label_limit_logged: self._label_limit_logged = True logger.warning( @@ -181,7 +188,7 @@ def _metric(self, container, metric_class, name, reset_on_collect, labels, unit= ) metric = noop_metric else: - metric = metric_class(name, reset_on_collect=reset_on_collect, unit=unit) + metric = metric_class(name, reset_on_collect=reset_on_collect, unit=unit, **kwargs) container[key] = metric return container[key] @@ -202,33 +209,65 @@ def collect(self): samples = defaultdict(dict) if self._counters: # iterate over a copy of the dict to avoid threading issues, see #717 - for (name, labels), c in compat.iteritems(self._counters.copy()): - if c is not noop_metric: - val = c.val - if val or not c.reset_on_collect: + for (name, labels), counter in compat.iteritems(self._counters.copy()): + if counter is not noop_metric: + val = counter.val + if val or not counter.reset_on_collect: samples[labels].update({name: {"value": val}}) - if c.reset_on_collect: - c.reset() + if counter.reset_on_collect: + counter.reset() if self._gauges: - for (name, labels), g in compat.iteritems(self._gauges.copy()): - if g is not noop_metric: - val = g.val - if val or not g.reset_on_collect: - samples[labels].update({name: {"value": val}}) - if g.reset_on_collect: - g.reset() + for (name, labels), gauge in compat.iteritems(self._gauges.copy()): + if gauge is not noop_metric: + val = gauge.val + if val or not gauge.reset_on_collect: + samples[labels].update({name: {"value": val, "type": "gauge"}}) + if gauge.reset_on_collect: + gauge.reset() if self._timers: - for (name, labels), t in compat.iteritems(self._timers.copy()): - if t is not noop_metric: - val, count = t.val - if val or not t.reset_on_collect: + for (name, labels), timer in compat.iteritems(self._timers.copy()): + if timer is not noop_metric: + val, count = timer.val + if val or not timer.reset_on_collect: sum_name = ".sum" - if t._unit: - sum_name += "." + t._unit + if timer._unit: + sum_name += "." + timer._unit samples[labels].update({name + sum_name: {"value": val}}) samples[labels].update({name + ".count": {"value": count}}) - if t.reset_on_collect: - t.reset() + if timer.reset_on_collect: + timer.reset() + if self._histograms: + for (name, labels), histo in compat.iteritems(self._histograms.copy()): + if histo is not noop_metric: + counts = histo.val + if counts or not histo.reset_on_collect: + # For the bucket values, we follow the approach described by Prometheus's + # histogram_quantile function + # (https://prometheus.io/docs/prometheus/latest/querying/functions/#histogram_quantile) + # to achieve consistent percentile aggregation results: + # + # "The histogram_quantile() function interpolates quantile values by assuming a linear + # distribution within a bucket. (...) If a quantile is located in the highest bucket, + # the upper bound of the second highest bucket is returned. A lower limit of the lowest + # bucket is assumed to be 0 if the upper bound of that bucket is greater than 0. In that + # case, the usual linear interpolation is applied within that bucket. Otherwise, the upper + # bound of the lowest bucket is returned for quantiles located in the lowest bucket." + bucket_midpoints = [] + for i, bucket_le in enumerate(histo.buckets): + if i == 0: + if bucket_le > 0: + bucket_le /= 2.0 + elif i == len(histo.buckets) - 1: + bucket_le = histo.buckets[i - 1] + else: + bucket_le = histo.buckets[i - 1] + (bucket_le - histo.buckets[i - 1]) / 2.0 + bucket_midpoints.append(bucket_le) + samples[labels].update( + {name: {"counts": counts, "values": bucket_midpoints, "type": "histogram"}} + ) + if histo.reset_on_collect: + histo.reset() + if samples: for labels, sample in compat.iteritems(samples): result = {"samples": sample, "timestamp": timestamp} @@ -263,8 +302,16 @@ def before_yield(self, data): return data -class Counter(object): - __slots__ = ("name", "_lock", "_initial_value", "_val", "reset_on_collect") +class BaseMetric(object): + __slots__ = ("name", "reset_on_collect") + + def __init__(self, name, reset_on_collect=False, **kwargs): + self.name = name + self.reset_on_collect = reset_on_collect + + +class Counter(BaseMetric): + __slots__ = BaseMetric.__slots__ + ("_lock", "_initial_value", "_val") def __init__(self, name, initial_value=0, reset_on_collect=False, unit=None): """ @@ -273,10 +320,9 @@ def __init__(self, name, initial_value=0, reset_on_collect=False, unit=None): :param initial_value: initial value of the counter, defaults to 0 :param unit: unit of the observed counter. Unused for counters """ - self.name = name self._lock = threading.Lock() self._val = self._initial_value = initial_value - self.reset_on_collect = reset_on_collect + super(Counter, self).__init__(name, reset_on_collect=reset_on_collect) def inc(self, delta=1): """ @@ -318,8 +364,8 @@ def val(self, value): self._val = value -class Gauge(object): - __slots__ = ("name", "_val", "reset_on_collect") +class Gauge(BaseMetric): + __slots__ = BaseMetric.__slots__ + ("_val",) def __init__(self, name, reset_on_collect=False, unit=None): """ @@ -327,9 +373,8 @@ def __init__(self, name, reset_on_collect=False, unit=None): :param name: label of the gauge :param unit of the observed gauge. Unused for gauges """ - self.name = name self._val = None - self.reset_on_collect = reset_on_collect + super(Gauge, self).__init__(name, reset_on_collect=reset_on_collect) @property def val(self): @@ -343,16 +388,15 @@ def reset(self): self._val = 0 -class Timer(object): - __slots__ = ("name", "_val", "_count", "_lock", "_unit", "reset_on_collect") +class Timer(BaseMetric): + __slots__ = BaseMetric.__slots__ + ("_val", "_count", "_lock", "_unit") def __init__(self, name=None, reset_on_collect=False, unit=None): - self.name = name self._val = 0 self._count = 0 self._unit = unit self._lock = threading.Lock() - self.reset_on_collect = reset_on_collect + super(Timer, self).__init__(name, reset_on_collect=reset_on_collect) def update(self, duration, count=1): with self._lock: @@ -375,6 +419,46 @@ def val(self, value): self._val, self._count = value +class Histogram(BaseMetric): + DEFAULT_BUCKETS = (0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10, float("inf")) + + __slots__ = BaseMetric.__slots__ + ("_lock", "_buckets", "_counts", "_lock", "_unit") + + def __init__(self, name=None, reset_on_collect=False, unit=None, buckets=None): + self._lock = threading.Lock() + self._buckets = buckets or Histogram.DEFAULT_BUCKETS + if self._buckets[-1] != float("inf"): + self._buckets.append(float("inf")) + self._counts = [0] * len(self._buckets) + self._unit = unit + super(Histogram, self).__init__(name, reset_on_collect=reset_on_collect) + + def update(self, value, count=1): + pos = 0 + while value > self._buckets[pos]: + pos += 1 + with self._lock: + self._counts[pos] += count + + @property + def val(self): + with self._lock: + return self._counts + + @val.setter + def val(self, value): + with self._lock: + self._counts = value + + @property + def buckets(self): + return self._buckets + + def reset(self): + with self._lock: + self._counts = [0] * len(self._buckets) + + class NoopMetric(object): """ A no-op metric that implements the "interface" of both Counter and Gauge. diff --git a/elasticapm/metrics/sets/prometheus.py b/elasticapm/metrics/sets/prometheus.py index 894de159c..2dffce812 100644 --- a/elasticapm/metrics/sets/prometheus.py +++ b/elasticapm/metrics/sets/prometheus.py @@ -46,9 +46,9 @@ def before_collect(self): metric_type = self.METRIC_MAP.get(metric.type, None) if not metric_type: continue - metric_type(self, metric.name, metric.samples) + metric_type(self, metric.name, metric.samples, metric.unit) - def _prom_counter_handler(self, name, samples): + def _prom_counter_handler(self, name, samples, unit): # Counters can be converted 1:1 from Prometheus to our # format. Each pair of samples represents a distinct labelset for a # given name. The pair consists of the value, and a "created" timestamp. @@ -58,7 +58,7 @@ def _prom_counter_handler(self, name, samples): self._registry.client.config.prometheus_metrics_prefix + name, **total_sample.labels ).val = total_sample.value - def _prom_gauge_handler(self, name, samples): + def _prom_gauge_handler(self, name, samples, unit): # Counters can be converted 1:1 from Prometheus to our # format. Each sample represents a distinct labelset for a # given name @@ -67,7 +67,7 @@ def _prom_gauge_handler(self, name, samples): self._registry.client.config.prometheus_metrics_prefix + name, **sample.labels ).val = sample.value - def _prom_summary_handler(self, name, samples): + def _prom_summary_handler(self, name, samples, unit): # Prometheus Summaries are analogous to our Timers, having # a count and a sum. A prometheus summary is represented by # three values. The list of samples for a given name can be @@ -79,7 +79,40 @@ def _prom_summary_handler(self, name, samples): count_sample.value, ) - METRIC_MAP = {"counter": _prom_counter_handler, "gauge": _prom_gauge_handler, "summary": _prom_summary_handler} + def _prom_histogram_handler(self, name, samples, unit): + # Prometheus histograms are structured as a series of counts + # with an "le" label. The count of each label signifies all + # observations with a lower-or-equal value with respect to + # the "le" label value. + # After the le-samples, 3 more samples follow, with an overall + # count, overall sum, and creation timestamp. + sample_pos = 0 + prev_val = 0 + counts = [] + values = [] + name = self._registry.client.config.prometheus_metrics_prefix + name + while sample_pos < len(samples): + sample = samples[sample_pos] + if "le" in sample.labels: + values.append(float(sample.labels["le"])) + counts.append(sample.value - prev_val) + prev_val = sample.value + sample_pos += 1 + + else: + # we reached the end of one set of buckets/values, this is the "count" sample + self.histogram(name, unit=unit, buckets=values, **sample.labels).val = counts + prev_val = 0 + counts = [] + values = [] + sample_pos += 3 # skip sum/created samples + + METRIC_MAP = { + "counter": _prom_counter_handler, + "gauge": _prom_gauge_handler, + "summary": _prom_summary_handler, + "histogram": _prom_histogram_handler, + } def grouper(iterable, n, fillvalue=None): diff --git a/tests/metrics/base_tests.py b/tests/metrics/base_tests.py index 36e4b3223..36c925bec 100644 --- a/tests/metrics/base_tests.py +++ b/tests/metrics/base_tests.py @@ -86,6 +86,25 @@ def test_metrics_counter(elasticapm_client): assert data["samples"]["x"]["value"] == 0 +def test_metrics_histogram(elasticapm_client): + metricset = MetricsSet(MetricsRegistry(elasticapm_client)) + hist = metricset.histogram("x", buckets=[1, 10, 100]) + assert len(hist.buckets) == 4 + + hist.update(0.3) + hist.update(1) + hist.update(5) + hist.update(20) + hist.update(100) + hist.update(1000) + + data = list(metricset.collect()) + assert len(data) == 1 + d = data[0] + assert d["samples"]["x"]["counts"] == [2, 1, 2, 1] + assert d["samples"]["x"]["values"] == [0.5, 5.5, 55.0, 100] + + def test_metrics_labels(elasticapm_client): metricset = MetricsSet(MetricsRegistry(elasticapm_client)) metricset.counter("x", mylabel="a").inc() diff --git a/tests/metrics/prometheus_tests.py b/tests/metrics/prometheus_tests.py index 756465e21..0308c2cf9 100644 --- a/tests/metrics/prometheus_tests.py +++ b/tests/metrics/prometheus_tests.py @@ -45,7 +45,14 @@ prometheus_client.REGISTRY.unregister(prometheus_client.GC_COLLECTOR) -def test_counter(elasticapm_client): +@pytest.fixture() +def prometheus(): + # reset registry + prometheus_client.REGISTRY._collector_to_names = {} + prometheus_client.REGISTRY._names_to_collectors = {} + + +def test_counter(elasticapm_client, prometheus): metricset = PrometheusMetrics(MetricsRegistry(elasticapm_client)) counter = prometheus_client.Counter("a_bare_counter", "Bare counter") counter_with_labels = prometheus_client.Counter( @@ -64,7 +71,7 @@ def test_counter(elasticapm_client): assert data[2]["tags"] == {"alabel": "bar", "anotherlabel": "bazzinga"} -def test_gauge(elasticapm_client): +def test_gauge(elasticapm_client, prometheus): metricset = PrometheusMetrics(MetricsRegistry(elasticapm_client)) gauge = prometheus_client.Gauge("a_bare_gauge", "Bare gauge") gauge_with_labels = prometheus_client.Gauge("gauge_with_labels", "Gauge with labels", ["alabel", "anotherlabel"]) @@ -81,7 +88,7 @@ def test_gauge(elasticapm_client): assert data[2]["tags"] == {"alabel": "bar", "anotherlabel": "bazzinga"} -def test_summary(elasticapm_client): +def test_summary(elasticapm_client, prometheus): metricset = PrometheusMetrics(MetricsRegistry(elasticapm_client)) summary = prometheus_client.Summary("a_bare_summary", "Bare summary") summary_with_labels = prometheus_client.Summary( @@ -104,3 +111,33 @@ def test_summary(elasticapm_client): assert data[2]["samples"]["prometheus.metrics.summary_with_labels.count"]["value"] == 1.0 assert data[2]["samples"]["prometheus.metrics.summary_with_labels.sum"]["value"] == 11.0 assert data[2]["tags"] == {"alabel": "bar", "anotherlabel": "bazzinga"} + + +def test_histogram(elasticapm_client, prometheus): + metricset = PrometheusMetrics(MetricsRegistry(elasticapm_client)) + histo = prometheus_client.Histogram("histo", "test histogram", buckets=[1, 10, 100, float("inf")]) + histo_with_labels = prometheus_client.Histogram( + "histowithlabel", "test histogram with labels", ["alabel", "anotherlabel"], buckets=[1, 10, 100, float("inf")] + ) + histo.observe(0.5) + histo.observe(0.6) + histo.observe(1.5) + histo.observe(26) + histo.observe(42) + histo.observe(12) + histo.observe(105) + histo_with_labels.labels(alabel="foo", anotherlabel="baz").observe(1) + histo_with_labels.labels(alabel="foo", anotherlabel="baz").observe(10) + histo_with_labels.labels(alabel="foo", anotherlabel="baz").observe(100) + histo_with_labels.labels(alabel="foo", anotherlabel="bazzinga").observe(1000) + data = list(metricset.collect()) + assert data[0]["samples"]["prometheus.metrics.histo"]["values"] == [0.5, 5.5, 55.0, 100.0] + assert data[0]["samples"]["prometheus.metrics.histo"]["counts"] == [2, 1, 3, 1] + + assert data[1]["samples"]["prometheus.metrics.histowithlabel"]["values"] == [0.5, 5.5, 55.0, 100.0] + assert data[1]["samples"]["prometheus.metrics.histowithlabel"]["counts"] == [1, 1, 1, 0] + assert data[1]["tags"] == {"alabel": "foo", "anotherlabel": "baz"} + + assert data[2]["samples"]["prometheus.metrics.histowithlabel"]["values"] == [0.5, 5.5, 55.0, 100.0] + assert data[2]["samples"]["prometheus.metrics.histowithlabel"]["counts"] == [0, 0, 0, 1] + assert data[2]["tags"] == {"alabel": "foo", "anotherlabel": "bazzinga"}