Skip to content

Commit

Permalink
Add adaptable circular buffer implementation for ExponentialCounter. (#…
Browse files Browse the repository at this point in the history
…4087)

* Add adaptable circular buffer implementation for ExponentialCounter and expose hooks to test its use in Exponential Histogram aggregator.

* Clean up some adapting circular buffer code.

* Fix style issues.

* Apply spotless.

* Add tests for adapting integer array.

* Finish wiring ability to remember previous integer cell size and expand testing.

* Update array copy from code review.

* Fixes/cleanups from review.

- Fix a bug in equality where it was forcing ExponentialCounter to have
  the same offset, even if it had stored 0 counts in all buckets. This
  interacts negatively with merge/diff tests where creating a fresh
  exponential bucket would have different indexStart then diff-ing
  another.
- Modify default exponential bucket counter to be adapting circular
  buffer.
- Remove some not-well-though-out methods (like zeroOf, zeroFrom) in
  favor of a "clear" method on ExponentialCounter
- Modify ExponentialBucketStrategy to be an actual implementation.

* Improve testing of copy behavior across exponential-counter implementations.

* Last fix/cleanup for PR.  Remove remaining TODO around preserving runtime optimisations.

* Fixes from review.

* Add test to ensure 0 is returned from exponential counters outside popualted range.

* Add a bunch of extra equality tests.

* run spotless.

* Add note about equality.

* Add copy() method to AdaptingIntegerArray, update tests.

* Fix checkstyle.

* Add internal disclaimer, reduce visibility of test classes

Co-authored-by: jack-berg <[email protected]>
  • Loading branch information
jsuereth and jack-berg authored Feb 4, 2022
1 parent 0ed4967 commit 5c1bd6c
Show file tree
Hide file tree
Showing 15 changed files with 892 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.sdk.metrics.internal.aggregator;

import io.opentelemetry.sdk.metrics.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounterFactory;
import java.util.Collections;

/** The types of histogram aggregation to benchmark. */
Expand All @@ -20,7 +21,20 @@ public enum HistogramAggregationParam {
new DoubleHistogramAggregator(
ExplicitBucketHistogramUtils.createBoundaryArray(Collections.emptyList()),
ExemplarReservoir::noSamples)),
EXPONENTIAL(new DoubleExponentialHistogramAggregator(ExemplarReservoir::noSamples));
EXPONENTIAL_SMALL_CIRCULAR_BUFFER(
new DoubleExponentialHistogramAggregator(
ExemplarReservoir::noSamples,
ExponentialBucketStrategy.newStrategy(
20, 20, ExponentialCounterFactory.circularBufferCounter()))),
EXPONENTIAL_CIRCULAR_BUFFER(
new DoubleExponentialHistogramAggregator(
ExemplarReservoir::noSamples,
ExponentialBucketStrategy.newStrategy(
20, 320, ExponentialCounterFactory.circularBufferCounter()))),
EXPONENTIAL_MAP_COUNTER(
new DoubleExponentialHistogramAggregator(
ExemplarReservoir::noSamples,
ExponentialBucketStrategy.newStrategy(20, 320, ExponentialCounterFactory.mapCounter())));

private final Aggregator<?> aggregator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounterFactory;
import io.opentelemetry.sdk.resources.Resource;
import java.util.List;
import java.util.Map;
Expand All @@ -22,14 +23,24 @@ final class DoubleExponentialHistogramAggregator
implements Aggregator<ExponentialHistogramAccumulation> {

private final Supplier<ExemplarReservoir> reservoirSupplier;
private final ExponentialBucketStrategy bucketStrategy;

DoubleExponentialHistogramAggregator(Supplier<ExemplarReservoir> reservoirSupplier) {
this(
reservoirSupplier,
ExponentialBucketStrategy.newStrategy(
20, 320, ExponentialCounterFactory.circularBufferCounter()));
}

DoubleExponentialHistogramAggregator(
Supplier<ExemplarReservoir> reservoirSupplier, ExponentialBucketStrategy bucketStrategy) {
this.reservoirSupplier = reservoirSupplier;
this.bucketStrategy = bucketStrategy;
}

@Override
public AggregatorHandle<ExponentialHistogramAccumulation> createHandle() {
return new Handle(reservoirSupplier.get());
return new Handle(reservoirSupplier.get(), this.bucketStrategy);
}

/**
Expand Down Expand Up @@ -132,32 +143,36 @@ public MetricData toMetricData(
}

static final class Handle extends AggregatorHandle<ExponentialHistogramAccumulation> {

private int scale;
private DoubleExponentialHistogramBuckets positiveBuckets;
private DoubleExponentialHistogramBuckets negativeBuckets;
private final ExponentialBucketStrategy bucketStrategy;
private final DoubleExponentialHistogramBuckets positiveBuckets;
private final DoubleExponentialHistogramBuckets negativeBuckets;
private long zeroCount;
private double sum;

Handle(ExemplarReservoir reservoir) {
Handle(ExemplarReservoir reservoir, ExponentialBucketStrategy bucketStrategy) {
super(reservoir);
this.sum = 0;
this.zeroCount = 0;
this.scale = DoubleExponentialHistogramBuckets.MAX_SCALE;
this.positiveBuckets = new DoubleExponentialHistogramBuckets();
this.negativeBuckets = new DoubleExponentialHistogramBuckets();
this.bucketStrategy = bucketStrategy;
this.positiveBuckets = this.bucketStrategy.newBuckets();
this.negativeBuckets = this.bucketStrategy.newBuckets();
}

@Override
protected synchronized ExponentialHistogramAccumulation doAccumulateThenReset(
List<ExemplarData> exemplars) {
ExponentialHistogramAccumulation acc =
ExponentialHistogramAccumulation.create(
scale, sum, positiveBuckets, negativeBuckets, zeroCount, exemplars);
this.positiveBuckets.getScale(),
sum,
positiveBuckets.copy(),
negativeBuckets.copy(),
zeroCount,
exemplars);
this.sum = 0;
this.zeroCount = 0;
this.positiveBuckets = new DoubleExponentialHistogramBuckets();
this.negativeBuckets = new DoubleExponentialHistogramBuckets();
this.positiveBuckets.clear();
this.negativeBuckets.clear();
return acc;
}

Expand All @@ -180,6 +195,8 @@ protected synchronized void doRecordDouble(double value) {
// Record; If recording fails, calculate scale reduction and scale down to fit new value.
// 2nd attempt at recording should work with new scale
DoubleExponentialHistogramBuckets buckets = (c > 0) ? positiveBuckets : negativeBuckets;
// TODO: We should experiment with downscale on demand during sync execution and only
// unifying scale factor between positive/negative at collection time (doAccumulate).
if (!buckets.record(value)) {
// getScaleReduction() used with downScale() will scale down as required to record value,
// fit inside max allowed buckets, and make sure index can be represented by int.
Expand All @@ -196,7 +213,6 @@ protected void doRecordLong(long value) {
void downScale(int by) {
positiveBuckets.downscale(by);
negativeBuckets.downscale(by);
this.scale -= by;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.opentelemetry.sdk.internal.PrimitiveLongList;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets;
import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounter;
import io.opentelemetry.sdk.metrics.internal.state.MapCounter;
import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounterFactory;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;
Expand All @@ -23,27 +23,37 @@
*/
final class DoubleExponentialHistogramBuckets implements ExponentialHistogramBuckets {

public static final int MAX_SCALE = 20;

private static final int MAX_BUCKETS = MapCounter.MAX_SIZE;

private final ExponentialCounterFactory counterFactory;
private ExponentialCounter counts;
private BucketMapper bucketMapper;
private int scale;

DoubleExponentialHistogramBuckets() {
this.counts = new MapCounter();
this.bucketMapper = new LogarithmMapper(MAX_SCALE);
this.scale = MAX_SCALE;
DoubleExponentialHistogramBuckets(
int scale, int maxBuckets, ExponentialCounterFactory counterFactory) {
this.counterFactory = counterFactory;
this.counts = counterFactory.newCounter(maxBuckets);
this.bucketMapper = new LogarithmMapper(scale);
this.scale = scale;
}

// For copying
DoubleExponentialHistogramBuckets(DoubleExponentialHistogramBuckets buckets) {
this.counts = new MapCounter(buckets.counts); // copy counts
this.counterFactory = buckets.counterFactory;
this.counts = counterFactory.copy(buckets.counts);
this.bucketMapper = new LogarithmMapper(buckets.scale);
this.scale = buckets.scale;
}

/** Returns a copy of this bucket. */
DoubleExponentialHistogramBuckets copy() {
return new DoubleExponentialHistogramBuckets(this);
}

/** Resets all counters in this bucket set to zero, but preserves scale. */
public void clear() {
this.counts.clear();
}

boolean record(double value) {
if (value == 0.0) {
// Guarded by caller. If passed 0 it would be a bug in the SDK.
Expand All @@ -55,6 +65,12 @@ boolean record(double value) {

@Override
public int getOffset() {
// We need to unify the behavior of empty buckets.
// Unfortunately, getIndexStart is not meaningful for empty counters, so we default to
// returning 0 for offset and an empty list.
if (counts.isEmpty()) {
return 0;
}
return counts.getIndexStart();
}

Expand All @@ -74,6 +90,9 @@ public List<Long> getBucketCounts() {

@Override
public long getTotalCount() {
if (counts.isEmpty()) {
return 0;
}
long totalCount = 0;
for (int i = counts.getIndexStart(); i <= counts.getIndexEnd(); i++) {
totalCount += counts.get(i);
Expand All @@ -90,7 +109,11 @@ void downscale(int by) {
}

if (!counts.isEmpty()) {
ExponentialCounter newCounts = new MapCounter();
// We want to preserve other optimisations here as well, e.g. integer size.
// Instead of creating a new counter, we copy the existing one (for bucket size
// optimisations), and clear the values before writing the new ones.
ExponentialCounter newCounts = counterFactory.copy(counts);
newCounts.clear();

for (int i = counts.getIndexStart(); i <= counts.getIndexEnd(); i++) {
long count = counts.get(i);
Expand All @@ -117,7 +140,7 @@ void downscale(int by) {
*/
static DoubleExponentialHistogramBuckets diff(
DoubleExponentialHistogramBuckets a, DoubleExponentialHistogramBuckets b) {
DoubleExponentialHistogramBuckets copy = new DoubleExponentialHistogramBuckets(a);
DoubleExponentialHistogramBuckets copy = a.copy();
copy.mergeWith(b, /* additive= */ false);
return copy;
}
Expand All @@ -133,11 +156,11 @@ static DoubleExponentialHistogramBuckets diff(
static DoubleExponentialHistogramBuckets merge(
DoubleExponentialHistogramBuckets a, DoubleExponentialHistogramBuckets b) {
if (b.counts.isEmpty()) {
return new DoubleExponentialHistogramBuckets(a);
return a;
} else if (a.counts.isEmpty()) {
return new DoubleExponentialHistogramBuckets(b);
return b;
}
DoubleExponentialHistogramBuckets copy = new DoubleExponentialHistogramBuckets(a);
DoubleExponentialHistogramBuckets copy = a.copy();
copy.mergeWith(b, /* additive= */ true);
return copy;
}
Expand Down Expand Up @@ -218,7 +241,7 @@ int getScaleReduction(double value) {
int getScaleReduction(long newStart, long newEnd) {
int scaleReduction = 0;

while (newEnd - newStart + 1 > MAX_BUCKETS) {
while (newEnd - newStart + 1 > counts.getMaxSize()) {
newStart >>= 1;
newEnd >>= 1;
scaleReduction++;
Expand All @@ -234,19 +257,48 @@ public boolean equals(@Nullable Object obj) {
DoubleExponentialHistogramBuckets other = (DoubleExponentialHistogramBuckets) obj;
// Don't need to compare getTotalCount() because equivalent bucket counts
// imply equivalent overall count.
return getBucketCounts().equals(other.getBucketCounts())
&& this.getOffset() == other.getOffset()
&& this.scale == other.scale;
// Additionally, we compare the "semantics" of bucket counts, that is
// it's ok for getOffset() to diverge as long as the populated counts remain
// the same. This is because we don't "normalize" buckets after doing
// difference/subtraction operations.
return this.scale == other.scale && sameBucketCounts(other);
}

/**
* Tests if two bucket counts are equivalent semantically.
*
* <p>Semantic equivalence means:
*
* <ul>
* <li>All counts are stored between indexStart/indexEnd.
* <li>Offset does NOT need to be the same
* </ul>
*/
private boolean sameBucketCounts(DoubleExponentialHistogramBuckets other) {
int min = Math.min(this.counts.getIndexStart(), other.counts.getIndexStart());
int max = Math.max(this.counts.getIndexEnd(), other.counts.getIndexEnd());
for (int idx = min; idx <= max; idx++) {
if (this.counts.get(idx) != other.counts.get(idx)) {
return false;
}
}
return true;
}

@Override
public int hashCode() {
int hash = 1;
hash *= 1000003;
hash ^= getOffset();
hash *= 1000003;
hash ^= getBucketCounts().hashCode();
hash *= 1000003;
// We need a new algorithm here that lines up w/ equals, so we only use non-zero counts.
for (int idx = this.counts.getIndexStart(); idx <= this.counts.getIndexEnd(); idx++) {
long count = this.counts.get(idx);
if (count != 0) {
hash ^= idx;
hash *= 1000003;
hash = (int) (hash ^ count);
hash *= 1000003;
}
}
hash ^= scale;
// Don't need to hash getTotalCount() because equivalent bucket
// counts imply equivalent overall count.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.aggregator;

import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounterFactory;

/** The configuration for how to create exponential histogram buckets. */
final class ExponentialBucketStrategy {
/** Starting scale of exponential buckets. */
private final int scale;
/** The maximum number of buckets that will be used for positive or negative recordings. */
private final int maxBuckets;
/** The mechanism of constructing and copying buckets. */
private final ExponentialCounterFactory counterFactory;

private ExponentialBucketStrategy(
int scale, int maxBuckets, ExponentialCounterFactory counterFactory) {
this.scale = scale;
this.maxBuckets = maxBuckets;
this.counterFactory = counterFactory;
}

/** Constructs fresh new buckets with default settings. */
DoubleExponentialHistogramBuckets newBuckets() {
return new DoubleExponentialHistogramBuckets(scale, maxBuckets, counterFactory);
}

/** Create a new strategy for generating Exponential Buckets. */
static ExponentialBucketStrategy newStrategy(
int scale, int maxBuckets, ExponentialCounterFactory counterFactory) {
return new ExponentialBucketStrategy(scale, maxBuckets, counterFactory);
}
}
Loading

0 comments on commit 5c1bd6c

Please sign in to comment.