Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse aggregation handles for delta temporality #5176

Merged
merged 2 commits into from
Feb 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand All @@ -49,6 +51,8 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
private final ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles =
new ConcurrentHashMap<>();
private final AttributesProcessor attributesProcessor;
private final ConcurrentLinkedQueue<AggregatorHandle<T, U>> aggregatorHandlePool =
new ConcurrentLinkedQueue<>();

DefaultSynchronousMetricStorage(
RegisteredReader registeredReader,
Expand All @@ -65,6 +69,11 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
this.attributesProcessor = attributesProcessor;
}

// Visible for testing
Queue<AggregatorHandle<T, U>> getAggregatorHandlePool() {
return aggregatorHandlePool;
}

@Override
public void recordLong(long value, Attributes attributes, Context context) {
AggregatorHandle<T, U> handle = getAggregatorHandle(attributes, context);
Expand Down Expand Up @@ -99,7 +108,11 @@ private AggregatorHandle<T, U> getAggregatorHandle(Attributes attributes, Contex
+ ").");
return null;
}
AggregatorHandle<T, U> newHandle = aggregator.createHandle();
// Get handle from pool if available, else create a new one.
AggregatorHandle<T, U> newHandle = aggregatorHandlePool.poll();
if (newHandle == null) {
newHandle = aggregator.createHandle();
}
handle = aggregatorHandles.putIfAbsent(attributes, newHandle);
return handle != null ? handle : newHandle;
}
Expand All @@ -119,16 +132,25 @@ public MetricData collect(
// Grab aggregated points.
List<T> points = new ArrayList<>(aggregatorHandles.size());
for (Map.Entry<Attributes, AggregatorHandle<T, U>> entry : aggregatorHandles.entrySet()) {
T point = entry.getValue().aggregateThenMaybeReset(start, epochNanos, entry.getKey(), reset);
if (reset) {
aggregatorHandles.remove(entry.getKey(), entry.getValue());
// Return the aggregator to the pool.
aggregatorHandlePool.offer(entry.getValue());
}
T point = entry.getValue().aggregateThenMaybeReset(start, epochNanos, entry.getKey(), reset);
if (point == null) {
continue;
}
points.add(point);
}

// Trim pool down if needed. pool.size() will only exceed MAX_CARDINALITY if new handles are
// created during collection.
int toDelete = aggregatorHandlePool.size() - MAX_CARDINALITY;
for (int i = 0; i < toDelete; i++) {
aggregatorHandlePool.poll();
}

if (points.isEmpty()) {
return EmptyMetricData.getInstance();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.api.common.AttributeKey;
Expand All @@ -33,7 +36,6 @@
import io.opentelemetry.sdk.testing.time.TestClock;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mockito;

@SuppressLogger(DefaultSynchronousMetricStorage.class)
public class SynchronousMetricStorageTest {
Expand All @@ -55,16 +57,17 @@ public class SynchronousMetricStorageTest {
RegisteredReader.create(InMemoryMetricReader.create(), ViewRegistry.create());
private final TestClock testClock = TestClock.create();
private final Aggregator<LongPointData, LongExemplarData> aggregator =
((AggregatorFactory) Aggregation.sum())
.createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff());
spy(
((AggregatorFactory) Aggregation.sum())
.createAggregator(DESCRIPTOR, ExemplarFilter.alwaysOff()));
private final AttributesProcessor attributesProcessor = AttributesProcessor.noop();

@Test
void attributesProcessor_applied() {
Attributes attributes = Attributes.builder().put("K", "V").build();
AttributesProcessor attributesProcessor =
AttributesProcessor.append(Attributes.builder().put("modifiedK", "modifiedV").build());
AttributesProcessor spyAttributesProcessor = Mockito.spy(attributesProcessor);
AttributesProcessor spyAttributesProcessor = spy(attributesProcessor);
SynchronousMetricStorage storage =
new DefaultSynchronousMetricStorage<>(
cumulativeReader, METRIC_DESCRIPTOR, aggregator, spyAttributesProcessor);
Expand All @@ -80,13 +83,15 @@ void attributesProcessor_applied() {
}

@Test
void recordAndcollect_CumulativeDoesNotReset() {
SynchronousMetricStorage storage =
void recordAndCollect_CumulativeDoesNotReset() {
DefaultSynchronousMetricStorage<?, ?> storage =
new DefaultSynchronousMetricStorage<>(
cumulativeReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor);

// Record measurement and collect at time 10
storage.recordDouble(3, Attributes.empty(), Context.current());
verify(aggregator, times(1)).createHandle();
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
.hasDoubleSumSatisfying(
sum ->
Expand All @@ -97,6 +102,8 @@ void recordAndcollect_CumulativeDoesNotReset() {

// Record measurement and collect at time 30
storage.recordDouble(3, Attributes.empty(), Context.current());
verify(aggregator, times(1)).createHandle();
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30))
.hasDoubleSumSatisfying(
sum ->
Expand All @@ -107,6 +114,8 @@ void recordAndcollect_CumulativeDoesNotReset() {

// Record measurement and collect at time 35
storage.recordDouble(2, Attributes.empty(), Context.current());
verify(aggregator, times(1)).createHandle();
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35))
.hasDoubleSumSatisfying(
sum ->
Expand All @@ -116,44 +125,55 @@ void recordAndcollect_CumulativeDoesNotReset() {
}

@Test
void recordAndcollect_DeltaResets() {
SynchronousMetricStorage storage =
void recordAndCollect_DeltaResets() {
DefaultSynchronousMetricStorage<?, ?> storage =
new DefaultSynchronousMetricStorage<>(
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor);

// Record measurement and collect at time 10
storage.recordDouble(3, Attributes.empty(), Context.current());
verify(aggregator, times(1)).createHandle();
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
.hasDoubleSumSatisfying(
sum ->
sum.isDelta()
.hasPointsSatisfying(
point -> point.hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3)));
assertThat(storage.getAggregatorHandlePool()).hasSize(1);
deltaReader.setLastCollectEpochNanos(10);

// Record measurement and collect at time 30
storage.recordDouble(3, Attributes.empty(), Context.current());
// AggregatorHandle should be returned to the pool on reset so shouldn't create additional
// handles
verify(aggregator, times(1)).createHandle();
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30))
.hasDoubleSumSatisfying(
sum ->
sum.isDelta()
.hasPointsSatisfying(
point -> point.hasStartEpochNanos(10).hasEpochNanos(30).hasValue(3)));
assertThat(storage.getAggregatorHandlePool()).hasSize(1);
deltaReader.setLastCollectEpochNanos(30);

// Record measurement and collect at time 35
storage.recordDouble(2, Attributes.empty(), Context.current());
verify(aggregator, times(1)).createHandle();
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 35))
.hasDoubleSumSatisfying(
sum ->
sum.isDelta()
.hasPointsSatisfying(
point -> point.hasStartEpochNanos(30).hasEpochNanos(35).hasValue(2)));
assertThat(storage.getAggregatorHandlePool()).hasSize(1);
}

@Test
void recordAndCollect_CumulativeAtLimit() {
SynchronousMetricStorage storage =
DefaultSynchronousMetricStorage<?, ?> storage =
new DefaultSynchronousMetricStorage<>(
cumulativeReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor);

Expand All @@ -162,6 +182,8 @@ void recordAndCollect_CumulativeAtLimit() {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
verify(aggregator, times(MetricStorage.MAX_CARDINALITY)).createHandle();
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
.hasDoubleSumSatisfying(
sum ->
Expand All @@ -175,6 +197,7 @@ void recordAndCollect_CumulativeAtLimit() {
assertThat(point.getEpochNanos()).isEqualTo(10);
assertThat(point.getValue()).isEqualTo(3);
})));
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(logs.getEvents()).isEmpty();
cumulativeReader.setLastCollectEpochNanos(10);

Expand All @@ -183,6 +206,9 @@ void recordAndCollect_CumulativeAtLimit() {
3,
Attributes.builder().put("key", "value" + MetricStorage.MAX_CARDINALITY + 1).build(),
Context.current());
// Should not create additional handles after MAX_CARDINALITY is reached
verify(aggregator, times(MetricStorage.MAX_CARDINALITY)).createHandle();
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20))
.hasDoubleSumSatisfying(
sum ->
Expand All @@ -202,12 +228,13 @@ void recordAndCollect_CumulativeAtLimit() {
.getAttributes()
.get(AttributeKey.stringKey("key"))
.equals("value" + MetricStorage.MAX_CARDINALITY + 1))));
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
logs.assertContains("Instrument name has exceeded the maximum allowed cardinality");
}

@Test
void recordAndCollect_DeltaAtLimit() {
SynchronousMetricStorage storage =
DefaultSynchronousMetricStorage<?, ?> storage =
new DefaultSynchronousMetricStorage<>(
deltaReader, METRIC_DESCRIPTOR, aggregator, attributesProcessor);

Expand All @@ -216,6 +243,8 @@ void recordAndCollect_DeltaAtLimit() {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
verify(aggregator, times(MetricStorage.MAX_CARDINALITY)).createHandle();
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 10))
.hasDoubleSumSatisfying(
sum ->
Expand All @@ -229,6 +258,7 @@ void recordAndCollect_DeltaAtLimit() {
assertThat(point.getEpochNanos()).isEqualTo(10);
assertThat(point.getValue()).isEqualTo(3);
})));
assertThat(storage.getAggregatorHandlePool()).hasSize(MetricStorage.MAX_CARDINALITY);
assertThat(logs.getEvents()).isEmpty();
deltaReader.setLastCollectEpochNanos(10);

Expand All @@ -237,6 +267,9 @@ void recordAndCollect_DeltaAtLimit() {
3,
Attributes.builder().put("key", "value" + MetricStorage.MAX_CARDINALITY + 1).build(),
Context.current());
// Should use handle returned to pool instead of creating new ones
verify(aggregator, times(MetricStorage.MAX_CARDINALITY)).createHandle();
assertThat(storage.getAggregatorHandlePool()).hasSize(MetricStorage.MAX_CARDINALITY - 1);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 20))
.hasDoubleSumSatisfying(
sum ->
Expand All @@ -251,6 +284,7 @@ void recordAndCollect_DeltaAtLimit() {
Attributes.builder()
.put("key", "value" + MetricStorage.MAX_CARDINALITY + 1)
.build())));
assertThat(storage.getAggregatorHandlePool()).hasSize(MetricStorage.MAX_CARDINALITY);
assertThat(logs.getEvents()).isEmpty();
deltaReader.setLastCollectEpochNanos(20);

Expand All @@ -259,6 +293,9 @@ void recordAndCollect_DeltaAtLimit() {
storage.recordDouble(
3, Attributes.builder().put("key", "value" + i).build(), Context.current());
}
// Should use handles returned to pool instead of creating new ones
verify(aggregator, times(MetricStorage.MAX_CARDINALITY)).createHandle();
assertThat(storage.getAggregatorHandlePool()).hasSize(0);
assertThat(storage.collect(RESOURCE, INSTRUMENTATION_SCOPE_INFO, 0, 30))
.hasDoubleSumSatisfying(
sum ->
Expand All @@ -278,6 +315,7 @@ void recordAndCollect_DeltaAtLimit() {
.getAttributes()
.get(AttributeKey.stringKey("key"))
.equals("value" + MetricStorage.MAX_CARDINALITY + 1))));
assertThat(storage.getAggregatorHandlePool()).hasSize(MetricStorage.MAX_CARDINALITY);
logs.assertContains("Instrument name has exceeded the maximum allowed cardinality");
}
}