Skip to content

Commit

Permalink
Reuse aggregation handles for delta temporality
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg committed Feb 3, 2023
1 parent 249d097 commit 7c88492
Showing 1 changed file with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand All @@ -49,6 +50,7 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
private final ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles =
new ConcurrentHashMap<>();
private final AttributesProcessor attributesProcessor;
private final ConcurrentLinkedQueue<AggregatorHandle<T, U>> pool = new ConcurrentLinkedQueue<>();

DefaultSynchronousMetricStorage(
RegisteredReader registeredReader,
Expand Down Expand Up @@ -99,7 +101,11 @@ private AggregatorHandle<T, U> getAggregatorHandle(Attributes attributes, Contex
+ ").");
return null;
}
AggregatorHandle<T, U> newHandle = aggregator.createHandle();
// Get handle from pool if available, else create a new one.
AggregatorHandle<T, U> newHandle = pool.poll();
if (newHandle == null) {
newHandle = aggregator.createHandle();
}
handle = aggregatorHandles.putIfAbsent(attributes, newHandle);
return handle != null ? handle : newHandle;
}
Expand All @@ -119,16 +125,24 @@ public MetricData collect(
// Grab aggregated points.
List<T> points = new ArrayList<>(aggregatorHandles.size());
for (Map.Entry<Attributes, AggregatorHandle<T, U>> entry : aggregatorHandles.entrySet()) {
T point = entry.getValue().aggregateThenMaybeReset(start, epochNanos, entry.getKey(), reset);
if (reset) {
aggregatorHandles.remove(entry.getKey(), entry.getValue());
// Return the aggregator to the pool.
pool.offer(entry.getValue());
}
T point = entry.getValue().aggregateThenMaybeReset(start, epochNanos, entry.getKey(), reset);
if (point == null) {
continue;
}
points.add(point);
}

// Trim pool down if needed
int toDelete = pool.size() - MAX_CARDINALITY;
for (int i = 0; i < toDelete; i++) {
pool.poll();
}

if (points.isEmpty()) {
return EmptyMetricData.getInstance();
}
Expand Down

0 comments on commit 7c88492

Please sign in to comment.