Skip to content

Commit

Permalink
Implement MetricReader temporality controls
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl committed Apr 23, 2022
1 parent 24ac96e commit 0d06123
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def consume_measurement(self, measurement: Measurement) -> None:

self._attributes_aggregation[attributes].aggregate(measurement)

def collect(self, temporality: int) -> Iterable[Metric]:
def collect(self, instrument_class_temporality: int) -> Iterable[Metric]:

with self._lock:
for (
Expand Down Expand Up @@ -106,13 +106,17 @@ def collect(self, temporality: int) -> Iterable[Metric]:
self._view._description
or self._instrument.description
),
instrumentation_scope=self._instrument.instrumentation_scope,
instrumentation_scope=(
self._instrument.instrumentation_scope
),
name=self._view._name or self._instrument.name,
resource=self._sdk_config.resource,
unit=self._instrument.unit,
point=_convert_aggregation_temporality(
previous_point,
current_point,
temporality,
instrument_class_temporality[
self._instrument.__class__
],
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from os import environ, linesep
from sys import stdout
from threading import Event, RLock, Thread
from typing import IO, Callable, Iterable, List, Optional, Sequence
from typing import IO, Callable, Dict, Iterable, List, Optional, Sequence

from opentelemetry.context import (
_SUPPRESS_INSTRUMENTATION_KEY,
Expand All @@ -46,10 +46,6 @@ class MetricExporter(ABC):
in their own format.
"""

@property
def preferred_temporality(self) -> AggregationTemporality:
return AggregationTemporality.CUMULATIVE

@abstractmethod
def export(self, metrics: Sequence[Metric]) -> "MetricExportResult":
"""Exports a batch of telemetry data.
Expand Down Expand Up @@ -103,8 +99,7 @@ class InMemoryMetricReader(MetricReader):
"""

def __init__(
self,
preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE,
self, preferred_temporality: Dict[type, AggregationTemporality] = None
) -> None:
super().__init__(preferred_temporality=preferred_temporality)
self._lock = RLock()
Expand Down Expand Up @@ -135,10 +130,11 @@ class PeriodicExportingMetricReader(MetricReader):
def __init__(
self,
exporter: MetricExporter,
preferred_temporality: Dict[type, AggregationTemporality] = None,
export_interval_millis: Optional[float] = None,
export_timeout_millis: Optional[float] = None,
) -> None:
super().__init__(preferred_temporality=exporter.preferred_temporality)
super().__init__(preferred_temporality=preferred_temporality)
self._exporter = exporter
if export_interval_millis is None:
try:
Expand Down
12 changes: 8 additions & 4 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# pylint: disable=too-many-ancestors

import logging
from typing import Dict, Generator, Iterable, Optional, Union
from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Union

from opentelemetry._metrics.instrument import CallbackT
from opentelemetry._metrics.instrument import Counter as APICounter
Expand All @@ -31,9 +31,13 @@
)
from opentelemetry._metrics.instrument import UpDownCounter as APIUpDownCounter
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.measurement_consumer import MeasurementConsumer
from opentelemetry.sdk.util.instrumentation import InstrumentationScope

if TYPE_CHECKING:
from opentelemetry.sdk._metrics.measurement_consumer import (
MeasurementConsumer,
)

_logger = logging.getLogger(__name__)


Expand All @@ -42,7 +46,7 @@ def __init__(
self,
name: str,
instrumentation_scope: InstrumentationScope,
measurement_consumer: MeasurementConsumer,
measurement_consumer: "MeasurementConsumer",
unit: str = "",
description: str = "",
):
Expand All @@ -59,7 +63,7 @@ def __init__(
self,
name: str,
instrumentation_scope: InstrumentationScope,
measurement_consumer: MeasurementConsumer,
measurement_consumer: "MeasurementConsumer",
callbacks: Optional[Iterable[CallbackT]] = None,
unit: str = "",
description: str = "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from abc import ABC, abstractmethod
from threading import Lock
from typing import TYPE_CHECKING, Iterable, List, Mapping
from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping

from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
from opentelemetry.sdk._metrics.measurement import Measurement
Expand All @@ -40,7 +40,9 @@ def register_asynchronous_instrument(self, instrument: "_Asynchronous"):

@abstractmethod
def collect(
self, metric_reader: MetricReader, temporality: AggregationTemporality
self,
metric_reader: MetricReader,
instrument_type_temporality: Dict[type, AggregationTemporality],
) -> Iterable[Metric]:
pass

Expand All @@ -67,11 +69,15 @@ def register_asynchronous_instrument(
self._async_instruments.append(instrument)

def collect(
self, metric_reader: MetricReader, temporality: AggregationTemporality
self,
metric_reader: MetricReader,
instrument_type_temporality: Dict[type, AggregationTemporality],
) -> Iterable[Metric]:
with self._lock:
metric_reader_storage = self._reader_storages[metric_reader]
for async_instrument in self._async_instruments:
for measurement in async_instrument.callback():
metric_reader_storage.consume_measurement(measurement)
return self._reader_storages[metric_reader].collect(temporality)
return self._reader_storages[metric_reader].collect(
instrument_type_temporality
)
70 changes: 63 additions & 7 deletions opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,85 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from abc import ABC, abstractmethod
from typing import Callable, Iterable
from logging import getLogger
from os import environ
from typing import Callable, Dict, Iterable

from typing_extensions import final

from opentelemetry.sdk._metrics.instrument import (
Counter,
Histogram,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
)
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
from opentelemetry.sdk.environment_variables import (
_OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
)

_logger = logging.getLogger(__name__)
_logger = getLogger(__name__)


class MetricReader(ABC):
"""
Base class for all metric readers
Args:
preferred_temporality: A mapping between instrument classes and
aggregation temporality. By default uses CUMULATIVE for all instrument
classes. This mapping will be used to define the default aggregation
temporality of every instrument class. If the user wants to make a
change in the default aggregation temporality of an instrument class,
it is enough to pass here a dictionary whose keys are the instrument
classes and the values are the corresponding desired aggregation
temporalities of the classes that the user wants to change, not all of
them. The classes not included in the passed dictionary will retain
their association to their default aggregation temporalities.
.. document protected _receive_metrics which is a intended to be overriden by subclass
.. automethod:: _receive_metrics
"""

def __init__(
self,
preferred_temporality: AggregationTemporality = AggregationTemporality.CUMULATIVE,
self, preferred_temporality: Dict[type, AggregationTemporality] = None
) -> None:
self._collect: Callable[
["MetricReader", AggregationTemporality], Iterable[Metric]
] = None
self._preferred_temporality = preferred_temporality

if (
environ.get(
_OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
"CUMULATIVE",
)
.upper()
.strip()
== "DELTA"
):
self._instrument_class_temporality = {
Counter: AggregationTemporality.DELTA,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Histogram: AggregationTemporality.DELTA,
ObservableCounter: AggregationTemporality.DELTA,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

else:
self._instrument_class_temporality = {
Counter: AggregationTemporality.CUMULATIVE,
UpDownCounter: AggregationTemporality.CUMULATIVE,
Histogram: AggregationTemporality.CUMULATIVE,
ObservableCounter: AggregationTemporality.CUMULATIVE,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}

self._instrument_class_temporality.update(preferred_temporality or {})

@final
def collect(self) -> None:
Expand All @@ -48,7 +102,9 @@ def collect(self) -> None:
"Cannot call collect on a MetricReader until it is registered on a MeterProvider"
)
return
self._receive_metrics(self._collect(self, self._preferred_temporality))
self._receive_metrics(
self._collect(self, self._instrument_class_temporality)
)

@final
def _set_collect_callback(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,30 @@ def consume_measurement(self, measurement: Measurement) -> None:
):
view_instrument_match.consume_measurement(measurement)

def collect(self, temporality: AggregationTemporality) -> Iterable[Metric]:
# use a list instead of yielding to prevent a slow reader from holding SDK locks
def collect(
self, instrument_type_temporality: Dict[type, AggregationTemporality]
) -> Iterable[Metric]:
# Use a list instead of yielding to prevent a slow reader from holding
# SDK locks
metrics: List[Metric] = []

# While holding the lock, new _ViewInstrumentMatch can't be added from another thread (so we are
# sure we collect all existing view). However, instruments can still send measurements
# that will make it into the individual aggregations; collection will acquire those
# locks iteratively to keep locking as fine-grained as possible. One side effect is
# that end times can be slightly skewed among the metric streams produced by the SDK,
# but we still align the output timestamps for a single instrument.
# While holding the lock, new _ViewInstrumentMatch can't be added from
# another thread (so we are sure we collect all existing view).
# However, instruments can still send measurements that will make it
# into the individual aggregations; collection will acquire those locks
# iteratively to keep locking as fine-grained as possible. One side
# effect is that end times can be slightly skewed among the metric
# streams produced by the SDK, but we still align the output timestamps
# for a single instrument.
with self._lock:
for (
view_instrument_matches
) in self._view_instrument_match.values():
for view_instrument_match in view_instrument_matches:
metrics.extend(view_instrument_match.collect(temporality))
metrics.extend(
view_instrument_match.collect(
instrument_type_temporality
)
)

return metrics
15 changes: 15 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/environment_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,18 @@
provide the entry point for loading the log emitter provider. If not specified, SDK
LogEmitterProvider is used.
"""

_OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE = (
"OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE"
)
"""
.. envvar:: OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE
The :envvar:`OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE` environment
variable allows users to set the default aggregation temporality policy to use
on the basis of instrument kind. The valid (case-insensitive) values are:
``CUMULATIVE``: Choose ``CUMULATIVE`` aggregation temporality for all instrument kinds.
``DELTA``: Choose ``DELTA`` aggregation temporality for ``Counter``, ``Asynchronous Counter`` and ``Histogram``.
Choose ``CUMULATIVE`` aggregation temporality for ``UpDownCounter`` and ``Asynchronous UpDownCounter``.
"""
Loading

0 comments on commit 0d06123

Please sign in to comment.