Skip to content

Commit

Permalink
Change temporality for Counter and UpDownCounter to CUMULATIVE (#1384)
Browse files Browse the repository at this point in the history
Fixes #1383
  • Loading branch information
ocelotl authored Nov 18, 2020
1 parent a085c10 commit bcf7a2f
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 29 deletions.
2 changes: 2 additions & 0 deletions exporter/opentelemetry-exporter-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Change temporality for Counter and UpDownCounter
([#1384](https://github.com/open-telemetry/opentelemetry-python/pull/1384))
- Add Gzip compression for exporter
([#1141](https://github.com/open-telemetry/opentelemetry-python/pull/1141))
- OTLP exporter: Handle error case when no credentials supplied
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
"""OTLP Metrics Exporter"""

import logging
import os
from typing import List, Optional, Sequence, Type, TypeVar, Union
from typing import List, Optional, Sequence, Type, TypeVar

from grpc import ChannelCredentials

Expand Down Expand Up @@ -71,7 +70,9 @@


def _get_data_points(
export_record: ExportRecord, data_point_class: Type[DataPointT]
export_record: ExportRecord,
data_point_class: Type[DataPointT],
aggregation_temporality: int,
) -> List[DataPointT]:

if isinstance(export_record.aggregator, SumAggregator):
Expand All @@ -91,16 +92,23 @@ def _get_data_points(
elif isinstance(export_record.aggregator, ValueObserverAggregator):
value = export_record.aggregator.checkpoint.last

if aggregation_temporality == (
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
):
start_time_unix_nano = export_record.aggregator.first_timestamp
else:
start_time_unix_nano = (
export_record.aggregator.initial_checkpoint_timestamp
)

return [
data_point_class(
labels=[
StringKeyValue(key=str(label_key), value=str(label_value))
for label_key, label_value in export_record.labels
],
value=value,
start_time_unix_nano=(
export_record.aggregator.initial_checkpoint_timestamp
),
start_time_unix_nano=start_time_unix_nano,
time_unix_nano=(export_record.aggregator.last_update_timestamp),
)
]
Expand Down Expand Up @@ -215,25 +223,35 @@ def _translate_data(
data_point_class = type_class[value_type]["data_point_class"]

if isinstance(export_record.instrument, Counter):

aggregation_temporality = (
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
)

otlp_metric_data = sum_class(
data_points=_get_data_points(
export_record, data_point_class
),
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA
export_record,
data_point_class,
aggregation_temporality,
),
aggregation_temporality=aggregation_temporality,
is_monotonic=True,
)
argument = type_class[value_type]["sum"]["argument"]

elif isinstance(export_record.instrument, UpDownCounter):

aggregation_temporality = (
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
)

otlp_metric_data = sum_class(
data_points=_get_data_points(
export_record, data_point_class
),
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA
export_record,
data_point_class,
aggregation_temporality,
),
aggregation_temporality=aggregation_temporality,
is_monotonic=False,
)
argument = type_class[value_type]["sum"]["argument"]
Expand All @@ -243,33 +261,45 @@ def _translate_data(
continue

elif isinstance(export_record.instrument, SumObserver):

aggregation_temporality = (
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
)

otlp_metric_data = sum_class(
data_points=_get_data_points(
export_record, data_point_class
),
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
export_record,
data_point_class,
aggregation_temporality,
),
aggregation_temporality=aggregation_temporality,
is_monotonic=True,
)
argument = type_class[value_type]["sum"]["argument"]

elif isinstance(export_record.instrument, UpDownSumObserver):

aggregation_temporality = (
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
)

otlp_metric_data = sum_class(
data_points=_get_data_points(
export_record, data_point_class
),
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
export_record,
data_point_class,
aggregation_temporality,
),
aggregation_temporality=aggregation_temporality,
is_monotonic=False,
)
argument = type_class[value_type]["sum"]["argument"]

elif isinstance(export_record.instrument, (ValueObserver)):
otlp_metric_data = gauge_class(
data_points=_get_data_points(
export_record, data_point_class
export_record,
data_point_class,
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA,
)
)
argument = type_class[value_type]["gauge"]["argument"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@


class TestOTLPMetricExporter(TestCase):
def setUp(self):
@patch("opentelemetry.sdk.metrics.export.aggregate.time_ns")
def setUp(self, mock_time_ns): # pylint: disable=arguments-differ
mock_time_ns.configure_mock(**{"return_value": 1})
self.exporter = OTLPMetricsExporter(insecure=True)
resource = SDKResource(OrderedDict([("a", 1), ("b", False)]))

Expand Down Expand Up @@ -95,12 +97,9 @@ def test_no_credentials_error(self):
with self.assertRaises(ValueError):
OTLPMetricsExporter()

@patch("opentelemetry.sdk.metrics.export.aggregate.time_ns")
def test_translate_metrics(self, mock_time_ns):
def test_translate_metrics(self):
# pylint: disable=no-member

mock_time_ns.configure_mock(**{"return_value": 1})

self.counter_export_record.aggregator.checkpoint = 1
self.counter_export_record.aggregator.initial_checkpoint_timestamp = 1
self.counter_export_record.aggregator.last_update_timestamp = 1
Expand Down Expand Up @@ -137,7 +136,7 @@ def test_translate_metrics(self, mock_time_ns):
)
],
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
),
is_monotonic=True,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(self, config=None):
self._lock = threading.Lock()
self.last_update_timestamp = 0
self.initial_checkpoint_timestamp = 0
self.first_timestamp = time_ns()
self.checkpointed = True
if config is not None:
self.config = config
Expand Down

0 comments on commit bcf7a2f

Please sign in to comment.