Skip to content

Commit

Permalink
Add SumObserver and UpDownSumObserver instruments (#789)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen authored Jun 9, 2020
1 parent 5c17a06 commit 31e29fc
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 15 deletions.
1 change: 1 addition & 0 deletions docs/examples/basic_meter/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def get_ram_usage_callback(observer):
description="RAM memory usage",
unit="1",
value_type=float,
observer_type=ValueObserver,
label_keys=(),
)

Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-api/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
([#552](https://github.com/open-telemetry/opentelemetry-python/pull/552))
- Rename Observer to ValueObserver
([#764](https://github.com/open-telemetry/opentelemetry-python/pull/764))
- Add SumObserver and UpDownSumObserver in metrics
([#789](https://github.com/open-telemetry/opentelemetry-python/pull/789))

## 0.8b0

Expand Down
24 changes: 24 additions & 0 deletions opentelemetry-api/src/opentelemetry/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,30 @@ def observe(self, value: ValueT, labels: Dict[str, str]) -> None:
"""


class SumObserver(Observer):
"""No-op implementation of ``SumObserver``."""

def observe(self, value: ValueT, labels: Dict[str, str]) -> None:
"""Captures ``value`` to the sumobserver.
Args:
value: The value to capture to this sumobserver metric.
labels: Labels associated to ``value``.
"""


class UpDownSumObserver(Observer):
"""No-op implementation of ``UpDownSumObserver``."""

def observe(self, value: ValueT, labels: Dict[str, str]) -> None:
"""Captures ``value`` to the updownsumobserver.
Args:
value: The value to capture to this updownsumobserver metric.
labels: Labels associated to ``value``.
"""


class ValueObserver(Observer):
"""No-op implementation of ``ValueObserver``."""

Expand Down
14 changes: 13 additions & 1 deletion opentelemetry-api/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ def test_bound_valuerecorder(self):
bound_valuerecorder = metrics.BoundValueRecorder()
bound_valuerecorder.record(1)

def test_observer(self):
def test_default_observer(self):
observer = metrics.DefaultObserver()
observer.observe(1, {})

def test_sum_observer(self):
observer = metrics.SumObserver()
observer.observe(1, {})

def test_updown_sum_observer(self):
observer = metrics.UpDownSumObserver()
observer.observe(1, {})

def test_value_observer(self):
observer = metrics.ValueObserver()
observer.observe(1, {})
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
([#775](https://github.com/open-telemetry/opentelemetry-python/pull/775))
- Rename Observer to ValueObserver
([#764](https://github.com/open-telemetry/opentelemetry-python/pull/764))
- Add SumObserver, UpDownSumObserver and LastValueAggregator in metrics
([#789](https://github.com/open-telemetry/opentelemetry-python/pull/789))

## 0.8b0

Expand Down
74 changes: 60 additions & 14 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,16 @@ def record(self, value: metrics_api.ValueT) -> None:


class Metric(metrics_api.Metric):
"""Base class for all metric types.
"""Base class for all synchronous metric types.
Also known as metric instrument. This is the class that is used to
represent a metric that is to be continuously recorded and tracked. Each
metric has a set of bound metrics that are created from the metric. See
`BaseBoundInstrument` for information on bound metric instruments.
This is the class that is used to represent a metric that is to be
synchronously recorded and tracked. Synchronous instruments are called
inside a request, meaning they have an associated distributed context
(i.e. Span context, correlation context). Multiple metric events may occur
for a synchronous instrument within a give collection interval.
Each metric has a set of bound metrics that are created from the metric.
See `BaseBoundInstrument` for information on bound metric instruments.
"""

BOUND_INSTR_TYPE = BaseBoundInstrument
Expand Down Expand Up @@ -190,8 +194,14 @@ def record(
UPDATE_FUNCTION = record


class ValueObserver(metrics_api.ValueObserver):
"""See `opentelemetry.metrics.ValueObserver`."""
class Observer(metrics_api.Observer):
"""Base class for all asynchronous metric types.
Also known as Observers, observer metric instruments are asynchronous in
that they are reported by a callback, once per collection interval, and
lack context. They are permitted to report only one value per distinct
label set per period.
"""

def __init__(
self,
Expand All @@ -218,15 +228,10 @@ def __init__(
def observe(
self, value: metrics_api.ValueT, labels: Dict[str, str]
) -> None:
if not self.enabled:
return
if not isinstance(value, self.value_type):
logger.warning(
"Invalid value passed for %s.", self.value_type.__name__
)
key = get_labels_as_key(labels)
if not self._validate_observe(value, key):
return

key = get_labels_as_key(labels)
if key not in self.aggregators:
# TODO: how to cleanup aggregators?
self.aggregators[key] = self.meter.batcher.aggregator_for(
Expand All @@ -235,6 +240,20 @@ def observe(
aggregator = self.aggregators[key]
aggregator.update(value)

# pylint: disable=W0613
def _validate_observe(
self, value: metrics_api.ValueT, key: Tuple[Tuple[str, str]],
) -> bool:
if not self.enabled:
return False
if not isinstance(value, self.value_type):
logger.warning(
"Invalid value passed for %s.", self.value_type.__name__
)
return False

return True

def run(self) -> bool:
try:
self.callback(self)
Expand All @@ -252,6 +271,33 @@ def __repr__(self):
)


class SumObserver(Observer, metrics_api.SumObserver):
"""See `opentelemetry.metrics.SumObserver`."""

def _validate_observe(
self, value: metrics_api.ValueT, key: Tuple[Tuple[str, str]],
) -> bool:
if not super()._validate_observe(value, key):
return False
# Must be non-decreasing because monotonic
if (
key in self.aggregators
and self.aggregators[key].current is not None
):
if value < self.aggregators[key].current:
logger.warning("Value passed must be non-decreasing.")
return False
return True


class UpDownSumObserver(Observer, metrics_api.UpDownSumObserver):
"""See `opentelemetry.metrics.UpDownSumObserver`."""


class ValueObserver(Observer, metrics_api.ValueObserver):
"""See `opentelemetry.metrics.ValueObserver`."""


class Record:
"""Container class used for processing in the `Batcher`"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,34 @@ def merge(self, other):
)


class LastValueAggregator(Aggregator):
"""Aggregator that stores last value results."""

def __init__(self):
super().__init__()
self._lock = threading.Lock()
self.last_update_timestamp = None

def update(self, value):
with self._lock:
self.current = value
self.last_update_timestamp = time_ns()

def take_checkpoint(self):
with self._lock:
self.checkpoint = self.current
self.current = None

def merge(self, other):
last = self.checkpoint.last
self.last_update_timestamp = get_latest_timestamp(
self.last_update_timestamp, other.last_update_timestamp
)
if self.last_update_timestamp == other.last_update_timestamp:
last = other.checkpoint.last
self.checkpoint = last


class ValueObserverAggregator(Aggregator):
"""Same as MinMaxSumCount but also with last value."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
from opentelemetry.metrics import (
Counter,
InstrumentT,
SumObserver,
UpDownSumObserver,
ValueObserver,
ValueRecorder,
)
from opentelemetry.sdk.metrics.export import MetricRecord
from opentelemetry.sdk.metrics.export.aggregate import (
Aggregator,
CounterAggregator,
LastValueAggregator,
MinMaxSumCountAggregator,
ValueObserverAggregator,
)
Expand Down Expand Up @@ -54,6 +57,8 @@ def aggregator_for(self, instrument_type: Type[InstrumentT]) -> Aggregator:
# pylint:disable=R0201
if issubclass(instrument_type, Counter):
return CounterAggregator()
if issubclass(instrument_type, (SumObserver, UpDownSumObserver)):
return LastValueAggregator()
if issubclass(instrument_type, ValueRecorder):
return MinMaxSumCountAggregator()
if issubclass(instrument_type, ValueObserver):
Expand Down
132 changes: 132 additions & 0 deletions opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,138 @@ def test_record(self):
)


class TestSumObserver(unittest.TestCase):
def test_observe(self):
meter = metrics.MeterProvider().get_meter(__name__)
observer = metrics.SumObserver(
None, "name", "desc", "unit", int, meter, ("key",), True
)
labels = {"key": "value"}
key_labels = tuple(sorted(labels.items()))
values = (37, 42, 60, 100)
for val in values:
observer.observe(val, labels)

self.assertEqual(observer.aggregators[key_labels].current, values[-1])

def test_observe_disabled(self):
meter = metrics.MeterProvider().get_meter(__name__)
observer = metrics.SumObserver(
None, "name", "desc", "unit", int, meter, ("key",), False
)
labels = {"key": "value"}
observer.observe(37, labels)
self.assertEqual(len(observer.aggregators), 0)

@mock.patch("opentelemetry.sdk.metrics.logger")
def test_observe_incorrect_type(self, logger_mock):
meter = metrics.MeterProvider().get_meter(__name__)
observer = metrics.SumObserver(
None, "name", "desc", "unit", int, meter, ("key",), True
)
labels = {"key": "value"}
observer.observe(37.0, labels)
self.assertEqual(len(observer.aggregators), 0)
self.assertTrue(logger_mock.warning.called)

@mock.patch("opentelemetry.sdk.metrics.logger")
def test_observe_non_decreasing_error(self, logger_mock):
meter = metrics.MeterProvider().get_meter(__name__)
observer = metrics.SumObserver(
None, "name", "desc", "unit", int, meter, ("key",), True
)
labels = {"key": "value"}
observer.observe(37, labels)
observer.observe(14, labels)
self.assertEqual(len(observer.aggregators), 1)
self.assertTrue(logger_mock.warning.called)

def test_run(self):
meter = metrics.MeterProvider().get_meter(__name__)

callback = mock.Mock()
observer = metrics.SumObserver(
callback, "name", "desc", "unit", int, meter, (), True
)

self.assertTrue(observer.run())
callback.assert_called_once_with(observer)

@mock.patch("opentelemetry.sdk.metrics.logger")
def test_run_exception(self, logger_mock):
meter = metrics.MeterProvider().get_meter(__name__)

callback = mock.Mock()
callback.side_effect = Exception("We have a problem!")

observer = metrics.SumObserver(
callback, "name", "desc", "unit", int, meter, (), True
)

self.assertFalse(observer.run())
self.assertTrue(logger_mock.warning.called)


class TestUpDownSumObserver(unittest.TestCase):
def test_observe(self):
meter = metrics.MeterProvider().get_meter(__name__)
observer = metrics.UpDownSumObserver(
None, "name", "desc", "unit", int, meter, ("key",), True
)
labels = {"key": "value"}
key_labels = tuple(sorted(labels.items()))
values = (37, 42, 14, 30)
for val in values:
observer.observe(val, labels)

self.assertEqual(observer.aggregators[key_labels].current, values[-1])

def test_observe_disabled(self):
meter = metrics.MeterProvider().get_meter(__name__)
observer = metrics.UpDownSumObserver(
None, "name", "desc", "unit", int, meter, ("key",), False
)
labels = {"key": "value"}
observer.observe(37, labels)
self.assertEqual(len(observer.aggregators), 0)

@mock.patch("opentelemetry.sdk.metrics.logger")
def test_observe_incorrect_type(self, logger_mock):
meter = metrics.MeterProvider().get_meter(__name__)
observer = metrics.UpDownSumObserver(
None, "name", "desc", "unit", int, meter, ("key",), True
)
labels = {"key": "value"}
observer.observe(37.0, labels)
self.assertEqual(len(observer.aggregators), 0)
self.assertTrue(logger_mock.warning.called)

def test_run(self):
meter = metrics.MeterProvider().get_meter(__name__)

callback = mock.Mock()
observer = metrics.UpDownSumObserver(
callback, "name", "desc", "unit", int, meter, (), True
)

self.assertTrue(observer.run())
callback.assert_called_once_with(observer)

@mock.patch("opentelemetry.sdk.metrics.logger")
def test_run_exception(self, logger_mock):
meter = metrics.MeterProvider().get_meter(__name__)

callback = mock.Mock()
callback.side_effect = Exception("We have a problem!")

observer = metrics.UpDownSumObserver(
callback, "name", "desc", "unit", int, meter, (), True
)

self.assertFalse(observer.run())
self.assertTrue(logger_mock.warning.called)


class TestValueObserver(unittest.TestCase):
def test_observe(self):
meter = metrics.MeterProvider().get_meter(__name__)
Expand Down

0 comments on commit 31e29fc

Please sign in to comment.