Skip to content

Commit

Permalink
Add support for force-flushing metrics when weight is too high
Browse files Browse the repository at this point in the history
  • Loading branch information
markushi committed Feb 22, 2024
1 parent 9c67a6c commit 6a3be45
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 41 deletions.
13 changes: 7 additions & 6 deletions sentry/api/sentry.api
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ public final class io/sentry/MemoryCollectionData {

public final class io/sentry/MetricsAggregator : io/sentry/IMetricsAggregator, java/io/Closeable, java/lang/Runnable {
public fun <init> (Lio/sentry/SentryOptions;Lio/sentry/metrics/IMetricsClient;)V
public fun <init> (Lio/sentry/metrics/IMetricsClient;Lio/sentry/ILogger;Lio/sentry/SentryDateProvider;Lio/sentry/ISentryExecutorService;)V
public fun <init> (Lio/sentry/metrics/IMetricsClient;Lio/sentry/ILogger;Lio/sentry/SentryDateProvider;ILio/sentry/ISentryExecutorService;)V
public fun close ()V
public fun distribution (Ljava/lang/String;DLio/sentry/MeasurementUnit;Ljava/util/Map;JI)V
public fun flush (Z)V
Expand Down Expand Up @@ -3395,15 +3395,15 @@ public final class io/sentry/metrics/CounterMetric : io/sentry/metrics/Metric {
public fun <init> (Ljava/lang/String;DLio/sentry/MeasurementUnit;Ljava/util/Map;Ljava/lang/Long;)V
public fun add (D)V
public fun getValue ()D
public fun getValues ()Ljava/lang/Iterable;
public fun getWeight ()I
public fun serialize ()Ljava/lang/Iterable;
}

public final class io/sentry/metrics/DistributionMetric : io/sentry/metrics/Metric {
public fun <init> (Ljava/lang/String;DLio/sentry/MeasurementUnit;Ljava/util/Map;Ljava/lang/Long;)V
public fun add (D)V
public fun getValues ()Ljava/lang/Iterable;
public fun getWeight ()I
public fun serialize ()Ljava/lang/Iterable;
}

public final class io/sentry/metrics/EncodedMetrics {
Expand All @@ -3414,8 +3414,8 @@ public final class io/sentry/metrics/EncodedMetrics {
public final class io/sentry/metrics/GaugeMetric : io/sentry/metrics/Metric {
public fun <init> (Ljava/lang/String;DLio/sentry/MeasurementUnit;Ljava/util/Map;Ljava/lang/Long;)V
public fun add (D)V
public fun getValues ()Ljava/lang/Iterable;
public fun getWeight ()I
public fun serialize ()Ljava/lang/Iterable;
}

public abstract interface class io/sentry/metrics/IMetricsClient {
Expand All @@ -3430,8 +3430,8 @@ public abstract class io/sentry/metrics/Metric {
public fun getTimeStampMs ()Ljava/lang/Long;
public fun getType ()Lio/sentry/metrics/MetricType;
public fun getUnit ()Lio/sentry/MeasurementUnit;
public abstract fun getValues ()Ljava/lang/Iterable;
public abstract fun getWeight ()I
public abstract fun serialize ()Ljava/lang/Iterable;
}

public final class io/sentry/metrics/MetricResourceIdentifier {
Expand Down Expand Up @@ -3494,6 +3494,7 @@ public abstract interface class io/sentry/metrics/MetricsApi$IMetricsInterface {

public final class io/sentry/metrics/MetricsHelper {
public static final field FLUSHER_SLEEP_TIME_MS I
public static final field MAX_TOTAL_WEIGHT I
public fun <init> ()V
public static fun convertNanosTo (Lio/sentry/MeasurementUnit$Duration;J)D
public static fun encodeMetrics (JLjava/util/Collection;Ljava/lang/StringBuilder;)V
Expand Down Expand Up @@ -3543,8 +3544,8 @@ public final class io/sentry/metrics/SentryMetric$JsonKeys {
public final class io/sentry/metrics/SetMetric : io/sentry/metrics/Metric {
public fun <init> (Ljava/lang/String;Lio/sentry/MeasurementUnit;Ljava/util/Map;Ljava/lang/Long;)V
public fun add (D)V
public fun getValues ()Ljava/lang/Iterable;
public fun getWeight ()I
public fun serialize ()Ljava/lang/Iterable;
}

public final class io/sentry/profilemeasurements/ProfileMeasurement : io/sentry/JsonSerializable, io/sentry/JsonUnknown {
Expand Down
94 changes: 60 additions & 34 deletions sentry/src/main/java/io/sentry/MetricsAggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CRC32;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
Expand All @@ -44,13 +45,17 @@ public final class MetricsAggregator implements IMetricsAggregator, Runnable, Cl
// the metrics,
// each of which has a key that uniquely identifies it within the time period
private final NavigableMap<Long, Map<String, Metric>> buckets = new ConcurrentSkipListMap<>();
private final AtomicInteger totalBucketsWeight = new AtomicInteger();

private final int maxWeight;

public MetricsAggregator(
final @NotNull SentryOptions options, final @NotNull IMetricsClient client) {
this(
client,
options.getLogger(),
options.getDateProvider(),
MetricsHelper.MAX_TOTAL_WEIGHT,
NoOpSentryExecutorService.getInstance());
}

Expand All @@ -59,10 +64,12 @@ public MetricsAggregator(
final @NotNull IMetricsClient client,
final @NotNull ILogger logger,
final @NotNull SentryDateProvider dateProvider,
final int maxWeight,
final @NotNull ISentryExecutorService executorService) {
this.client = client;
this.logger = logger;
this.dateProvider = dateProvider;
this.maxWeight = maxWeight;
this.executorService = executorService;
}

Expand Down Expand Up @@ -153,49 +160,49 @@ private void add(
final double value,
@Nullable MeasurementUnit unit,
final @Nullable Map<String, String> tags,
@Nullable Long timestampMs,
@NotNull Long timestampMs,
final int stackLevel) {

if (isClosed) {
return;
}

if (timestampMs == null) {
timestampMs = nowMillis();
}

final @NotNull Metric metric;
switch (type) {
case Counter:
metric = new CounterMetric(key, value, unit, tags, timestampMs);
break;
case Gauge:
metric = new GaugeMetric(key, value, unit, tags, timestampMs);
break;
case Distribution:
metric = new DistributionMetric(key, value, unit, tags, timestampMs);
break;
case Set:
metric = new SetMetric(key, unit, tags, timestampMs);
//noinspection unchecked
metric.add((int) value);
break;
default:
throw new IllegalArgumentException("Unknown MetricType: " + type.name());
}

final long timeBucketKey = MetricsHelper.getTimeBucketKey(timestampMs);
final @NotNull Map<String, Metric> timeBucket = getOrAddTimeBucket(timeBucketKey);

final @NotNull String metricKey = MetricsHelper.getMetricBucketKey(type, key, unit, tags);

// TODO check if we can synchronize only the metric itself
// TODO ideally we can synchronize only the metric itself
synchronized (timeBucket) {
@Nullable Metric existingMetric = timeBucket.get(metricKey);
if (existingMetric != null) {
final int oldWeight = existingMetric.getWeight();
existingMetric.add(value);
final int newWeight = existingMetric.getWeight();
totalBucketsWeight.addAndGet(newWeight - oldWeight);
} else {
final @NotNull Metric metric;
switch (type) {
case Counter:
metric = new CounterMetric(key, value, unit, tags, timestampMs);
break;
case Gauge:
metric = new GaugeMetric(key, value, unit, tags, timestampMs);
break;
case Distribution:
metric = new DistributionMetric(key, value, unit, tags, timestampMs);
break;
case Set:
metric = new SetMetric(key, unit, tags, timestampMs);
// sets API is either ints or strings cr32 encoded into ints
// noinspection unchecked
metric.add((int) value);
break;
default:
throw new IllegalArgumentException("Unknown MetricType: " + type.name());
}
timeBucket.put(metricKey, metric);
totalBucketsWeight.addAndGet(metric.getWeight());
}
}

Expand All @@ -217,7 +224,13 @@ private void add(
}

@Override
public void flush(final boolean force) {
public void flush(boolean force) {
final int totalWeight = buckets.size() + totalBucketsWeight.get();
if (totalWeight >= maxWeight) {
logger.log(SentryLevel.INFO, "Metrics: total weight exceeded, flushing all buckets");
force = true;
}

final @NotNull Set<Long> flushableBuckets = getFlushableBuckets(force);
if (flushableBuckets.isEmpty()) {
logger.log(SentryLevel.DEBUG, "Metrics: nothing to flush");
Expand All @@ -226,16 +239,21 @@ public void flush(final boolean force) {
logger.log(SentryLevel.DEBUG, "Metrics: flushing " + flushableBuckets.size() + " buckets");

final Map<Long, Map<String, Metric>> snapshot = new HashMap<>();
int totalSize = 0;
int numMetrics = 0;
for (long bucketKey : flushableBuckets) {
final @Nullable Map<String, Metric> metrics = buckets.remove(bucketKey);
if (metrics != null) {
totalSize += metrics.size();
snapshot.put(bucketKey, metrics);
final @Nullable Map<String, Metric> bucket = buckets.remove(bucketKey);
if (bucket != null) {
synchronized (bucket) {
final int weight = getBucketWeight(bucket);
totalBucketsWeight.addAndGet(-weight);

numMetrics += bucket.size();
snapshot.put(bucketKey, bucket);
}
}
}

if (totalSize == 0) {
if (numMetrics == 0) {
logger.log(SentryLevel.DEBUG, "Metrics: only empty buckets found");
return;
}
Expand All @@ -244,6 +262,14 @@ public void flush(final boolean force) {
client.captureMetrics(new EncodedMetrics(snapshot));
}

private static int getBucketWeight(final @NotNull Map<String, Metric> bucket) {
int weight = 0;
for (final @NotNull Metric value : bucket.values()) {
weight += value.getWeight();
}
return weight;
}

@NotNull
private Set<Long> getFlushableBuckets(final boolean force) {
if (force) {
Expand All @@ -262,7 +288,7 @@ private Map<String, Metric> getOrAddTimeBucket(final long bucketKey) {
@Nullable Map<String, Metric> bucket = buckets.get(bucketKey);
if (bucket == null) {
// although buckets is thread safe, we still need to synchronize here to avoid creating
// the same bucket at the same time
// the same bucket at the same time, overwriting each other
synchronized (buckets) {
bucket = buckets.get(bucketKey);
if (bucket == null) {
Expand Down
1 change: 1 addition & 0 deletions sentry/src/main/java/io/sentry/metrics/MetricsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
@ApiStatus.Internal
public final class MetricsHelper {
public static final int FLUSHER_SLEEP_TIME_MS = 5000;
public static final int MAX_TOTAL_WEIGHT = 100000;
private static final int ROLLUP_IN_SECONDS = 10;

private static final Pattern INVALID_KEY_CHARACTERS_PATTERN =
Expand Down
37 changes: 36 additions & 1 deletion sentry/src/test/java/io/sentry/MetricsAggregatorTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ class MetricsAggregatorTest {
var currentTimeMillis: Long = 0
var executorService = DeferredExecutorService()

fun getSut(): MetricsAggregator {
fun getSut(maxWeight: Int = MetricsHelper.MAX_TOTAL_WEIGHT): MetricsAggregator {
return MetricsAggregator(
client,
logger,
dateProvider,
maxWeight,
executorService
)
}
Expand Down Expand Up @@ -308,4 +309,38 @@ class MetricsAggregatorTest {
// and flushing is scheduled again
assertTrue(fixture.executorService.hasScheduledRunnables())
}

@Test
fun `weight is considered for force flushing`() {
// weight is determined by number of buckets + weight of metrics
val aggregator = fixture.getSut(5)

// when 3 values are emitted
for (i in 0 until 3) {
aggregator.distribution(
"name",
i.toDouble(),
null,
null,
fixture.currentTimeMillis,
1
)
}
// no metrics are captured by the client
aggregator.flush(false)
verify(fixture.client, never()).captureMetrics(any())

// once we have 4 values and one bucket = weight of 5
aggregator.distribution(
"name",
10.0,
null,
null,
fixture.currentTimeMillis,
1
)
// then flush without force still captures all metrics
aggregator.flush(false)
verify(fixture.client).captureMetrics(any())
}
}

0 comments on commit 6a3be45

Please sign in to comment.