-
Notifications
You must be signed in to change notification settings - Fork 91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(metrics): Track memory footprint of metrics buckets [INGEST-1132] #1284
Changes from 7 commits
b993fcd
6b9f1af
3e9bee3
cd0ab62
f2592be
024fb12
e213cb4
cb0cdaf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -520,6 +520,21 @@ impl BucketValue { | |
Self::Distribution(m) => m.internal_size(), | ||
} | ||
} | ||
|
||
/// Estimates the number of bytes needed to encode the bucket. | ||
/// Note that this does not necessarily match the exact memory footprint of the bucket, | ||
/// because datastructures might have a memory overhead. | ||
/// | ||
/// This is very similar to [`BucketValue::relative_size`], which can possibly be removed. | ||
pub fn cost(&self) -> usize { | ||
match self { | ||
Self::Counter(_) => 8, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of hard-coding, use Also, note that this is wrong. The size of a bucket value is always const DIST_SIZE: usize = mem::size_of::<f64>() + mem::size_of::<Count>();
let allocations = match self {
Self::Counter(_) => 0,
Self::Set(s) => s.len() * mem::size_of::<u32>(), // better to typedef this to `SetValue` now
Self::Gauge(_) => 0,
Self::Distribution(m) => m.internal_size() * DIST_SIZE,
};
mem::size_of::<Self>() + allocations There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had an implementation that used There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is still hared-coded and explicit, it just doesn't use magic numbers to be more self-explanatory. |
||
Self::Set(s) => 4 * s.len(), | ||
Self::Gauge(_) => 5 * 8, | ||
// Distribution values are stored as maps of (f64, u32) pairs | ||
Self::Distribution(m) => 12 * m.internal_size(), | ||
} | ||
} | ||
} | ||
|
||
impl From<MetricValue> for BucketValue { | ||
|
@@ -537,7 +552,7 @@ impl From<MetricValue> for BucketValue { | |
/// | ||
/// Currently either a [`MetricValue`] or another `BucketValue`. | ||
trait MergeValue: Into<BucketValue> { | ||
/// Merges `self` into the given `bucket_value`. | ||
/// Merges `self` into the given `bucket_value` and returns the additional cost for storing this value. | ||
/// | ||
/// Aggregation is performed according to the rules documented in [`BucketValue`]. | ||
fn merge_into(self, bucket_value: &mut BucketValue) -> Result<(), AggregateMetricsError>; | ||
|
@@ -1018,6 +1033,48 @@ enum AggregatorState { | |
ShuttingDown, | ||
} | ||
|
||
#[derive(Debug, Default)] | ||
struct CostTracker { | ||
total_cost: usize, | ||
// Choosing a BTreeMap instead of a HashMap here, under the assumption that a BTreeMap | ||
// is still more efficient for the number of project keys we store. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This reasoning is up for discussion. |
||
cost_per_project_key: BTreeMap<ProjectKey, usize>, | ||
} | ||
|
||
impl CostTracker { | ||
fn add_cost(&mut self, project_key: ProjectKey, cost: usize) { | ||
self.total_cost += cost; | ||
let project_cost = self.cost_per_project_key.entry(project_key).or_insert(0); | ||
*project_cost += cost; | ||
} | ||
|
||
fn subtract_cost(&mut self, project_key: ProjectKey, cost: usize) { | ||
match self.cost_per_project_key.entry(project_key) { | ||
btree_map::Entry::Vacant(_) => { | ||
relay_log::error!( | ||
"Trying to subtract cost for a project key that has not been tracked" | ||
); | ||
} | ||
btree_map::Entry::Occupied(mut entry) => { | ||
// Handle per-project cost: | ||
let project_cost = entry.get_mut(); | ||
if cost > *project_cost { | ||
relay_log::error!("Subtracting a project cost higher than what we tracked"); | ||
self.total_cost = self.total_cost.saturating_sub(*project_cost); | ||
*project_cost = 0; | ||
} else { | ||
*project_cost -= cost; | ||
self.total_cost = self.total_cost.saturating_sub(cost); | ||
} | ||
if *project_cost == 0 { | ||
// Remove this project_key from the map | ||
entry.remove(); | ||
} | ||
} | ||
}; | ||
} | ||
} | ||
|
||
/// A collector of [`Metric`] submissions. | ||
/// | ||
/// # Aggregation | ||
|
@@ -1074,6 +1131,7 @@ pub struct Aggregator { | |
buckets: HashMap<BucketKey, QueuedBucket>, | ||
receiver: Recipient<FlushBuckets>, | ||
state: AggregatorState, | ||
cost_tracker: CostTracker, | ||
} | ||
|
||
impl Aggregator { | ||
|
@@ -1087,6 +1145,7 @@ impl Aggregator { | |
buckets: HashMap::new(), | ||
receiver, | ||
state: AggregatorState::Running, | ||
cost_tracker: CostTracker::default(), | ||
} | ||
} | ||
|
||
|
@@ -1200,14 +1259,19 @@ impl Aggregator { | |
|
||
let key = Self::validate_bucket_key(key, &self.config)?; | ||
|
||
let added_cost; | ||
match self.buckets.entry(key) { | ||
Entry::Occupied(mut entry) => { | ||
relay_statsd::metric!( | ||
counter(MetricCounters::MergeHit) += 1, | ||
metric_type = entry.key().metric_type.as_str(), | ||
metric_name = &entry.key().metric_name | ||
); | ||
value.merge_into(&mut entry.get_mut().value)?; | ||
let bucket_value = &mut entry.get_mut().value; | ||
let cost_before = bucket_value.cost(); | ||
value.merge_into(bucket_value)?; | ||
let cost_after = bucket_value.cost(); | ||
added_cost = cost_after.saturating_sub(cost_before); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could probably optimize this by making There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't it be simpler to just add the cost of the single value here and have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah that could work. |
||
} | ||
Entry::Vacant(entry) => { | ||
relay_statsd::metric!( | ||
|
@@ -1222,10 +1286,14 @@ impl Aggregator { | |
); | ||
|
||
let flush_at = self.config.get_flush_time(timestamp, project_key); | ||
entry.insert(QueuedBucket::new(flush_at, value.into())); | ||
let bucket = value.into(); | ||
added_cost = bucket.cost(); | ||
entry.insert(QueuedBucket::new(flush_at, bucket)); | ||
} | ||
} | ||
|
||
self.cost_tracker.add_cost(project_key, added_cost); | ||
|
||
Ok(()) | ||
} | ||
|
||
|
@@ -1305,10 +1373,12 @@ impl Aggregator { | |
|
||
relay_statsd::metric!(timer(MetricTimers::BucketsScanDuration), { | ||
let bucket_interval = self.config.bucket_interval; | ||
let cost_tracker = &mut self.cost_tracker; | ||
self.buckets.retain(|key, entry| { | ||
if force || entry.elapsed() { | ||
// Take the value and leave a placeholder behind. It'll be removed right after. | ||
let value = std::mem::replace(&mut entry.value, BucketValue::Counter(0.0)); | ||
cost_tracker.subtract_cost(key.project_key, value.cost()); | ||
let bucket = Bucket::from_parts(key.clone(), bucket_interval, value); | ||
buckets.entry(key.project_key).or_default().push(bucket); | ||
false | ||
|
@@ -1318,6 +1388,12 @@ impl Aggregator { | |
}); | ||
}); | ||
|
||
// We only emit a statsd metric for the total cost on flush (and not when merging the buckets), | ||
// assuming that this gives us more than enough data points. | ||
relay_statsd::metric!( | ||
gauge(MetricGauges::BucketsCost) = self.cost_tracker.total_cost as u64 | ||
); | ||
|
||
buckets | ||
} | ||
|
||
|
@@ -1883,6 +1959,24 @@ mod tests { | |
); | ||
} | ||
|
||
#[test] | ||
fn test_bucket_value_cost() { | ||
let counter = BucketValue::Counter(123.0); | ||
assert_eq!(counter.cost(), 8); | ||
let set = BucketValue::Set(vec![1, 2, 3, 4, 5].into_iter().collect()); | ||
assert_eq!(set.cost(), 20); | ||
let distribution = BucketValue::Distribution(dist![1., 2., 3.]); | ||
assert_eq!(distribution.cost(), 36); | ||
let gauge = BucketValue::Gauge(GaugeValue { | ||
max: 43., | ||
min: 42., | ||
sum: 85., | ||
last: 43., | ||
count: 2, | ||
}); | ||
assert_eq!(gauge.cost(), 40); | ||
} | ||
|
||
#[test] | ||
fn test_aggregator_merge_counters() { | ||
relay_test::setup(); | ||
|
@@ -2059,6 +2153,108 @@ mod tests { | |
assert_eq!(aggregator.buckets.len(), 2); | ||
} | ||
|
||
#[test] | ||
fn test_cost_tracker() { | ||
let project_key1 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); | ||
let project_key2 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); | ||
let project_key3 = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fef").unwrap(); | ||
let mut cost_tracker = CostTracker::default(); | ||
insta::assert_debug_snapshot!(cost_tracker, @r###" | ||
CostTracker { | ||
total_cost: 0, | ||
cost_per_project_key: {}, | ||
} | ||
"###); | ||
cost_tracker.add_cost(project_key1, 100); | ||
insta::assert_debug_snapshot!(cost_tracker, @r###" | ||
CostTracker { | ||
total_cost: 100, | ||
cost_per_project_key: { | ||
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fed"): 100, | ||
}, | ||
} | ||
"###); | ||
cost_tracker.add_cost(project_key2, 200); | ||
insta::assert_debug_snapshot!(cost_tracker, @r###" | ||
CostTracker { | ||
total_cost: 300, | ||
cost_per_project_key: { | ||
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fed"): 100, | ||
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 200, | ||
}, | ||
} | ||
"###); | ||
// Unknown project: Will log error, but not crash | ||
cost_tracker.subtract_cost(project_key3, 666); | ||
insta::assert_debug_snapshot!(cost_tracker, @r###" | ||
CostTracker { | ||
total_cost: 300, | ||
cost_per_project_key: { | ||
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fed"): 100, | ||
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 200, | ||
}, | ||
} | ||
"###); | ||
// Subtract too much: Will log error, but not crash | ||
cost_tracker.subtract_cost(project_key1, 666); | ||
insta::assert_debug_snapshot!(cost_tracker, @r###" | ||
CostTracker { | ||
total_cost: 200, | ||
cost_per_project_key: { | ||
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 200, | ||
}, | ||
} | ||
"###); | ||
cost_tracker.subtract_cost(project_key2, 20); | ||
insta::assert_debug_snapshot!(cost_tracker, @r###" | ||
CostTracker { | ||
total_cost: 180, | ||
cost_per_project_key: { | ||
ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"): 180, | ||
}, | ||
} | ||
"###); | ||
cost_tracker.subtract_cost(project_key2, 180); | ||
insta::assert_debug_snapshot!(cost_tracker, @r###" | ||
CostTracker { | ||
total_cost: 0, | ||
cost_per_project_key: {}, | ||
} | ||
"###); | ||
} | ||
|
||
#[test] | ||
fn test_aggregator_cost_tracking() { | ||
// Make sure that the right cost is added / subtracted | ||
let receiver = TestReceiver::start_default().recipient(); | ||
let mut aggregator = Aggregator::new(test_config(), receiver); | ||
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap(); | ||
|
||
let mut metric = Metric { | ||
name: "c:foo".to_owned(), | ||
unit: MetricUnit::None, | ||
value: MetricValue::Counter(42.), | ||
timestamp: UnixTimestamp::from_secs(999994711), | ||
tags: BTreeMap::new(), | ||
}; | ||
for (metric_value, expected_total_cost) in [ | ||
(MetricValue::Counter(42.), 8), | ||
(MetricValue::Counter(42.), 8), // counters have constant size | ||
(MetricValue::Set(123), 12), // 8 + 1*4 | ||
(MetricValue::Set(123), 12), // Same element in set, no change | ||
(MetricValue::Set(456), 16), // Different element in set -> +4 | ||
(MetricValue::Distribution(1.0), 28), // 1 unique element -> +12 | ||
(MetricValue::Distribution(1.0), 28), // no new element | ||
(MetricValue::Distribution(2.0), 40), // 1 new element -> +12 | ||
(MetricValue::Gauge(0.3), 80), | ||
(MetricValue::Gauge(0.2), 80), // gauge has constant size | ||
] { | ||
metric.value = metric_value; | ||
aggregator.insert(project_key, metric.clone()).unwrap(); | ||
assert_eq!(aggregator.cost_tracker.total_cost, expected_total_cost); | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_flush_bucket() { | ||
relay_test::setup(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if
cost
is the best name for what we're modeling here. Open for suggestions.