Skip to content

Commit

Permalink
feat(metrics): Track memory footprint of metrics buckets (#1284)
Browse files Browse the repository at this point in the history
To be able to limit the memory footprint of metrics buckets in the
aggregator, we need to keep track of the number of elements we store.
Instead of measuring the actual memory consumption, we apply a simple
model, roughly measuring the bytes needed to encode a bucket:

- counter buckets: 8 bytes (f64)
- set buckets: number of unique elements * 4 (f32)
- distribution buckets: number of unique elements * 12 (f64 + u32)
- gauge: 40 bytes (4 * f64 + 1 * u64)

To avoid iterating over all the buckets every
time we want to query the memory footprint, we keep a map of counters
per project key (plus one total count) that is incremented with the
footprint delta on every insert.
  • Loading branch information
jjbayer authored Jun 3, 2022
1 parent 904625c commit acb94fa
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- Add support for profile outcomes. ([#1272](https://github.com/getsentry/relay/pull/1272))
- Avoid potential panics when scrubbing minidumps. ([#1282](https://github.com/getsentry/relay/pull/1282))
- Fix typescript profile validation. ([#1283](https://github.com/getsentry/relay/pull/1283))
- Track memory footprint of metrics buckets. ([#1284](https://github.com/getsentry/relay/pull/1284))

## 22.5.0

Expand Down
208 changes: 205 additions & 3 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,21 @@ impl BucketValue {
Self::Distribution(m) => m.internal_size(),
}
}

/// Estimates the number of bytes needed to encode the bucket.
/// Note that this does not necessarily match the exact memory footprint of the bucket,
/// because datastructures might have a memory overhead.
///
/// This is very similar to [`BucketValue::relative_size`], which can possibly be removed.
pub fn cost(&self) -> usize {
match self {
Self::Counter(_) => 8,
Self::Set(s) => 4 * s.len(),
Self::Gauge(_) => 5 * 8,
// Distribution values are stored as maps of (f64, u32) pairs
Self::Distribution(m) => 12 * m.internal_size(),
}
}
}

impl From<MetricValue> for BucketValue {
Expand All @@ -537,7 +552,7 @@ impl From<MetricValue> for BucketValue {
///
/// Currently either a [`MetricValue`] or another `BucketValue`.
trait MergeValue: Into<BucketValue> {
/// Merges `self` into the given `bucket_value`.
/// Merges `self` into the given `bucket_value` and returns the additional cost for storing this value.
///
/// Aggregation is performed according to the rules documented in [`BucketValue`].
fn merge_into(self, bucket_value: &mut BucketValue) -> Result<(), AggregateMetricsError>;
Expand Down Expand Up @@ -1018,6 +1033,48 @@ enum AggregatorState {
ShuttingDown,
}

#[derive(Debug, Default)]
struct CostTracker {
total_cost: usize,
// Choosing a BTreeMap instead of a HashMap here, under the assumption that a BTreeMap
// is still more efficient for the number of project keys we store.
cost_per_project_key: BTreeMap<ProjectKey, usize>,
}

impl CostTracker {
fn add_cost(&mut self, project_key: ProjectKey, cost: usize) {
self.total_cost += cost;
let project_cost = self.cost_per_project_key.entry(project_key).or_insert(0);
*project_cost += cost;
}

fn subtract_cost(&mut self, project_key: ProjectKey, cost: usize) {
match self.cost_per_project_key.entry(project_key) {
btree_map::Entry::Vacant(_) => {
relay_log::error!(
"Trying to subtract cost for a project key that has not been tracked"
);
}
btree_map::Entry::Occupied(mut entry) => {
// Handle per-project cost:
let project_cost = entry.get_mut();
if cost > *project_cost {
relay_log::error!("Subtracting a project cost higher than what we tracked");
self.total_cost = self.total_cost.saturating_sub(*project_cost);
*project_cost = 0;
} else {
*project_cost -= cost;
self.total_cost = self.total_cost.saturating_sub(cost);
}
if *project_cost == 0 {
// Remove this project_key from the map
entry.remove();
}
}
};
}
}

/// A collector of [`Metric`] submissions.
///
/// # Aggregation
Expand Down Expand Up @@ -1074,6 +1131,7 @@ pub struct Aggregator {
buckets: HashMap<BucketKey, QueuedBucket>,
receiver: Recipient<FlushBuckets>,
state: AggregatorState,
cost_tracker: CostTracker,
}

impl Aggregator {
Expand All @@ -1087,6 +1145,7 @@ impl Aggregator {
buckets: HashMap::new(),
receiver,
state: AggregatorState::Running,
cost_tracker: CostTracker::default(),
}
}

Expand Down Expand Up @@ -1200,14 +1259,19 @@ impl Aggregator {

let key = Self::validate_bucket_key(key, &self.config)?;

let added_cost;
match self.buckets.entry(key) {
Entry::Occupied(mut entry) => {
relay_statsd::metric!(
counter(MetricCounters::MergeHit) += 1,
metric_type = entry.key().metric_type.as_str(),
metric_name = &entry.key().metric_name
);
value.merge_into(&mut entry.get_mut().value)?;
let bucket_value = &mut entry.get_mut().value;
let cost_before = bucket_value.cost();
value.merge_into(bucket_value)?;
let cost_after = bucket_value.cost();
added_cost = cost_after.saturating_sub(cost_before);
}
Entry::Vacant(entry) => {
relay_statsd::metric!(
Expand All @@ -1222,10 +1286,14 @@ impl Aggregator {
);

let flush_at = self.config.get_flush_time(timestamp, project_key);
entry.insert(QueuedBucket::new(flush_at, value.into()));
let bucket = value.into();
added_cost = bucket.cost();
entry.insert(QueuedBucket::new(flush_at, bucket));
}
}

self.cost_tracker.add_cost(project_key, added_cost);

Ok(())
}

Expand Down Expand Up @@ -1299,18 +1367,32 @@ impl Aggregator {
pub fn pop_flush_buckets(&mut self) -> HashMap<ProjectKey, Vec<Bucket>> {
relay_statsd::metric!(gauge(MetricGauges::Buckets) = self.buckets.len() as u64);

// We only emit statsd metrics for the cost on flush (and not when merging the buckets),
// assuming that this gives us more than enough data points.
relay_statsd::metric!(
gauge(MetricGauges::BucketsCost) = self.cost_tracker.total_cost as u64
);
for cost in self.cost_tracker.cost_per_project_key.values() {
relay_statsd::metric!(
histogram(MetricHistograms::BucketsCostPerProjectKey) = *cost as f64
);
}

let mut buckets = HashMap::<ProjectKey, Vec<Bucket>>::new();

let force = matches!(&self.state, AggregatorState::ShuttingDown);

relay_statsd::metric!(timer(MetricTimers::BucketsScanDuration), {
let bucket_interval = self.config.bucket_interval;
let cost_tracker = &mut self.cost_tracker;
self.buckets.retain(|key, entry| {
if force || entry.elapsed() {
// Take the value and leave a placeholder behind. It'll be removed right after.
let value = std::mem::replace(&mut entry.value, BucketValue::Counter(0.0));
cost_tracker.subtract_cost(key.project_key, value.cost());
let bucket = Bucket::from_parts(key.clone(), bucket_interval, value);
buckets.entry(key.project_key).or_default().push(bucket);

false
} else {
true
Expand Down Expand Up @@ -1883,6 +1965,24 @@ mod tests {
);
}

#[test]
fn test_bucket_value_cost() {
let counter = BucketValue::Counter(123.0);
assert_eq!(counter.cost(), 8);
let set = BucketValue::Set(vec![1, 2, 3, 4, 5].into_iter().collect());
assert_eq!(set.cost(), 20);
let distribution = BucketValue::Distribution(dist![1., 2., 3.]);
assert_eq!(distribution.cost(), 36);
let gauge = BucketValue::Gauge(GaugeValue {
max: 43.,
min: 42.,
sum: 85.,
last: 43.,
count: 2,
});
assert_eq!(gauge.cost(), 40);
}

#[test]
fn test_aggregator_merge_counters() {
relay_test::setup();
Expand Down Expand Up @@ -2059,6 +2159,108 @@ mod tests {
assert_eq!(aggregator.buckets.len(), 2);
}

#[test]
fn test_cost_tracker() {
let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let project_key3 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap();
let mut cost_tracker = CostTracker::default();
insta::assert_debug_snapshot!(cost_tracker, @r###"
CostTracker {
total_cost: 0,
cost_per_project_key: {},
}
"###);
cost_tracker.add_cost(project_key1, 100);
insta::assert_debug_snapshot!(cost_tracker, @r###"
CostTracker {
total_cost: 100,
cost_per_project_key: {
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fed"): 100,
},
}
"###);
cost_tracker.add_cost(project_key2, 200);
insta::assert_debug_snapshot!(cost_tracker, @r###"
CostTracker {
total_cost: 300,
cost_per_project_key: {
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fed"): 100,
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 200,
},
}
"###);
// Unknown project: Will log error, but not crash
cost_tracker.subtract_cost(project_key3, 666);
insta::assert_debug_snapshot!(cost_tracker, @r###"
CostTracker {
total_cost: 300,
cost_per_project_key: {
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fed"): 100,
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 200,
},
}
"###);
// Subtract too much: Will log error, but not crash
cost_tracker.subtract_cost(project_key1, 666);
insta::assert_debug_snapshot!(cost_tracker, @r###"
CostTracker {
total_cost: 200,
cost_per_project_key: {
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 200,
},
}
"###);
cost_tracker.subtract_cost(project_key2, 20);
insta::assert_debug_snapshot!(cost_tracker, @r###"
CostTracker {
total_cost: 180,
cost_per_project_key: {
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 180,
},
}
"###);
cost_tracker.subtract_cost(project_key2, 180);
insta::assert_debug_snapshot!(cost_tracker, @r###"
CostTracker {
total_cost: 0,
cost_per_project_key: {},
}
"###);
}

#[test]
fn test_aggregator_cost_tracking() {
// Make sure that the right cost is added / subtracted
let receiver = TestReceiver::start_default().recipient();
let mut aggregator = Aggregator::new(test_config(), receiver);
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

let mut metric = Metric {
name: "c:foo".to_owned(),
unit: MetricUnit::None,
value: MetricValue::Counter(42.),
timestamp: UnixTimestamp::from_secs(999994711),
tags: BTreeMap::new(),
};
for (metric_value, expected_total_cost) in [
(MetricValue::Counter(42.), 8),
(MetricValue::Counter(42.), 8), // counters have constant size
(MetricValue::Set(123), 12), // 8 + 1*4
(MetricValue::Set(123), 12), // Same element in set, no change
(MetricValue::Set(456), 16), // Different element in set -> +4
(MetricValue::Distribution(1.0), 28), // 1 unique element -> +12
(MetricValue::Distribution(1.0), 28), // no new element
(MetricValue::Distribution(2.0), 40), // 1 new element -> +12
(MetricValue::Gauge(0.3), 80),
(MetricValue::Gauge(0.2), 80), // gauge has constant size
] {
metric.value = metric_value;
aggregator.insert(project_key, metric.clone()).unwrap();
assert_eq!(aggregator.cost_tracker.total_cost, expected_total_cost);
}
}

#[test]
fn test_flush_bucket() {
relay_test::setup();
Expand Down
9 changes: 9 additions & 0 deletions relay-metrics/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ pub enum MetricHistograms {
/// - `backdated`: A flag indicating whether the metric was reported within the `initial_delay`
/// time period (`false`) or after the initial delay has expired (`true`).
BucketsDelay,

/// The storage cost of metrics buckets stored Relay's metrics aggregator, for a project key.
///
/// See also [`MetricGauges::BucketsCost`].
BucketsCostPerProjectKey,
}

impl HistogramMetric for MetricHistograms {
Expand All @@ -112,6 +117,7 @@ impl HistogramMetric for MetricHistograms {
Self::BucketsFlushedPerProject => "metrics.buckets.flushed_per_project",
Self::BucketRelativeSize => "metrics.buckets.relative_bucket_size",
Self::BucketsDelay => "metrics.buckets.delay",
Self::BucketsCostPerProjectKey => "metrics.buckets.cost_per_project_key",
}
}
}
Expand All @@ -120,12 +126,15 @@ impl HistogramMetric for MetricHistograms {
pub enum MetricGauges {
/// The total number of metric buckets in Relay's metrics aggregator.
Buckets,
/// The total storage cost of metric buckets in Relay's metrics aggregator.
BucketsCost,
}

impl GaugeMetric for MetricGauges {
fn name(&self) -> &'static str {
match *self {
Self::Buckets => "metrics.buckets",
Self::BucketsCost => "metrics.buckets.cost",
}
}
}

0 comments on commit acb94fa

Please sign in to comment.