From a24901bd9274e3aca212a65f71e84bb82b2983df Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Thu, 21 Apr 2022 20:37:06 -0600 Subject: [PATCH] Implement MetricReader temporality controls Fixes #2627 Fixes #2636 --- .../sdk/_metrics/_view_instrument_match.py | 10 ++- .../sdk/_metrics/export/__init__.py | 12 ++-- .../opentelemetry/sdk/_metrics/instrument.py | 12 ++-- .../sdk/_metrics/measurement_consumer.py | 14 ++-- .../sdk/_metrics/metric_reader.py | 70 +++++++++++++++++-- .../sdk/_metrics/metric_reader_storage.py | 27 ++++--- .../sdk/environment_variables.py | 15 ++++ .../tests/metrics/test_metrics.py | 4 +- .../test_periodic_exporting_metric_reader.py | 6 +- .../metrics/test_view_instrument_match.py | 8 ++- 10 files changed, 137 insertions(+), 41 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py index af1d9eb3513..7d3f83d95af 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_view_instrument_match.py @@ -75,7 +75,7 @@ def consume_measurement(self, measurement: Measurement) -> None: self._attributes_aggregation[attributes].aggregate(measurement) - def collect(self, temporality: int) -> Iterable[Metric]: + def collect(self, instrument_class_temporality: int) -> Iterable[Metric]: with self._lock: for ( @@ -106,13 +106,17 @@ def collect(self, temporality: int) -> Iterable[Metric]: self._view._description or self._instrument.description ), - instrumentation_scope=self._instrument.instrumentation_scope, + instrumentation_scope=( + self._instrument.instrumentation_scope + ), name=self._view._name or self._instrument.name, resource=self._sdk_config.resource, unit=self._instrument.unit, point=_convert_aggregation_temporality( previous_point, current_point, - temporality, + instrument_class_temporality[ + self._instrument.__class__ + ], ), ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py index ff505ed4dfc..f9f7b96e3cb 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/export/__init__.py @@ -19,7 +19,7 @@ from os import environ, linesep from sys import stdout from threading import Event, RLock, Thread -from typing import IO, Callable, Iterable, List, Optional, Sequence +from typing import IO, Callable, Dict, Iterable, List, Optional, Sequence from opentelemetry.context import ( _SUPPRESS_INSTRUMENTATION_KEY, @@ -46,10 +46,6 @@ class MetricExporter(ABC): in their own format. """ - @property - def preferred_temporality(self) -> AggregationTemporality: - return AggregationTemporality.CUMULATIVE - @abstractmethod def export(self, metrics: Sequence[Metric]) -> "MetricExportResult": """Exports a batch of telemetry data. @@ -103,8 +99,7 @@ class InMemoryMetricReader(MetricReader): """ def __init__( - self, - preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE, + self, preferred_temporality: Dict[type, AggregationTemporality] = None ) -> None: super().__init__(preferred_temporality=preferred_temporality) self._lock = RLock() @@ -135,10 +130,11 @@ class PeriodicExportingMetricReader(MetricReader): def __init__( self, exporter: MetricExporter, + preferred_temporality: Dict[type, AggregationTemporality] = None, export_interval_millis: Optional[float] = None, export_timeout_millis: Optional[float] = None, ) -> None: - super().__init__(preferred_temporality=exporter.preferred_temporality) + super().__init__(preferred_temporality=preferred_temporality) self._exporter = exporter if export_interval_millis is None: try: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py index b637bba9f08..9b74c064ad4 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py @@ -15,7 +15,7 @@ # pylint: disable=too-many-ancestors import logging -from typing import Dict, Generator, Iterable, Optional, Union +from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Union from opentelemetry._metrics.instrument import CallbackT from opentelemetry._metrics.instrument import Counter as APICounter @@ -31,9 +31,13 @@ ) from opentelemetry._metrics.instrument import UpDownCounter as APIUpDownCounter from opentelemetry.sdk._metrics.measurement import Measurement -from opentelemetry.sdk._metrics.measurement_consumer import MeasurementConsumer from opentelemetry.sdk.util.instrumentation import InstrumentationScope +if TYPE_CHECKING: + from opentelemetry.sdk._metrics.measurement_consumer import ( + MeasurementConsumer, + ) + _logger = logging.getLogger(__name__) @@ -42,7 +46,7 @@ def __init__( self, name: str, instrumentation_scope: InstrumentationScope, - measurement_consumer: MeasurementConsumer, + measurement_consumer: "MeasurementConsumer", unit: str = "", description: str = "", ): @@ -59,7 +63,7 @@ def __init__( self, name: str, instrumentation_scope: InstrumentationScope, - measurement_consumer: MeasurementConsumer, + measurement_consumer: "MeasurementConsumer", callbacks: Optional[Iterable[CallbackT]] = None, unit: str = "", description: str = "", diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py index c4b67702760..7ee0c4ea855 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/measurement_consumer.py @@ -14,7 +14,7 @@ from abc import ABC, abstractmethod from threading import Lock -from typing import TYPE_CHECKING, Iterable, List, Mapping +from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping from opentelemetry.sdk._metrics.aggregation import AggregationTemporality from opentelemetry.sdk._metrics.measurement import Measurement @@ -40,7 +40,9 @@ def register_asynchronous_instrument(self, instrument: "_Asynchronous"): @abstractmethod def collect( - self, metric_reader: MetricReader, temporality: AggregationTemporality + self, + metric_reader: MetricReader, + instrument_type_temporality: Dict[type, AggregationTemporality], ) -> Iterable[Metric]: pass @@ -67,11 +69,15 @@ def register_asynchronous_instrument( self._async_instruments.append(instrument) def collect( - self, metric_reader: MetricReader, temporality: AggregationTemporality + self, + metric_reader: MetricReader, + instrument_type_temporality: Dict[type, AggregationTemporality], ) -> Iterable[Metric]: with self._lock: metric_reader_storage = self._reader_storages[metric_reader] for async_instrument in self._async_instruments: for measurement in async_instrument.callback(): metric_reader_storage.consume_measurement(measurement) - return self._reader_storages[metric_reader].collect(temporality) + return self._reader_storages[metric_reader].collect( + instrument_type_temporality + ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py index e14877d87da..cb065a7d95d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py @@ -12,31 +12,85 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging from abc import ABC, abstractmethod -from typing import Callable, Iterable +from logging import getLogger +from os import environ +from typing import Callable, Dict, Iterable from typing_extensions import final +from opentelemetry.sdk._metrics.instrument import ( + Counter, + Histogram, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +) from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric +from opentelemetry.sdk.environment_variables import ( + _OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, +) -_logger = logging.getLogger(__name__) +_logger = getLogger(__name__) class MetricReader(ABC): """ + Base class for all metric readers + + Args: + preferred_temporality: A mapping between instrument classes and + aggregation temporality. By default uses CUMULATIVE for all instrument + classes. This mapping will be used to define the default aggregation + temporality of every instrument class. If the user wants to make a + change in the default aggregation temporality of an instrument class, + it is enough to pass here a dictionary whose keys are the instrument + classes and the values are the corresponding desired aggregation + temporalities of the classes that the user wants to change, not all of + them. The classes not included in the passed dictionary will retain + their association to their default aggregation temporalities. + .. document protected _receive_metrics which is a intended to be overriden by subclass .. automethod:: _receive_metrics """ def __init__( - self, - preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE, + self, preferred_temporality: Dict[type, AggregationTemporality] = None ) -> None: self._collect: Callable[ ["MetricReader", AggregationTemporality], Iterable[Metric] ] = None - self._preferred_temporality = preferred_temporality + + if ( + environ.get( + _OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, + "CUMULATIVE", + ) + .upper() + .strip() + == "DELTA" + ): + self._instrument_class_temporality = { + Counter: AggregationTemporality.DELTA, + UpDownCounter: AggregationTemporality.CUMULATIVE, + Histogram: AggregationTemporality.DELTA, + ObservableCounter: AggregationTemporality.DELTA, + ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + ObservableGauge: AggregationTemporality.CUMULATIVE, + } + + else: + self._instrument_class_temporality = { + Counter: AggregationTemporality.CUMULATIVE, + UpDownCounter: AggregationTemporality.CUMULATIVE, + Histogram: AggregationTemporality.CUMULATIVE, + ObservableCounter: AggregationTemporality.CUMULATIVE, + ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + ObservableGauge: AggregationTemporality.CUMULATIVE, + } + + self._instrument_class_temporality.update(preferred_temporality or {}) @final def collect(self) -> None: @@ -48,7 +102,9 @@ def collect(self) -> None: "Cannot call collect on a MetricReader until it is registered on a MeterProvider" ) return - self._receive_metrics(self._collect(self, self._preferred_temporality)) + self._receive_metrics( + self._collect(self, self._instrument_class_temporality) + ) @final def _set_collect_callback( diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py index 45c4d4dd793..7835bf5858d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader_storage.py @@ -83,21 +83,30 @@ def consume_measurement(self, measurement: Measurement) -> None: ): view_instrument_match.consume_measurement(measurement) - def collect(self, temporality: AggregationTemporality) -> Iterable[Metric]: - # use a list instead of yielding to prevent a slow reader from holding SDK locks + def collect( + self, instrument_type_temporality: Dict[type, AggregationTemporality] + ) -> Iterable[Metric]: + # Use a list instead of yielding to prevent a slow reader from holding + # SDK locks metrics: List[Metric] = [] - # While holding the lock, new _ViewInstrumentMatch can't be added from another thread (so we are - # sure we collect all existing view). However, instruments can still send measurements - # that will make it into the individual aggregations; collection will acquire those - # locks iteratively to keep locking as fine-grained as possible. One side effect is - # that end times can be slightly skewed among the metric streams produced by the SDK, - # but we still align the output timestamps for a single instrument. + # While holding the lock, new _ViewInstrumentMatch can't be added from + # another thread (so we are sure we collect all existing view). + # However, instruments can still send measurements that will make it + # into the individual aggregations; collection will acquire those locks + # iteratively to keep locking as fine-grained as possible. One side + # effect is that end times can be slightly skewed among the metric + # streams produced by the SDK, but we still align the output timestamps + # for a single instrument. with self._lock: for ( view_instrument_matches ) in self._view_instrument_match.values(): for view_instrument_match in view_instrument_matches: - metrics.extend(view_instrument_match.collect(temporality)) + metrics.extend( + view_instrument_match.collect( + instrument_type_temporality + ) + ) return metrics diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py index d9bab1bb169..3e632bc521f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py @@ -406,3 +406,18 @@ provide the entry point for loading the log emitter provider. If not specified, SDK LogEmitterProvider is used. """ + +_OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE = ( + "OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE" +) +""" +.. envvar:: OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE + +The :envvar:`OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE` environment +variable allows users to set the default aggregation temporality policy to use +on the basis of instrument kind. The valid (case-insensitive) values are: + +``CUMULATIVE``: Choose ``CUMULATIVE`` aggregation temporality for all instrument kinds. +``DELTA``: Choose ``DELTA`` aggregation temporality for ``Counter``, ``Asynchronous Counter`` and ``Histogram``. +Choose ``CUMULATIVE`` aggregation temporality for ``UpDownCounter`` and ``Asynchronous UpDownCounter``. +""" diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 0279dfbe618..81d1068fb80 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -36,7 +36,7 @@ UpDownCounter, ) from opentelemetry.sdk._metrics.metric_reader import MetricReader -from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric +from opentelemetry.sdk._metrics.point import Metric from opentelemetry.sdk._metrics.view import View from opentelemetry.sdk.resources import Resource from opentelemetry.test.concurrency_test import ConcurrencyTestBase, MockFunc @@ -44,7 +44,7 @@ class DummyMetricReader(MetricReader): def __init__(self): - super().__init__(AggregationTemporality.CUMULATIVE) + super().__init__() def _receive_metrics(self, metrics): pass diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index 728e2911800..ff67e848afe 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -84,7 +84,9 @@ def _create_periodic_reader( self, metrics, exporter, collect_wait=0, interval=60000 ): - pmr = PeriodicExportingMetricReader(exporter, interval) + pmr = PeriodicExportingMetricReader( + exporter, export_interval_millis=interval + ) def _collect(reader, temp): time.sleep(collect_wait) @@ -95,7 +97,7 @@ def _collect(reader, temp): def test_ticker_called(self): collect_mock = Mock() - pmr = PeriodicExportingMetricReader(Mock(), 1) + pmr = PeriodicExportingMetricReader(Mock(), export_interval_millis=1) pmr._set_collect_callback(collect_mock) time.sleep(0.1) self.assertTrue(collect_mock.assert_called_once) diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py index 20b1a041698..9ccdd90933c 100644 --- a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -13,7 +13,7 @@ # limitations under the License. from unittest import TestCase -from unittest.mock import Mock +from unittest.mock import MagicMock, Mock from opentelemetry.sdk._metrics._view_instrument_match import ( _ViewInstrumentMatch, @@ -185,7 +185,11 @@ def test_collect(self): self.assertEqual( next( view_instrument_match.collect( - AggregationTemporality.CUMULATIVE + MagicMock( + **{ + "__getitem__.return_value": AggregationTemporality.CUMULATIVE + } + ) ) ), Metric(