diff --git a/consistent-sampling/build.gradle.kts b/consistent-sampling/build.gradle.kts
index 1ad1faad1..bdeaaf542 100644
--- a/consistent-sampling/build.gradle.kts
+++ b/consistent-sampling/build.gradle.kts
@@ -7,6 +7,6 @@ description = "Sampler and exporter implementations for consistent sampling"
dependencies {
api("io.opentelemetry:opentelemetry-sdk-trace")
- testImplementation("org.hipparchus:hipparchus-core:2.0")
- testImplementation("org.hipparchus:hipparchus-stat:2.0")
+ testImplementation("org.hipparchus:hipparchus-core:2.1")
+ testImplementation("org.hipparchus:hipparchus-stat:2.1")
}
diff --git a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessor.java b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessor.java
new file mode 100644
index 000000000..cb2ae54dc
--- /dev/null
+++ b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessor.java
@@ -0,0 +1,565 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.samplers;
+
+import static io.opentelemetry.api.internal.Utils.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import io.opentelemetry.api.trace.SpanContext;
+import io.opentelemetry.api.trace.TraceState;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.sdk.common.CompletableResultCode;
+import io.opentelemetry.sdk.internal.DaemonThreadFactory;
+import io.opentelemetry.sdk.trace.ReadWriteSpan;
+import io.opentelemetry.sdk.trace.ReadableSpan;
+import io.opentelemetry.sdk.trace.SpanProcessor;
+import io.opentelemetry.sdk.trace.data.DelegatingSpanData;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.export.SpanExporter;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link SpanProcessor} which periodically exports a fixed maximum number of spans. If the number
+ * of spans in a period exceeds the fixed reservoir (buffer) size, spans will be consistently
+ * (compare {@link ConsistentSampler}) sampled.
+ */
+public final class ConsistentReservoirSamplingSpanProcessor implements SpanProcessor {
+
+ private static final String WORKER_THREAD_NAME =
+ ConsistentReservoirSamplingSpanProcessor.class.getSimpleName() + "_WorkerThread";
+
+ private final Worker worker;
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+ // visible for testing
+ static final long DEFAULT_EXPORT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
+
+ private static final class ReadableSpanWithPriority {
+
+ private final ReadableSpan readableSpan;
+ private int pval;
+ private final int rval;
+ private final long priority;
+
+ public static ReadableSpanWithPriority create(
+ ReadableSpan readableSpan, RandomGenerator randomGenerator) {
+ String otelTraceStateString =
+ readableSpan.getSpanContext().getTraceState().get(OtelTraceState.TRACE_STATE_KEY);
+ OtelTraceState otelTraceState = OtelTraceState.parse(otelTraceStateString);
+ int pval;
+ int rval;
+ long priority = randomGenerator.nextLong();
+ if (otelTraceState.hasValidR()) {
+ rval = otelTraceState.getR();
+ } else {
+ rval =
+ Math.min(randomGenerator.numberOfLeadingZerosOfRandomLong(), OtelTraceState.getMaxR());
+ }
+
+ if (otelTraceState.hasValidP()) {
+ pval = otelTraceState.getP();
+ } else {
+ // if the p-value is not defined assume it is zero,
+ // which corresponds to an adjusted count of 1
+ pval = 0;
+ }
+
+ return new ReadableSpanWithPriority(readableSpan, pval, rval, priority);
+ }
+
+ private ReadableSpanWithPriority(ReadableSpan readableSpan, int pval, int rval, long priority) {
+ this.readableSpan = readableSpan;
+ this.pval = pval;
+ this.rval = rval;
+ this.priority = priority;
+ }
+
+ private ReadableSpan getReadableSpan() {
+ return readableSpan;
+ }
+
+ private int getP() {
+ return pval;
+ }
+
+ private void setP(int pval) {
+ this.pval = pval;
+ }
+
+ private int getR() {
+ return rval;
+ }
+
+ private static int compareRthenPriority(
+ ReadableSpanWithPriority s1, ReadableSpanWithPriority s2) {
+ int compareR = Integer.compare(s1.rval, s2.rval);
+ if (compareR != 0) {
+ return compareR;
+ }
+ return Long.compare(s1.priority, s2.priority);
+ }
+ }
+
+ /**
+ * A reservoir sampling buffer that collects a fixed number of spans.
+ *
+ *
Consistent sampling requires that spans are sampled only if r-value >= p-value, where
+ * p-value describes which sampling rate from the discrete set of possible sampling rates is
+ * applied. Consistent sampling allows to choose the sampling rate (the p-value) individually for
+ * every span. Therefore, the number of sampled spans can be reduced by increasing the p-value of
+ * spans, such that spans for which r-value < p-value get discarded. To reduce the number of
+ * sampled spans one can therefore apply the following procedure until the desired number of spans
+ * are left:
+ *
+ *
1) Randomly choose a span among the spans with smallest p-values
+ *
+ *
2) Increment its p-value by 1
+ *
+ *
3) Discard the span, if r-value < p-value
+ *
+ *
4) continue with 1)
+ *
+ *
By always incrementing one of the smallest p-values, this approach tries to balance the
+ * sampling rates (p-values). Balanced sampling rates are better for estimation (compare VarOpt sampling).
+ *
+ *
This sampling approach can be implemented in a streaming fashion. In order to ensure that
+ * spans have fair chances regardless of processing order, a uniform random number (priority) is
+ * associated with its p-value. When choosing a span among all spans with smallest p-value, we
+ * take that with the smallest priority. For that, a priority queue is needed.
+ *
+ *
In the following, an equivalent and more efficient sampling approach is described, that is
+ * based on a priority queue where the minimum is the span with the smallest r-value. In this way
+ * the {@code add}-operation will have a worst case time complexity of {@code O(log n)} where
+ * {@code n} denotes the reservoir size. We use the following notation:
+ *
+ *
Z := {@code reservoirSize}
+ *
+ *
L := {@code maxDiscardedRValue}
+ *
+ *
R := {@code numberOfDiscardedSpansWithMaxDiscardedRValue}
+ *
+ *
K := {@code numSampledSpansWithGreaterRValueAndSmallPValue}
+ *
+ *
X := {@code numberOfSampledSpansWithMaxDiscardedRValue}
+ *
+ *
The sampling approach described above can be equivalently performed by keeping Z spans with
+ * largest r-values (in case of ties with highest priority) and adjusting the corresponding
+ * p-values in a finalization step. We know that the largest r-value among the dropped spans is L
+ * and that we had to discard exactly R spans with (r-value == L). This implies that their
+ * corresponding p-values were raised to (L + 1) which finally violated the sampling condition
+ * (r-value >= p-value). We only raise the p-value of some span, if it belongs to the set of spans
+ * with minimum p-value. Therefore, the minimum p-value must be given by L. To determine the
+ * p-values of all finally kept spans, we consider 3 cases:
+ *
+ *
1) For all X kept spans with r-value == L the corresponding p-value must also be L.
+ * Otherwise, the span would have been discarded. There are R spans with (r-value == L) which have
+ * been discarded. Therefore, among the original (X + R) spans with (r-value == L) we have kept X
+ * spans.
+ *
+ *
2) For spans with (p-value > L) the p-value will not be changed as they do not belong to the
+ * set of spans with minimal p-values.
+ *
+ *
3) For the remaining K spans for which (r-value > L) and (p-value <= L) the p-value needs to
+ * be adjusted. The new p-value will be either L or (L + 1). When starting to sample the first
+ * spans with (p-value == L), we have N = R + K + X spans which all have (r-value >= L) and
+ * (p-value == L). This set can be divided into two sets of spans dependent on whether (r-value ==
+ * L) or (r-value > L). We know that there were (R + X) spans with (r-value == L) and K spans with
+ * (r-value > L). When randomly selecting a span to increase its p-value, the span will only be
+ * discarded if the span belongs to the first set (r-value == L). We will call such an event
+ * "failure". If the selected span belongs to the second set (r-value > L), its p-value will be
+ * increased by 1 to (L + 1) but the span will not be dropped. The sampling procedure will be
+ * stopped after R "failures". The number of "successes" follows a negative
+ * hypergeometric distribution. Therefore, we need to sample a random value from a negative
+ * hypergeometric distribution with N = R + X + K elements of which K are "successes" and after
+ * drawing R "failures", in order to determine how many spans out of K will get a p-value equal to
+ * (L + 1). The expected number is given by R * K / (N - K + 1) = R * K / (R + X + 1). Instead of
+ * drawing the number from the negative hypergeometric distribution we could also set it to the
+ * stochastically rounded expected value. This makes this reservoir sampling approach not fully
+ * equivalent to the approach described initially, but leads to a smaller variance when
+ * estimating.
+ */
+ private static final class Reservoir {
+ private final int reservoirSize;
+ private int maxDiscardedRValue = 0;
+ private long numberOfDiscardedSpansWithMaxDiscardedRValue = 0;
+ private final PriorityQueue queue;
+ private final RandomGenerator randomGenerator;
+
+ public Reservoir(int reservoirSize, RandomGenerator randomGenerator) {
+ if (reservoirSize < 1) {
+ throw new IllegalArgumentException();
+ }
+ this.reservoirSize = reservoirSize;
+ this.queue =
+ new PriorityQueue<>(reservoirSize, ReadableSpanWithPriority::compareRthenPriority);
+ this.randomGenerator = randomGenerator;
+ }
+
+ public void add(ReadableSpanWithPriority readableSpanWithPriority) {
+
+ if (queue.size() < reservoirSize) {
+ queue.add(readableSpanWithPriority);
+ return;
+ }
+
+ ReadableSpanWithPriority head = queue.peek();
+ if (ReadableSpanWithPriority.compareRthenPriority(readableSpanWithPriority, head) > 0) {
+ queue.remove();
+ queue.add(readableSpanWithPriority);
+ readableSpanWithPriority = head;
+ }
+ if (readableSpanWithPriority.getR() > maxDiscardedRValue) {
+ maxDiscardedRValue = readableSpanWithPriority.getR();
+ numberOfDiscardedSpansWithMaxDiscardedRValue = 1;
+ } else if (readableSpanWithPriority.getR() == maxDiscardedRValue) {
+ numberOfDiscardedSpansWithMaxDiscardedRValue += 1;
+ }
+ }
+
+ public List getResult() {
+
+ if (numberOfDiscardedSpansWithMaxDiscardedRValue == 0) {
+ return queue.stream().map(x -> x.readableSpan.toSpanData()).collect(Collectors.toList());
+ }
+
+ List readableSpansWithPriority = new ArrayList<>(queue.size());
+ int numberOfSampledSpansWithMaxDiscardedRValue = 0;
+ int numSampledSpansWithGreaterRValueAndSmallPValue = 0;
+ for (ReadableSpanWithPriority readableSpanWithPriority : queue) {
+ if (readableSpanWithPriority.getR() == maxDiscardedRValue) {
+ numberOfSampledSpansWithMaxDiscardedRValue += 1;
+ } else if (readableSpanWithPriority.getP() <= maxDiscardedRValue) {
+ numSampledSpansWithGreaterRValueAndSmallPValue += 1;
+ }
+ readableSpansWithPriority.add(readableSpanWithPriority);
+ }
+
+ double expectedNumPValueIncrements =
+ numSampledSpansWithGreaterRValueAndSmallPValue
+ * (numberOfDiscardedSpansWithMaxDiscardedRValue
+ / (double)
+ (numberOfDiscardedSpansWithMaxDiscardedRValue
+ + numberOfSampledSpansWithMaxDiscardedRValue
+ + 1L));
+ int roundedExpectedNumPValueIncrements =
+ Math.toIntExact(randomGenerator.roundStochastically(expectedNumPValueIncrements));
+
+ BitSet incrementIndicators =
+ randomGenerator.generateRandomBitSet(
+ numSampledSpansWithGreaterRValueAndSmallPValue, roundedExpectedNumPValueIncrements);
+
+ int incrementIndicatorIndex = 0;
+ List result = new ArrayList<>(queue.size());
+ for (ReadableSpanWithPriority readableSpanWithPriority : readableSpansWithPriority) {
+ if (readableSpanWithPriority.getP() <= maxDiscardedRValue) {
+ readableSpanWithPriority.setP(maxDiscardedRValue);
+ if (readableSpanWithPriority.getR() > maxDiscardedRValue) {
+ if (incrementIndicators.get(incrementIndicatorIndex)) {
+ readableSpanWithPriority.setP(maxDiscardedRValue + 1);
+ }
+ incrementIndicatorIndex += 1;
+ }
+ }
+
+ SpanData spanData = readableSpanWithPriority.getReadableSpan().toSpanData();
+ SpanContext spanContext = spanData.getSpanContext();
+ TraceState traceState = spanContext.getTraceState();
+ String otelTraceStateString = traceState.get(OtelTraceState.TRACE_STATE_KEY);
+ OtelTraceState otelTraceState = OtelTraceState.parse(otelTraceStateString);
+ if ((!otelTraceState.hasValidR() && readableSpanWithPriority.getP() > 0)
+ || (otelTraceState.hasValidR()
+ && readableSpanWithPriority.getP() != otelTraceState.getP())) {
+ otelTraceState.setP(readableSpanWithPriority.getP());
+ spanData = updateSpanDataWithOtelTraceState(spanData, otelTraceState);
+ }
+ result.add(spanData);
+ }
+
+ return result;
+ }
+
+ public boolean isEmpty() {
+ return queue.isEmpty();
+ }
+ }
+
+ private static SpanData updateSpanDataWithOtelTraceState(
+ SpanData spanData, OtelTraceState otelTraceState) {
+ SpanContext spanContext = spanData.getSpanContext();
+ TraceState traceState = spanContext.getTraceState();
+ String updatedOtelTraceStateString = otelTraceState.serialize();
+ TraceState updatedTraceState =
+ traceState.toBuilder()
+ .put(OtelTraceState.TRACE_STATE_KEY, updatedOtelTraceStateString)
+ .build();
+ SpanContext updatedSpanContext =
+ SpanContext.create(
+ spanContext.getTraceId(),
+ spanContext.getSpanId(),
+ spanContext.getTraceFlags(),
+ updatedTraceState);
+ return new DelegatingSpanData(spanData) {
+ @Override
+ public SpanContext getSpanContext() {
+ return updatedSpanContext;
+ }
+ };
+ }
+
+ // visible for testing
+ static SpanProcessor create(
+ SpanExporter spanExporter,
+ int reservoirSize,
+ long exportPeriodNanos,
+ long exporterTimeoutNanos,
+ RandomGenerator randomGenerator) {
+ return new ConsistentReservoirSamplingSpanProcessor(
+ spanExporter, exportPeriodNanos, reservoirSize, exporterTimeoutNanos, randomGenerator);
+ }
+
+ /**
+ * Creates a new {@link SpanProcessor} which periodically exports a fixed maximum number of spans.
+ * If the number of spans in a period exceeds the fixed reservoir (buffer) size, spans will be
+ * consistently (compare {@link ConsistentSampler}) sampled.
+ *
+ * @param spanExporter a span exporter
+ * @param reservoirSize the reservoir size
+ * @param exportPeriodNanos the export period in nanoseconds
+ * @param exporterTimeoutNanos the exporter timeout in nanoseconds
+ * @return a span processor
+ */
+ public static SpanProcessor create(
+ SpanExporter spanExporter,
+ int reservoirSize,
+ long exportPeriodNanos,
+ long exporterTimeoutNanos) {
+ return create(
+ spanExporter,
+ reservoirSize,
+ exportPeriodNanos,
+ exporterTimeoutNanos,
+ RandomGenerator.getDefault());
+ }
+
+ /**
+ * Creates a new {@link SpanProcessor} which periodically exports a fixed maximum number of spans.
+ * If the number of spans in a period exceeds the fixed reservoir (buffer) size, spans will be
+ * consistently (compare {@link ConsistentSampler}) sampled.
+ *
+ * @param spanExporter a span exporter
+ * @param reservoirSize the reservoir size
+ * @param exportPeriodNanos the export period in nanoseconds
+ * @return a span processor
+ */
+ static SpanProcessor create(
+ SpanExporter spanExporter, int reservoirSize, long exportPeriodNanos) {
+ return create(spanExporter, reservoirSize, exportPeriodNanos, DEFAULT_EXPORT_TIMEOUT_NANOS);
+ }
+
+ private ConsistentReservoirSamplingSpanProcessor(
+ SpanExporter spanExporter,
+ long exportPeriodNanos,
+ int reservoirSize,
+ long exporterTimeoutNanos,
+ RandomGenerator randomGenerator) {
+ requireNonNull(spanExporter, "spanExporter");
+ checkArgument(exportPeriodNanos > 0, "export period must be positive");
+ checkArgument(reservoirSize > 0, "reservoir size must be positive");
+ checkArgument(exporterTimeoutNanos > 0, "exporter timeout must be positive");
+ requireNonNull(randomGenerator, "randomGenerator");
+
+ this.worker =
+ new Worker(
+ spanExporter, exportPeriodNanos, reservoirSize, exporterTimeoutNanos, randomGenerator);
+ Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
+ workerThread.start();
+ }
+
+ @Override
+ public void onStart(Context parentContext, ReadWriteSpan span) {}
+
+ @Override
+ public boolean isStartRequired() {
+ return false;
+ }
+
+ @Override
+ public void onEnd(ReadableSpan span) {
+ if (span == null || !span.getSpanContext().isSampled()) {
+ return;
+ }
+ worker.addSpan(span);
+ }
+
+ @Override
+ public boolean isEndRequired() {
+ return true;
+ }
+
+ @Override
+ public CompletableResultCode shutdown() {
+ if (isShutdown.getAndSet(true)) {
+ return CompletableResultCode.ofSuccess();
+ }
+ return worker.shutdown();
+ }
+
+ @Override
+ public CompletableResultCode forceFlush() {
+ return worker.forceFlush();
+ }
+
+ // Visible for testing
+ boolean isReservoirEmpty() {
+ return worker.isReservoirEmpty();
+ }
+
+ private static final class Worker implements Runnable {
+
+ private static final Logger logger = Logger.getLogger(Worker.class.getName());
+ private final SpanExporter spanExporter;
+ private final long exportPeriodNanos;
+ private final int reservoirSize;
+ private final long exporterTimeoutNanos;
+
+ private long nextExportTime;
+
+ private final RandomGenerator randomGenerator;
+ private final Object reservoirLock = new Object();
+ private Reservoir reservoir;
+ private final BlockingQueue signal;
+ private volatile boolean continueWork = true;
+
+ private static Reservoir createReservoir(int reservoirSize, RandomGenerator randomGenerator) {
+ return new Reservoir(reservoirSize, randomGenerator);
+ }
+
+ private Worker(
+ SpanExporter spanExporter,
+ long exportPeriodNanos,
+ int reservoirSize,
+ long exporterTimeoutNanos,
+ RandomGenerator randomGenerator) {
+ this.spanExporter = spanExporter;
+ this.exportPeriodNanos = exportPeriodNanos;
+ this.reservoirSize = reservoirSize;
+ this.exporterTimeoutNanos = exporterTimeoutNanos;
+ this.randomGenerator = randomGenerator;
+ synchronized (reservoirLock) {
+ this.reservoir = createReservoir(reservoirSize, randomGenerator);
+ }
+ this.signal = new ArrayBlockingQueue<>(1);
+ }
+
+ private void addSpan(ReadableSpan span) {
+ ReadableSpanWithPriority readableSpanWithPriority =
+ ReadableSpanWithPriority.create(span, randomGenerator);
+ synchronized (reservoirLock) {
+ reservoir.add(readableSpanWithPriority);
+ }
+ }
+
+ @Override
+ public void run() {
+ updateNextExportTime();
+ CompletableResultCode completableResultCode = null;
+ while (continueWork) {
+
+ if (completableResultCode != null || System.nanoTime() >= nextExportTime) {
+ Reservoir oldReservoir;
+ Reservoir newReservoir = createReservoir(reservoirSize, randomGenerator);
+ synchronized (reservoirLock) {
+ oldReservoir = reservoir;
+ reservoir = newReservoir;
+ }
+ exportCurrentBatch(oldReservoir.getResult());
+ updateNextExportTime();
+ if (completableResultCode != null) {
+ completableResultCode.succeed();
+ }
+ }
+
+ try {
+ long pollWaitTime = nextExportTime - System.nanoTime();
+ if (pollWaitTime > 0) {
+ completableResultCode = signal.poll(pollWaitTime, TimeUnit.NANOSECONDS);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+
+ private void updateNextExportTime() {
+ nextExportTime = System.nanoTime() + exportPeriodNanos;
+ }
+
+ private CompletableResultCode shutdown() {
+ CompletableResultCode result = new CompletableResultCode();
+
+ CompletableResultCode flushResult = forceFlush();
+ flushResult.whenComplete(
+ () -> {
+ continueWork = false;
+ CompletableResultCode shutdownResult = spanExporter.shutdown();
+ shutdownResult.whenComplete(
+ () -> {
+ if (!flushResult.isSuccess() || !shutdownResult.isSuccess()) {
+ result.fail();
+ } else {
+ result.succeed();
+ }
+ });
+ });
+
+ return result;
+ }
+
+ private CompletableResultCode forceFlush() {
+ CompletableResultCode flushResult = new CompletableResultCode();
+ signal.offer(flushResult);
+ return flushResult;
+ }
+
+ private void exportCurrentBatch(List batch) {
+ if (batch.isEmpty()) {
+ return;
+ }
+
+ try {
+ CompletableResultCode result = spanExporter.export(Collections.unmodifiableList(batch));
+ result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
+ if (!result.isSuccess()) {
+ logger.log(Level.FINE, "Exporter failed");
+ }
+ } catch (RuntimeException e) {
+ logger.log(Level.WARNING, "Exporter threw an Exception", e);
+ } finally {
+ batch.clear();
+ }
+ }
+
+ private boolean isReservoirEmpty() {
+ synchronized (reservoirLock) {
+ return reservoir.isEmpty();
+ }
+ }
+ }
+}
diff --git a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/RandomGenerator.java b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/RandomGenerator.java
index 4363eadeb..f44822aff 100644
--- a/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/RandomGenerator.java
+++ b/consistent-sampling/src/main/java/io/opentelemetry/contrib/samplers/RandomGenerator.java
@@ -7,6 +7,7 @@
import static java.util.Objects.requireNonNull;
+import java.util.BitSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.LongSupplier;
@@ -131,4 +132,94 @@ public boolean nextBoolean(double probability) {
public int numberOfLeadingZerosOfRandomLong() {
return threadLocalData.get().numberOfLeadingZerosOfRandomLong(threadSafeRandomLongSupplier);
}
+
+ /**
+ * Returns a pseudorandomly chosen {@code long} value.
+ *
+ * @return a pseudorandomly chosen {@code long} value
+ */
+ public long nextLong() {
+ return threadSafeRandomLongSupplier.getAsLong();
+ }
+
+ /**
+ * Stochastically rounds the given floating-point value.
+ *
+ *
see https://en.wikipedia.org/wiki/Rounding#Stochastic_rounding
+ *
+ * @param x the value to be rounded
+ * @return the rounded value
+ */
+ public long roundStochastically(double x) {
+ long i = (long) Math.floor(x);
+ if (nextBoolean(x - i)) {
+ return i + 1;
+ } else {
+ return i;
+ }
+ }
+
+ /**
+ * Returns a pseudorandomly chosen {@code int} value between zero (inclusive) and the specified
+ * bound (exclusive).
+ *
+ *
The implementation is based on Daniel Lemire's algorithm as described in "Fast random
+ * integer generation in an interval." ACM Transactions on Modeling and Computer Simulation
+ * (TOMACS) 29.1 (2019): 3.
+ *
+ * @param bound the upper bound (exclusive) for the returned value. Must be positive.
+ * @return a pseudorandomly chosen {@code int} value between zero (inclusive) and the bound
+ * (exclusive)
+ * @throws IllegalArgumentException if {@code bound} is not positive
+ */
+ private int nextInt(int bound) {
+ if (bound <= 0) {
+ throw new IllegalArgumentException();
+ }
+ long x = nextLong() >>> 33; // use only 31 random bits
+ long m = x * bound;
+ int l = (int) m & 0x7FFFFFFF;
+ if (l < bound) {
+ int t = (-bound & 0x7FFFFFFF) % bound;
+ while (l < t) {
+ x = nextLong() >>> 33; // use only 31 random bits
+ m = x * bound;
+ l = (int) m & 0x7FFFFFFF;
+ }
+ }
+ return (int) (m >>> 31);
+ }
+
+ /**
+ * Generates a random bit set where a given number of 1-bits are randomly set.
+ *
+ * @param numBits the total number of bits
+ * @param numOneBits the number of 1-bits
+ * @return a random bit set
+ * @throws IllegalArgumentException if {@code 0 <= numOneBits <= numBits} is violated
+ */
+ public BitSet generateRandomBitSet(int numBits, int numOneBits) {
+
+ if (numOneBits < 0 || numOneBits > numBits) {
+ throw new IllegalArgumentException();
+ }
+
+ BitSet result = new BitSet(numBits);
+ int numZeroBits = numBits - numOneBits;
+
+ // based on Fisher-Yates shuffling
+ for (int i = Math.max(numZeroBits, numOneBits); i < numBits; ++i) {
+ int j = nextInt(i + 1);
+ if (result.get(j)) {
+ result.set(i);
+ } else {
+ result.set(j);
+ }
+ }
+ if (numZeroBits < numOneBits) {
+ result.flip(0, numBits);
+ }
+
+ return result;
+ }
}
diff --git a/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessorTest.java b/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessorTest.java
new file mode 100644
index 000000000..8c20a644b
--- /dev/null
+++ b/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/ConsistentReservoirSamplingSpanProcessorTest.java
@@ -0,0 +1,689 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.samplers;
+
+import static io.opentelemetry.contrib.samplers.ConsistentReservoirSamplingSpanProcessor.DEFAULT_EXPORT_TIMEOUT_NANOS;
+import static io.opentelemetry.contrib.util.TestUtil.verifyObservedPvaluesUsingGtest;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.when;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.sdk.common.CompletableResultCode;
+import io.opentelemetry.sdk.trace.ReadableSpan;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.SpanProcessor;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.export.SpanExporter;
+import io.opentelemetry.sdk.trace.samplers.Sampler;
+import io.opentelemetry.sdk.trace.samplers.SamplingResult;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SplittableRandom;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.LongSupplier;
+import java.util.stream.IntStream;
+import javax.annotation.Nullable;
+import org.hipparchus.distribution.discrete.BinomialDistribution;
+import org.hipparchus.stat.inference.GTest;
+import org.hipparchus.stat.inference.TTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.mockito.ArgumentMatcher;
+import org.mockito.ArgumentMatchers;
+
+class ConsistentReservoirSamplingSpanProcessorTest {
+
+ private static final String SPAN_NAME_1 = "MySpanName/1";
+ private static final String SPAN_NAME_2 = "MySpanName/2";
+ private static final String SPAN_NAME_3 = "MySpanName/3";
+ private static final int RESERVOIR_SIZE = 4096;
+ private static final long EXPORT_PERIOD_10_MILLIS_AS_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
+ private static final long EXPORT_PERIOD_100_MILLIS_AS_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
+ private static final long VERY_LONG_EXPORT_PERIOD_NANOS = TimeUnit.SECONDS.toNanos(10000);
+
+ private static void shutdown(SdkTracerProvider sdkTracerProvider) {
+ sdkTracerProvider.shutdown().join(1000, TimeUnit.SECONDS);
+ }
+
+ private static class WaitingSpanExporter implements SpanExporter {
+
+ private final List spanDataList = new ArrayList<>();
+ private final int numberOfSpansToWaitFor;
+ private volatile CountDownLatch countDownLatch;
+ private final AtomicBoolean shutDownCalled = new AtomicBoolean(false);
+
+ WaitingSpanExporter(int numberOfSpansToWaitFor) {
+ countDownLatch = new CountDownLatch(numberOfSpansToWaitFor);
+ this.numberOfSpansToWaitFor = numberOfSpansToWaitFor;
+ }
+
+ List getExported() {
+ List result = new ArrayList<>(spanDataList);
+ spanDataList.clear();
+ return result;
+ }
+
+ /**
+ * Waits until {@code numberOfSpansToWaitFor} spans have been exported. Returns the list of
+ * exported {@link SpanData} objects, otherwise {@code null} if the current thread is
+ * interrupted.
+ *
+ * @return the list of exported {@link SpanData} objects, otherwise {@code null} if the current
+ * thread is interrupted.
+ */
+ @Nullable
+ List waitForExport() {
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ // Preserve the interruption status as per guidance.
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ return getExported();
+ }
+
+ @Override
+ public CompletableResultCode export(Collection spans) {
+ this.spanDataList.addAll(spans);
+ for (int i = 0; i < spans.size(); i++) {
+ countDownLatch.countDown();
+ }
+ return CompletableResultCode.ofSuccess();
+ }
+
+ @Override
+ public CompletableResultCode flush() {
+ return CompletableResultCode.ofSuccess();
+ }
+
+ @Override
+ public CompletableResultCode shutdown() {
+ shutDownCalled.set(true);
+ return CompletableResultCode.ofSuccess();
+ }
+
+ public void reset() {
+ this.countDownLatch = new CountDownLatch(numberOfSpansToWaitFor);
+ }
+ }
+
+ @Nullable
+ private ReadableSpan createEndedSpan(String spanName, SdkTracerProvider sdkTracerProvider) {
+ Tracer tracer = sdkTracerProvider.get(getClass().getName());
+ Span span = tracer.spanBuilder(spanName).startSpan();
+ span.end();
+ if (span instanceof ReadableSpan) {
+ return (ReadableSpan) span;
+ } else {
+ return null;
+ }
+ }
+
+ @Test
+ void invalidConfig() {
+ SpanExporter exporter = mock(SpanExporter.class);
+ when(exporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
+ assertThatThrownBy(() -> ConsistentReservoirSamplingSpanProcessor.create(null, 1, 1))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("spanExporter");
+ assertThatThrownBy(() -> ConsistentReservoirSamplingSpanProcessor.create(exporter, -1, 1))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("reservoir size must be positive");
+ assertThatThrownBy(() -> ConsistentReservoirSamplingSpanProcessor.create(exporter, 1, -1))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("export period must be positive");
+ assertThatThrownBy(() -> ConsistentReservoirSamplingSpanProcessor.create(exporter, 1, 1, -1))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("exporter timeout must be positive");
+ assertThatThrownBy(
+ () -> ConsistentReservoirSamplingSpanProcessor.create(exporter, 1, 1, 1, null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("randomGenerator");
+ }
+
+ @Test
+ void startEndRequirements() {
+ SpanProcessor processor =
+ ConsistentReservoirSamplingSpanProcessor.create(
+ new WaitingSpanExporter(0), RESERVOIR_SIZE, EXPORT_PERIOD_100_MILLIS_AS_NANOS);
+ assertThat(processor.isStartRequired()).isFalse();
+ assertThat(processor.isEndRequired()).isTrue();
+ }
+
+ @Test
+ @Timeout(10)
+ void exportDifferentSampledSpans() {
+ WaitingSpanExporter exporter = new WaitingSpanExporter(2);
+ SdkTracerProvider sdkTracerProvider =
+ SdkTracerProvider.builder()
+ .addSpanProcessor(
+ ConsistentReservoirSamplingSpanProcessor.create(
+ exporter, RESERVOIR_SIZE, EXPORT_PERIOD_100_MILLIS_AS_NANOS))
+ .build();
+
+ ReadableSpan span1 = createEndedSpan(SPAN_NAME_1, sdkTracerProvider);
+ ReadableSpan span2 = createEndedSpan(SPAN_NAME_2, sdkTracerProvider);
+ ReadableSpan span3 = createEndedSpan(SPAN_NAME_3, sdkTracerProvider);
+ List exported = exporter.waitForExport();
+ assertThat(exported)
+ .containsExactlyInAnyOrder(span1.toSpanData(), span2.toSpanData(), span3.toSpanData());
+
+ shutdown(sdkTracerProvider);
+ }
+
+ @Test
+ @Timeout(10)
+ void forceExport() {
+ WaitingSpanExporter exporter = new WaitingSpanExporter(100);
+ int reservoirSize = 50;
+ SpanProcessor processor =
+ ConsistentReservoirSamplingSpanProcessor.create(
+ exporter, reservoirSize, VERY_LONG_EXPORT_PERIOD_NANOS);
+
+ SdkTracerProvider sdkTracerProvider =
+ SdkTracerProvider.builder().addSpanProcessor(processor).build();
+ for (int i = 0; i < 100; i++) {
+ createEndedSpan("MySpanName/" + i, sdkTracerProvider);
+ }
+
+ processor.forceFlush().join(10, TimeUnit.SECONDS);
+ List exported = exporter.getExported();
+ assertThat(exported).isNotNull();
+ assertThat(exported.size()).isEqualTo(reservoirSize);
+
+ shutdown(sdkTracerProvider);
+ }
+
+ @Test
+ @Timeout(10)
+ void exportSpansToMultipleServices() {
+ WaitingSpanExporter waitingSpanExporter1 = new WaitingSpanExporter(2);
+ WaitingSpanExporter waitingSpanExporter2 = new WaitingSpanExporter(2);
+ SdkTracerProvider sdkTracerProvider =
+ SdkTracerProvider.builder()
+ .addSpanProcessor(
+ ConsistentReservoirSamplingSpanProcessor.create(
+ SpanExporter.composite(
+ Arrays.asList(waitingSpanExporter1, waitingSpanExporter2)),
+ RESERVOIR_SIZE,
+ EXPORT_PERIOD_100_MILLIS_AS_NANOS))
+ .build();
+
+ ReadableSpan span1 = createEndedSpan(SPAN_NAME_1, sdkTracerProvider);
+ ReadableSpan span2 = createEndedSpan(SPAN_NAME_2, sdkTracerProvider);
+ List exported1 = waitingSpanExporter1.waitForExport();
+ List exported2 = waitingSpanExporter2.waitForExport();
+ assertThat(exported1).containsExactlyInAnyOrder(span1.toSpanData(), span2.toSpanData());
+ assertThat(exported2).containsExactlyInAnyOrder(span1.toSpanData(), span2.toSpanData());
+
+ shutdown(sdkTracerProvider);
+ }
+
+ @Test
+ void ignoresNullSpans() {
+ SpanProcessor processor =
+ ConsistentReservoirSamplingSpanProcessor.create(
+ mock(SpanExporter.class), RESERVOIR_SIZE, EXPORT_PERIOD_100_MILLIS_AS_NANOS);
+ assertThatCode(
+ () -> {
+ processor.onStart(null, null);
+ processor.onEnd(null);
+ })
+ .doesNotThrowAnyException();
+
+ processor.shutdown();
+ }
+
+ @Test
+ @Timeout(10)
+ void exporterThrowsException() {
+ SpanExporter failingExporter = mock(SpanExporter.class);
+ when(failingExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
+ doThrow(new IllegalArgumentException("No export for you."))
+ .when(failingExporter)
+ .export(ArgumentMatchers.anyList());
+
+ WaitingSpanExporter workingExporter = new WaitingSpanExporter(1);
+
+ SdkTracerProvider sdkTracerProvider =
+ SdkTracerProvider.builder()
+ .addSpanProcessor(
+ ConsistentReservoirSamplingSpanProcessor.create(
+ SpanExporter.composite(Arrays.asList(failingExporter, workingExporter)),
+ RESERVOIR_SIZE,
+ EXPORT_PERIOD_100_MILLIS_AS_NANOS))
+ .build();
+
+ ReadableSpan span1 = createEndedSpan(SPAN_NAME_1, sdkTracerProvider);
+ List exported = workingExporter.waitForExport();
+ assertThat(exported).containsExactly(span1.toSpanData());
+
+ workingExporter.reset();
+
+ ReadableSpan span2 = createEndedSpan(SPAN_NAME_2, sdkTracerProvider);
+ exported = workingExporter.waitForExport();
+ assertThat(exported).containsExactly(span2.toSpanData());
+
+ shutdown(sdkTracerProvider);
+ }
+
+ private static ArgumentMatcher> containsSpanName(
+ String spanName, Runnable runOnMatch) {
+ return spans -> {
+ assertThat(spans).anySatisfy(span -> assertThat(span.getName()).isEqualTo(spanName));
+ runOnMatch.run();
+ return true;
+ };
+ }
+
+ private static void awaitReservoirEmpty(SpanProcessor processor) {
+ await()
+ .untilAsserted(
+ () ->
+ assertThat(
+ ((ConsistentReservoirSamplingSpanProcessor) processor).isReservoirEmpty())
+ .isTrue());
+ }
+
+ @Test
+ @Timeout(10)
+ public void continuesIfExporterTimesOut() throws InterruptedException {
+ SpanExporter mockSpanExporter = mock(SpanExporter.class);
+ when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
+
+ SpanProcessor processor =
+ ConsistentReservoirSamplingSpanProcessor.create(
+ mockSpanExporter,
+ RESERVOIR_SIZE,
+ EXPORT_PERIOD_10_MILLIS_AS_NANOS,
+ TimeUnit.MILLISECONDS.toNanos(1));
+ SdkTracerProvider sdkTracerProvider =
+ SdkTracerProvider.builder().addSpanProcessor(processor).build();
+
+ CountDownLatch exported = new CountDownLatch(1);
+
+ // We return a result we never complete, meaning it will timeout.
+ when(mockSpanExporter.export(argThat(containsSpanName(SPAN_NAME_1, exported::countDown))))
+ .thenReturn(new CompletableResultCode());
+
+ createEndedSpan(SPAN_NAME_1, sdkTracerProvider);
+ exported.await();
+
+ // Timed out so the span was dropped.
+ awaitReservoirEmpty(processor);
+
+ // Still processing new spans.
+ CountDownLatch exportedAgain = new CountDownLatch(1);
+ reset(mockSpanExporter);
+ when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
+ when(mockSpanExporter.export(argThat(containsSpanName(SPAN_NAME_2, exportedAgain::countDown))))
+ .thenReturn(CompletableResultCode.ofSuccess());
+ createEndedSpan(SPAN_NAME_2, sdkTracerProvider);
+ exported.await();
+ awaitReservoirEmpty(processor);
+
+ shutdown(sdkTracerProvider);
+ }
+
+ @Test
+ @Timeout(10)
+ void exportNotSampledNotRecordedSpans() {
+ Sampler mockSampler = mock(Sampler.class);
+ WaitingSpanExporter exporter = new WaitingSpanExporter(1);
+ SdkTracerProvider sdkTracerProvider =
+ SdkTracerProvider.builder()
+ .addSpanProcessor(
+ ConsistentReservoirSamplingSpanProcessor.create(
+ exporter, RESERVOIR_SIZE, EXPORT_PERIOD_100_MILLIS_AS_NANOS))
+ .setSampler(mockSampler)
+ .build();
+
+ when(mockSampler.shouldSample(any(), any(), any(), any(), any(), anyList()))
+ .thenReturn(SamplingResult.drop());
+ sdkTracerProvider.get("test").spanBuilder(SPAN_NAME_1).startSpan().end();
+ sdkTracerProvider.get("test").spanBuilder(SPAN_NAME_2).startSpan().end();
+ when(mockSampler.shouldSample(any(), any(), any(), any(), any(), anyList()))
+ .thenReturn(SamplingResult.recordAndSample());
+ ReadableSpan span = createEndedSpan(SPAN_NAME_2, sdkTracerProvider);
+ // Spans are recorded and exported in the same order as they are ended, we test that a non
+ // sampled span is not exported by creating and ending a sampled span after a non sampled span
+ // and checking that the first exported span is the sampled span (the non sampled did not get
+ // exported).
+ List exported = exporter.waitForExport();
+ assertThat(exported).containsExactly(span.toSpanData());
+
+ shutdown(sdkTracerProvider);
+ }
+
+ @Test
+ @Timeout(10)
+ void exportNotSampledButRecordedSpans() {
+ WaitingSpanExporter exporter = new WaitingSpanExporter(1);
+
+ Sampler mockSampler = mock(Sampler.class);
+ when(mockSampler.shouldSample(any(), any(), any(), any(), any(), anyList()))
+ .thenReturn(SamplingResult.recordOnly());
+ SdkTracerProvider sdkTracerProvider =
+ SdkTracerProvider.builder()
+ .addSpanProcessor(
+ ConsistentReservoirSamplingSpanProcessor.create(
+ exporter, RESERVOIR_SIZE, EXPORT_PERIOD_100_MILLIS_AS_NANOS))
+ .setSampler(mockSampler)
+ .build();
+
+ createEndedSpan(SPAN_NAME_1, sdkTracerProvider);
+ when(mockSampler.shouldSample(any(), any(), any(), any(), any(), anyList()))
+ .thenReturn(SamplingResult.recordAndSample());
+ ReadableSpan span = createEndedSpan(SPAN_NAME_2, sdkTracerProvider);
+
+ // Spans are recorded and exported in the same order as they are ended, we test that a non
+ // exported span is not exported by creating and ending a sampled span after a non sampled span
+ // and checking that the first exported span is the sampled span (the non sampled did not get
+ // exported).
+ List exported = exporter.waitForExport();
+ assertThat(exported).containsExactly(span.toSpanData());
+
+ shutdown(sdkTracerProvider);
+ }
+
+ @Test
+ @Timeout(10)
+ void shutdownFlushes() {
+ WaitingSpanExporter exporter = new WaitingSpanExporter(1);
+
+ // Set the export period to large value, in order to confirm the #flush() below works
+ SdkTracerProvider sdkTracerProvider =
+ SdkTracerProvider.builder()
+ .addSpanProcessor(
+ ConsistentReservoirSamplingSpanProcessor.create(
+ exporter, RESERVOIR_SIZE, VERY_LONG_EXPORT_PERIOD_NANOS))
+ .build();
+
+ ReadableSpan span = createEndedSpan(SPAN_NAME_1, sdkTracerProvider);
+
+ // Force a shutdown, which forces processing of all remaining spans.
+ shutdown(sdkTracerProvider);
+
+ List exported = exporter.getExported();
+ assertThat(exported).containsExactly(span.toSpanData());
+ assertThat(exporter.shutDownCalled.get()).isTrue();
+ }
+
+ @Test
+ void shutdownPropagatesSuccess() {
+ SpanExporter mockSpanExporter = mock(SpanExporter.class);
+ when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
+
+ SpanProcessor processor =
+ ConsistentReservoirSamplingSpanProcessor.create(
+ mockSpanExporter, RESERVOIR_SIZE, EXPORT_PERIOD_100_MILLIS_AS_NANOS);
+ CompletableResultCode result = processor.shutdown();
+ result.join(1, TimeUnit.SECONDS);
+ assertThat(result.isSuccess()).isTrue();
+ }
+
+ @Test
+ void shutdownPropagatesFailure() {
+ SpanExporter mockSpanExporter = mock(SpanExporter.class);
+ when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofFailure());
+
+ SpanProcessor processor =
+ ConsistentReservoirSamplingSpanProcessor.create(
+ mockSpanExporter, RESERVOIR_SIZE, EXPORT_PERIOD_100_MILLIS_AS_NANOS);
+ CompletableResultCode result = processor.shutdown();
+ result.join(1, TimeUnit.SECONDS);
+ assertThat(result.isSuccess()).isFalse();
+ }
+
+ @Test
+ @Timeout(10)
+ void fullReservoir() {
+ int reservoirSize = 10;
+ int numberOfSpans = 100;
+
+ WaitingSpanExporter exporter = new WaitingSpanExporter(reservoirSize);
+
+ SpanProcessor processor =
+ ConsistentReservoirSamplingSpanProcessor.create(
+ exporter, reservoirSize, VERY_LONG_EXPORT_PERIOD_NANOS);
+
+ SdkTracerProvider sdkTracerProvider =
+ SdkTracerProvider.builder()
+ .setSampler(ConsistentSampler.alwaysOn())
+ .addSpanProcessor(processor)
+ .build();
+
+ IntStream.range(0, numberOfSpans)
+ .forEach(i -> createEndedSpan("MySpanName/" + i, sdkTracerProvider));
+
+ processor.forceFlush().join(10, TimeUnit.SECONDS);
+
+ List exported = exporter.waitForExport();
+ assertThat(exported).hasSize(reservoirSize);
+
+ shutdown(sdkTracerProvider);
+ }
+
+ private enum Tests {
+ VERIFY_MEAN,
+ VERIFY_PVALUE_DISTRIBUTION,
+ VERIFY_ORDER_INDEPENDENCE
+ }
+
+ private static LongSupplier asThreadSafeLongSupplier(SplittableRandom rng) {
+ return () -> {
+ synchronized (rng) {
+ return rng.nextLong();
+ }
+ };
+ }
+
+ /**
+ * Tests a multi-stage consistent sampling setup consisting of a consistent probability-based
+ * sampler with predefined sampling probability followed by a reservoir sampling span processor
+ * with fixed reservoir size.
+ */
+ private void testConsistentSampling(
+ long seed,
+ int numCycles,
+ int numberOfSpans,
+ int reservoirSize,
+ double samplingProbability,
+ EnumSet tests) {
+
+ SplittableRandom rng1 = new SplittableRandom(seed);
+ SplittableRandom rng2 = rng1.split();
+
+ WaitingSpanExporter spanExporter = new WaitingSpanExporter(0);
+
+ SpanProcessor processor =
+ ConsistentReservoirSamplingSpanProcessor.create(
+ spanExporter,
+ reservoirSize,
+ VERY_LONG_EXPORT_PERIOD_NANOS,
+ DEFAULT_EXPORT_TIMEOUT_NANOS,
+ RandomGenerator.create(asThreadSafeLongSupplier(rng1)));
+
+ SdkTracerProvider sdkTracerProvider =
+ SdkTracerProvider.builder()
+ .setSampler(
+ ConsistentSampler.probabilityBased(
+ samplingProbability, RandomGenerator.create(asThreadSafeLongSupplier(rng2))))
+ .addSpanProcessor(processor)
+ .build();
+
+ Map observedPvalues = new HashMap<>();
+ Map spanNameCounts = new HashMap<>();
+
+ double[] totalAdjustedCounts = new double[numCycles];
+
+ for (int k = 0; k < numCycles; ++k) {
+ List spans = new ArrayList<>(numberOfSpans);
+ for (long i = 0; i < numberOfSpans; ++i) {
+ ReadableSpan span = createEndedSpan(Long.toString(i), sdkTracerProvider);
+ if (span != null) {
+ spans.add(span);
+ }
+ }
+
+ if (samplingProbability >= 1.) {
+ assertThat(spans).hasSize(numberOfSpans);
+ }
+
+ processor.forceFlush().join(1000, TimeUnit.SECONDS);
+
+ List exported = spanExporter.getExported();
+ assertThat(exported).hasSize(Math.min(reservoirSize, spans.size()));
+
+ long totalAdjustedCount = 0;
+ for (SpanData spanData : exported) {
+ String traceStateString =
+ spanData.getSpanContext().getTraceState().get(OtelTraceState.TRACE_STATE_KEY);
+ OtelTraceState traceState = OtelTraceState.parse(traceStateString);
+ assertTrue(traceState.hasValidR());
+ assertTrue(traceState.hasValidP());
+ observedPvalues.merge(traceState.getP(), 1L, Long::sum);
+ totalAdjustedCount += 1L << traceState.getP();
+ spanNameCounts.merge(spanData.getName(), 1L, Long::sum);
+ }
+ totalAdjustedCounts[k] = totalAdjustedCount;
+ }
+
+ long totalNumberOfSpans = numberOfSpans * (long) numCycles;
+ if (numCycles == 1) {
+ assertThat(observedPvalues).hasSizeLessThanOrEqualTo(2);
+ }
+ if (tests.contains(Tests.VERIFY_MEAN)) {
+ assertThat(reservoirSize)
+ .isGreaterThanOrEqualTo(
+ 100); // require a lower limit on the reservoir size, to justify the assumption of the
+ // t-test that values are normally distributed
+
+ assertThat(new TTest().tTest(totalNumberOfSpans / (double) numCycles, totalAdjustedCounts))
+ .isGreaterThan(0.01);
+ }
+ if (tests.contains(Tests.VERIFY_PVALUE_DISTRIBUTION)) {
+ assertThat(observedPvalues)
+ .hasSizeLessThanOrEqualTo(2); // test does not work for more than 2 different p-values
+
+ // The expected number of sampled spans is binomially distributed with the given sampling
+ // probability. However, due to the reservoir sampling buffer the maximum number of sampled
+ // spans is given by the reservoir size. The effective sampling rate is therefore given by
+ // sum_{i=0}^n p^i*(1-p)^{n-i}*min(i,k) (n choose i)
+ // where p denotes the sampling rate, n is the total number of original spans, and k denotes
+ // the reservoir size
+ double p1 =
+ new BinomialDistribution(numberOfSpans - 1, samplingProbability)
+ .cumulativeProbability(reservoirSize - 1);
+ double p2 =
+ new BinomialDistribution(numberOfSpans, samplingProbability)
+ .cumulativeProbability(reservoirSize);
+ assertThat(p1).isLessThanOrEqualTo(p2);
+
+ double effectiveSamplingProbability =
+ samplingProbability * p1 + (reservoirSize / (double) numberOfSpans) * (1. - p2);
+ verifyObservedPvaluesUsingGtest(
+ totalNumberOfSpans, observedPvalues, effectiveSamplingProbability);
+ }
+ if (tests.contains(Tests.VERIFY_ORDER_INDEPENDENCE)) {
+ assertThat(spanNameCounts.size()).isEqualTo(numberOfSpans);
+ long[] observed = spanNameCounts.values().stream().mapToLong(x -> x).toArray();
+ double[] expected = new double[numberOfSpans];
+ Arrays.fill(expected, 1.);
+ assertThat(new GTest().gTest(expected, observed)).isGreaterThan(0.01);
+ }
+
+ shutdown(sdkTracerProvider);
+ }
+
+ @Test
+ @Timeout(1000)
+ void testConsistentSampling() {
+ testConsistentSampling(
+ 0x34e7052af91d5355L,
+ 1000,
+ 1000,
+ 100,
+ 1.,
+ EnumSet.of(Tests.VERIFY_MEAN, Tests.VERIFY_ORDER_INDEPENDENCE));
+ testConsistentSampling(
+ 0x44ec62de12a422b4L,
+ 1000,
+ 1000,
+ 100,
+ 0.8,
+ EnumSet.of(Tests.VERIFY_MEAN, Tests.VERIFY_ORDER_INDEPENDENCE));
+ testConsistentSampling(
+ 0x2c3d086534e14407L,
+ 1000,
+ 1000,
+ 100,
+ 0.1,
+ EnumSet.of(
+ Tests.VERIFY_MEAN, Tests.VERIFY_PVALUE_DISTRIBUTION, Tests.VERIFY_ORDER_INDEPENDENCE));
+ testConsistentSampling(
+ 0xd3f8a40433cf0522L,
+ 1000,
+ 1000,
+ 200,
+ 0.9,
+ EnumSet.of(Tests.VERIFY_MEAN, Tests.VERIFY_ORDER_INDEPENDENCE));
+ testConsistentSampling(
+ 0xf25638ca67eceadcL, 10000, 100, 100, 1.0, EnumSet.of(Tests.VERIFY_MEAN));
+ testConsistentSampling(
+ 0x14c5f8f815618ce2L,
+ 1000,
+ 200,
+ 100,
+ 1.0,
+ EnumSet.of(Tests.VERIFY_MEAN, Tests.VERIFY_ORDER_INDEPENDENCE));
+ testConsistentSampling(
+ 0xb6c27f1169e128ddL,
+ 1000,
+ 1000,
+ 200,
+ 0.2,
+ EnumSet.of(
+ Tests.VERIFY_MEAN, Tests.VERIFY_PVALUE_DISTRIBUTION, Tests.VERIFY_ORDER_INDEPENDENCE));
+ testConsistentSampling(
+ 0xab558ff7c5c73c18L,
+ 1000,
+ 10000,
+ 200,
+ 1.,
+ EnumSet.of(
+ Tests.VERIFY_MEAN, Tests.VERIFY_PVALUE_DISTRIBUTION, Tests.VERIFY_ORDER_INDEPENDENCE));
+ testConsistentSampling(
+ 0xe53010c4b009a6c0L,
+ 1000,
+ 1000,
+ 2000,
+ 0.2,
+ EnumSet.of(
+ Tests.VERIFY_MEAN, Tests.VERIFY_PVALUE_DISTRIBUTION, Tests.VERIFY_ORDER_INDEPENDENCE));
+ testConsistentSampling(
+ 0xc41d327fd1a6866aL, 1000000, 5, 4, 1.0, EnumSet.of(Tests.VERIFY_ORDER_INDEPENDENCE));
+ }
+}
diff --git a/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/RandomGeneratorTest.java b/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/RandomGeneratorTest.java
new file mode 100644
index 000000000..2a8669f6a
--- /dev/null
+++ b/consistent-sampling/src/test/java/io/opentelemetry/contrib/samplers/RandomGeneratorTest.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.samplers;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+import java.util.BitSet;
+import java.util.SplittableRandom;
+import java.util.stream.DoubleStream;
+import org.hipparchus.stat.inference.GTest;
+import org.junit.jupiter.api.Test;
+
+public class RandomGeneratorTest {
+
+ private static void testGenerateRandomBitSet(long seed, int numBits, int numOneBits) {
+
+ int numCycles = 100000;
+
+ SplittableRandom splittableRandom = new SplittableRandom(seed);
+ RandomGenerator randomGenerator = RandomGenerator.create(splittableRandom::nextLong);
+
+ long[] observed = new long[numBits];
+ double[] expected = DoubleStream.generate(() -> 1.).limit(numBits).toArray();
+
+ for (int i = 0; i < numCycles; ++i) {
+ BitSet bitSet = randomGenerator.generateRandomBitSet(numBits, numOneBits);
+ bitSet.stream().forEach(k -> observed[k] += 1);
+ assertThat(bitSet.cardinality()).isEqualTo(numOneBits);
+ }
+ if (numBits > 1) {
+ assertThat(new GTest().gTest(expected, observed)).isGreaterThan(0.01);
+ } else if (numBits == 1) {
+ assertThat(observed[0]).isEqualTo(numOneBits * (long) numCycles);
+ } else {
+ fail("numBits was non-positive!");
+ }
+ }
+
+ @Test
+ void testGenerateRandomBitSet() {
+ testGenerateRandomBitSet(0x4a5580b958d52182L, 1, 0);
+ testGenerateRandomBitSet(0x529dff14b0ce7414L, 1, 1);
+ testGenerateRandomBitSet(0x2d3f673a9e1da536L, 2, 0);
+ testGenerateRandomBitSet(0xb9a6735e64361bacL, 2, 1);
+ testGenerateRandomBitSet(0xb5aafedc7031506fL, 2, 2);
+ testGenerateRandomBitSet(0xaecabe7698971ee1L, 3, 0);
+ testGenerateRandomBitSet(0x119ccf35dc52b34dL, 3, 1);
+ testGenerateRandomBitSet(0xcaf2b7a98f194ce2L, 3, 2);
+ testGenerateRandomBitSet(0xe28e8cc3d3de0c2aL, 3, 3);
+ testGenerateRandomBitSet(0xb69989dce9cc8b34L, 4, 0);
+ testGenerateRandomBitSet(0x6575d4c848c95dc8L, 4, 1);
+ testGenerateRandomBitSet(0xed0ad0525ad632e9L, 4, 2);
+ testGenerateRandomBitSet(0x34db9303b405a706L, 4, 3);
+ testGenerateRandomBitSet(0x8e97972893044140L, 4, 4);
+ testGenerateRandomBitSet(0x47f966b8f28dac77L, 5, 0);
+ testGenerateRandomBitSet(0x7996db4a5f1e4680L, 5, 1);
+ testGenerateRandomBitSet(0x577fcf18bbc0ba30L, 5, 2);
+ testGenerateRandomBitSet(0x36b1ed999d2986b0L, 5, 3);
+ testGenerateRandomBitSet(0xa8e099ed958d03bbL, 5, 4);
+ testGenerateRandomBitSet(0xc2b50bbf3263b414L, 5, 5);
+ testGenerateRandomBitSet(0x2994550582b091e9L, 6, 0);
+ testGenerateRandomBitSet(0xd2797c539136f6faL, 6, 1);
+ testGenerateRandomBitSet(0xf3ffae1d93983fd9L, 6, 2);
+ testGenerateRandomBitSet(0x281e0f9873455ea6L, 6, 3);
+ testGenerateRandomBitSet(0x5344c2887e30d621L, 6, 4);
+ testGenerateRandomBitSet(0xa8f4ed6e3e1cf385L, 6, 5);
+ testGenerateRandomBitSet(0x6bd0f9f11520ae57L, 6, 6);
+
+ testGenerateRandomBitSet(0x514f52732c193e62L, 1000, 1);
+ testGenerateRandomBitSet(0xe214063ae29d9802L, 1000, 10);
+ testGenerateRandomBitSet(0x602fdb45063e7b0fL, 1000, 990);
+ testGenerateRandomBitSet(0xe0ef0cb214de3ec0L, 1000, 999);
+ }
+}
diff --git a/consistent-sampling/src/test/java/io/opentelemetry/contrib/util/TestUtil.java b/consistent-sampling/src/test/java/io/opentelemetry/contrib/util/TestUtil.java
new file mode 100644
index 000000000..af196c1e9
--- /dev/null
+++ b/consistent-sampling/src/test/java/io/opentelemetry/contrib/util/TestUtil.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.contrib.util;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.hipparchus.stat.inference.GTest;
+
+public final class TestUtil {
+
+ private TestUtil() {}
+
+ public static void verifyObservedPvaluesUsingGtest(
+ long originalNumberOfSpans, Map observedPvalues, double samplingProbability) {
+
+ Object notSampled =
+ new Object() {
+ @Override
+ public String toString() {
+ return "NOT SAMPLED";
+ }
+ };
+
+ Map