-
Notifications
You must be signed in to change notification settings - Fork 452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding two level hashing in metrics hashmap #1564
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1564 +/- ##
=======================================
+ Coverage 69.3% 69.6% +0.3%
=======================================
Files 136 136
Lines 19637 19946 +309
=======================================
+ Hits 13610 13894 +284
- Misses 6027 6052 +25 ☔ View full report in Codecov by Sentry. |
Thanks! This is amazing! Lets sit on this for 1- 2 days to see if any concerns, before proceeding to do similar for other instrument types. Also, lets do this after the pending release. |
@lalitb love the performance gains! Were you able to test this impl against some of the single level hashmap alternative implementations?
@cijothomas If it's truly for discussion a week seems like a more reasonable timeframe. |
@hdost Do you have any alternative implemetations in mind, can test that. I tested
As tested separately, the dashmap was much faster for concurrent read-only operations, but our scenario is concurrent updates. Also, we were using dashmap in the old metrics implementation, but was removed in the new PR (didn't find the reason in history). In general, reluctant to take dependency on any external crate unless it is widely used :) |
From what it seems there's no public interface effect with this so let's take the win for now and see if we can't make further improvements later 👍 |
} else { | ||
// TBD - Update total_count ?? | ||
values | ||
.entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to store the overflow attribute in each bucket? or once only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overflow attribute is handled separately, and is stored only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am not sure how is this handled.. it looks to me like we'll store overflow for each of the hasmaps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, this was indeed overflow every hashmap. Have fixed it now. Will also add tests to validate this.
I see you've got some further improvements, how's the performance improvement for you ? |
The additional improvements are with (optionally) using the fast hashing with hashbrown/ahash. Will update the improvements later from the same machine I used as base earlier. As of now, I am replicating the same changes for other aggregation. |
Updated the latest throughputs in the PR description. The perf-boost using hashbrown/ahash is marginal. |
As discussed during the community meeting, have added the tests for overflow and concurrent measurement recordings. |
.get_finished_metrics() | ||
.expect("metrics are expected to be exported."); | ||
// Every collect cycle produces a new ResourceMetrics (even if no data is collected). | ||
// TBD = This needs to be fixed, and then below assert should validate for one entry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will create an issue for this.
.init(); | ||
|
||
// sleep for random ~5 milis to avoid recording during first collect cycle | ||
// (TBD: need to fix PeriodicReader to NOT collect data immediately after start) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will create an issue for this.
let bucket_guard = bucket_mutex.lock().unwrap(); | ||
|
||
let is_new_entry = if let Some(bucket) = &*bucket_guard { | ||
!bucket.contains_key(&attrs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the common path where attributes are already existing, now we have to acquire lock once, do lookup, release lock, and the re-acquire the lock, and do the lookup+update.
Apart from the perf hit, this loses the atomicity of the update. It is possible that, between the time we release the lock and re-acquire, other entries might have occurred and the limit was hit, so this attribute should be going into over-flow.
We need to ensure atomicity and avoid the two-step lock-release-re-lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good observation, made the required changes. All operations should be (theoretically) atomic now, and perf should be fast for common scenarios of attributes already existing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let (bucket_index, attrs) = if under_limit { | ||
(bucket_index, attrs) // the index remains same | ||
} else { | ||
// TBD - Should we log, as this can flood the logs ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't log. the existing code was incorrectly doing it.
}; | ||
|
||
// Lock and update the relevant bucket | ||
let mut final_bucket_guard = self.buckets[bucket_index].lock().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this wont work, because by the time we reach and acquire this lock, collect() may have triggered and wiped the hashmap clean, and another updates could have occurred as well making our assumption that this attribute is not-an-overflow invalid.
let under_limit = self.try_increment();
It is entirely possible that this thread loses its CPU right after the above statement is executed. When it gets a chance to execute again, under_limit could have changed already. We need to do this whole thing atomically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two changes in the design
- We increment before the actual insert.
- The increment i.e, try_increment() is atomic.
There is a narrow window where a measurement with new set of attributes go to overflow index even when there is empty index (this can happen during collect), which should be fine. However, there won't be a scenario where the number of data points in the map get exceeded beyond the cardinality limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And thanks for the thorough review of this PR, thinking of all possible scenarios and edge conditions :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
take this example:
thread1 is executing update with attributes (foo=bar)., the current total_unique is 100, with limit as 2000
say bucket_index is 4
take lock
(foo=bar) is not present.
drop lock
try_increment is called which increments total_unique to 101, which is within the limit., so this thread will go and update bucket_index 4.
====thread1 lost its CPU===
// in other threads
collect() occurred and it reset total_unique to 0
other updates() occurred and take total_unique to 2000 (limit hit). None of these updates() were having (foo=bar) as attribute.
At this stage (foo=bar) should be going into overflow.
====thread1 got back its CPU===
It takes lock to update bucket_index 4 with (foo=bar). But this is not correct. (foo=bar) should be going to overflow only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collect() occurred and it reset total_unique to 0
It won't reset to 0, but will only adjust(decrement) according to the number of entries read/drained.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collect() read and drained the entire hashmaps. Won't it hit 0 then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the above scenario, when the number of entries in hashmap is 100, while the unique_count is 101 (for foo=bar), if collect happens before foo=bar is inserted, the count will get reset to 1 (not 0), and then the insert happens after/during the collect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
take the example where 5 threads try to update with (foo=bar) which is not existing currently. The current unique_count is 1999, and limit is 2000.
All 5 threads realize entry not found, so they all attempt to increment the unique_count.
first thread succeeds, and attempts to store into correct bucket but remaining 4 threads will attempt to put foo=bar into overflow. I believe this is the part which you said might be okay?
However, consider the same scenario, but the current unique_count was just 10.
All 5 threads realize entry not found, so they all attempt to increment the unique_count.
All threads would succeed, so unique_count reaches 15. (though it should actually be 11)
One threads would then insert the hashmap and rest would update the hashmap.
Collect() runs, and it drains the map, but when it drains (foo=bar), it'll reduce unique_count by 1 (from 15 to 14). So we now have 4 wasted entries. No matter how many collect() runs afterwards, we'll never reclaim that wasted spot... If we run stress test long enough (may weeks), we might see all entries going to overflow, even though there is plenty of space.
Please confirm if the above is correct.
Just an update, I am still working on it, particularly to mitigate the challenges around concurrency and atomicity - The main issue is making sure the operations of checking if we're under the cardinality limit and then acting on it (like inserting a new entry) happen seamlessly without any race conditions or inconsistencies due to concurrent updates. This actually turned out to be more complex than initially anticipated, especially when trying to do so without compromising on performance. Still working on it, hoping to have a solution during this week :) |
Let us know if this is ready for another review. If not, we can mark as draft again. @hdost I am waiting for confirmation that this is ready for review, as we have open conversations that are not marked resolved. |
Sorry for the delay. Will revisit it now. If it takes more time, will move it to draft. |
There have been substantial perf improvements in metrics instrumentation with #1833. Will revisit this PR after logs beta release. Closing this for now to keep the PR list minimal. |
Changes
This PR is opened just for the discussion, of one of the possible changes to improve the performance of concurrent recording/updation of measurements in the hot-path. In today's (Feb 20th, 2024) community meeting, we discussed multiple approaches
dashmap
- sharded hashmap.flurry
- uses fine-grained atomic locks in hashmapsharde-slab
- building hashmap using concurrent data-structures provided by this crate.thread-local
hashmaps in the hot-path to record measurements and then merge them during collection. The collection would be a performance-heavy process. And the SDK doesn't have control over the life cycle of threads created by the application, so exiting a thread by the application will also remove all it's aggregated data.The approach in this PR is modifying the
ValueMap
store values in two-level hashing to minimize lock contention. It's basically a more simpler form of sharding as provided bydashmap
.Existing:
PR:
First level of hashing is to distribute the values across the fixed set of 256 buckets. Each bucket is guarded by its own mutex and contains a second-level hash map for storing AttributeSet to aggregation mapping.
Please note
locality of reference
during collection cycle, as the OS can allocate these buckets in different segments of memory, and so mayn't be cached together.All above are the limitation of any sharded hashmap (including
dashmap
).This is the result of our metrics stress test in my machine. However, I don't think results are really consistent across machines, so good if someone can test it too:
Based on the above results- with PR, the perf seems to be increasing significantly with the number of threads. In the main branch, the performance increases with threads till a threshold and then starts degrading.
Benchmark result. This PR won't improve performance there, as these are single-threaded tests. There is slight retrogress for two-level indirection, which gets compensated (in fact improved) with hashbrown/ahash:
Hashbrown/ahash seems widely adopted crates in terms of number of downloads, and also secure enough for external ddos attacks - https://github.com/tkaitchuck/aHash/wiki/How-aHash-is-resists-DOS-attacks
Merge requirement checklist
CHANGELOG.md
files updated for non-trivial, user-facing changes