Skip to content

Commit

Permalink
Refactor instrument
Browse files Browse the repository at this point in the history
Fixes #2294
  • Loading branch information
ocelotl committed Nov 29, 2021
1 parent 0855c9f commit 29f5752
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 166 deletions.
53 changes: 3 additions & 50 deletions opentelemetry-api/src/opentelemetry/_metrics/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,12 @@
# type: ignore

from abc import ABC, abstractmethod
from collections import abc as collections_abc
from logging import getLogger
from typing import (
Callable,
Generator,
Generic,
Iterable,
Optional,
TypeVar,
Union,
)
from typing import Generic, Optional, TypeVar

# pylint: disable=unused-import; needed for typing and sphinx
from opentelemetry import _metrics as metrics
from opentelemetry._metrics.measurement import Measurement

_TInstrumentCallback = Callable[[], Iterable[Measurement]]
_TInstrumentCallbackGenerator = Generator[Iterable[Measurement], None, None]
TCallback = Union[_TInstrumentCallback, _TInstrumentCallbackGenerator]
InstrumentT = TypeVar("InstrumentT", bound="Instrument")


Expand Down Expand Up @@ -87,45 +74,11 @@ class Asynchronous(Instrument):
def __init__(
self,
name,
callback: TCallback,
*args,
callback,
unit="",
description="",
**kwargs
):
super().__init__(
name, *args, unit=unit, description=description, **kwargs
)

if isinstance(callback, collections_abc.Callable):
self._callback = callback
elif isinstance(callback, collections_abc.Generator):
self._callback = self._wrap_generator_callback(callback)
# FIXME check that callback is a callable or generator

@staticmethod
def _wrap_generator_callback(
generator_callback: _TInstrumentCallbackGenerator,
) -> _TInstrumentCallback:
"""Wraps a generator style callback into a callable one"""
has_items = True

def inner() -> Iterable[Measurement]:
nonlocal has_items
if not has_items:
return []

try:
return next(generator_callback)
except StopIteration:
has_items = False
# FIXME handle the situation where the callback generator has
# run out of measurements
return []

return inner

# FIXME check that callbacks return an iterable of Measurements
super().__init__(name, unit=unit, description=description)


class _Adding(Instrument):
Expand Down
190 changes: 81 additions & 109 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,149 +12,121 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=function-redefined
# pylint: disable=dangerous-default-value
# Classes in this module use dictionaries as default arguments. This is
# considered dangerous by pylint because the default dictionary is shared by
# all instances. Implementations of these classes must not make any change to
# this default dictionary in __init__.
# pylint: disable=too-many-ancestors


from collections.abc import Callable, Generator
from typing import Iterable, Union

from opentelemetry._metrics.instrument import Asynchronous
from opentelemetry._metrics.instrument import Counter as APICounter
from opentelemetry._metrics.instrument import Histogram as APIHistogram
from opentelemetry._metrics.instrument import (
ObservableCounter as APIObservableCounter,
)
from opentelemetry._metrics.instrument import (
Counter,
Histogram,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
ObservableGauge as APIObservableGauge,
)
from opentelemetry._metrics.instrument import (
ObservableUpDownCounter as APIObservableUpDownCounter,
)
from opentelemetry._metrics.instrument import Synchronous
from opentelemetry._metrics.instrument import UpDownCounter as APIUpDownCounter
from opentelemetry.sdk._metrics.aggregation import (
ExplicitBucketHistogramAggregation,
LastValueAggregation,
SumAggregation,
)
from opentelemetry.sdk._metrics.measurement import Measurement


class _Instrument:
class _Synchronous(Synchronous):
def __init__(
self,
meter_provider,
name,
unit="",
description="",
aggregation=None,
aggregation_config={},
):
self._attributes_aggregations = {}
self._aggregation = aggregation
self._aggregation_config = aggregation_config
aggregation(self, **aggregation_config)

self._meter_provider = meter_provider

class Counter(_Instrument, Counter):
def __init__(
self,
name,
unit="",
description="",
aggregation=SumAggregation,
aggregation_config={},
):
super().__init__(
name,
unit=unit,
description=description,
aggregation=aggregation,
aggregation_config=aggregation_config,
)
super().__init__(name, unit=unit, description=description)


class UpDownCounter(_Instrument, UpDownCounter):
class _Asynchronous(Asynchronous):
def __init__(
self,
meter_provider,
name,
callback: Union[Callable, Generator],
unit="",
description="",
aggregation=SumAggregation,
aggregation_config={},
):
super().__init__(
name,
unit=unit,
description=description,
aggregation=aggregation,
aggregation_config=aggregation_config,
)
self._meter_provider = meter_provider

super().__init__(name, callback, unit=unit, description=description)

class ObservableCounter(_Instrument, ObservableCounter):
def __init__(
self,
name,
callback,
unit="",
description="",
aggregation=SumAggregation,
aggregation_config={},
):
super().__init__(
name,
unit=unit,
description=description,
aggregation=aggregation,
aggregation_config=aggregation_config,
)
self._callback = callback

if isinstance(callback, Generator):

class ObservableUpDownCounter(_Instrument, ObservableUpDownCounter):
def __init__(
self,
name,
callback,
unit="",
description="",
aggregation=SumAggregation,
aggregation_config={},
):
super().__init__(
name,
unit=unit,
description=description,
aggregation=aggregation,
aggregation_config=aggregation_config,
def inner() -> Iterable[Measurement]:
return next(callback)

self._callback = inner

@property
def callback(self) -> Union[Callable, Generator]:
return self._callback


class Counter(_Synchronous, APICounter):

_default_aggregation = SumAggregation

def add(self, amount, attributes=None):
if amount < 0:
raise Exception("amount must be non negative")

# pylint: disable=protected-access
self._meter_provider._measurement_processor.process(
self, Measurement(amount, attributes=attributes)
)


class Histogram(_Instrument, Histogram):
def __init__(
self,
name,
unit="",
description="",
aggregation=ExplicitBucketHistogramAggregation,
aggregation_config={},
):
super().__init__(
name,
unit=unit,
description=description,
aggregation=aggregation,
aggregation_config=aggregation_config,
class UpDownCounter(_Synchronous, APIUpDownCounter):

_default_aggregation = SumAggregation

def add(self, amount, attributes=None):
# pylint: disable=protected-access
self._meter_provider._measurement_processor.process(
self, Measurement(amount, attributes=attributes)
)


class ObservableGauge(_Instrument, ObservableGauge):
def __init__(
self,
name,
callback,
unit="",
description="",
aggregation=LastValueAggregation,
aggregation_config={},
):
super().__init__(
name,
unit=unit,
description=description,
aggregation=aggregation,
aggregation_config=aggregation_config,
class ObservableCounter(_Asynchronous, APIObservableCounter):

_default_aggregation = SumAggregation


class ObservableUpDownCounter(_Asynchronous, APIObservableUpDownCounter):

_default_aggregation = SumAggregation


class Histogram(_Synchronous, APIHistogram):

_default_aggregation = ExplicitBucketHistogramAggregation

def record(self, amount, attributes=None):
# pylint: disable=protected-access
self._meter_provider._measurement_processor.process(
self, Measurement(amount, attributes=attributes)
)


class ObservableGauge(_Asynchronous, APIObservableGauge):

_default_aggregation = LastValueAggregation
17 changes: 17 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class Measurement:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
import io
from typing import Generator, Iterable
from unittest import TestCase
from unittest.mock import Mock

from opentelemetry._metrics import _DefaultMeter
from opentelemetry._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.instrument import ObservableCounter

# FIXME Test that the instrument methods can be called concurrently safely.

Expand Down Expand Up @@ -61,8 +62,6 @@ class TestCpuTimeIntegration(TestCase):
]

def test_cpu_time_callback(self):
meter = _DefaultMeter("foo")

def cpu_time_callback() -> Iterable[Measurement]:
procstat = io.StringIO(self.procstat_str)
procstat.readline() # skip the first line
Expand Down Expand Up @@ -98,7 +97,8 @@ def cpu_time_callback() -> Iterable[Measurement]:
int(states[8]) // 100, {"cpu": cpu, "state": "guest_nice"}
)

observable_counter = meter.create_observable_counter(
observable_counter = ObservableCounter(
Mock(),
"system.cpu.time",
callback=cpu_time_callback,
unit="s",
Expand All @@ -108,8 +108,6 @@ def cpu_time_callback() -> Iterable[Measurement]:
self.assertEqual(measurements, self.measurements_expected)

def test_cpu_time_generator(self):
meter = _DefaultMeter("foo")

def cpu_time_generator() -> Generator[
Iterable[Measurement], None, None
]:
Expand Down Expand Up @@ -176,7 +174,8 @@ def cpu_time_generator() -> Generator[
)
yield measurements

observable_counter = meter.create_observable_counter(
observable_counter = ObservableCounter(
Mock(),
"system.cpu.time",
callback=cpu_time_generator(),
unit="s",
Expand Down

0 comments on commit 29f5752

Please sign in to comment.