Skip to content

Commit

Permalink
Implement metric reader default aggregation controls
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl committed Apr 26, 2022
1 parent ebdc101 commit 4b0317e
Show file tree
Hide file tree
Showing 10 changed files with 276 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
from typing import TYPE_CHECKING, Dict, Iterable

from opentelemetry.sdk._metrics.aggregation import (
DefaultAggregation,
_Aggregation,
_AggregationFactory,
_convert_aggregation_temporality,
_PointVarT,
)
Expand All @@ -39,13 +41,15 @@ def __init__(
view: View,
instrument: "_Instrument",
sdk_config: SdkConfiguration,
instrument_class_aggregation: Dict[type, _AggregationFactory],
):
self._view = view
self._instrument = instrument
self._sdk_config = sdk_config
self._attributes_aggregation: Dict[frozenset, _Aggregation] = {}
self._attributes_previous_point: Dict[frozenset, _PointVarT] = {}
self._lock = Lock()
self._instrument_class_aggregation = instrument_class_aggregation

# pylint: disable=protected-access
def consume_measurement(self, measurement: Measurement) -> None:
Expand All @@ -67,11 +71,19 @@ def consume_measurement(self, measurement: Measurement) -> None:
if attributes not in self._attributes_aggregation:
with self._lock:
if attributes not in self._attributes_aggregation:
self._attributes_aggregation[
attributes
] = self._view._aggregation._create_aggregation(
self._instrument
)
if not isinstance(
self._view._aggregation, DefaultAggregation
):
aggregation = (
self._view._aggregation._create_aggregation(
self._instrument
)
)
else:
aggregation = self._instrument_class_aggregation[
self._instrument.__class__
]._create_aggregation(self._instrument)
self._attributes_aggregation[attributes] = aggregation

self._attributes_aggregation[attributes].aggregate(measurement)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
from os import environ, linesep
from sys import stdout
from threading import Event, RLock, Thread
from typing import IO, Callable, Iterable, List, Optional, Sequence
from typing import IO, Callable, Dict, Iterable, List, Optional, Sequence

from opentelemetry.context import (
_SUPPRESS_INSTRUMENTATION_KEY,
attach,
detach,
set_value,
)
from opentelemetry.sdk._metrics.aggregation import _AggregationFactory
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
from opentelemetry.util._once import Once
Expand Down Expand Up @@ -108,9 +109,13 @@ class InMemoryMetricReader(MetricReader):

def __init__(
self,
preferred_aggregation: Dict[type, _AggregationFactory] = None,
preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE,
) -> None:
super().__init__(preferred_temporality=preferred_temporality)
super().__init__(
preferred_temporality=preferred_temporality,
preferred_aggregation=preferred_aggregation,
)
self._lock = RLock()
self._metrics: List[Metric] = []

Expand Down Expand Up @@ -139,10 +144,14 @@ class PeriodicExportingMetricReader(MetricReader):
def __init__(
self,
exporter: MetricExporter,
preferred_aggregation: Dict[type, _AggregationFactory] = None,
export_interval_millis: Optional[float] = None,
export_timeout_millis: Optional[float] = None,
) -> None:
super().__init__(preferred_temporality=exporter.preferred_temporality)
super().__init__(
preferred_temporality=exporter.preferred_temporality,
preferred_aggregation=preferred_aggregation,
)
self._exporter = exporter
if export_interval_millis is None:
try:
Expand Down
17 changes: 11 additions & 6 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

# pylint: disable=too-many-ancestors

import logging
from typing import Dict, Generator, Iterable, Optional, Union
from logging import getLogger
from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Union

from opentelemetry._metrics.instrument import CallbackT
from opentelemetry._metrics.instrument import Counter as APICounter
Expand All @@ -31,18 +31,23 @@
)
from opentelemetry._metrics.instrument import UpDownCounter as APIUpDownCounter
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.measurement_consumer import MeasurementConsumer
from opentelemetry.sdk.util.instrumentation import InstrumentationScope

_logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from opentelemetry.sdk._metrics.measurement_consumer import (
MeasurementConsumer,
)


_logger = getLogger(__name__)


class _Synchronous:
def __init__(
self,
name: str,
instrumentation_scope: InstrumentationScope,
measurement_consumer: MeasurementConsumer,
measurement_consumer: "MeasurementConsumer",
unit: str = "",
description: str = "",
):
Expand All @@ -59,7 +64,7 @@ def __init__(
self,
name: str,
instrumentation_scope: InstrumentationScope,
measurement_consumer: MeasurementConsumer,
measurement_consumer: "MeasurementConsumer",
callbacks: Optional[Iterable[CallbackT]] = None,
unit: str = "",
description: str = "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ def __init__(self, sdk_config: SdkConfiguration) -> None:
self._sdk_config = sdk_config
# should never be mutated
self._reader_storages: Mapping[MetricReader, MetricReaderStorage] = {
reader: MetricReaderStorage(sdk_config)
reader: MetricReaderStorage(
sdk_config, reader._instrument_class_aggregation
)
for reader in sdk_config.metric_readers
}
self._async_instruments: List["_Asynchronous"] = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,70 @@

import logging
from abc import ABC, abstractmethod
from typing import Callable, Iterable
from typing import Callable, Dict, Iterable

from typing_extensions import final

from opentelemetry.sdk._metrics.aggregation import (
DefaultAggregation,
_AggregationFactory,
)
from opentelemetry.sdk._metrics.instrument import (
Counter,
Histogram,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
)
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric

_logger = logging.getLogger(__name__)


class MetricReader(ABC):
"""
Base class for all metric readers
Args:
preferred_aggregation: A mapping between instrument classes and
aggregation instances. By default maps all instrument classes to an
instance of `DefaultAggregation`. This mapping will be used to
define the default aggregation of every instrument class. If the
user wants to make a change in the default aggregation of an
instrument class, it is enough to pass here a dictionary whose keys
are the instrument classes and the values are the corresponding
desired aggregation for the instrument classes that the user wants
to change, not necessarily all of them. The classes not included in
the passed dictionary will retain their association to their
default aggregations. The aggregation defined here will be
overriden by an aggregation defined by a view that is not
`DefaultAggregation`.
.. document protected _receive_metrics which is a intended to be overriden by subclass
.. automethod:: _receive_metrics
"""

def __init__(
self,
preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE,
preferred_aggregation: Dict[type, _AggregationFactory] = None,
) -> None:
self._collect: Callable[
["MetricReader", AggregationTemporality], Iterable[Metric]
] = None

self._preferred_temporality = preferred_temporality
self._instrument_class_aggregation = {
Counter: DefaultAggregation(),
UpDownCounter: DefaultAggregation(),
Histogram: DefaultAggregation(),
ObservableCounter: DefaultAggregation(),
ObservableUpDownCounter: DefaultAggregation(),
ObservableGauge: DefaultAggregation(),
}

self._instrument_class_aggregation.update(preferred_aggregation or {})

@final
def collect(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
from opentelemetry.sdk._metrics._view_instrument_match import (
_ViewInstrumentMatch,
)
from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
from opentelemetry.sdk._metrics.aggregation import (
AggregationTemporality,
_AggregationFactory,
)
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import Metric
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration
Expand All @@ -31,12 +34,17 @@
class MetricReaderStorage:
"""The SDK's storage for a given reader"""

def __init__(self, sdk_config: SdkConfiguration) -> None:
def __init__(
self,
sdk_config: SdkConfiguration,
instrument_class_aggregation: Dict[type, _AggregationFactory],
) -> None:
self._lock = RLock()
self._sdk_config = sdk_config
self._view_instrument_match: Dict[
Instrument, List[_ViewInstrumentMatch]
] = {}
self._instrument_class_aggregation = instrument_class_aggregation

def _get_or_init_view_instrument_match(
self, instrument: Instrument
Expand All @@ -62,6 +70,9 @@ def _get_or_init_view_instrument_match(
view=view,
instrument=instrument,
sdk_config=self._sdk_config,
instrument_class_aggregation=(
self._instrument_class_aggregation
),
)
)

Expand All @@ -72,6 +83,9 @@ def _get_or_init_view_instrument_match(
view=_DEFAULT_VIEW,
instrument=instrument,
sdk_config=self._sdk_config,
instrument_class_aggregation=(
self._instrument_class_aggregation
),
)
)
self._view_instrument_match[instrument] = view_instrument_matches
Expand Down
95 changes: 95 additions & 0 deletions opentelemetry-sdk/tests/metrics/test_metric_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict
from unittest import TestCase

from opentelemetry.sdk._metrics.aggregation import (
AggregationTemporality,
DefaultAggregation,
LastValueAggregation,
_AggregationFactory,
)
from opentelemetry.sdk._metrics.instrument import (
Counter,
Histogram,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
)
from opentelemetry.sdk._metrics.metric_reader import MetricReader


class DummyMetricReader(MetricReader):
def __init__(
self,
preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE,
preferred_aggregation: Dict[type, _AggregationFactory] = None,
) -> None:
super().__init__(
preferred_temporality=preferred_temporality,
preferred_aggregation=preferred_aggregation,
)

def _receive_metrics(self, metrics):
pass

def shutdown(self):
return True


class TestMetricReader(TestCase):
def test_default_temporality(self):

dummy_metric_reader = DummyMetricReader()

self.assertEqual(
dummy_metric_reader._instrument_class_aggregation.keys(),
set(
[
Counter,
UpDownCounter,
Histogram,
ObservableCounter,
ObservableUpDownCounter,
ObservableGauge,
]
),
)
for (
value
) in dummy_metric_reader._instrument_class_aggregation.values():
self.assertIsInstance(value, DefaultAggregation)

dummy_metric_reader = DummyMetricReader(
preferred_aggregation={Counter: LastValueAggregation()}
)
self.assertEqual(
dummy_metric_reader._instrument_class_aggregation.keys(),
set(
[
Counter,
UpDownCounter,
Histogram,
ObservableCounter,
ObservableUpDownCounter,
ObservableGauge,
]
),
)
self.assertIsInstance(
dummy_metric_reader._instrument_class_aggregation[Counter],
LastValueAggregation,
)
Loading

0 comments on commit 4b0317e

Please sign in to comment.