Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix metric sdk when multiple readers are present #4436

Merged
merged 4 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.export.CollectionInfo;
import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.state.MeterSharedState;
import java.util.Collection;
import java.util.List;

/** {@link SdkMeter} is SDK implementation of {@link Meter}. */
final class SdkMeter implements Meter {
Expand All @@ -37,10 +38,11 @@ final class SdkMeter implements Meter {

SdkMeter(
MeterProviderSharedState meterProviderSharedState,
InstrumentationScopeInfo instrumentationScopeInfo) {
InstrumentationScopeInfo instrumentationScopeInfo,
List<RegisteredReader> registeredReaders) {
this.instrumentationScopeInfo = instrumentationScopeInfo;
this.meterProviderSharedState = meterProviderSharedState;
this.meterSharedState = MeterSharedState.create(instrumentationScopeInfo);
this.meterSharedState = MeterSharedState.create(instrumentationScopeInfo, registeredReaders);
}

// Visible for testing
Expand All @@ -49,10 +51,8 @@ InstrumentationScopeInfo getInstrumentationScopeInfo() {
}

/** Collects all the metric recordings that changed since the previous call. */
Collection<MetricData> collectAll(
CollectionInfo collectionInfo, long epochNanos, boolean suppressSynchronousCollection) {
return meterSharedState.collectAll(
collectionInfo, meterProviderSharedState, epochNanos, suppressSynchronousCollection);
Collection<MetricData> collectAll(RegisteredReader registeredReader, long epochNanos) {
return meterSharedState.collectAll(registeredReader, meterProviderSharedState, epochNanos);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,18 @@
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.export.CollectionHandle;
import io.opentelemetry.sdk.metrics.internal.export.CollectionInfo;
import io.opentelemetry.sdk.metrics.internal.export.MetricProducer;
import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry;
import io.opentelemetry.sdk.resources.Resource;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.logging.Logger;

/** SDK implementation for {@link MeterProvider}. */
Expand All @@ -41,10 +35,8 @@ public final class SdkMeterProvider implements MeterProvider, Closeable {

private final ComponentRegistry<SdkMeter> registry;
private final MeterProviderSharedState sharedState;
private final Map<CollectionHandle, CollectionInfo> collectionInfoMap;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CollectionHandle and CollectionInfo had overlapping responsibilities. I've replaced them with a single class with a name which more clearly conveys its responsibilities: RegisteredReader.

private final List<RegisteredReader> registeredReaders;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicLong lastCollectionTimestamp;
private final long minimumCollectionIntervalNanos;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The minimumCollectionIntervalNanos we've previously talked about turned out to be central to the bug. It's true purpose appears to be trying to ensure that when multiple readers are present they each receive the same data if they collect within a narrow enough interval of time. However, I believe the mechanism to be flawed as it didn't account for correctness when the readers are on different schedules (a perfectly reasonable scenario).

The refactor negates the need for it and assists in reducing complexity.


/**
* Returns a new {@link SdkMeterProviderBuilder} for {@link SdkMeterProvider}.
Expand All @@ -56,38 +48,26 @@ public static SdkMeterProviderBuilder builder() {
}

SdkMeterProvider(
List<MetricReader> metricReaders,
List<RegisteredReader> registeredReaders,
Clock clock,
Resource resource,
ViewRegistry viewRegistry,
ExemplarFilter exemplarFilter,
long minimumCollectionIntervalNanos) {
ExemplarFilter exemplarFilter) {
this.sharedState =
MeterProviderSharedState.create(clock, resource, viewRegistry, exemplarFilter);
this.registeredReaders = registeredReaders;
for (RegisteredReader registeredReader : registeredReaders) {
registeredReader.getReader().register(new LeasedMetricProducer(registeredReader));
jack-berg marked this conversation as resolved.
Show resolved Hide resolved
}
this.registry =
new ComponentRegistry<>(
instrumentationLibraryInfo -> new SdkMeter(sharedState, instrumentationLibraryInfo));
this.lastCollectionTimestamp =
new AtomicLong(clock.nanoTime() - minimumCollectionIntervalNanos);
this.minimumCollectionIntervalNanos = minimumCollectionIntervalNanos;

// Here we construct our own unique handle ids for this SDK.
// These are guaranteed to be unique per-reader for this SDK, and only this SDK.
// These are *only* mutated in our constructor, and safe to use concurrently after construction.
Set<CollectionHandle> collectors = CollectionHandle.mutableSet();
collectionInfoMap = new HashMap<>();
Supplier<CollectionHandle> handleSupplier = CollectionHandle.createSupplier();
for (MetricReader metricReader : metricReaders) {
CollectionHandle handle = handleSupplier.get();
collectionInfoMap.put(handle, CollectionInfo.create(handle, collectors, metricReader));
metricReader.register(new LeasedMetricProducer(handle));
collectors.add(handle);
}
instrumentationLibraryInfo ->
new SdkMeter(sharedState, instrumentationLibraryInfo, registeredReaders));
}

@Override
public MeterBuilder meterBuilder(String instrumentationScopeName) {
if (collectionInfoMap.isEmpty()) {
if (registeredReaders.isEmpty()) {
return MeterProvider.noop().meterBuilder(instrumentationScopeName);
}
if (instrumentationScopeName == null || instrumentationScopeName.isEmpty()) {
Expand All @@ -102,12 +82,12 @@ public MeterBuilder meterBuilder(String instrumentationScopeName) {
* resulting {@link CompletableResultCode} completes when all complete.
*/
public CompletableResultCode forceFlush() {
if (collectionInfoMap.isEmpty()) {
if (registeredReaders.isEmpty()) {
return CompletableResultCode.ofSuccess();
}
List<CompletableResultCode> results = new ArrayList<>();
for (CollectionInfo collectionInfo : collectionInfoMap.values()) {
results.add(collectionInfo.getReader().forceFlush());
for (RegisteredReader registeredReader : registeredReaders) {
results.add(registeredReader.getReader().forceFlush());
}
return CompletableResultCode.ofAll(results);
}
Expand All @@ -121,11 +101,11 @@ public CompletableResultCode shutdown() {
LOGGER.info("Multiple close calls");
return CompletableResultCode.ofSuccess();
}
if (collectionInfoMap.isEmpty()) {
if (registeredReaders.isEmpty()) {
return CompletableResultCode.ofSuccess();
}
List<CompletableResultCode> results = new ArrayList<>();
for (CollectionInfo info : collectionInfoMap.values()) {
for (RegisteredReader info : registeredReaders) {
results.add(info.getReader().shutdown());
}
return CompletableResultCode.ofAll(results);
Expand All @@ -139,37 +119,19 @@ public void close() {

/** Helper class to expose registered metric exports. */
private class LeasedMetricProducer implements MetricProducer {
private final CollectionHandle handle;

LeasedMetricProducer(CollectionHandle handle) {
this.handle = handle;
private final RegisteredReader registeredReader;

LeasedMetricProducer(RegisteredReader registeredReader) {
this.registeredReader = registeredReader;
}

@Override
public Collection<MetricData> collectAllMetrics() {
Collection<SdkMeter> meters = registry.getComponents();
// Suppress too-frequent-collection.
long currentNanoTime = sharedState.getClock().nanoTime();
long pastNanoTime = lastCollectionTimestamp.get();
// It hasn't been long enough since the last collection.
boolean disableSynchronousCollection =
(currentNanoTime - pastNanoTime) < minimumCollectionIntervalNanos;
// If we're not disabling metrics, write the current collection time.
// We don't care if this happens in more than one thread, suppression is optimistic, and the
// interval is small enough some jitter isn't important.
if (!disableSynchronousCollection) {
lastCollectionTimestamp.lazySet(currentNanoTime);
}
CollectionInfo info = collectionInfoMap.get(handle);
if (info == null) {
throw new IllegalStateException(
"No collection info for handle, this is a bug in the OpenTelemetry SDK.");
}

List<MetricData> result = new ArrayList<>();
for (SdkMeter meter : meters) {
result.addAll(
meter.collectAll(info, sharedState.getClock().now(), disableSynchronousCollection));
result.addAll(meter.collectAll(registeredReader, sharedState.getClock().now()));
}
return Collections.unmodifiableCollection(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@

package io.opentelemetry.sdk.metrics;

import static io.opentelemetry.api.internal.Utils.checkArgument;

import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.internal.debug.SourceInfo;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader;
import io.opentelemetry.sdk.metrics.internal.view.ViewRegistry;
import io.opentelemetry.sdk.metrics.internal.view.ViewRegistryBuilder;
import io.opentelemetry.sdk.resources.Resource;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand All @@ -29,19 +27,11 @@ public final class SdkMeterProviderBuilder {
*/
private static final ExemplarFilter DEFAULT_EXEMPLAR_FILTER = ExemplarFilter.sampleWithTraces();

/**
* By default, the minimum collection interval is 0ns.
*
* @see #setMinimumCollectionInterval(Duration)
*/
private static final long DEFAULT_MIN_COLLECTION_INTERVAL_NANOS = 0;

private Clock clock = Clock.getDefault();
private Resource resource = Resource.getDefault();
private final ViewRegistryBuilder viewRegistryBuilder = ViewRegistry.builder();
private final List<MetricReader> metricReaders = new ArrayList<>();
private final List<RegisteredReader> registeredReaders = new ArrayList<>();
private ExemplarFilter exemplarFilter = DEFAULT_EXEMPLAR_FILTER;
private long minimumCollectionIntervalNanos = DEFAULT_MIN_COLLECTION_INTERVAL_NANOS;

SdkMeterProviderBuilder() {}

Expand Down Expand Up @@ -123,21 +113,7 @@ public SdkMeterProviderBuilder registerView(InstrumentSelector selector, View vi
* @return this
*/
public SdkMeterProviderBuilder registerMetricReader(MetricReader reader) {
metricReaders.add(reader);
return this;
}

/**
* Configure the minimum duration between synchronous collections. If collections occur more
* frequently than this, synchronous collection will be suppressed.
*
* @param duration The duration.
* @return this
*/
SdkMeterProviderBuilder setMinimumCollectionInterval(Duration duration) {
Objects.requireNonNull(duration, "duration");
checkArgument(!duration.isNegative(), "duration must not be negative");
minimumCollectionIntervalNanos = duration.toNanos();
registeredReaders.add(RegisteredReader.create(reader));
return this;
}

Expand All @@ -147,11 +123,6 @@ SdkMeterProviderBuilder setMinimumCollectionInterval(Duration duration) {
*/
public SdkMeterProvider build() {
return new SdkMeterProvider(
metricReaders,
clock,
resource,
viewRegistryBuilder.build(),
exemplarFilter,
minimumCollectionIntervalNanos);
registeredReaders, clock, resource, viewRegistryBuilder.build(), exemplarFilter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import io.opentelemetry.sdk.metrics.internal.view.StringPredicates;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.function.Predicate;

/**
Expand Down Expand Up @@ -42,27 +41,6 @@ public static void setExemplarFilter(
}
}

/**
* Reflectively set the minimum duration between synchronous collections for the {@link
* SdkMeterProviderBuilder}. If collections occur more frequently than this, synchronous
* collection will be suppressed.
*
* @param duration The duration.
*/
public static void setMinimumCollectionInterval(
SdkMeterProviderBuilder sdkMeterProviderBuilder, Duration duration) {
try {
Method method =
SdkMeterProviderBuilder.class.getDeclaredMethod(
"setMinimumCollectionInterval", Duration.class);
method.setAccessible(true);
method.invoke(sdkMeterProviderBuilder, duration);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new IllegalStateException(
"Error calling setMinimumCollectionInterval on SdkMeterProviderBuilder", e);
}
}

/**
* Reflectively add an {@link AttributesProcessor} to the {@link ViewBuilder} which appends
* key-values from baggage to all measurements.
Expand Down
Loading