Skip to content

Commit

Permalink
Convert histogram measurements to double before passing recording exe…
Browse files Browse the repository at this point in the history
…mplar reservoir
  • Loading branch information
jack-berg committed Nov 28, 2023
1 parent c57ca65 commit ff5595a
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
Expand All @@ -25,6 +26,19 @@
*/
public interface ExemplarReservoir<T extends ExemplarData> {

/**
* Wraps a {@link ExemplarReservoir}, casting calls to {@link
* ExemplarReservoir#offerLongMeasurement(long, Attributes, Context)} to {@link
* ExemplarReservoir#offerDoubleMeasurement(double, Attributes, Context)} such that {@link
* ExemplarReservoir#collectAndReset(Attributes)} only returns {@link DoubleExemplarData}.
*
* <p>This is used for {@link Aggregation#explicitBucketHistogram()} and {@link
* Aggregation#base2ExponentialBucketHistogram()} which only support double measurements.
*/
static <T extends ExemplarData> ExemplarReservoir<T> longToDouble(ExemplarReservoir<T> delegate) {
return new LongToDoubleExemplarReservoir<>(delegate);
}

/** Wraps a {@link ExemplarReservoir} with a measurement pre-filter. */
static <T extends ExemplarData> ExemplarReservoir<T> filtered(
ExemplarFilter filter, ExemplarReservoir<T> original) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.exemplar;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import java.util.List;

class LongToDoubleExemplarReservoir<T extends ExemplarData> implements ExemplarReservoir<T> {

private final ExemplarReservoir<T> delegate;

LongToDoubleExemplarReservoir(ExemplarReservoir<T> delegate) {
this.delegate = delegate;
}

@Override
public void offerDoubleMeasurement(double value, Attributes attributes, Context context) {
delegate.offerDoubleMeasurement(value, attributes, context);
}

@Override
public void offerLongMeasurement(long value, Attributes attributes, Context context) {
offerDoubleMeasurement((double) value, attributes, context);
}

@Override
public List<T> collectAndReset(Attributes pointAttributes) {
return delegate.collectAndReset(pointAttributes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@ public <T extends PointData, U extends ExemplarData> Aggregator<T, U> createAggr
() ->
ExemplarReservoir.filtered(
exemplarFilter,
ExemplarReservoir.doubleFixedSizeReservoir(
Clock.getDefault(),
Runtime.getRuntime().availableProcessors(),
RandomSupplier.platformDefault())),
ExemplarReservoir.longToDouble(
ExemplarReservoir.doubleFixedSizeReservoir(
Clock.getDefault(),
Runtime.getRuntime().availableProcessors(),
RandomSupplier.platformDefault()))),
maxBuckets,
maxScale);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ public <T extends PointData, U extends ExemplarData> Aggregator<T, U> createAggr
() ->
ExemplarReservoir.filtered(
exemplarFilter,
ExemplarReservoir.histogramBucketReservoir(
Clock.getDefault(), bucketBoundaries)));
ExemplarReservoir.longToDouble(
ExemplarReservoir.histogramBucketReservoir(
Clock.getDefault(), bucketBoundaries))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.internal.state.DefaultSynchronousMetricStorage;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import io.opentelemetry.sdk.testing.time.TestClock;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -275,6 +279,97 @@ void doubleHistogramRecord_NaN() {
assertThat(sdkMeterReader.collectAllMetrics()).hasSize(0);
}

@Test
void collectMetrics_ExemplarsWithExponentialHistogram() {
InMemoryMetricReader reader = InMemoryMetricReader.create();
SdkMeterProvider sdkMeterProvider =
SdkMeterProvider.builder()
.setClock(testClock)
.setResource(RESOURCE)
.registerView(
InstrumentSelector.builder().setType(InstrumentType.HISTOGRAM).build(),
View.builder()
.setAggregation(Aggregation.base2ExponentialBucketHistogram())
.setAttributeFilter(Collections.emptySet())
.build())
.registerMetricReader(reader)
.build();
Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
DoubleHistogram histogram = sdkMeter.histogramBuilder("testHistogram").build();

SdkTracerProvider tracerProvider = SdkTracerProvider.builder().build();
Tracer tracer = tracerProvider.get("foo");

Span span = tracer.spanBuilder("span").startSpan();
try (Scope unused = span.makeCurrent()) {
histogram.record(10, Attributes.builder().put("key", "value").build());
}

assertThat(reader.collectAllMetrics())
.satisfiesExactly(
metric ->
assertThat(metric)
.hasExponentialHistogramSatisfying(
exponentialHistogram ->
exponentialHistogram.hasPointsSatisfying(
point ->
point
.hasSum(10.0)
.hasAttributes(Attributes.empty())
.hasExemplarsSatisfying(
exemplar ->
exemplar
.hasValue(10.0)
.hasFilteredAttributes(
Attributes.builder()
.put("key", "value")
.build())))));
}

@Test
void collectMetrics_ExemplarsWithExplicitBucketHistogram() {
InMemoryMetricReader reader = InMemoryMetricReader.create();
SdkMeterProvider sdkMeterProvider =
SdkMeterProvider.builder()
.setClock(testClock)
.setResource(RESOURCE)
.registerView(
InstrumentSelector.builder().setName("*").build(),
View.builder().setAttributeFilter(Collections.emptySet()).build())
.registerMetricReader(reader)
.build();
Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
DoubleHistogram histogram = sdkMeter.histogramBuilder("testHistogram").build();

SdkTracerProvider tracerProvider = SdkTracerProvider.builder().build();
Tracer tracer = tracerProvider.get("foo");

Span span = tracer.spanBuilder("span").startSpan();
try (Scope unused = span.makeCurrent()) {
histogram.record(10, Attributes.builder().put("key", "value").build());
}

assertThat(reader.collectAllMetrics())
.satisfiesExactly(
metric ->
assertThat(metric)
.hasHistogramSatisfying(
explicitHistogram ->
explicitHistogram.hasPointsSatisfying(
point ->
point
.hasSum(10)
.hasAttributes(Attributes.empty())
.hasExemplarsSatisfying(
exemplar ->
exemplar
.hasValue(10.0)
.hasFilteredAttributes(
Attributes.builder()
.put("key", "value")
.build())))));
}

@Test
void stressTest() {
DoubleHistogram doubleHistogram = sdkMeter.histogramBuilder("testHistogram").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import io.opentelemetry.sdk.testing.time.TestClock;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -435,6 +439,97 @@ void longHistogramRecord_NonNegativeCheck() {
"Histograms can only record non-negative values. Instrument testHistogram has recorded a negative value.");
}

@Test
void collectMetrics_ExemplarsWithExponentialHistogram() {
InMemoryMetricReader reader = InMemoryMetricReader.create();
SdkMeterProvider sdkMeterProvider =
SdkMeterProvider.builder()
.setClock(testClock)
.setResource(RESOURCE)
.registerView(
InstrumentSelector.builder().setType(InstrumentType.HISTOGRAM).build(),
View.builder()
.setAggregation(Aggregation.base2ExponentialBucketHistogram())
.setAttributeFilter(Collections.emptySet())
.build())
.registerMetricReader(reader)
.build();
Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
LongHistogram histogram = sdkMeter.histogramBuilder("testHistogram").ofLongs().build();

SdkTracerProvider tracerProvider = SdkTracerProvider.builder().build();
Tracer tracer = tracerProvider.get("foo");

Span span = tracer.spanBuilder("span").startSpan();
try (Scope unused = span.makeCurrent()) {
histogram.record(10, Attributes.builder().put("key", "value").build());
}

assertThat(reader.collectAllMetrics())
.satisfiesExactly(
metric ->
assertThat(metric)
.hasExponentialHistogramSatisfying(
exponentialHistogram ->
exponentialHistogram.hasPointsSatisfying(
point ->
point
.hasSum(10.0)
.hasAttributes(Attributes.empty())
.hasExemplarsSatisfying(
exemplar ->
exemplar
.hasValue(10.0)
.hasFilteredAttributes(
Attributes.builder()
.put("key", "value")
.build())))));
}

@Test
void collectMetrics_ExemplarsWithExplicitBucketHistogram() {
InMemoryMetricReader reader = InMemoryMetricReader.create();
SdkMeterProvider sdkMeterProvider =
SdkMeterProvider.builder()
.setClock(testClock)
.setResource(RESOURCE)
.registerView(
InstrumentSelector.builder().setName("*").build(),
View.builder().setAttributeFilter(Collections.emptySet()).build())
.registerMetricReader(reader)
.build();
Meter sdkMeter = sdkMeterProvider.get(getClass().getName());
LongHistogram histogram = sdkMeter.histogramBuilder("testHistogram").ofLongs().build();

SdkTracerProvider tracerProvider = SdkTracerProvider.builder().build();
Tracer tracer = tracerProvider.get("foo");

Span span = tracer.spanBuilder("span").startSpan();
try (Scope unused = span.makeCurrent()) {
histogram.record(10, Attributes.builder().put("key", "value").build());
}

assertThat(reader.collectAllMetrics())
.satisfiesExactly(
metric ->
assertThat(metric)
.hasHistogramSatisfying(
explicitHistogram ->
explicitHistogram.hasPointsSatisfying(
point ->
point
.hasSum(10)
.hasAttributes(Attributes.empty())
.hasExemplarsSatisfying(
exemplar ->
exemplar
.hasValue(10.0)
.hasFilteredAttributes(
Attributes.builder()
.put("key", "value")
.build())))));
}

@Test
void stressTest() {
LongHistogram longHistogram = sdkMeter.histogramBuilder("testHistogram").ofLongs().build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.exemplar;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.context.Context;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class LongToDoubleExemplarReservoirTest {
@Mock ExemplarReservoir<?> delegate;

@Test
void offerDoubleMeasurement() {
ExemplarReservoir<?> filtered = new LongToDoubleExemplarReservoir<>(delegate);
filtered.offerDoubleMeasurement(1.0, Attributes.empty(), Context.root());
verify(delegate).offerDoubleMeasurement(1.0, Attributes.empty(), Context.root());
verify(delegate, never()).offerLongMeasurement(anyLong(), any(), any());
}

@Test
void offerLongMeasurement() {
ExemplarReservoir<?> filtered = new LongToDoubleExemplarReservoir<>(delegate);
filtered.offerLongMeasurement(1L, Attributes.empty(), Context.root());
verify(delegate).offerDoubleMeasurement(1.0, Attributes.empty(), Context.root());
verify(delegate, never()).offerLongMeasurement(anyLong(), any(), any());
}
}

0 comments on commit ff5595a

Please sign in to comment.