Skip to content

Commit

Permalink
Fix up cumulatives to be preserved in more scenarios. (#3957)
Browse files Browse the repository at this point in the history
* Fix up cumulatives to be preserved in more scenarios.  Expand testing/expectations to match cumulative user expecations.

* Fixes from review.
  • Loading branch information
jsuereth authored Dec 7, 2021
1 parent 8e04fbf commit 34218da
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,18 @@ private MetricStorageUtils() {}
*/
static <T> void mergeInPlace(
Map<Attributes, T> result, Map<Attributes, T> toMerge, Aggregator<T> aggregator) {
blend(result, toMerge, aggregator::merge);
blend(result, toMerge, /* preserve= */ false, aggregator::merge);
}

/**
* Merges accumulations from {@code toMerge} into {@code result}. Keys from {@code result} which
* don't appear in {@code toMerge} are preserved as-is.
*
* <p>Note: This mutates the result map.
*/
static <T> void mergeAndPreserveInPlace(
Map<Attributes, T> result, Map<Attributes, T> toMerge, Aggregator<T> aggregator) {
blend(result, toMerge, /* preserve= */ true, aggregator::merge);
}

/**
Expand All @@ -38,13 +49,27 @@ static <T> void mergeInPlace(
*/
static <T> void diffInPlace(
Map<Attributes, T> result, Map<Attributes, T> toDiff, Aggregator<T> aggregator) {
blend(result, toDiff, aggregator::diff);
blend(result, toDiff, /* preserve= */ false, aggregator::diff);
}

private static <T> void blend(
Map<Attributes, T> result, Map<Attributes, T> toMerge, BiFunction<T, T, T> blendFunction) {
result.entrySet().removeIf(entry -> !toMerge.containsKey(entry.getKey()));
Map<Attributes, T> result,
Map<Attributes, T> toMerge,
boolean preserve,
BiFunction<T, T, T> blendFunction) {
if (!preserve) {
removeUnseen(result, toMerge);
}
toMerge.forEach(
(k, v) -> result.compute(k, (k2, v2) -> (v2 != null) ? blendFunction.apply(v2, v) : v));
}

/**
* Removes all keys in {@code result} that do not exist in {@code latest}.
*
* <p>Note: This mutates the result map.
*/
public static <T> void removeUnseen(Map<Attributes, T> result, Map<Attributes, T> latest) {
result.entrySet().removeIf(entry -> !latest.containsKey(entry.getKey()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,13 @@ synchronized MetricData buildMetricFor(
} else if (temporality == AggregationTemporality.CUMULATIVE && isSynchronous) {
// We need to make sure the current delta recording gets merged into the previous cumulative
// for the next cumulative measurement.
MetricStorageUtils.mergeInPlace(last.getAccumulation(), currentAccumulation, aggregator);
MetricStorageUtils.mergeAndPreserveInPlace(
last.getAccumulation(), currentAccumulation, aggregator);
// Note: We allow going over our hard limit on attribute streams when first merging, but
// preserve after this point.
if (last.getAccumulation().size() > MetricStorageUtils.MAX_ACCUMULATIONS) {
MetricStorageUtils.removeUnseen(last.getAccumulation(), currentAccumulation);
}
result = last.getAccumulation();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,18 @@ void setup() {
* Records to sync instruments, with distinct attributes each time. Validates that stale metrics
* are dropped for delta and cumulative readers. Stale metrics are those with attributes that did
* not receive recordings in the most recent collection.
*
* <p>Effectively, we make sure we cap-out at attribute size = 2000 (constant in
* MetricStorageutils).
*/
@Test
void staleMetricsDropped_synchronousInstrument() {
LongCounter syncCounter = meter.counterBuilder("sync-counter").build();
for (int i = 1; i <= 5; i++) {
// Note: This constant comes from MetricStorageUtils, but it's package-private.
for (int i = 1; i <= 2000; i++) {
syncCounter.add(1, Attributes.builder().put("key", "num_" + i).build());

// DELTA reader only has latest
assertThat(deltaReader.collectAllMetrics())
.as("Delta collection " + i)
.hasSize(1)
Expand All @@ -63,6 +68,8 @@ void staleMetricsDropped_synchronousInstrument() {
.points()
.hasSize(1));

// Make sure we preserve previous cumulatives
final int currentSize = i;
assertThat(cumulativeReader.collectAllMetrics())
.as("Cumulative collection " + i)
.hasSize(1)
Expand All @@ -73,8 +80,35 @@ void staleMetricsDropped_synchronousInstrument() {
.hasLongSum()
.isCumulative()
.points()
.hasSize(1));
.hasSize(currentSize));
}
// Now punch the limit and ONLY metrics we just recorded stay, due to simplistic GC.
for (int i = 2001; i <= 2010; i++) {
syncCounter.add(1, Attributes.builder().put("key", "num_" + i).build());
}
assertThat(deltaReader.collectAllMetrics())
.as("Delta collection - post limit @ 10")
.hasSize(1)
.satisfiesExactly(
metricData ->
assertThat(metricData)
.hasName("sync-counter")
.hasLongSum()
.isDelta()
.points()
.hasSize(10));

assertThat(cumulativeReader.collectAllMetrics())
.as("Cumulative collection - post limit @ 10")
.hasSize(1)
.satisfiesExactly(
metricData ->
assertThat(metricData)
.hasName("sync-counter")
.hasLongSum()
.isCumulative()
.points()
.hasSize(10));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.opentelemetry.sdk.metrics.common.InstrumentValueType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.DoubleAccumulation;
Expand Down Expand Up @@ -139,14 +140,16 @@ void synchronousCumulative_joinsWithLastMeasurementForCumulative() {
}

@Test
void synchronousCumulative_dropsStale() {
void synchronousCumulative_dropsStaleAtLimit() {
TemporalMetricStorage<DoubleAccumulation> storage =
new TemporalMetricStorage<>(SUM, /* isSynchronous= */ true);

// Send in new measurement at time 10 for collector 1, with attr1
Map<Attributes, DoubleAccumulation> measurement1 = new HashMap<>();
Attributes attr1 = Attributes.builder().put("key", "value1").build();
measurement1.put(attr1, DoubleAccumulation.create(3));
for (int i = 0; i < MetricStorageUtils.MAX_ACCUMULATIONS; i++) {
Attributes attr1 = Attributes.builder().put("key", "value" + i).build();
measurement1.put(attr1, DoubleAccumulation.create(3));
}
assertThat(
storage.buildMetricFor(
collector1,
Expand All @@ -160,15 +163,18 @@ void synchronousCumulative_dropsStale() {
.hasDoubleSum()
.isCumulative()
.points()
.hasSize(1)
.hasSize(MetricStorageUtils.MAX_ACCUMULATIONS)
.isNotEmpty()
.contains(DoublePointData.create(0, 10, attr1, 3));
.allSatisfy(point -> assertThat(point).hasStartEpochNanos(0).hasEpochNanos(10).hasValue(3));

// Send in new measurement at time 20 for collector 1, with attr2
// Result should drop accumulation for attr1, only reporting accumulation for attr2
Map<Attributes, DoubleAccumulation> measurement2 = new HashMap<>();
Attributes attr2 = Attributes.builder().put("key", "value2").build();
measurement2.put(attr2, DoubleAccumulation.create(7));
Attributes attr2 =
Attributes.builder()
.put("key", "value" + (MetricStorageUtils.MAX_ACCUMULATIONS + 1))
.build();
measurement2.put(attr2, DoubleAccumulation.create(3));
assertThat(
storage.buildMetricFor(
collector1,
Expand All @@ -182,9 +188,10 @@ void synchronousCumulative_dropsStale() {
.hasDoubleSum()
.isCumulative()
.points()
.hasSize(1)
.hasSize(1) // Limiting to only recent measurements means we cut everything here.
.isNotEmpty()
.containsExactly(DoublePointData.create(0, 20, attr2, 7));
.extracting(PointData::getAttributes)
.contains(attr2);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void test_reset_preserves_cumulatives() {

// Add more data, should join.
generateFakeMetric(1);
assertThat(reader.collectAllMetrics()).hasSize(1);
assertThat(reader.collectAllMetrics()).hasSize(3);
}

@Test
Expand All @@ -60,7 +60,7 @@ void test_flush() {
generateFakeMetric(3);
// TODO: Better assertions for CompletableResultCode.
assertThat(reader.flush()).isNotNull();
assertThat(reader.collectAllMetrics()).hasSize(0);
assertThat(reader.collectAllMetrics()).hasSize(3);
}

@Test
Expand Down

0 comments on commit 34218da

Please sign in to comment.