Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3871 from matrix-org/erikj/in_flight_block_metrics
Browse files Browse the repository at this point in the history
Add in flight real time metrics for Measure blocks
  • Loading branch information
erikjohnston authored Sep 14, 2018
2 parents ad9198c + d0f6c1c commit 8b36528
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 4 deletions.
1 change: 1 addition & 0 deletions changelog.d/3871.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add in flight real time metrics for Measure blocks
9 changes: 6 additions & 3 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util import logcontext
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure

logger = logging.getLogger(__name__)
outbound_logger = logging.getLogger("synapse.http.outbound")
Expand Down Expand Up @@ -224,9 +225,11 @@ def _request(self, destination, method, path,
reactor=self.hs.get_reactor()
)
request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
response = yield make_deferred_yieldable(
request_deferred,
)

with Measure(self.clock, "outbound_request"):
response = yield make_deferred_yieldable(
request_deferred,
)

log_result = "%d %s" % (response.code, response.phrase,)
break
Expand Down
108 changes: 107 additions & 1 deletion synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
import logging
import os
import platform
import threading
import time

import six

import attr
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.core import REGISTRY, GaugeMetricFamily
Expand Down Expand Up @@ -68,7 +71,7 @@ def collect(self):
return

if isinstance(calls, dict):
for k, v in calls.items():
for k, v in six.iteritems(calls):
g.add_metric(k, v)
else:
g.add_metric([], calls)
Expand All @@ -87,6 +90,109 @@ def _register(self):
all_gauges[self.name] = self


class InFlightGauge(object):
"""Tracks number of things (e.g. requests, Measure blocks, etc) in flight
at any given time.
Each InFlightGauge will create a metric called `<name>_total` that counts
the number of in flight blocks, as well as a metrics for each item in the
given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the
callbacks.
Args:
name (str)
desc (str)
labels (list[str])
sub_metrics (list[str]): A list of sub metrics that the callbacks
will update.
"""

def __init__(self, name, desc, labels, sub_metrics):
self.name = name
self.desc = desc
self.labels = labels
self.sub_metrics = sub_metrics

# Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks.
self._metrics_class = attr.make_class(
"_MetricsEntry",
attrs={x: attr.ib(0) for x in sub_metrics},
slots=True,
)

# Counts number of in flight blocks for a given set of label values
self._registrations = {}

# Protects access to _registrations
self._lock = threading.Lock()

self._register_with_collector()

def register(self, key, callback):
"""Registers that we've entered a new block with labels `key`.
`callback` gets called each time the metrics are collected. The same
value must also be given to `unregister`.
`callback` gets called with an object that has an attribute per
sub_metric, which should be updated with the necessary values. Note that
the metrics object is shared between all callbacks registered with the
same key.
Note that `callback` may be called on a separate thread.
"""
with self._lock:
self._registrations.setdefault(key, set()).add(callback)

def unregister(self, key, callback):
"""Registers that we've exited a block with labels `key`.
"""

with self._lock:
self._registrations.setdefault(key, set()).discard(callback)

def collect(self):
"""Called by prometheus client when it reads metrics.
Note: may be called by a separate thread.
"""
in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)

metrics_by_key = {}

# We copy so that we don't mutate the list while iterating
with self._lock:
keys = list(self._registrations)

for key in keys:
with self._lock:
callbacks = set(self._registrations[key])

in_flight.add_metric(key, len(callbacks))

metrics = self._metrics_class()
metrics_by_key[key] = metrics
for callback in callbacks:
callback(metrics)

yield in_flight

for name in self.sub_metrics:
gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
for key, metrics in six.iteritems(metrics_by_key):
gauge.add_metric(key, getattr(metrics, name))
yield gauge

def _register_with_collector(self):
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))

REGISTRY.register(self)
all_gauges[self.name] = self


#
# Detailed CPU metrics
#
Expand Down
22 changes: 22 additions & 0 deletions synapse/util/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from twisted.internet import defer

from synapse.metrics import InFlightGauge
from synapse.util.logcontext import LoggingContext

logger = logging.getLogger(__name__)
Expand All @@ -45,6 +46,13 @@
block_db_sched_duration = Counter(
"synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"])

# Tracks the number of blocks currently active
in_flight = InFlightGauge(
"synapse_util_metrics_block_in_flight", "",
labels=["block_name"],
sub_metrics=["real_time_max", "real_time_sum"],
)


def measure_func(name):
def wrapper(func):
Expand Down Expand Up @@ -82,10 +90,14 @@ def __enter__(self):

self.start_usage = self.start_context.get_resource_usage()

in_flight.register((self.name,), self._update_in_flight)

def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(exc_type, Exception) or not self.start_context:
return

in_flight.unregister((self.name,), self._update_in_flight)

duration = self.clock.time() - self.start

block_counter.labels(self.name).inc()
Expand Down Expand Up @@ -120,3 +132,13 @@ def __exit__(self, exc_type, exc_val, exc_tb):

if self.created_context:
self.start_context.__exit__(exc_type, exc_val, exc_tb)

def _update_in_flight(self, metrics):
"""Gets called when processing in flight metrics
"""
duration = self.clock.time() - self.start

metrics.real_time_max = max(metrics.real_time_max, duration)
metrics.real_time_sum += duration

# TODO: Add other in flight metrics.
81 changes: 81 additions & 0 deletions tests/test_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from synapse.metrics import InFlightGauge

from tests import unittest


class TestMauLimit(unittest.TestCase):
def test_basic(self):
gauge = InFlightGauge(
"test1", "",
labels=["test_label"],
sub_metrics=["foo", "bar"],
)

def handle1(metrics):
metrics.foo += 2
metrics.bar = max(metrics.bar, 5)

def handle2(metrics):
metrics.foo += 3
metrics.bar = max(metrics.bar, 7)

gauge.register(("key1",), handle1)

self.assert_dict({
"test1_total": {("key1",): 1},
"test1_foo": {("key1",): 2},
"test1_bar": {("key1",): 5},
}, self.get_metrics_from_gauge(gauge))

gauge.unregister(("key1",), handle1)

self.assert_dict({
"test1_total": {("key1",): 0},
"test1_foo": {("key1",): 0},
"test1_bar": {("key1",): 0},
}, self.get_metrics_from_gauge(gauge))

gauge.register(("key1",), handle1)
gauge.register(("key2",), handle2)

self.assert_dict({
"test1_total": {("key1",): 1, ("key2",): 1},
"test1_foo": {("key1",): 2, ("key2",): 3},
"test1_bar": {("key1",): 5, ("key2",): 7},
}, self.get_metrics_from_gauge(gauge))

gauge.unregister(("key2",), handle2)
gauge.register(("key1",), handle2)

self.assert_dict({
"test1_total": {("key1",): 2, ("key2",): 0},
"test1_foo": {("key1",): 5, ("key2",): 0},
"test1_bar": {("key1",): 7, ("key2",): 0},
}, self.get_metrics_from_gauge(gauge))

def get_metrics_from_gauge(self, gauge):
results = {}

for r in gauge.collect():
results[r.name] = {
tuple(labels[x] for x in gauge.labels): value
for _, labels, value in r.samples
}

return results

0 comments on commit 8b36528

Please sign in to comment.