diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py index af1d9eb3513..7d3f83d95af 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py @@ -75,7 +75,7 @@ def consume_measurement(self, measurement: Measurement) -> None: self._attributes_aggregation[attributes].aggregate(measurement) - def collect(self, temporality: int) -> Iterable[Metric]: + def collect(self, instrument_class_temporality: int) -> Iterable[Metric]: with self._lock: for ( @@ -106,13 +106,17 @@ def collect(self, temporality: int) -> Iterable[Metric]: self._view._description or self._instrument.description ), - instrumentation_scope=self._instrument.instrumentation_scope, + instrumentation_scope=( + self._instrument.instrumentation_scope + ), name=self._view._name or self._instrument.name, resource=self._sdk_config.resource, unit=self._instrument.unit, point=_convert_aggregation_temporality( previous_point, current_point, - temporality, + instrument_class_temporality[ + self._instrument.__class__ + ], ), ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py index ff505ed4dfc..f9f7b96e3cb 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py @@ -19,7 +19,7 @@ from os import environ, linesep from sys import stdout from threading import Event, RLock, Thread -from typing import IO, Callable, Iterable, List, Optional, Sequence +from typing import IO, Callable, Dict, Iterable, List, Optional, Sequence from opentelemetry.context import ( _SUPPRESS_INSTRUMENTATION_KEY, @@ -46,10 +46,6 @@ class MetricExporter(ABC): in their own format. """ - @property - def preferred_temporality(self) -> AggregationTemporality: - return AggregationTemporality.CUMULATIVE - @abstractmethod def export(self, metrics: Sequence[Metric]) -> "MetricExportResult": """Exports a batch of telemetry data. @@ -103,8 +99,7 @@ class InMemoryMetricReader(MetricReader): """ def __init__( - self, - preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE, + self, preferred_temporality: Dict[type, AggregationTemporality] = None ) -> None: super().__init__(preferred_temporality=preferred_temporality) self._lock = RLock() @@ -135,10 +130,11 @@ class PeriodicExportingMetricReader(MetricReader): def __init__( self, exporter: MetricExporter, + preferred_temporality: Dict[type, AggregationTemporality] = None, export_interval_millis: Optional[float] = None, export_timeout_millis: Optional[float] = None, ) -> None: - super().__init__(preferred_temporality=exporter.preferred_temporality) + super().__init__(preferred_temporality=preferred_temporality) self._exporter = exporter if export_interval_millis is None: try: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py index b637bba9f08..9b74c064ad4 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py @@ -15,7 +15,7 @@ # pylint: disable=too-many-ancestors import logging -from typing import Dict, Generator, Iterable, Optional, Union +from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Union from opentelemetry._metrics.instrument import CallbackT from opentelemetry._metrics.instrument import Counter as APICounter @@ -31,9 +31,13 @@ ) from opentelemetry._metrics.instrument import UpDownCounter as APIUpDownCounter from opentelemetry.sdk._metrics.measurement import Measurement -from opentelemetry.sdk._metrics.measurement_consumer import MeasurementConsumer from opentelemetry.sdk.util.instrumentation import InstrumentationScope +if TYPE_CHECKING: + from opentelemetry.sdk._metrics.measurement_consumer import ( + MeasurementConsumer, + ) + _logger = logging.getLogger(__name__) @@ -42,7 +46,7 @@ def __init__( self, name: str, instrumentation_scope: InstrumentationScope, - measurement_consumer: MeasurementConsumer, + measurement_consumer: "MeasurementConsumer", unit: str = "", description: str = "", ): @@ -59,7 +63,7 @@ def __init__( self, name: str, instrumentation_scope: InstrumentationScope, - measurement_consumer: MeasurementConsumer, + measurement_consumer: "MeasurementConsumer", callbacks: Optional[Iterable[CallbackT]] = None, unit: str = "", description: str = "", diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py index c4b67702760..7ee0c4ea855 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py @@ -14,7 +14,7 @@ from abc import ABC, abstractmethod from threading import Lock -from typing import TYPE_CHECKING, Iterable, List, Mapping +from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping from opentelemetry.sdk._metrics.aggregation import AggregationTemporality from opentelemetry.sdk._metrics.measurement import Measurement @@ -40,7 +40,9 @@ def register_asynchronous_instrument(self, instrument: "_Asynchronous"): @abstractmethod def collect( - self, metric_reader: MetricReader, temporality: AggregationTemporality + self, + metric_reader: MetricReader, + instrument_type_temporality: Dict[type, AggregationTemporality], ) -> Iterable[Metric]: pass @@ -67,11 +69,15 @@ def register_asynchronous_instrument( self._async_instruments.append(instrument) def collect( - self, metric_reader: MetricReader, temporality: AggregationTemporality + self, + metric_reader: MetricReader, + instrument_type_temporality: Dict[type, AggregationTemporality], ) -> Iterable[Metric]: with self._lock: metric_reader_storage = self._reader_storages[metric_reader] for async_instrument in self._async_instruments: for measurement in async_instrument.callback(): metric_reader_storage.consume_measurement(measurement) - return self._reader_storages[metric_reader].collect(temporality) + return self._reader_storages[metric_reader].collect( + instrument_type_temporality + ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py index e14877d87da..c101f6b6fde 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py @@ -12,31 +12,85 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging from abc import ABC, abstractmethod -from typing import Callable, Iterable +from logging import getLogger +from os import environ +from typing import Callable, Dict, Iterable from typing_extensions import final +from opentelemetry.sdk._metrics.instrument import ( + Counter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric +from opentelemetry.sdk.environment_variables import ( + _OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, +) -_logger = logging.getLogger(__name__) +_logger = getLogger(__name__) class MetricReader(ABC): """ + Base class for all metric readers + + Args: + preferred_temporality: A mapping between instrument classes and + aggregation temporality. By default uses CUMULATIVE for all instrument + classes. This mapping will be used to define the default aggregation + temporality of every instrument class. If the user wants to make a + change in the default aggregation temporality of an instrument class, + it is enough to pass here a dictionary whose keys are the instrument + classes and the values are the corresponding desired aggregation + temporalities of the classes that the user wants to change, not all of + them. The classes not included in the passed dictionary will retain + their association to their default aggregation temporalities. + .. document protected _receive_metrics which is a intended to be overriden by subclass .. automethod:: _receive_metrics """ def __init__( - self, - preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE, + self, preferred_temporality: Dict[type, AggregationTemporality] = None ) -> None: self._collect: Callable[ ["MetricReader", AggregationTemporality], Iterable[Metric] ] = None - self._preferred_temporality = preferred_temporality + + if ( + environ.get( + _OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, + "CUMULATIVE", + ) + .upper() + .strip() + == "DELTA" + ): + self._instrument_class_temporality = { + Counter: AggregationTemporality.DELTA, + UpDownCounter: AggregationTemporality.CUMULATIVE, + Histogram: AggregationTemporality.DELTA, + ObservableCounter: AggregationTemporality.DELTA, + ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + ObservableGauge: AggregationTemporality.CUMULATIVE, + } + + else: + self._instrument_class_temporality = { + Counter: AggregationTemporality.CUMULATIVE, + UpDownCounter: AggregationTemporality.CUMULATIVE, + Histogram: AggregationTemporality.CUMULATIVE, + ObservableCounter: AggregationTemporality.CUMULATIVE, + ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + ObservableGauge: AggregationTemporality.CUMULATIVE, + } + + self._instrument_class_temporality.update(preferred_temporality or {}) @final def collect(self) -> None: @@ -48,7 +102,9 @@ def collect(self) -> None: "Cannot call collect on a MetricReader until it is registered on a MeterProvider" ) return - self._receive_metrics(self._collect(self, self._preferred_temporality)) + self._receive_metrics( + self._collect(self, self._instrument_class_temporality) + ) @final def _set_collect_callback( diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py index 45c4d4dd793..7835bf5858d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py @@ -83,21 +83,30 @@ def consume_measurement(self, measurement: Measurement) -> None: ): view_instrument_match.consume_measurement(measurement) - def collect(self, temporality: AggregationTemporality) -> Iterable[Metric]: - # use a list instead of yielding to prevent a slow reader from holding SDK locks + def collect( + self, instrument_type_temporality: Dict[type, AggregationTemporality] + ) -> Iterable[Metric]: + # Use a list instead of yielding to prevent a slow reader from holding + # SDK locks metrics: List[Metric] = [] - # While holding the lock, new _ViewInstrumentMatch can't be added from another thread (so we are - # sure we collect all existing view). However, instruments can still send measurements - # that will make it into the individual aggregations; collection will acquire those - # locks iteratively to keep locking as fine-grained as possible. One side effect is - # that end times can be slightly skewed among the metric streams produced by the SDK, - # but we still align the output timestamps for a single instrument. + # While holding the lock, new _ViewInstrumentMatch can't be added from + # another thread (so we are sure we collect all existing view). + # However, instruments can still send measurements that will make it + # into the individual aggregations; collection will acquire those locks + # iteratively to keep locking as fine-grained as possible. One side + # effect is that end times can be slightly skewed among the metric + # streams produced by the SDK, but we still align the output timestamps + # for a single instrument. with self._lock: for ( view_instrument_matches ) in self._view_instrument_match.values(): for view_instrument_match in view_instrument_matches: - metrics.extend(view_instrument_match.collect(temporality)) + metrics.extend( + view_instrument_match.collect( + instrument_type_temporality + ) + ) return metrics diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py index d9bab1bb169..3e632bc521f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py @@ -406,3 +406,18 @@ provide the entry point for loading the log emitter provider. If not specified, SDK LogEmitterProvider is used. """ + +_OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE = ( + "OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE" +) +""" +.. envvar:: OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE + +The :envvar:`OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE` environment +variable allows users to set the default aggregation temporality policy to use +on the basis of instrument kind. The valid (case-insensitive) values are: + +``CUMULATIVE``: Choose ``CUMULATIVE`` aggregation temporality for all instrument kinds. +``DELTA``: Choose ``DELTA`` aggregation temporality for ``Counter``, ``Asynchronous Counter`` and ``Histogram``. +Choose ``CUMULATIVE`` aggregation temporality for ``UpDownCounter`` and ``Asynchronous UpDownCounter``. +""" diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_metric_reader.py new file mode 100644 index 00000000000..0f9792313ae --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader.py @@ -0,0 +1,175 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from os import environ +from typing import Dict +from unittest import TestCase +from unittest.mock import patch + +from opentelemetry.sdk._metrics.aggregation import AggregationTemporality +from opentelemetry.sdk._metrics.instrument import ( + Counter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) +from opentelemetry.sdk._metrics.metric_reader import MetricReader +from opentelemetry.sdk.environment_variables import ( + _OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, +) + + +class DummyMetricReader(MetricReader): + def __init__( + self, preferred_temporality: Dict[type, AggregationTemporality] = None + ) -> None: + super().__init__( + preferred_temporality=preferred_temporality, + ) + + def _receive_metrics(self, metrics): + pass + + def shutdown(self): + return True + + +class TestMetricReader(TestCase): + @patch.dict( + environ, + {_OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "CUMULATIVE"}, + ) + def test_configure_temporality_cumulative(self): + + dummy_metric_reader = DummyMetricReader() + + self.assertEqual( + dummy_metric_reader._instrument_class_temporality.keys(), + set( + [ + Counter, + UpDownCounter, + Histogram, + ObservableCounter, + ObservableUpDownCounter, + ObservableGauge, + ] + ), + ) + for ( + value + ) in dummy_metric_reader._instrument_class_temporality.values(): + self.assertEqual(value, AggregationTemporality.CUMULATIVE) + + @patch.dict( + environ, {_OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "DELTA"} + ) + def test_configure_temporality_delta(self): + + dummy_metric_reader = DummyMetricReader() + + self.assertEqual( + dummy_metric_reader._instrument_class_temporality.keys(), + set( + [ + Counter, + UpDownCounter, + Histogram, + ObservableCounter, + ObservableUpDownCounter, + ObservableGauge, + ] + ), + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[Counter], + AggregationTemporality.DELTA, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[UpDownCounter], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[Histogram], + AggregationTemporality.DELTA, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[ + ObservableCounter + ], + AggregationTemporality.DELTA, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[ + ObservableUpDownCounter + ], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[ObservableGauge], + AggregationTemporality.CUMULATIVE, + ) + + def test_configure_temporality_parameter(self): + + dummy_metric_reader = DummyMetricReader( + preferred_temporality={ + Histogram: AggregationTemporality.DELTA, + ObservableGauge: AggregationTemporality.DELTA, + } + ) + + self.assertEqual( + dummy_metric_reader._instrument_class_temporality.keys(), + set( + [ + Counter, + UpDownCounter, + Histogram, + ObservableCounter, + ObservableUpDownCounter, + ObservableGauge, + ] + ), + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[Counter], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[UpDownCounter], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[Histogram], + AggregationTemporality.DELTA, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[ + ObservableCounter + ], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[ + ObservableUpDownCounter + ], + AggregationTemporality.CUMULATIVE, + ) + self.assertEqual( + dummy_metric_reader._instrument_class_temporality[ObservableGauge], + AggregationTemporality.DELTA, + ) diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 0279dfbe618..81d1068fb80 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -36,7 +36,7 @@ UpDownCounter, ) from opentelemetry.sdk._metrics.metric_reader import MetricReader -from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric +from opentelemetry.sdk._metrics.point import Metric from opentelemetry.sdk._metrics.view import View from opentelemetry.sdk.resources import Resource from opentelemetry.test.concurrency_test import ConcurrencyTestBase, MockFunc @@ -44,7 +44,7 @@ class DummyMetricReader(MetricReader): def __init__(self): - super().__init__(AggregationTemporality.CUMULATIVE) + super().__init__() def _receive_metrics(self, metrics): pass diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index 728e2911800..ff67e848afe 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -84,7 +84,9 @@ def _create_periodic_reader( self, metrics, exporter, collect_wait=0, interval=60000 ): - pmr = PeriodicExportingMetricReader(exporter, interval) + pmr = PeriodicExportingMetricReader( + exporter, export_interval_millis=interval + ) def _collect(reader, temp): time.sleep(collect_wait) @@ -95,7 +97,7 @@ def _collect(reader, temp): def test_ticker_called(self): collect_mock = Mock() - pmr = PeriodicExportingMetricReader(Mock(), 1) + pmr = PeriodicExportingMetricReader(Mock(), export_interval_millis=1) pmr._set_collect_callback(collect_mock) time.sleep(0.1) self.assertTrue(collect_mock.assert_called_once) diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py index 20b1a041698..9ccdd90933c 100644 --- a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -13,7 +13,7 @@ # limitations under the License. from unittest import TestCase -from unittest.mock import Mock +from unittest.mock import MagicMock, Mock from opentelemetry.sdk._metrics._view_instrument_match import ( _ViewInstrumentMatch, @@ -185,7 +185,11 @@ def test_collect(self): self.assertEqual( next( view_instrument_match.collect( - AggregationTemporality.CUMULATIVE + MagicMock( + **{ + "__getitem__.return_value": AggregationTemporality.CUMULATIVE + } + ) ) ), Metric(