Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add in flight real time metrics for Measure blocks #3871

Merged
merged 5 commits into from
Sep 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3871.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add in flight real time metrics for Measure blocks
9 changes: 6 additions & 3 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util import logcontext
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure

logger = logging.getLogger(__name__)
outbound_logger = logging.getLogger("synapse.http.outbound")
Expand Down Expand Up @@ -224,9 +225,11 @@ def _request(self, destination, method, path,
reactor=self.hs.get_reactor()
)
request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
response = yield make_deferred_yieldable(
request_deferred,
)

with Measure(self.clock, "outbound_request"):
response = yield make_deferred_yieldable(
request_deferred,
)

log_result = "%d %s" % (response.code, response.phrase,)
break
Expand Down
108 changes: 107 additions & 1 deletion synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
import logging
import os
import platform
import threading
import time

import six

import attr
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.core import REGISTRY, GaugeMetricFamily
Expand Down Expand Up @@ -68,7 +71,7 @@ def collect(self):
return

if isinstance(calls, dict):
for k, v in calls.items():
for k, v in six.iteritems(calls):
g.add_metric(k, v)
else:
g.add_metric([], calls)
Expand All @@ -87,6 +90,109 @@ def _register(self):
all_gauges[self.name] = self


class InFlightGauge(object):
"""Tracks number of things (e.g. requests, Measure blocks, etc) in flight
at any given time.

Each InFlightGauge will create a metric called `<name>_total` that counts
the number of in flight blocks, as well as a metrics for each item in the
given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the
callbacks.

Args:
name (str)
desc (str)
labels (list[str])
sub_metrics (list[str]): A list of sub metrics that the callbacks
will update.
"""

def __init__(self, name, desc, labels, sub_metrics):
self.name = name
self.desc = desc
self.labels = labels
self.sub_metrics = sub_metrics

# Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks.
self._metrics_class = attr.make_class(
"_MetricsEntry",
attrs={x: attr.ib(0) for x in sub_metrics},
slots=True,
)

# Counts number of in flight blocks for a given set of label values
self._registrations = {}

# Protects access to _registrations
self._lock = threading.Lock()

self._register_with_collector()

def register(self, key, callback):
"""Registers that we've entered a new block with labels `key`.

`callback` gets called each time the metrics are collected. The same
value must also be given to `unregister`.

`callback` gets called with an object that has an attribute per
sub_metric, which should be updated with the necessary values. Note that
the metrics object is shared between all callbacks registered with the
same key.

Note that `callback` may be called on a separate thread.
"""
with self._lock:
self._registrations.setdefault(key, set()).add(callback)

def unregister(self, key, callback):
"""Registers that we've exited a block with labels `key`.
"""

with self._lock:
self._registrations.setdefault(key, set()).discard(callback)

def collect(self):
"""Called by prometheus client when it reads metrics.

Note: may be called by a separate thread.
"""
in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)

metrics_by_key = {}

# We copy so that we don't mutate the list while iterating
with self._lock:
keys = list(self._registrations)

for key in keys:
with self._lock:
callbacks = set(self._registrations[key])

in_flight.add_metric(key, len(callbacks))

metrics = self._metrics_class()
metrics_by_key[key] = metrics
for callback in callbacks:
callback(metrics)

yield in_flight

for name in self.sub_metrics:
gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
for key, metrics in six.iteritems(metrics_by_key):
gauge.add_metric(key, getattr(metrics, name))
yield gauge

def _register_with_collector(self):
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))

REGISTRY.register(self)
all_gauges[self.name] = self


#
# Detailed CPU metrics
#
Expand Down
22 changes: 22 additions & 0 deletions synapse/util/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from twisted.internet import defer

from synapse.metrics import InFlightGauge
from synapse.util.logcontext import LoggingContext

logger = logging.getLogger(__name__)
Expand All @@ -45,6 +46,13 @@
block_db_sched_duration = Counter(
"synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"])

# Tracks the number of blocks currently active
in_flight = InFlightGauge(
"synapse_util_metrics_block_in_flight", "",
labels=["block_name"],
sub_metrics=["real_time_max", "real_time_sum"],
)


def measure_func(name):
def wrapper(func):
Expand Down Expand Up @@ -82,10 +90,14 @@ def __enter__(self):

self.start_usage = self.start_context.get_resource_usage()

in_flight.register((self.name,), self._update_in_flight)

def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(exc_type, Exception) or not self.start_context:
return

in_flight.unregister((self.name,), self._update_in_flight)

duration = self.clock.time() - self.start

block_counter.labels(self.name).inc()
Expand Down Expand Up @@ -120,3 +132,13 @@ def __exit__(self, exc_type, exc_val, exc_tb):

if self.created_context:
self.start_context.__exit__(exc_type, exc_val, exc_tb)

def _update_in_flight(self, metrics):
"""Gets called when processing in flight metrics
"""
duration = self.clock.time() - self.start

metrics.real_time_max = max(metrics.real_time_max, duration)
metrics.real_time_sum += duration

# TODO: Add other in flight metrics.
81 changes: 81 additions & 0 deletions tests/test_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from synapse.metrics import InFlightGauge

from tests import unittest


class TestMauLimit(unittest.TestCase):
def test_basic(self):
gauge = InFlightGauge(
"test1", "",
labels=["test_label"],
sub_metrics=["foo", "bar"],
)

def handle1(metrics):
metrics.foo += 2
metrics.bar = max(metrics.bar, 5)

def handle2(metrics):
metrics.foo += 3
metrics.bar = max(metrics.bar, 7)

gauge.register(("key1",), handle1)

self.assert_dict({
"test1_total": {("key1",): 1},
"test1_foo": {("key1",): 2},
"test1_bar": {("key1",): 5},
}, self.get_metrics_from_gauge(gauge))

gauge.unregister(("key1",), handle1)

self.assert_dict({
"test1_total": {("key1",): 0},
"test1_foo": {("key1",): 0},
"test1_bar": {("key1",): 0},
}, self.get_metrics_from_gauge(gauge))

gauge.register(("key1",), handle1)
gauge.register(("key2",), handle2)

self.assert_dict({
"test1_total": {("key1",): 1, ("key2",): 1},
"test1_foo": {("key1",): 2, ("key2",): 3},
"test1_bar": {("key1",): 5, ("key2",): 7},
}, self.get_metrics_from_gauge(gauge))

gauge.unregister(("key2",), handle2)
gauge.register(("key1",), handle2)

self.assert_dict({
"test1_total": {("key1",): 2, ("key2",): 0},
"test1_foo": {("key1",): 5, ("key2",): 0},
"test1_bar": {("key1",): 7, ("key2",): 0},
}, self.get_metrics_from_gauge(gauge))

def get_metrics_from_gauge(self, gauge):
results = {}

for r in gauge.collect():
results[r.name] = {
tuple(labels[x] for x in gauge.labels): value
for _, labels, value in r.samples
}

return results