Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow exporters to influence Aggregation #3762

Merged
merged 23 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3bb5625
Major refactoring to Aggregator
jsuereth Oct 16, 2021
eecddb7
Remove extraneous bits of information from AggregatorFactory interface
jsuereth Oct 16, 2021
9a195c3
Spotless + remove extraneous pieces of data from Aggregation interface.
jsuereth Oct 16, 2021
6a07707
Added supported + preferred temprality to export interfaces.
jsuereth Oct 16, 2021
9f12d78
Finish wiring configured temporality through Views.
jsuereth Oct 16, 2021
9d120b2
Fix remaining gauge tests
jsuereth Oct 16, 2021
e227722
Update exporters to specify desired temporality
jsuereth Oct 17, 2021
e2a8865
Other build fixes
jsuereth Oct 17, 2021
bf8e339
Fix javadoc
jsuereth Oct 17, 2021
123f729
add tests for cumulative+delta in temporal storage
jsuereth Oct 17, 2021
fbc1c12
Move diffInPlace and megeInPlace into utility class
jsuereth Oct 17, 2021
a1c598a
Add explicit test for delta vs. cumulative exporters.
jsuereth Oct 17, 2021
619ec2c
Spotless fixes
jsuereth Oct 17, 2021
09137e1
Remove AggregatorFactory
jsuereth Oct 17, 2021
8a7100e
Fixes from review
jsuereth Oct 20, 2021
c94719c
Add javadoc for new parameters in temporal metric storage.
jsuereth Oct 20, 2021
44ecfb8
Merge latest.
jsuereth Oct 23, 2021
cbb9750
Fixes from review.
jsuereth Oct 23, 2021
d99b832
Add some unit tests for the aggregation config
jsuereth Oct 24, 2021
0d0e0db
Add "diff" unit test for histogram + lastvalue
jsuereth Oct 24, 2021
58ba395
Apply suggestions from code review
jsuereth Oct 25, 2021
547abf7
Merge main.
jsuereth Oct 26, 2021
b4152b8
Fixes from code review.
jsuereth Oct 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.opentelemetry.exporter.otlp.internal.metrics.MetricsRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.IOException;
Expand Down Expand Up @@ -53,6 +54,13 @@ public final class OtlpHttpMetricExporter implements MetricExporter {
this.compressionEnabled = compressionEnabled;
}

@Nullable
@Override
public final AggregationTemporality getPreferedTemporality() {
jsuereth marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Lookup based on specification, or constructor
jsuereth marked this conversation as resolved.
Show resolved Hide resolved
return null;
}

/**
* Submits all the given metrics in a single batch to the OpenTelemetry collector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import io.opentelemetry.exporter.otlp.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.otlp.internal.metrics.MetricsRequestMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.util.Collection;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

/** Exports metrics using OTLP via gRPC, using OpenTelemetry's protobuf model. */
Expand Down Expand Up @@ -43,6 +45,13 @@ public static OtlpGrpcMetricExporterBuilder builder() {
this.delegate = delegate;
}

@Nullable
@Override
public final AggregationTemporality getPreferedTemporality() {
jsuereth marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Lookup based on specification, or constructor
return null;
}

/**
* Submits all the given metrics in a single batch to the OpenTelemetry collector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.exporter.prometheus;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricProducer;
import io.opentelemetry.sdk.metrics.export.MetricReader;
Expand All @@ -15,6 +16,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;

/**
Expand Down Expand Up @@ -47,6 +49,16 @@ public static MetricReaderFactory create() {
return new Factory();
}

@Override
public EnumSet<AggregationTemporality> getSupportedTemporality() {
return EnumSet.of(AggregationTemporality.CUMULATIVE);
}

@Override
public AggregationTemporality getPreferedTemporality() {
return AggregationTemporality.CUMULATIVE;
}

// Prometheus cannot flush.
@Override
public CompletableResultCode flush() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.sun.net.httpserver.HttpServer;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.DaemonThreadFactory;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.MetricProducer;
Expand All @@ -35,6 +36,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -104,6 +106,16 @@ private void start() {
}
}

@Override
public EnumSet<AggregationTemporality> getSupportedTemporality() {
return EnumSet.of(AggregationTemporality.CUMULATIVE);
}

@Override
public AggregationTemporality getPreferedTemporality() {
return AggregationTemporality.CUMULATIVE;
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ public SumAssertT isNotMonotonic() {
failWithActualExpectedAndMessage(
actual,
"montonic: fail",
"Exepcted Sum to be non-monotonic",
false,
"Exepcted Sum to be non-monotonic, found: %s",
jsuereth marked this conversation as resolved.
Show resolved Hide resolved
actual.isMonotonic());
}
return myself;
Expand All @@ -49,7 +48,6 @@ public SumAssertT isCumulative() {
actual,
"aggregationTemporality: CUMULATIVE",
"Exepcted Sum to have cumulative aggregation but found <%s>",
jsuereth marked this conversation as resolved.
Show resolved Hide resolved
AggregationTemporality.CUMULATIVE,
actual.getAggregationTemporality());
}
return myself;
Expand All @@ -62,8 +60,7 @@ public SumAssertT isDelta() {
failWithActualExpectedAndMessage(
actual,
"aggregationTemporality: DELTA",
"Exepcted Sum to have cumulative aggregation but found <%s>",
AggregationTemporality.DELTA,
"Exepected Sum to have delta aggregation but found <%s>",
jsuereth marked this conversation as resolved.
Show resolved Hide resolved
actual.getAggregationTemporality());
}
return myself;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@

package io.opentelemetry.sdk.metrics.internal.aggregator;

import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.metrics.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.view.Aggregation;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
Expand All @@ -31,14 +28,11 @@
@State(Scope.Benchmark)
public class DoubleHistogramBenchmark {
private static final Aggregator<HistogramAccumulation> aggregator =
AggregatorFactory.histogram(Arrays.asList(10.0, 100.0, 1_000.0), AggregationTemporality.DELTA)
.create(
Resource.getDefault(),
InstrumentationLibraryInfo.empty(),
Aggregation.explicitBucketHistogram(Arrays.asList(10.0, 100.0, 1_000.0))
.createAggregator(
InstrumentDescriptor.create(
"name", "description", "1", InstrumentType.HISTOGRAM, InstrumentValueType.DOUBLE),
MetricDescriptor.create("name", "description", "1"),
ExemplarReservoir::noSamples);
ExemplarFilter.neverSample());
private AggregatorHandle<HistogramAccumulation> aggregatorHandle;

@Setup(Level.Trial)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,7 @@

package io.opentelemetry.sdk.metrics.internal.aggregator;

import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.resources.Resource;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand All @@ -29,14 +23,7 @@
@State(Scope.Benchmark)
public class DoubleMinMaxSumCountBenchmark {
private static final Aggregator<MinMaxSumCountAccumulation> aggregator =
AggregatorFactory.minMaxSumCount()
.create(
Resource.getDefault(),
InstrumentationLibraryInfo.empty(),
InstrumentDescriptor.create(
"name", "description", "1", InstrumentType.HISTOGRAM, InstrumentValueType.DOUBLE),
MetricDescriptor.create("name", "description", "1"),
ExemplarReservoir::noSamples);
new DoubleMinMaxSumCountAggregator(ExemplarReservoir::noSamples);
private AggregatorHandle<MinMaxSumCountAccumulation> aggregatorHandle;

@Setup(Level.Trial)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,7 @@

package io.opentelemetry.sdk.metrics.internal.aggregator;

import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.common.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.common.InstrumentType;
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.resources.Resource;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand All @@ -29,14 +23,7 @@
@State(Scope.Benchmark)
public class LongMinMaxSumCountBenchmark {
private static final Aggregator<MinMaxSumCountAccumulation> aggregator =
AggregatorFactory.minMaxSumCount()
.create(
Resource.getDefault(),
InstrumentationLibraryInfo.empty(),
InstrumentDescriptor.create(
"name", "description", "1", InstrumentType.HISTOGRAM, InstrumentValueType.LONG),
MetricDescriptor.create("name", "description", "1"),
ExemplarReservoir::noSamples);
new LongMinMaxSumCountAggregator(ExemplarReservoir::noSamples);
private AggregatorHandle<MinMaxSumCountAccumulation> aggregatorHandle;

@Setup(Level.Trial)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.export.MetricReaderFactory;
import io.opentelemetry.sdk.metrics.internal.export.CollectionHandle;
import io.opentelemetry.sdk.metrics.internal.export.CollectionInfo;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -45,7 +48,7 @@ final class DefaultSdkMeterProvider implements SdkMeterProvider {
private final ComponentRegistry<SdkMeter> registry;
private final MeterProviderSharedState sharedState;
private final Set<CollectionHandle> collectors;
private final List<MetricReader> readers;
private final Map<CollectionHandle, CollectionInfo> collectionInfoMap;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicLong lastCollectionTimestamp;

Expand All @@ -72,14 +75,14 @@ final class DefaultSdkMeterProvider implements SdkMeterProvider {
// These are guaranteed to be unique per-reader for this SDK, and only this SDK.
// These are *only* mutated in our constructor, and safe to use concurrently after construction.
collectors = CollectionHandle.mutableSet();
readers = new ArrayList<>();
collectionInfoMap = new HashMap<>();
Supplier<CollectionHandle> handleSupplier = CollectionHandle.createSupplier();
for (MetricReaderFactory readerFactory : readerFactories) {
CollectionHandle handle = handleSupplier.get();
// TODO: handle failure in creation or just crash?
MetricReader reader = readerFactory.apply(new LeasedMetricProducer(handle));
collectionInfoMap.put(handle, CollectionInfo.create(handle, collectors, reader));
collectors.add(handle);
readers.add(reader);
}
}

Expand All @@ -95,8 +98,8 @@ public MeterBuilder meterBuilder(@Nullable String instrumentationName) {
@Override
public CompletableResultCode forceFlush() {
List<CompletableResultCode> results = new ArrayList<>();
for (MetricReader reader : readers) {
results.add(reader.flush());
for (CollectionInfo reader : collectionInfoMap.values()) {
results.add(reader.getReader().shutdown());
jsuereth marked this conversation as resolved.
Show resolved Hide resolved
}
return CompletableResultCode.ofAll(results);
}
Expand All @@ -108,8 +111,8 @@ public CompletableResultCode close() {
return CompletableResultCode.ofSuccess();
}
List<CompletableResultCode> results = new ArrayList<>();
for (MetricReader reader : readers) {
results.add(reader.shutdown());
for (CollectionInfo info : collectionInfoMap.values()) {
results.add(info.getReader().shutdown());
}
return CompletableResultCode.ofAll(results);
}
Expand Down Expand Up @@ -147,7 +150,9 @@ public Collection<MetricData> collectAllMetrics() {
for (SdkMeter meter : meters) {
result.addAll(
meter.collectAll(
handle, collectors, sharedState.getClock().now(), disableSynchronousCollection));
collectionInfoMap.get(handle),
sharedState.getClock().now(),
disableSynchronousCollection));
}
return Collections.unmodifiableCollection(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.export.CollectionHandle;
import io.opentelemetry.sdk.metrics.internal.export.CollectionInfo;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.state.MeterSharedState;
import java.util.Collection;
import java.util.Set;

/** {@link SdkMeter} is SDK implementation of {@link Meter}. */
final class SdkMeter implements Meter {
Expand All @@ -37,16 +36,9 @@ InstrumentationLibraryInfo getInstrumentationLibraryInfo() {

/** Collects all the metric recordings that changed since the previous call. */
Collection<MetricData> collectAll(
CollectionHandle collector,
Set<CollectionHandle> allCollectors,
long epochNanos,
boolean suppressSynchronousCollection) {
CollectionInfo collectionInfo, long epochNanos, boolean suppressSynchronousCollection) {
return meterSharedState.collectAll(
collector,
allCollectors,
meterProviderSharedState,
epochNanos,
suppressSynchronousCollection);
collectionInfo, meterProviderSharedState, epochNanos, suppressSynchronousCollection);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
package io.opentelemetry.sdk.metrics.export;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.Collection;
import java.util.EnumSet;
import javax.annotation.Nullable;

/**
* {@code MetricExporter} is the interface that all "push based" metric libraries should use to
Expand All @@ -17,6 +20,17 @@
*/
public interface MetricExporter {

/** Returns the set of all supported temporalities for this exporter. */
default EnumSet<AggregationTemporality> getSupportedTemporality() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the purpose of this to fail if someone configures a view for an exporter that doesn't support the specified temporality?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically.

However, the Metrics SiG Is removing AggregationTemporality from the View API, so the purpose here is that an exporter can "handle everything" by default.

return EnumSet.allOf(AggregationTemporality.class);
}

/** Returns the preferred temporality for metrics. */
@Nullable
default AggregationTemporality getPreferedTemporality() {
jsuereth marked this conversation as resolved.
Show resolved Hide resolved
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregatedTemporality.CUMULATIVE? Just kidding (mostly) :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ha!

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have thought that this method would take an instrument type as a parameter, since a given exporter might want this to vary by instrument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Do you have an example of this (A backend/exporter that needs this level of sophistication)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought Prometheus wanted cumulative for everything but histograms, but I might very well be making that up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, they need cumulative for everything. (Writing that spec now)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may be thinking of Summary? However in that case it's not DELTA, it's actually "last 10 minutes", which could be longer than the DELTA export interval.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yeah, Summary is probably what I'm thinking of. But, I could imagine an exporter might want to impose a full view or set of views, in order to limit cardinality, etc. What would you think of handing the exporter some sort of Builder thingee that it could use to fully customize the output that is needed for a particular backend?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on the fence.

I think for the first version, I wish we had decided to go with ALL delta or ALL cumulative and let OTLP pick one or the other via config. Now we have an expectation of flexibility that I'm not sure where it truly belongs.

My thinking is that I expect the VIEW SDK to eventually have a "consistent across SDK" configuration file, something like:

views:
   select:
        instrumentType: counter
        instrumentationLibrary: grpc
        name: request.bytes
  aggregation:
     type: Histogram
  attributes:
     drop: [route]

I'm not so certain we'll be exposing that kind of configuration on a per-exporter level. While I think the choice of aggregation temporality really is an exporter decision, I find myself torn in that I think a consistent View configuration file to be more likely. (And what I suggest above is a really ugly version of what I think we'll eventually have, something halfway between prometheus-rewrite rules and otel-collector processor config).


/**
* Exports the collection of given {@link MetricData}. Note that export operations can be
* performed simultaneously depending on the type of metric reader being used. However, the caller
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package io.opentelemetry.sdk.metrics.export;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import java.util.EnumSet;

/**
* A registered reader of metrics.
Expand All @@ -15,6 +17,12 @@
*/
public interface MetricReader {

/** Return The set of all supported temporalities for this exporter. */
EnumSet<AggregationTemporality> getSupportedTemporality();

/** Return The preferred temporality for metrics. */
AggregationTemporality getPreferedTemporality();
jsuereth marked this conversation as resolved.
Show resolved Hide resolved

/**
* Flushes metrics read by this reader.
*
Expand Down
Loading