Skip to content

Commit

Permalink
Experimental metric reader and view cardinality limits (#5494)
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg authored Jun 8, 2023
1 parent 4d034b0 commit 331c6af
Show file tree
Hide file tree
Showing 19 changed files with 499 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ void stringRepresentation() {
+ "clock=SystemClock{}, "
+ "resource=Resource{schemaUrl=null, attributes={service.name=\"otel-test\"}}, "
+ "metricReaders=[PeriodicMetricReader{exporter=MockMetricExporter{}, intervalNanos=60000000000}], "
+ "views=[RegisteredView{instrumentSelector=InstrumentSelector{instrumentName=instrument}, view=View{name=new-instrument, aggregation=DefaultAggregation, attributesProcessor=NoopAttributesProcessor{}}}]"
+ "views=[RegisteredView{instrumentSelector=InstrumentSelector{instrumentName=instrument}, view=View{name=new-instrument, aggregation=DefaultAggregation, attributesProcessor=NoopAttributesProcessor{}, cardinalityLimit=2000}}]"
+ "}, "
+ "loggerProvider=SdkLoggerProvider{"
+ "clock=SystemClock{}, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector;
import io.opentelemetry.sdk.metrics.internal.export.MetricProducer;
import io.opentelemetry.sdk.metrics.internal.export.RegisteredReader;
import io.opentelemetry.sdk.metrics.internal.state.MeterProviderSharedState;
Expand All @@ -26,6 +27,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -54,17 +56,19 @@ public static SdkMeterProviderBuilder builder() {

SdkMeterProvider(
List<RegisteredView> registeredViews,
List<MetricReader> metricReaders,
IdentityHashMap<MetricReader, CardinalityLimitSelector> metricReaders,
Clock clock,
Resource resource,
ExemplarFilter exemplarFilter) {
long startEpochNanos = clock.now();
this.registeredViews = registeredViews;
this.registeredReaders =
metricReaders.stream()
metricReaders.entrySet().stream()
.map(
reader ->
RegisteredReader.create(reader, ViewRegistry.create(reader, registeredViews)))
entry ->
RegisteredReader.create(
entry.getKey(),
ViewRegistry.create(entry.getKey(), entry.getValue(), registeredViews)))
.collect(toList());
this.sharedState =
MeterProviderSharedState.create(clock, resource, exemplarFilter, startEpochNanos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.metrics.internal.debug.SourceInfo;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector;
import io.opentelemetry.sdk.metrics.internal.view.RegisteredView;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Objects;

Expand All @@ -32,7 +34,8 @@ public final class SdkMeterProviderBuilder {

private Clock clock = Clock.getDefault();
private Resource resource = Resource.getDefault();
private final List<MetricReader> metricReaders = new ArrayList<>();
private final IdentityHashMap<MetricReader, CardinalityLimitSelector> metricReaders =
new IdentityHashMap<>();
private final List<RegisteredView> registeredViews = new ArrayList<>();
private ExemplarFilter exemplarFilter = DEFAULT_EXEMPLAR_FILTER;

Expand Down Expand Up @@ -96,7 +99,11 @@ public SdkMeterProviderBuilder registerView(InstrumentSelector selector, View vi
Objects.requireNonNull(view, "view");
registeredViews.add(
RegisteredView.create(
selector, view, view.getAttributesProcessor(), SourceInfo.fromCurrentStack()));
selector,
view,
view.getAttributesProcessor(),
view.getCardinalityLimit(),
SourceInfo.fromCurrentStack()));
return this;
}

Expand All @@ -106,7 +113,20 @@ public SdkMeterProviderBuilder registerView(InstrumentSelector selector, View vi
* <p>Note: custom implementations of {@link MetricReader} are not currently supported.
*/
public SdkMeterProviderBuilder registerMetricReader(MetricReader reader) {
metricReaders.add(reader);
metricReaders.put(reader, CardinalityLimitSelector.defaultCardinalityLimitSelector());
return this;
}

/**
* Registers a {@link MetricReader} with a {@link CardinalityLimitSelector}.
*
* <p>Note: not currently stable but available for experimental use via {@link
* SdkMeterProviderUtil#registerMetricReaderWithCardinalitySelector(SdkMeterProviderBuilder,
* MetricReader, CardinalityLimitSelector)}.
*/
SdkMeterProviderBuilder registerMetricReader(
MetricReader reader, CardinalityLimitSelector cardinalityLimitSelector) {
metricReaders.put(reader, cardinalityLimitSelector);
return this;
}

Expand Down
10 changes: 8 additions & 2 deletions sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/View.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ static View create(
@Nullable String name,
@Nullable String description,
Aggregation aggregation,
AttributesProcessor attributesProcessor) {
return new AutoValue_View(name, description, aggregation, attributesProcessor);
AttributesProcessor attributesProcessor,
int cardinalityLimit) {
return new AutoValue_View(
name, description, aggregation, attributesProcessor, cardinalityLimit);
}

View() {}
Expand All @@ -58,6 +60,9 @@ static View create(
/** Returns the attribute processor used for this view. */
abstract AttributesProcessor getAttributesProcessor();

/** Returns the cardinality limit for this view. */
abstract int getCardinalityLimit();

@Override
public final String toString() {
StringJoiner joiner = new StringJoiner(", ", "View{", "}");
Expand All @@ -69,6 +74,7 @@ public final String toString() {
}
joiner.add("aggregation=" + getAggregation());
joiner.add("attributesProcessor=" + getAttributesProcessor());
joiner.add("cardinalityLimit=" + getCardinalityLimit());
return joiner.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.state.MetricStorage;
import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor;
import java.util.Objects;
import java.util.function.Predicate;
Expand All @@ -23,6 +24,7 @@ public final class ViewBuilder {
@Nullable private String description;
private Aggregation aggregation = Aggregation.defaultAggregation();
private AttributesProcessor processor = AttributesProcessor.noop();
private int cardinalityLimit = MetricStorage.DEFAULT_MAX_CARDINALITY;

ViewBuilder() {}

Expand Down Expand Up @@ -85,8 +87,24 @@ ViewBuilder addAttributesProcessor(AttributesProcessor attributesProcessor) {
return this;
}

/**
* Set the cardinality limit.
*
* <p>Note: not currently stable but cardinality limit can be configured via
* SdkMeterProviderUtil#setCardinalityLimit(ViewBuilder, int)}.
*
* @param cardinalityLimit the maximum number of series for a metric
*/
ViewBuilder setCardinalityLimit(int cardinalityLimit) {
if (cardinalityLimit <= 0) {
throw new IllegalArgumentException("cardinalityLimit must be > 0");
}
this.cardinalityLimit = cardinalityLimit;
return this;
}

/** Returns a {@link View} with the configuration of this builder. */
public View build() {
return View.create(name, description, aggregation, processor);
return View.create(name, description, aggregation, processor, cardinalityLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.ViewBuilder;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.export.CardinalityLimitSelector;
import io.opentelemetry.sdk.metrics.internal.view.AttributesProcessor;
import io.opentelemetry.sdk.metrics.internal.view.StringPredicates;
import java.lang.reflect.InvocationTargetException;
Expand All @@ -26,7 +28,7 @@ private SdkMeterProviderUtil() {}
/**
* Reflectively assign the {@link ExemplarFilter} to the {@link SdkMeterProviderBuilder}.
*
* @param sdkMeterProviderBuilder the
* @param sdkMeterProviderBuilder the builder
*/
public static void setExemplarFilter(
SdkMeterProviderBuilder sdkMeterProviderBuilder, ExemplarFilter exemplarFilter) {
Expand All @@ -42,6 +44,28 @@ public static void setExemplarFilter(
}
}

/**
* Reflectively add a {@link MetricReader} with the {@link CardinalityLimitSelector} to the {@link
* SdkMeterProviderBuilder}.
*
* @param sdkMeterProviderBuilder the builder
*/
public static void registerMetricReaderWithCardinalitySelector(
SdkMeterProviderBuilder sdkMeterProviderBuilder,
MetricReader metricReader,
CardinalityLimitSelector cardinalityLimitSelector) {
try {
Method method =
SdkMeterProviderBuilder.class.getDeclaredMethod(
"registerMetricReader", MetricReader.class, CardinalityLimitSelector.class);
method.setAccessible(true);
method.invoke(sdkMeterProviderBuilder, metricReader, cardinalityLimitSelector);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
throw new IllegalStateException(
"Error calling addMetricReader on SdkMeterProviderBuilder", e);
}
}

/**
* Reflectively add an {@link AttributesProcessor} to the {@link ViewBuilder} which appends
* key-values from baggage to all measurements.
Expand Down Expand Up @@ -81,6 +105,21 @@ private static void addAttributesProcessor(
}
}

/**
* Reflectively set the {@code cardinalityLimit} on the {@link ViewBuilder}.
*
* @param viewBuilder the builder
*/
public static void setCardinalityLimit(ViewBuilder viewBuilder, int cardinalityLimit) {
try {
Method method = ViewBuilder.class.getDeclaredMethod("setCardinalityLimit", int.class);
method.setAccessible(true);
method.invoke(viewBuilder, cardinalityLimit);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new IllegalStateException("Error setting cardinalityLimit on ViewBuilder", e);
}
}

/** Reflectively reset the {@link SdkMeterProvider}, clearing all registered instruments. */
public static void resetForTest(SdkMeterProvider sdkMeterProvider) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.export;

import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.metrics.internal.state.MetricStorage;

/**
* Customize the {@link io.opentelemetry.sdk.metrics.export.MetricReader} cardinality limit as a
* function of {@link InstrumentType}. Register via {@link
* SdkMeterProviderUtil#registerMetricReaderWithCardinalitySelector(SdkMeterProviderBuilder,
* MetricReader, CardinalityLimitSelector)}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
@FunctionalInterface
public interface CardinalityLimitSelector {

/**
* The default {@link CardinalityLimitSelector}, allowing each metric to have {@code 2000} points.
*/
static CardinalityLimitSelector defaultCardinalityLimitSelector() {
return unused -> MetricStorage.DEFAULT_MAX_CARDINALITY;
}

/**
* Return the default cardinality limit for metrics from instruments of type {@code
* instrumentType}. The cardinality limit dictates the maximum number of distinct points (or time
* series) for the metric.
*/
int getCardinalityLimit(InstrumentType instrumentType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ final class AsynchronousMetricStorage<T extends PointData, U extends ExemplarDat
private final AggregationTemporality aggregationTemporality;
private final Aggregator<T, U> aggregator;
private final AttributesProcessor attributesProcessor;
private final int maxCardinality;
private Map<Attributes, T> points = new HashMap<>();
private Map<Attributes, T> lastPoints =
new HashMap<>(); // Only populated if aggregationTemporality == DELTA
Expand All @@ -54,7 +55,8 @@ private AsynchronousMetricStorage(
RegisteredReader registeredReader,
MetricDescriptor metricDescriptor,
Aggregator<T, U> aggregator,
AttributesProcessor attributesProcessor) {
AttributesProcessor attributesProcessor,
int maxCardinality) {
this.registeredReader = registeredReader;
this.metricDescriptor = metricDescriptor;
this.aggregationTemporality =
Expand All @@ -63,6 +65,7 @@ private AsynchronousMetricStorage(
.getAggregationTemporality(metricDescriptor.getSourceInstrument().getType());
this.aggregator = aggregator;
this.attributesProcessor = attributesProcessor;
this.maxCardinality = maxCardinality;
}

/**
Expand All @@ -83,7 +86,8 @@ static <T extends PointData, U extends ExemplarData> AsynchronousMetricStorage<T
registeredReader,
metricDescriptor,
aggregator,
registeredView.getViewAttributesProcessor());
registeredView.getViewAttributesProcessor(),
registeredView.getCardinalityLimit());
}

/**
Expand All @@ -109,13 +113,13 @@ void record(Measurement measurement) {
private void recordPoint(T point) {
Attributes attributes = point.getAttributes();

if (points.size() >= MetricStorage.MAX_CARDINALITY) {
if (points.size() >= maxCardinality) {
throttlingLogger.log(
Level.WARNING,
"Instrument "
+ metricDescriptor.getSourceInstrument().getName()
+ " has exceeded the maximum allowed cardinality ("
+ MetricStorage.MAX_CARDINALITY
+ maxCardinality
+ ").");
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
private final ConcurrentHashMap<Attributes, AggregatorHandle<T, U>> aggregatorHandles =
new ConcurrentHashMap<>();
private final AttributesProcessor attributesProcessor;
private final int maxCardinality;
private final ConcurrentLinkedQueue<AggregatorHandle<T, U>> aggregatorHandlePool =
new ConcurrentLinkedQueue<>();

DefaultSynchronousMetricStorage(
RegisteredReader registeredReader,
MetricDescriptor metricDescriptor,
Aggregator<T, U> aggregator,
AttributesProcessor attributesProcessor) {
AttributesProcessor attributesProcessor,
int maxCardinality) {
this.registeredReader = registeredReader;
this.metricDescriptor = metricDescriptor;
this.aggregationTemporality =
Expand All @@ -66,6 +68,7 @@ public final class DefaultSynchronousMetricStorage<T extends PointData, U extend
.getAggregationTemporality(metricDescriptor.getSourceInstrument().getType());
this.aggregator = aggregator;
this.attributesProcessor = attributesProcessor;
this.maxCardinality = maxCardinality;
}

// Visible for testing
Expand Down Expand Up @@ -97,13 +100,13 @@ private AggregatorHandle<T, U> getAggregatorHandle(Attributes attributes, Contex
if (handle != null) {
return handle;
}
if (aggregatorHandles.size() >= MAX_CARDINALITY) {
if (aggregatorHandles.size() >= maxCardinality) {
logger.log(
Level.WARNING,
"Instrument "
+ metricDescriptor.getSourceInstrument().getName()
+ " has exceeded the maximum allowed cardinality ("
+ MAX_CARDINALITY
+ maxCardinality
+ ").");
return null;
}
Expand Down Expand Up @@ -143,9 +146,9 @@ public MetricData collect(
}
});

// Trim pool down if needed. pool.size() will only exceed MAX_CARDINALITY if new handles are
// Trim pool down if needed. pool.size() will only exceed maxCardinality if new handles are
// created during collection.
int toDelete = aggregatorHandlePool.size() - MAX_CARDINALITY;
int toDelete = aggregatorHandlePool.size() - maxCardinality;
for (int i = 0; i < toDelete; i++) {
aggregatorHandlePool.poll();
}
Expand Down
Loading

0 comments on commit 331c6af

Please sign in to comment.