Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add adaptable circular buffer implementation for ExponentialCounter. #4087

Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c650018
Add adaptable circular buffer implementation for ExponentialCounter a…
jsuereth Jan 12, 2022
0ed0b6b
Clean up some adapting circular buffer code.
jsuereth Jan 13, 2022
dd011bf
Fix style issues.
jsuereth Jan 17, 2022
110d897
Apply spotless.
jsuereth Jan 17, 2022
42ce93b
Add tests for adapting integer array.
jsuereth Jan 17, 2022
6fef5df
Finish wiring ability to remember previous integer cell size and expa…
jsuereth Jan 19, 2022
c2effc5
Update array copy from code review.
jsuereth Jan 26, 2022
d31a643
Fixes/cleanups from review.
jsuereth Jan 26, 2022
242e01f
Improve testing of copy behavior across exponential-counter implement…
jsuereth Jan 26, 2022
040655a
Last fix/cleanup for PR. Remove remaining TODO around preserving run…
jsuereth Jan 27, 2022
ffde91a
Fixes from review.
jsuereth Jan 28, 2022
f07dba5
Merge remote-tracking branch 'otel/main' into jsuereth/wip-expontenti…
jsuereth Jan 31, 2022
49785d6
Add test to ensure 0 is returned from exponential counters outside po…
jsuereth Jan 31, 2022
5e5a391
Add a bunch of extra equality tests.
jsuereth Jan 31, 2022
2abffed
run spotless.
jsuereth Feb 1, 2022
0315f68
Add note about equality.
jsuereth Feb 1, 2022
4019280
Merge remote-tracking branch 'otel/main' into jsuereth/wip-expontenti…
jsuereth Feb 1, 2022
c1439e0
Add copy() method to AdaptingIntegerArray, update tests.
jsuereth Feb 1, 2022
7e2e1c9
Fix checkstyle.
jsuereth Feb 1, 2022
bd8a251
Add internal disclaimer, reduce visibility of test classes
jack-berg Feb 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.sdk.metrics.internal.aggregator;

import io.opentelemetry.sdk.metrics.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounterFactory;
import java.util.Collections;

/** The types of histogram aggregation to benchmark. */
Expand All @@ -20,7 +21,20 @@ public enum HistogramAggregationParam {
new DoubleHistogramAggregator(
ExplicitBucketHistogramUtils.createBoundaryArray(Collections.emptyList()),
ExemplarReservoir::noSamples)),
EXPONENTIAL(new DoubleExponentialHistogramAggregator(ExemplarReservoir::noSamples));
EXPONENTIAL_SMALL_CIRCULAR_BUFFER(
new DoubleExponentialHistogramAggregator(
ExemplarReservoir::noSamples,
ExponentialBucketStrategy.newStrategy(
20, 20, ExponentialCounterFactory.circularBufferCounter()))),
EXPONENTIAL_CIRCULAR_BUFFER(
new DoubleExponentialHistogramAggregator(
ExemplarReservoir::noSamples,
ExponentialBucketStrategy.newStrategy(
20, 320, ExponentialCounterFactory.circularBufferCounter()))),
EXPONENTIAL_MAP_COUNTER(
new DoubleExponentialHistogramAggregator(
ExemplarReservoir::noSamples,
ExponentialBucketStrategy.newStrategy(20, 320, ExponentialCounterFactory.mapCounter())));

private final Aggregator<?> aggregator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounterFactory;
import io.opentelemetry.sdk.resources.Resource;
import java.util.List;
import java.util.Map;
Expand All @@ -22,14 +23,24 @@ final class DoubleExponentialHistogramAggregator
implements Aggregator<ExponentialHistogramAccumulation> {

private final Supplier<ExemplarReservoir> reservoirSupplier;
private final ExponentialBucketStrategy bucketStrategy;

DoubleExponentialHistogramAggregator(Supplier<ExemplarReservoir> reservoirSupplier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you foresee the instrument / user API looking like for this? Something like new ExponentialHistogramInstrument(maxbuckets, startingScale)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a discussion on the specification right now, but I'll give you my thoughts:

  1. If we make a user specify anything by default, we've failed (fundamentally) on the goal of these histograms.
  2. If users want to specify to optimise their use case, it should match their expectations of data. Specifically, We can allow:
    • Users may give us a dynamic (base) range. E.g. I want to record between (0.0, 4000.0).
    • Users may give us the maximum # of buckets (or RAM) they want to use to store for histograms.
    • The SDK would calculate initial scale + bucket maximum based on these values.

this(
reservoirSupplier,
ExponentialBucketStrategy.newStrategy(
20, 320, ExponentialCounterFactory.circularBufferCounter()));
}

DoubleExponentialHistogramAggregator(
Supplier<ExemplarReservoir> reservoirSupplier, ExponentialBucketStrategy bucketStrategy) {
this.reservoirSupplier = reservoirSupplier;
this.bucketStrategy = bucketStrategy;
}

@Override
public AggregatorHandle<ExponentialHistogramAccumulation> createHandle() {
return new Handle(reservoirSupplier.get());
return new Handle(reservoirSupplier.get(), this.bucketStrategy);
}

/**
Expand Down Expand Up @@ -132,32 +143,36 @@ public MetricData toMetricData(
}

static final class Handle extends AggregatorHandle<ExponentialHistogramAccumulation> {

private int scale;
private DoubleExponentialHistogramBuckets positiveBuckets;
private DoubleExponentialHistogramBuckets negativeBuckets;
private final ExponentialBucketStrategy bucketStrategy;
private final DoubleExponentialHistogramBuckets positiveBuckets;
private final DoubleExponentialHistogramBuckets negativeBuckets;
private long zeroCount;
private double sum;

Handle(ExemplarReservoir reservoir) {
Handle(ExemplarReservoir reservoir, ExponentialBucketStrategy bucketStrategy) {
super(reservoir);
this.sum = 0;
this.zeroCount = 0;
this.scale = DoubleExponentialHistogramBuckets.MAX_SCALE;
this.positiveBuckets = new DoubleExponentialHistogramBuckets();
this.negativeBuckets = new DoubleExponentialHistogramBuckets();
this.bucketStrategy = bucketStrategy;
this.positiveBuckets = this.bucketStrategy.newBuckets();
this.negativeBuckets = this.bucketStrategy.newBuckets();
}

@Override
protected synchronized ExponentialHistogramAccumulation doAccumulateThenReset(
List<ExemplarData> exemplars) {
ExponentialHistogramAccumulation acc =
ExponentialHistogramAccumulation.create(
scale, sum, positiveBuckets, negativeBuckets, zeroCount, exemplars);
this.positiveBuckets.getScale(),
sum,
positiveBuckets.copy(),
negativeBuckets.copy(),
zeroCount,
exemplars);
this.sum = 0;
this.zeroCount = 0;
this.positiveBuckets = new DoubleExponentialHistogramBuckets();
this.negativeBuckets = new DoubleExponentialHistogramBuckets();
this.positiveBuckets.clear();
this.negativeBuckets.clear();
return acc;
}

Expand All @@ -180,6 +195,8 @@ protected synchronized void doRecordDouble(double value) {
// Record; If recording fails, calculate scale reduction and scale down to fit new value.
// 2nd attempt at recording should work with new scale
DoubleExponentialHistogramBuckets buckets = (c > 0) ? positiveBuckets : negativeBuckets;
// TODO: We should experiment with downscale on demand during sync execution and only
// unifying scale factor between positive/negative at collection time (doAccumulate).
if (!buckets.record(value)) {
// getScaleReduction() used with downScale() will scale down as required to record value,
// fit inside max allowed buckets, and make sure index can be represented by int.
Expand All @@ -196,7 +213,6 @@ protected void doRecordLong(long value) {
void downScale(int by) {
positiveBuckets.downscale(by);
negativeBuckets.downscale(by);
this.scale -= by;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.opentelemetry.sdk.internal.PrimitiveLongList;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets;
import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounter;
import io.opentelemetry.sdk.metrics.internal.state.MapCounter;
import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounterFactory;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;
Expand All @@ -23,27 +23,37 @@
*/
final class DoubleExponentialHistogramBuckets implements ExponentialHistogramBuckets {

public static final int MAX_SCALE = 20;

private static final int MAX_BUCKETS = MapCounter.MAX_SIZE;

private final ExponentialCounterFactory counterFactory;
private ExponentialCounter counts;
private BucketMapper bucketMapper;
private int scale;

DoubleExponentialHistogramBuckets() {
this.counts = new MapCounter();
this.bucketMapper = new LogarithmMapper(MAX_SCALE);
this.scale = MAX_SCALE;
DoubleExponentialHistogramBuckets(
int scale, int maxBuckets, ExponentialCounterFactory counterFactory) {
this.counterFactory = counterFactory;
this.counts = counterFactory.newCounter(maxBuckets);
this.bucketMapper = new LogarithmMapper(scale);
this.scale = scale;
}

// For copying
DoubleExponentialHistogramBuckets(DoubleExponentialHistogramBuckets buckets) {
this.counts = new MapCounter(buckets.counts); // copy counts
this.counterFactory = buckets.counterFactory;
this.counts = counterFactory.copy(buckets.counts);
this.bucketMapper = new LogarithmMapper(buckets.scale);
this.scale = buckets.scale;
}

/** Returns a copy of this bucket. */
DoubleExponentialHistogramBuckets copy() {
return new DoubleExponentialHistogramBuckets(this);
}

/** Resets all counters in this bucket set to zero, but preserves scale. */
public void clear() {
this.counts.clear();
}

boolean record(double value) {
if (value == 0.0) {
// Guarded by caller. If passed 0 it would be a bug in the SDK.
Expand All @@ -55,6 +65,12 @@ boolean record(double value) {

@Override
public int getOffset() {
// We need to unify the behavior of empty buckets.
// Unfortunately, getIndexStart is not meaningful for empty counters, so we default to
// returning 0 for offset and an empty list.
if (counts.isEmpty()) {
return 0;
}
return counts.getIndexStart();
}

Expand All @@ -74,6 +90,9 @@ public List<Long> getBucketCounts() {

@Override
public long getTotalCount() {
if (counts.isEmpty()) {
return 0;
}
long totalCount = 0;
for (int i = counts.getIndexStart(); i <= counts.getIndexEnd(); i++) {
totalCount += counts.get(i);
Expand All @@ -90,7 +109,11 @@ void downscale(int by) {
}

if (!counts.isEmpty()) {
ExponentialCounter newCounts = new MapCounter();
// We want to preserve other optimisations here as well, e.g. integer size.
// Instead of creating a new counter, we copy the existing one (for bucket size
// optimisations), and clear the values before writing the new ones.
ExponentialCounter newCounts = counterFactory.copy(counts);
newCounts.clear();

for (int i = counts.getIndexStart(); i <= counts.getIndexEnd(); i++) {
long count = counts.get(i);
Expand All @@ -117,7 +140,7 @@ void downscale(int by) {
*/
static DoubleExponentialHistogramBuckets diff(
DoubleExponentialHistogramBuckets a, DoubleExponentialHistogramBuckets b) {
DoubleExponentialHistogramBuckets copy = new DoubleExponentialHistogramBuckets(a);
DoubleExponentialHistogramBuckets copy = a.copy();
copy.mergeWith(b, /* additive= */ false);
return copy;
}
Expand All @@ -133,11 +156,11 @@ static DoubleExponentialHistogramBuckets diff(
static DoubleExponentialHistogramBuckets merge(
DoubleExponentialHistogramBuckets a, DoubleExponentialHistogramBuckets b) {
if (b.counts.isEmpty()) {
return new DoubleExponentialHistogramBuckets(a);
return a;
} else if (a.counts.isEmpty()) {
return new DoubleExponentialHistogramBuckets(b);
return b;
}
DoubleExponentialHistogramBuckets copy = new DoubleExponentialHistogramBuckets(a);
DoubleExponentialHistogramBuckets copy = a.copy();
copy.mergeWith(b, /* additive= */ true);
return copy;
}
Expand Down Expand Up @@ -218,7 +241,7 @@ int getScaleReduction(double value) {
int getScaleReduction(long newStart, long newEnd) {
int scaleReduction = 0;

while (newEnd - newStart + 1 > MAX_BUCKETS) {
while (newEnd - newStart + 1 > counts.getMaxSize()) {
newStart >>= 1;
newEnd >>= 1;
scaleReduction++;
Expand All @@ -234,19 +257,44 @@ public boolean equals(@Nullable Object obj) {
DoubleExponentialHistogramBuckets other = (DoubleExponentialHistogramBuckets) obj;
// Don't need to compare getTotalCount() because equivalent bucket counts
// imply equivalent overall count.
return getBucketCounts().equals(other.getBucketCounts())
&& this.getOffset() == other.getOffset()
&& this.scale == other.scale;
return this.scale == other.scale && sameBucketCounts(other);
Copy link
Contributor

@jamesmoessis jamesmoessis Jan 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have you omitted the offset from hashCode() and equals()? I would think offset needs to be the same for two bucket sets to be strictly equal.

edit: oh I see, you are comparing across an extended window of both buckets. I suppose two bucket sets can have different offset but represent the same measurements. The buckets with the lower offset must have all zeroes up until the first non-zero bucket on the other bucket set in order for this to be the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly the case. Additionally the other bucket needs to have ONLY zeros after the first bucket set ends.

We could fix this in the diff method with a "normalize" type method that ensures the first index is populated. I'm fine going that route if you think it's cleaner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely worth a comment in here, either way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agreed, could use a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point. I had documented the other method, but in-context, it wasn't very good. Added here.

}

/**
* Tests if two bucket counts are equivalent semantically.
*
* <p>Semantic equivalence means:
*
* <ul>
* <li>All counts are stored between indexStart/indexEnd.
* <li>Offset does NOT need to be the same
* </ul>
*/
private boolean sameBucketCounts(DoubleExponentialHistogramBuckets other) {
int min = Math.min(this.counts.getIndexStart(), other.counts.getIndexStart());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you confirm this logic? Shouldn't it be something like a = this.start, b = other.start, a <= this.end && b <= other.end; a++ && b++)?

It looks like we could refer to an index that is below start for one of the two buckets right now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, actually that's the point.

What can happen right now (with diff) is that we have a bucket, e.g. at index 2 which "diffs" to 0. In the unit tests, we instantiate the histogram-buckets by saying "You should see count 5 at index 3", which means we have one bucket with index start @ 3 and another with index start @ 2. They're semantically equivalent counts.

The previous MapCounter implementation actually did GC so any count that reached zero was removed from the Map. Unfortunately, that's not really feasible in the circular buffer implementation.

Copy link
Contributor

@jamesmoessis jamesmoessis Jan 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be what you are saying:

bucket counts: 
0,0,5,1,2,0,0, == 5,1,2

Makes sense to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I didn't notice the change from throwing to returning 0 for those indices. Can you confirm there is a unit test for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

int max = Math.max(this.counts.getIndexEnd(), other.counts.getIndexEnd());
for (int idx = min; idx <= max; idx++) {
if (this.counts.get(idx) != other.counts.get(idx)) {
return false;
}
}
return true;
}

@Override
public int hashCode() {
int hash = 1;
hash *= 1000003;
hash ^= getOffset();
hash *= 1000003;
hash ^= getBucketCounts().hashCode();
hash *= 1000003;
// We need a new algorithm here that lines up w/ equals, so we only use non-zero counts.
for (int idx = this.counts.getIndexStart(); idx <= this.counts.getIndexEnd(); idx++) {
long count = this.counts.get(idx);
if (count != 0) {
hash ^= idx;
hash *= 1000003;
hash = (int) (hash ^ count);
hash *= 1000003;
}
}
hash ^= scale;
// Don't need to hash getTotalCount() because equivalent bucket
// counts imply equivalent overall count.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.aggregator;

import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounterFactory;

/** The configuration for how to create exponential histogram buckets. */
final class ExponentialBucketStrategy {
/** Starting scale of exponential buckets. */
private final int scale;
/** The maximum number of buckets that will be used for positive or negative recordings. */
private final int maxBuckets;
/** The mechanism of constructing and copying buckets. */
private final ExponentialCounterFactory counterFactory;

private ExponentialBucketStrategy(
int scale, int maxBuckets, ExponentialCounterFactory counterFactory) {
this.scale = scale;
this.maxBuckets = maxBuckets;
this.counterFactory = counterFactory;
}

/** Constructs fresh new buckets with default settings. */
DoubleExponentialHistogramBuckets newBuckets() {
return new DoubleExponentialHistogramBuckets(scale, maxBuckets, counterFactory);
}

/** Create a new strategy for generating Exponential Buckets. */
static ExponentialBucketStrategy newStrategy(
int scale, int maxBuckets, ExponentialCounterFactory counterFactory) {
return new ExponentialBucketStrategy(scale, maxBuckets, counterFactory);
}
}
Loading