From 6a3be455b0603398ab46d8c2623b60bc1328f714 Mon Sep 17 00:00:00 2001 From: Markus Hintersteiner Date: Thu, 22 Feb 2024 11:36:17 +0100 Subject: [PATCH] Add support for force-flushing metrics when weight is too high --- sentry/api/sentry.api | 13 +-- .../java/io/sentry/MetricsAggregator.java | 94 ++++++++++++------- .../java/io/sentry/metrics/MetricsHelper.java | 1 + .../java/io/sentry/MetricsAggregatorTest.kt | 37 +++++++- 4 files changed, 104 insertions(+), 41 deletions(-) diff --git a/sentry/api/sentry.api b/sentry/api/sentry.api index 99b32e8c7f..98777c5d30 100644 --- a/sentry/api/sentry.api +++ b/sentry/api/sentry.api @@ -1023,7 +1023,7 @@ public final class io/sentry/MemoryCollectionData { public final class io/sentry/MetricsAggregator : io/sentry/IMetricsAggregator, java/io/Closeable, java/lang/Runnable { public fun (Lio/sentry/SentryOptions;Lio/sentry/metrics/IMetricsClient;)V - public fun (Lio/sentry/metrics/IMetricsClient;Lio/sentry/ILogger;Lio/sentry/SentryDateProvider;Lio/sentry/ISentryExecutorService;)V + public fun (Lio/sentry/metrics/IMetricsClient;Lio/sentry/ILogger;Lio/sentry/SentryDateProvider;ILio/sentry/ISentryExecutorService;)V public fun close ()V public fun distribution (Ljava/lang/String;DLio/sentry/MeasurementUnit;Ljava/util/Map;JI)V public fun flush (Z)V @@ -3395,15 +3395,15 @@ public final class io/sentry/metrics/CounterMetric : io/sentry/metrics/Metric { public fun (Ljava/lang/String;DLio/sentry/MeasurementUnit;Ljava/util/Map;Ljava/lang/Long;)V public fun add (D)V public fun getValue ()D - public fun getValues ()Ljava/lang/Iterable; public fun getWeight ()I + public fun serialize ()Ljava/lang/Iterable; } public final class io/sentry/metrics/DistributionMetric : io/sentry/metrics/Metric { public fun (Ljava/lang/String;DLio/sentry/MeasurementUnit;Ljava/util/Map;Ljava/lang/Long;)V public fun add (D)V - public fun getValues ()Ljava/lang/Iterable; public fun getWeight ()I + public fun serialize ()Ljava/lang/Iterable; } public final class io/sentry/metrics/EncodedMetrics { @@ -3414,8 +3414,8 @@ public final class io/sentry/metrics/EncodedMetrics { public final class io/sentry/metrics/GaugeMetric : io/sentry/metrics/Metric { public fun (Ljava/lang/String;DLio/sentry/MeasurementUnit;Ljava/util/Map;Ljava/lang/Long;)V public fun add (D)V - public fun getValues ()Ljava/lang/Iterable; public fun getWeight ()I + public fun serialize ()Ljava/lang/Iterable; } public abstract interface class io/sentry/metrics/IMetricsClient { @@ -3430,8 +3430,8 @@ public abstract class io/sentry/metrics/Metric { public fun getTimeStampMs ()Ljava/lang/Long; public fun getType ()Lio/sentry/metrics/MetricType; public fun getUnit ()Lio/sentry/MeasurementUnit; - public abstract fun getValues ()Ljava/lang/Iterable; public abstract fun getWeight ()I + public abstract fun serialize ()Ljava/lang/Iterable; } public final class io/sentry/metrics/MetricResourceIdentifier { @@ -3494,6 +3494,7 @@ public abstract interface class io/sentry/metrics/MetricsApi$IMetricsInterface { public final class io/sentry/metrics/MetricsHelper { public static final field FLUSHER_SLEEP_TIME_MS I + public static final field MAX_TOTAL_WEIGHT I public fun ()V public static fun convertNanosTo (Lio/sentry/MeasurementUnit$Duration;J)D public static fun encodeMetrics (JLjava/util/Collection;Ljava/lang/StringBuilder;)V @@ -3543,8 +3544,8 @@ public final class io/sentry/metrics/SentryMetric$JsonKeys { public final class io/sentry/metrics/SetMetric : io/sentry/metrics/Metric { public fun (Ljava/lang/String;Lio/sentry/MeasurementUnit;Ljava/util/Map;Ljava/lang/Long;)V public fun add (D)V - public fun getValues ()Ljava/lang/Iterable; public fun getWeight ()I + public fun serialize ()Ljava/lang/Iterable; } public final class io/sentry/profilemeasurements/ProfileMeasurement : io/sentry/JsonSerializable, io/sentry/JsonUnknown { diff --git a/sentry/src/main/java/io/sentry/MetricsAggregator.java b/sentry/src/main/java/io/sentry/MetricsAggregator.java index 3f8370ce80..de1224b1ba 100644 --- a/sentry/src/main/java/io/sentry/MetricsAggregator.java +++ b/sentry/src/main/java/io/sentry/MetricsAggregator.java @@ -18,6 +18,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.CRC32; import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.NotNull; @@ -44,6 +45,9 @@ public final class MetricsAggregator implements IMetricsAggregator, Runnable, Cl // the metrics, // each of which has a key that uniquely identifies it within the time period private final NavigableMap> buckets = new ConcurrentSkipListMap<>(); + private final AtomicInteger totalBucketsWeight = new AtomicInteger(); + + private final int maxWeight; public MetricsAggregator( final @NotNull SentryOptions options, final @NotNull IMetricsClient client) { @@ -51,6 +55,7 @@ public MetricsAggregator( client, options.getLogger(), options.getDateProvider(), + MetricsHelper.MAX_TOTAL_WEIGHT, NoOpSentryExecutorService.getInstance()); } @@ -59,10 +64,12 @@ public MetricsAggregator( final @NotNull IMetricsClient client, final @NotNull ILogger logger, final @NotNull SentryDateProvider dateProvider, + final int maxWeight, final @NotNull ISentryExecutorService executorService) { this.client = client; this.logger = logger; this.dateProvider = dateProvider; + this.maxWeight = maxWeight; this.executorService = executorService; } @@ -153,49 +160,49 @@ private void add( final double value, @Nullable MeasurementUnit unit, final @Nullable Map tags, - @Nullable Long timestampMs, + @NotNull Long timestampMs, final int stackLevel) { if (isClosed) { return; } - if (timestampMs == null) { - timestampMs = nowMillis(); - } - - final @NotNull Metric metric; - switch (type) { - case Counter: - metric = new CounterMetric(key, value, unit, tags, timestampMs); - break; - case Gauge: - metric = new GaugeMetric(key, value, unit, tags, timestampMs); - break; - case Distribution: - metric = new DistributionMetric(key, value, unit, tags, timestampMs); - break; - case Set: - metric = new SetMetric(key, unit, tags, timestampMs); - //noinspection unchecked - metric.add((int) value); - break; - default: - throw new IllegalArgumentException("Unknown MetricType: " + type.name()); - } - final long timeBucketKey = MetricsHelper.getTimeBucketKey(timestampMs); final @NotNull Map timeBucket = getOrAddTimeBucket(timeBucketKey); final @NotNull String metricKey = MetricsHelper.getMetricBucketKey(type, key, unit, tags); - // TODO check if we can synchronize only the metric itself + // TODO ideally we can synchronize only the metric itself synchronized (timeBucket) { @Nullable Metric existingMetric = timeBucket.get(metricKey); if (existingMetric != null) { + final int oldWeight = existingMetric.getWeight(); existingMetric.add(value); + final int newWeight = existingMetric.getWeight(); + totalBucketsWeight.addAndGet(newWeight - oldWeight); } else { + final @NotNull Metric metric; + switch (type) { + case Counter: + metric = new CounterMetric(key, value, unit, tags, timestampMs); + break; + case Gauge: + metric = new GaugeMetric(key, value, unit, tags, timestampMs); + break; + case Distribution: + metric = new DistributionMetric(key, value, unit, tags, timestampMs); + break; + case Set: + metric = new SetMetric(key, unit, tags, timestampMs); + // sets API is either ints or strings cr32 encoded into ints + // noinspection unchecked + metric.add((int) value); + break; + default: + throw new IllegalArgumentException("Unknown MetricType: " + type.name()); + } timeBucket.put(metricKey, metric); + totalBucketsWeight.addAndGet(metric.getWeight()); } } @@ -217,7 +224,13 @@ private void add( } @Override - public void flush(final boolean force) { + public void flush(boolean force) { + final int totalWeight = buckets.size() + totalBucketsWeight.get(); + if (totalWeight >= maxWeight) { + logger.log(SentryLevel.INFO, "Metrics: total weight exceeded, flushing all buckets"); + force = true; + } + final @NotNull Set flushableBuckets = getFlushableBuckets(force); if (flushableBuckets.isEmpty()) { logger.log(SentryLevel.DEBUG, "Metrics: nothing to flush"); @@ -226,16 +239,21 @@ public void flush(final boolean force) { logger.log(SentryLevel.DEBUG, "Metrics: flushing " + flushableBuckets.size() + " buckets"); final Map> snapshot = new HashMap<>(); - int totalSize = 0; + int numMetrics = 0; for (long bucketKey : flushableBuckets) { - final @Nullable Map metrics = buckets.remove(bucketKey); - if (metrics != null) { - totalSize += metrics.size(); - snapshot.put(bucketKey, metrics); + final @Nullable Map bucket = buckets.remove(bucketKey); + if (bucket != null) { + synchronized (bucket) { + final int weight = getBucketWeight(bucket); + totalBucketsWeight.addAndGet(-weight); + + numMetrics += bucket.size(); + snapshot.put(bucketKey, bucket); + } } } - if (totalSize == 0) { + if (numMetrics == 0) { logger.log(SentryLevel.DEBUG, "Metrics: only empty buckets found"); return; } @@ -244,6 +262,14 @@ public void flush(final boolean force) { client.captureMetrics(new EncodedMetrics(snapshot)); } + private static int getBucketWeight(final @NotNull Map bucket) { + int weight = 0; + for (final @NotNull Metric value : bucket.values()) { + weight += value.getWeight(); + } + return weight; + } + @NotNull private Set getFlushableBuckets(final boolean force) { if (force) { @@ -262,7 +288,7 @@ private Map getOrAddTimeBucket(final long bucketKey) { @Nullable Map bucket = buckets.get(bucketKey); if (bucket == null) { // although buckets is thread safe, we still need to synchronize here to avoid creating - // the same bucket at the same time + // the same bucket at the same time, overwriting each other synchronized (buckets) { bucket = buckets.get(bucketKey); if (bucket == null) { diff --git a/sentry/src/main/java/io/sentry/metrics/MetricsHelper.java b/sentry/src/main/java/io/sentry/metrics/MetricsHelper.java index aeaadc4428..62f9c10f23 100644 --- a/sentry/src/main/java/io/sentry/metrics/MetricsHelper.java +++ b/sentry/src/main/java/io/sentry/metrics/MetricsHelper.java @@ -17,6 +17,7 @@ @ApiStatus.Internal public final class MetricsHelper { public static final int FLUSHER_SLEEP_TIME_MS = 5000; + public static final int MAX_TOTAL_WEIGHT = 100000; private static final int ROLLUP_IN_SECONDS = 10; private static final Pattern INVALID_KEY_CHARACTERS_PATTERN = diff --git a/sentry/src/test/java/io/sentry/MetricsAggregatorTest.kt b/sentry/src/test/java/io/sentry/MetricsAggregatorTest.kt index 641ae14f8f..b1180044a4 100644 --- a/sentry/src/test/java/io/sentry/MetricsAggregatorTest.kt +++ b/sentry/src/test/java/io/sentry/MetricsAggregatorTest.kt @@ -27,11 +27,12 @@ class MetricsAggregatorTest { var currentTimeMillis: Long = 0 var executorService = DeferredExecutorService() - fun getSut(): MetricsAggregator { + fun getSut(maxWeight: Int = MetricsHelper.MAX_TOTAL_WEIGHT): MetricsAggregator { return MetricsAggregator( client, logger, dateProvider, + maxWeight, executorService ) } @@ -308,4 +309,38 @@ class MetricsAggregatorTest { // and flushing is scheduled again assertTrue(fixture.executorService.hasScheduledRunnables()) } + + @Test + fun `weight is considered for force flushing`() { + // weight is determined by number of buckets + weight of metrics + val aggregator = fixture.getSut(5) + + // when 3 values are emitted + for (i in 0 until 3) { + aggregator.distribution( + "name", + i.toDouble(), + null, + null, + fixture.currentTimeMillis, + 1 + ) + } + // no metrics are captured by the client + aggregator.flush(false) + verify(fixture.client, never()).captureMetrics(any()) + + // once we have 4 values and one bucket = weight of 5 + aggregator.distribution( + "name", + 10.0, + null, + null, + fixture.currentTimeMillis, + 1 + ) + // then flush without force still captures all metrics + aggregator.flush(false) + verify(fixture.client).captureMetrics(any()) + } }