Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete MapCounter alternative histogram bucket implementation #5047

Merged
merged 1 commit into from
Dec 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package io.opentelemetry.sdk.metrics.internal.aggregator;

import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounterFactory;
import java.util.Collections;

/** The types of histogram aggregation to benchmark. */
Expand All @@ -22,23 +21,13 @@ public enum HistogramAggregationParam {
ExplicitBucketHistogramUtils.createBoundaryArray(Collections.emptyList()),
ExemplarReservoir::doubleNoSamples)),
EXPONENTIAL_SMALL_CIRCULAR_BUFFER(
new DoubleExponentialHistogramAggregator(
ExemplarReservoir::doubleNoSamples,
ExponentialBucketStrategy.newStrategy(
20, ExponentialCounterFactory.circularBufferCounter(), 0))),
new DoubleExponentialHistogramAggregator(ExemplarReservoir::doubleNoSamples, 20, 0)),
EXPONENTIAL_CIRCULAR_BUFFER(
new DoubleExponentialHistogramAggregator(
ExemplarReservoir::doubleNoSamples,
ExponentialBucketStrategy.newStrategy(
160, ExponentialCounterFactory.circularBufferCounter(), 0))),
EXPONENTIAL_MAP_COUNTER(
new DoubleExponentialHistogramAggregator(
ExemplarReservoir::doubleNoSamples,
ExponentialBucketStrategy.newStrategy(160, ExponentialCounterFactory.mapCounter(), 0)));
new DoubleExponentialHistogramAggregator(ExemplarReservoir::doubleNoSamples, 160, 0));

private final Aggregator<?, ?> aggregator;

private HistogramAggregationParam(Aggregator<?, ?> aggregator) {
HistogramAggregationParam(Aggregator<?, ?> aggregator) {
this.aggregator = aggregator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public enum HistogramValueGenerator {
private static final int INITIAL_SEED = 513423236;
private final double[] pool;

private HistogramValueGenerator(double[] pool) {
HistogramValueGenerator(double[] pool) {
this.pool = pool;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state;
package io.opentelemetry.sdk.metrics.internal.aggregator;

/**
* A circle-buffer-backed exponential counter.
Expand All @@ -13,56 +13,57 @@
* <p>This expand start/End index as it sees values.
*
* <p>This class is NOT thread-safe. It is expected to be behind a synchronized incrementer.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time
*/
public class AdaptingCircularBufferCounter implements ExponentialCounter {
final class AdaptingCircularBufferCounter {
private static final int NULL_INDEX = Integer.MIN_VALUE;
private int endIndex = NULL_INDEX;
private int startIndex = NULL_INDEX;
private int baseIndex = NULL_INDEX;
private final AdaptingIntegerArray backing;

/** Constructs a circular buffer that will hold at most {@code maxSize} buckets. */
public AdaptingCircularBufferCounter(int maxSize) {
AdaptingCircularBufferCounter(int maxSize) {
this.backing = new AdaptingIntegerArray(maxSize);
}

/** (Deep)-Copies the values from another exponential counter. */
public AdaptingCircularBufferCounter(ExponentialCounter toCopy) {
// If toCopy is an AdaptingCircularBuffer, just do a copy of the underlying array
// and baseIndex.
if (toCopy instanceof AdaptingCircularBufferCounter) {
this.backing = ((AdaptingCircularBufferCounter) toCopy).backing.copy();
this.startIndex = toCopy.getIndexStart();
this.endIndex = toCopy.getIndexEnd();
this.baseIndex = ((AdaptingCircularBufferCounter) toCopy).baseIndex;
} else {
// Copy values from some other implementation of ExponentialCounter.
this.backing = new AdaptingIntegerArray(toCopy.getMaxSize());
this.startIndex = NULL_INDEX;
this.baseIndex = NULL_INDEX;
this.endIndex = NULL_INDEX;
for (int i = toCopy.getIndexStart(); i <= toCopy.getIndexEnd(); i++) {
long val = toCopy.get(i);
this.increment(i, val);
}
}
AdaptingCircularBufferCounter(AdaptingCircularBufferCounter toCopy) {
this.backing = toCopy.backing.copy();
this.startIndex = toCopy.getIndexStart();
this.endIndex = toCopy.getIndexEnd();
this.baseIndex = toCopy.baseIndex;
}

@Override
public int getIndexStart() {
/**
* The first index with a recording. May be negative.
*
* <p>Note: the returned value is not meaningful when isEmpty returns true.
*
* @return the first index with a recording.
*/
int getIndexStart() {
return startIndex;
}

@Override
public int getIndexEnd() {
/**
* The last index with a recording. May be negative.
*
* <p>Note: the returned value is not meaningful when isEmpty returns true.
*
* @return The last index with a recording.
*/
int getIndexEnd() {
return endIndex;
}

@Override
public boolean increment(int index, long delta) {
/**
* Persist new data at index, incrementing by delta amount.
*
* @param index The index of where to perform the incrementation.
* @param delta How much to increment the index by.
* @return success status.
*/
boolean increment(int index, long delta) {
if (baseIndex == NULL_INDEX) {
startIndex = index;
endIndex = index;
Expand All @@ -89,26 +90,34 @@ public boolean increment(int index, long delta) {
return true;
}

@Override
public long get(int index) {
/**
* Get the number of recordings for the given index.
*
* @return the number of recordings for the index, or 0 if the index is out of bounds.
*/
long get(int index) {
if (index < startIndex || index > endIndex) {
return 0;
}
return backing.get(toBufferIndex(index));
}

@Override
public boolean isEmpty() {
/**
* Boolean denoting if the backing structure has recordings or not.
*
* @return true if no recordings, false if at least one recording.
*/
boolean isEmpty() {
return baseIndex == NULL_INDEX;
}

@Override
public int getMaxSize() {
/** Returns the maximum number of buckets allowed in this counter. */
int getMaxSize() {
return backing.length();
}

@Override
public void clear() {
/** Resets all bucket counts to zero and resets index start/end tracking. */
void clear() {
this.backing.clear();
this.baseIndex = NULL_INDEX;
this.endIndex = NULL_INDEX;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state;
package io.opentelemetry.sdk.metrics.internal.aggregator;

import java.util.Arrays;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -31,11 +31,8 @@
* <li>If cellSize == INT then intBacking is not null
* <li>If cellSize == LONG then longBacking is not null
* </ul>
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class AdaptingIntegerArray {
final class AdaptingIntegerArray {
@Nullable private byte[] byteBacking;
@Nullable private short[] shortBacking;
@Nullable private int[] intBacking;
Expand All @@ -52,7 +49,7 @@ private enum ArrayCellSize {
private ArrayCellSize cellSize;

/** Construct an adapting integer array of a given size. */
public AdaptingIntegerArray(int size) {
AdaptingIntegerArray(int size) {
this.cellSize = ArrayCellSize.BYTE;
this.byteBacking = new byte[size];
}
Expand All @@ -78,13 +75,13 @@ private AdaptingIntegerArray(AdaptingIntegerArray toCopy) {
}

/** Returns a deep-copy of this array, preserving cell size. */
public AdaptingIntegerArray copy() {
AdaptingIntegerArray copy() {
return new AdaptingIntegerArray(this);
}

/** Add {@code count} to the value stored at {@code index}. */
@SuppressWarnings("NullAway")
public void increment(int idx, long count) {
void increment(int idx, long count) {
// TODO - prevent bad index
long result;
switch (cellSize) {
Expand Down Expand Up @@ -124,7 +121,7 @@ public void increment(int idx, long count) {

/** Grab the value stored at {@code index}. */
@SuppressWarnings("NullAway")
public long get(int index) {
long get(int index) {
long value = 0;
switch (this.cellSize) {
case BYTE:
Expand All @@ -145,7 +142,7 @@ public long get(int index) {

/** Return the length of this integer array. */
@SuppressWarnings("NullAway")
public int length() {
int length() {
int length = 0;
switch (this.cellSize) {
case BYTE:
Expand All @@ -165,7 +162,7 @@ public int length() {
}
/** Zeroes out all counts in this array. */
@SuppressWarnings("NullAway")
public void clear() {
void clear() {
switch (this.cellSize) {
case BYTE:
Arrays.fill(this.byteBacking, (byte) 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.opentelemetry.sdk.metrics.internal.data.exponentialhistogram.ExponentialHistogramData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounterFactory;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Collections;
import java.util.List;
Expand All @@ -35,31 +34,26 @@ public final class DoubleExponentialHistogramAggregator
implements Aggregator<ExponentialHistogramAccumulation, DoubleExemplarData> {

private final Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier;
private final ExponentialBucketStrategy bucketStrategy;
private final int maxBuckets;
private final int startingScale;

/**
* Constructs an exponential histogram aggregator.
*
* @param reservoirSupplier Supplier of exemplar reservoirs per-stream.
*/
public DoubleExponentialHistogramAggregator(
Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier, int maxBuckets) {
this(
reservoirSupplier,
ExponentialBucketStrategy.newStrategy(
maxBuckets, ExponentialCounterFactory.circularBufferCounter()));
}

DoubleExponentialHistogramAggregator(
Supplier<ExemplarReservoir<DoubleExemplarData>> reservoirSupplier,
ExponentialBucketStrategy bucketStrategy) {
int maxBuckets,
int startingScale) {
this.reservoirSupplier = reservoirSupplier;
this.bucketStrategy = bucketStrategy;
this.maxBuckets = maxBuckets;
this.startingScale = startingScale;
}

@Override
public AggregatorHandle<ExponentialHistogramAccumulation, DoubleExemplarData> createHandle() {
return new Handle(reservoirSupplier.get(), this.bucketStrategy);
return new Handle(reservoirSupplier.get(), maxBuckets, startingScale);
}

/**
Expand Down Expand Up @@ -183,7 +177,7 @@ public MetricData toMetricData(

static final class Handle
extends AggregatorHandle<ExponentialHistogramAccumulation, DoubleExemplarData> {
private final ExponentialBucketStrategy bucketStrategy;
private final int maxBuckets;
@Nullable private DoubleExponentialHistogramBuckets positiveBuckets;
@Nullable private DoubleExponentialHistogramBuckets negativeBuckets;
private long zeroCount;
Expand All @@ -193,16 +187,15 @@ static final class Handle
private long count;
private int scale;

Handle(
ExemplarReservoir<DoubleExemplarData> reservoir, ExponentialBucketStrategy bucketStrategy) {
Handle(ExemplarReservoir<DoubleExemplarData> reservoir, int maxBuckets, int startingScale) {
super(reservoir);
this.bucketStrategy = bucketStrategy;
this.maxBuckets = maxBuckets;
this.sum = 0;
this.zeroCount = 0;
this.min = Double.MAX_VALUE;
this.max = -1;
this.count = 0;
this.scale = bucketStrategy.getStartingScale();
this.scale = startingScale;
}

@Override
Expand Down Expand Up @@ -260,17 +253,15 @@ protected synchronized void doRecordDouble(double value) {
zeroCount++;
return;
} else if (c > 0) {
// Initialize positive buckets if needed, adjusting to current scale
// Initialize positive buckets at current scale, if needed
if (positiveBuckets == null) {
positiveBuckets = bucketStrategy.newBuckets();
positiveBuckets.downscale(positiveBuckets.getScale() - scale);
positiveBuckets = new DoubleExponentialHistogramBuckets(scale, maxBuckets);
}
buckets = positiveBuckets;
} else {
// Initialize negative buckets if needed, adjusting to current scale
// Initialize negative buckets at current scale, if needed
if (negativeBuckets == null) {
negativeBuckets = bucketStrategy.newBuckets();
negativeBuckets.downscale(negativeBuckets.getScale() - scale);
negativeBuckets = new DoubleExponentialHistogramBuckets(scale, maxBuckets);
}
buckets = negativeBuckets;
}
Expand Down
Loading