diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py index af1d9eb3513..ed9bc680e2f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py @@ -18,7 +18,9 @@ from typing import TYPE_CHECKING, Dict, Iterable from opentelemetry.sdk._metrics.aggregation import ( + DefaultAggregation, _Aggregation, + _AggregationFactory, _convert_aggregation_temporality, _PointVarT, ) @@ -39,6 +41,7 @@ def __init__( view: View, instrument: "_Instrument", sdk_config: SdkConfiguration, + instrument_class_aggregation: Dict[type, _AggregationFactory], ): self._view = view self._instrument = instrument @@ -46,6 +49,7 @@ def __init__( self._attributes_aggregation: Dict[frozenset, _Aggregation] = {} self._attributes_previous_point: Dict[frozenset, _PointVarT] = {} self._lock = Lock() + self._instrument_class_aggregation = instrument_class_aggregation # pylint: disable=protected-access def consume_measurement(self, measurement: Measurement) -> None: @@ -67,11 +71,19 @@ def consume_measurement(self, measurement: Measurement) -> None: if attributes not in self._attributes_aggregation: with self._lock: if attributes not in self._attributes_aggregation: - self._attributes_aggregation[ - attributes - ] = self._view._aggregation._create_aggregation( - self._instrument - ) + if not isinstance( + self._view._aggregation, DefaultAggregation + ): + aggregation = ( + self._view._aggregation._create_aggregation( + self._instrument + ) + ) + else: + aggregation = self._instrument_class_aggregation[ + self._instrument.__class__ + ]._create_aggregation(self._instrument) + self._attributes_aggregation[attributes] = aggregation self._attributes_aggregation[attributes].aggregate(measurement) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py index 88171f975f2..3d4df9ed69e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py @@ -19,7 +19,7 @@ from os import environ, linesep from sys import stdout from threading import Event, RLock, Thread -from typing import IO, Callable, Iterable, List, Optional, Sequence +from typing import IO, Callable, Dict, Iterable, List, Optional, Sequence from opentelemetry.context import ( _SUPPRESS_INSTRUMENTATION_KEY, @@ -27,6 +27,7 @@ detach, set_value, ) +from opentelemetry.sdk._metrics.aggregation import _AggregationFactory from opentelemetry.sdk._metrics.metric_reader import MetricReader from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric from opentelemetry.util._once import Once @@ -108,9 +109,13 @@ class InMemoryMetricReader(MetricReader): def __init__( self, + preferred_aggregation: Dict[type, _AggregationFactory] = None, preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE, ) -> None: - super().__init__(preferred_temporality=preferred_temporality) + super().__init__( + preferred_temporality=preferred_temporality, + preferred_aggregation=preferred_aggregation, + ) self._lock = RLock() self._metrics: List[Metric] = [] @@ -139,10 +144,14 @@ class PeriodicExportingMetricReader(MetricReader): def __init__( self, exporter: MetricExporter, + preferred_aggregation: Dict[type, _AggregationFactory] = None, export_interval_millis: Optional[float] = None, export_timeout_millis: Optional[float] = None, ) -> None: - super().__init__(preferred_temporality=exporter.preferred_temporality) + super().__init__( + preferred_temporality=exporter.preferred_temporality, + preferred_aggregation=preferred_aggregation, + ) self._exporter = exporter if export_interval_millis is None: try: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py index b637bba9f08..92de8775506 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py @@ -14,8 +14,8 @@ # pylint: disable=too-many-ancestors -import logging -from typing import Dict, Generator, Iterable, Optional, Union +from logging import getLogger +from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Union from opentelemetry._metrics.instrument import CallbackT from opentelemetry._metrics.instrument import Counter as APICounter @@ -31,10 +31,15 @@ ) from opentelemetry._metrics.instrument import UpDownCounter as APIUpDownCounter from opentelemetry.sdk._metrics.measurement import Measurement -from opentelemetry.sdk._metrics.measurement_consumer import MeasurementConsumer from opentelemetry.sdk.util.instrumentation import InstrumentationScope -_logger = logging.getLogger(__name__) +if TYPE_CHECKING: + from opentelemetry.sdk._metrics.measurement_consumer import ( + MeasurementConsumer, + ) + + +_logger = getLogger(__name__) class _Synchronous: @@ -42,7 +47,7 @@ def __init__( self, name: str, instrumentation_scope: InstrumentationScope, - measurement_consumer: MeasurementConsumer, + measurement_consumer: "MeasurementConsumer", unit: str = "", description: str = "", ): @@ -59,7 +64,7 @@ def __init__( self, name: str, instrumentation_scope: InstrumentationScope, - measurement_consumer: MeasurementConsumer, + measurement_consumer: "MeasurementConsumer", callbacks: Optional[Iterable[CallbackT]] = None, unit: str = "", description: str = "", diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py index c4b67702760..43c370930f1 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py @@ -51,7 +51,9 @@ def __init__(self, sdk_config: SdkConfiguration) -> None: self._sdk_config = sdk_config # should never be mutated self._reader_storages: Mapping[MetricReader, MetricReaderStorage] = { - reader: MetricReaderStorage(sdk_config) + reader: MetricReaderStorage( + sdk_config, reader._instrument_class_aggregation + ) for reader in sdk_config.metric_readers } self._async_instruments: List["_Asynchronous"] = [] diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py index e14877d87da..417a2fb9b16 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py @@ -14,10 +14,22 @@ import logging from abc import ABC, abstractmethod -from typing import Callable, Iterable +from typing import Callable, Dict, Iterable from typing_extensions import final +from opentelemetry.sdk._metrics.aggregation import ( + DefaultAggregation, + _AggregationFactory, +) +from opentelemetry.sdk._metrics.instrument import ( + Counter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric _logger = logging.getLogger(__name__) @@ -25,6 +37,23 @@ class MetricReader(ABC): """ + Base class for all metric readers + + Args: + preferred_aggregation: A mapping between instrument classes and + aggregation instances. By default maps all instrument classes to an + instance of `DefaultAggregation`. This mapping will be used to + define the default aggregation of every instrument class. If the + user wants to make a change in the default aggregation of an + instrument class, it is enough to pass here a dictionary whose keys + are the instrument classes and the values are the corresponding + desired aggregation for the instrument classes that the user wants + to change, not necessarily all of them. The classes not included in + the passed dictionary will retain their association to their + default aggregations. The aggregation defined here will be + overriden by an aggregation defined by a view that is not + `DefaultAggregation`. + .. document protected _receive_metrics which is a intended to be overriden by subclass .. automethod:: _receive_metrics """ @@ -32,11 +61,23 @@ class MetricReader(ABC): def __init__( self, preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE, + preferred_aggregation: Dict[type, _AggregationFactory] = None, ) -> None: self._collect: Callable[ ["MetricReader", AggregationTemporality], Iterable[Metric] ] = None + self._preferred_temporality = preferred_temporality + self._instrument_class_aggregation = { + Counter: DefaultAggregation(), + UpDownCounter: DefaultAggregation(), + Histogram: DefaultAggregation(), + ObservableCounter: DefaultAggregation(), + ObservableUpDownCounter: DefaultAggregation(), + ObservableGauge: DefaultAggregation(), + } + + self._instrument_class_aggregation.update(preferred_aggregation or {}) @final def collect(self) -> None: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py index 45c4d4dd793..b63d37e93fb 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py @@ -19,7 +19,10 @@ from opentelemetry.sdk._metrics._view_instrument_match import ( _ViewInstrumentMatch, ) -from opentelemetry.sdk._metrics.aggregation import AggregationTemporality +from opentelemetry.sdk._metrics.aggregation import ( + AggregationTemporality, + _AggregationFactory, +) from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk._metrics.point import Metric from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration @@ -31,12 +34,17 @@ class MetricReaderStorage: """The SDK's storage for a given reader""" - def __init__(self, sdk_config: SdkConfiguration) -> None: + def __init__( + self, + sdk_config: SdkConfiguration, + instrument_class_aggregation: Dict[type, _AggregationFactory], + ) -> None: self._lock = RLock() self._sdk_config = sdk_config self._view_instrument_match: Dict[ Instrument, List[_ViewInstrumentMatch] ] = {} + self._instrument_class_aggregation = instrument_class_aggregation def _get_or_init_view_instrument_match( self, instrument: Instrument @@ -62,6 +70,9 @@ def _get_or_init_view_instrument_match( view=view, instrument=instrument, sdk_config=self._sdk_config, + instrument_class_aggregation=( + self._instrument_class_aggregation + ), ) ) @@ -72,6 +83,9 @@ def _get_or_init_view_instrument_match( view=_DEFAULT_VIEW, instrument=instrument, sdk_config=self._sdk_config, + instrument_class_aggregation=( + self._instrument_class_aggregation + ), ) ) self._view_instrument_match[instrument] = view_instrument_matches diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_metric_reader.py new file mode 100644 index 00000000000..c56f5d70fa6 --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader.py @@ -0,0 +1,95 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict +from unittest import TestCase + +from opentelemetry.sdk._metrics.aggregation import ( + AggregationTemporality, + DefaultAggregation, + LastValueAggregation, + _AggregationFactory, +) +from opentelemetry.sdk._metrics.instrument import ( + Counter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) +from opentelemetry.sdk._metrics.metric_reader import MetricReader + + +class DummyMetricReader(MetricReader): + def __init__( + self, + preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE, + preferred_aggregation: Dict[type, _AggregationFactory] = None, + ) -> None: + super().__init__( + preferred_temporality=preferred_temporality, + preferred_aggregation=preferred_aggregation, + ) + + def _receive_metrics(self, metrics): + pass + + def shutdown(self): + return True + + +class TestMetricReader(TestCase): + def test_default_temporality(self): + + dummy_metric_reader = DummyMetricReader() + + self.assertEqual( + dummy_metric_reader._instrument_class_aggregation.keys(), + set( + [ + Counter, + UpDownCounter, + Histogram, + ObservableCounter, + ObservableUpDownCounter, + ObservableGauge, + ] + ), + ) + for ( + value + ) in dummy_metric_reader._instrument_class_aggregation.values(): + self.assertIsInstance(value, DefaultAggregation) + + dummy_metric_reader = DummyMetricReader( + preferred_aggregation={Counter: LastValueAggregation()} + ) + self.assertEqual( + dummy_metric_reader._instrument_class_aggregation.keys(), + set( + [ + Counter, + UpDownCounter, + Histogram, + ObservableCounter, + ObservableUpDownCounter, + ObservableGauge, + ] + ), + ) + self.assertIsInstance( + dummy_metric_reader._instrument_class_aggregation[Counter], + LastValueAggregation, + ) diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py index f7ffc6993a2..d0d05854d8a 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py @@ -12,9 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, Mock, patch -from opentelemetry.sdk._metrics.aggregation import DropAggregation +from opentelemetry.sdk._metrics.aggregation import ( + DefaultAggregation, + DropAggregation, +) from opentelemetry.sdk._metrics.instrument import Counter from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk._metrics.metric_reader_storage import ( @@ -56,7 +59,8 @@ def test_creates_view_instrument_matches( resource=Mock(), metric_readers=(), views=(view1, view2), - ) + ), + MagicMock(**{"__getitem__.return_value": DefaultAggregation()}), ) # instrument1 matches view1 and view2, so should create two ViewInstrumentMatch objects @@ -100,7 +104,8 @@ def test_forwards_calls_to_view_instrument_match( resource=Mock(), metric_readers=(), views=(view1, view2), - ) + ), + MagicMock(**{"__getitem__.return_value": DefaultAggregation()}), ) # Measurements from an instrument should be passed on to each ViewInstrumentMatch objects @@ -147,7 +152,8 @@ def test_race_concurrent_measurements(self, MockViewInstrumentMatch: Mock): resource=Mock(), metric_readers=(), views=(view1,), - ) + ), + MagicMock(**{"__getitem__.return_value": DefaultAggregation()}), ) def send_measurement(): @@ -172,7 +178,8 @@ def test_default_view_enabled(self, MockViewInstrumentMatch: Mock): resource=Mock(), metric_readers=(), views=(), - ) + ), + MagicMock(**{"__getitem__.return_value": DefaultAggregation()}), ) storage.consume_measurement(Measurement(1, instrument1)) @@ -200,7 +207,8 @@ def test_drop_aggregation(self): instrument_name="name", aggregation=DropAggregation() ), ), - ) + ), + MagicMock(**{"__getitem__.return_value": DefaultAggregation()}), ) metric_reader_storage.consume_measurement(Measurement(1, counter)) diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index 728e2911800..ff67e848afe 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -84,7 +84,9 @@ def _create_periodic_reader( self, metrics, exporter, collect_wait=0, interval=60000 ): - pmr = PeriodicExportingMetricReader(exporter, interval) + pmr = PeriodicExportingMetricReader( + exporter, export_interval_millis=interval + ) def _collect(reader, temp): time.sleep(collect_wait) @@ -95,7 +97,7 @@ def _collect(reader, temp): def test_ticker_called(self): collect_mock = Mock() - pmr = PeriodicExportingMetricReader(Mock(), 1) + pmr = PeriodicExportingMetricReader(Mock(), export_interval_millis=1) pmr._set_collect_callback(collect_mock) time.sleep(0.1) self.assertTrue(collect_mock.assert_called_once) diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py index 20b1a041698..bdf8ba234c9 100644 --- a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -13,15 +13,19 @@ # limitations under the License. from unittest import TestCase -from unittest.mock import Mock +from unittest.mock import MagicMock, Mock from opentelemetry.sdk._metrics._view_instrument_match import ( _ViewInstrumentMatch, ) from opentelemetry.sdk._metrics.aggregation import ( + DefaultAggregation, DropAggregation, + LastValueAggregation, _DropAggregation, + _LastValueAggregation, ) +from opentelemetry.sdk._metrics.instrument import Counter from opentelemetry.sdk._metrics.measurement import Measurement from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration @@ -56,6 +60,9 @@ def test_consume_measurement(self): ), instrument=instrument1, sdk_config=sdk_config, + instrument_class_aggregation=MagicMock( + **{"__getitem__.return_value": DefaultAggregation()} + ), ) view_instrument_match.consume_measurement( @@ -95,6 +102,9 @@ def test_consume_measurement(self): ), instrument=instrument1, sdk_config=sdk_config, + instrument_class_aggregation=MagicMock( + **{"__getitem__.return_value": DefaultAggregation()} + ), ) view_instrument_match.consume_measurement( @@ -123,6 +133,9 @@ def test_consume_measurement(self): ), instrument=instrument1, sdk_config=sdk_config, + instrument_class_aggregation=MagicMock( + **{"__getitem__.return_value": DefaultAggregation()} + ), ) view_instrument_match.consume_measurement( Measurement(value=0, instrument=instrument1, attributes=None) @@ -145,6 +158,9 @@ def test_consume_measurement(self): ), instrument=instrument1, sdk_config=sdk_config, + instrument_class_aggregation=MagicMock( + **{"__getitem__.return_value": DefaultAggregation()} + ), ) view_instrument_match.consume_measurement( Measurement(value=0, instrument=instrument1, attributes=None) @@ -173,6 +189,9 @@ def test_collect(self): ), instrument=instrument1, sdk_config=sdk_config, + instrument_class_aggregation=MagicMock( + **{"__getitem__.return_value": DefaultAggregation()} + ), ) view_instrument_match.consume_measurement( @@ -198,3 +217,44 @@ def test_collect(self): point=None, ), ) + + def test_setting_aggregation(self): + instrument1 = Counter( + name="instrument1", + instrumentation_scope=Mock(), + measurement_consumer=Mock(), + description="description", + unit="unit", + ) + instrument1.instrumentation_scope = self.mock_instrumentation_scope + sdk_config = SdkConfiguration( + resource=self.mock_resource, + metric_readers=[], + views=[], + ) + view_instrument_match = _ViewInstrumentMatch( + view=View( + instrument_name="instrument1", + name="name", + aggregation=DefaultAggregation(), + attribute_keys={"a", "c"}, + ), + instrument=instrument1, + sdk_config=sdk_config, + instrument_class_aggregation={Counter: LastValueAggregation()}, + ) + + view_instrument_match.consume_measurement( + Measurement( + value=0, + instrument=Mock(name="instrument1"), + attributes={"c": "d", "f": "g"}, + ) + ) + + self.assertIsInstance( + view_instrument_match._attributes_aggregation[ + frozenset({("c", "d")}) + ], + _LastValueAggregation, + )