-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support pending writes within CachedCounterValue #299
Changes from all commits
8c48bff
df3b556
f3a7f07
620c021
826b8a9
87f69a7
9d2b2c9
49dd4c4
83ba6a7
2b22301
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,27 +4,147 @@ use crate::storage::redis::{ | |
DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC, | ||
DEFAULT_TTL_RATIO_CACHED_COUNTERS, | ||
}; | ||
use dashmap::mapref::entry::Entry; | ||
use dashmap::DashMap; | ||
use moka::sync::Cache; | ||
use std::collections::HashMap; | ||
use std::future::Future; | ||
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; | ||
use std::sync::Arc; | ||
use std::time::{Duration, SystemTime}; | ||
use tokio::select; | ||
use tokio::sync::Notify; | ||
use tokio::time::interval; | ||
|
||
pub struct CachedCounterValue { | ||
value: AtomicExpiringValue, | ||
initial_value: AtomicI64, | ||
expiry: AtomicExpiryTime, | ||
from_authority: AtomicBool, | ||
} | ||
|
||
pub struct Batcher { | ||
updates: DashMap<Counter, Arc<CachedCounterValue>>, | ||
notifier: Notify, | ||
interval: Duration, | ||
priority_flush: AtomicBool, | ||
} | ||
|
||
impl Batcher { | ||
fn new(period: Duration) -> Self { | ||
Self { | ||
updates: Default::default(), | ||
notifier: Default::default(), | ||
interval: period, | ||
priority_flush: AtomicBool::new(false), | ||
} | ||
} | ||
|
||
pub fn is_empty(&self) -> bool { | ||
self.updates.is_empty() | ||
} | ||
|
||
pub async fn consume<F, Fut, O>(&self, min: usize, consumer: F) -> O | ||
where | ||
F: FnOnce(HashMap<Counter, Arc<CachedCounterValue>>) -> Fut, | ||
Fut: Future<Output = O>, | ||
{ | ||
let mut interval = interval(self.interval); | ||
let mut ready = self.updates.len() >= min; | ||
loop { | ||
if ready { | ||
let mut batch = Vec::with_capacity(min); | ||
for entry in &self.updates { | ||
if entry.value().requires_fast_flush(&self.interval) { | ||
batch.push(entry.key().clone()); | ||
if batch.len() == min { | ||
break; | ||
} | ||
} | ||
} | ||
if let Some(remaining) = min.checked_sub(batch.len()) { | ||
let take = self.updates.iter().take(remaining); | ||
batch.append(&mut take.map(|e| e.key().clone()).collect()); | ||
} | ||
let mut result = HashMap::new(); | ||
for counter in &batch { | ||
let value = self.updates.get(counter).unwrap().clone(); | ||
result.insert(counter.clone(), value); | ||
} | ||
let result = consumer(result).await; | ||
for counter in &batch { | ||
self.updates | ||
.remove_if(counter, |_, v| v.no_pending_writes()); | ||
} | ||
Comment on lines
+75
to
+78
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If after having flushed to redis no additional pending writes were added, we can remove these entries from the queue. |
||
return result; | ||
} else { | ||
ready = select! { | ||
_ = self.notifier.notified() => { | ||
self.updates.len() >= min || | ||
self.priority_flush | ||
.compare_exchange(true, false, Ordering::Release, Ordering::Acquire) | ||
.is_ok() | ||
}, | ||
_ = interval.tick() => true, | ||
} | ||
} | ||
} | ||
} | ||
|
||
pub fn add(&self, counter: Counter, value: Arc<CachedCounterValue>) { | ||
let priority = value.requires_fast_flush(&self.interval); | ||
match self.updates.entry(counter.clone()) { | ||
Entry::Occupied(needs_merge) => { | ||
let arc = needs_merge.get(); | ||
if !Arc::ptr_eq(arc, &value) { | ||
arc.delta(&counter, value.pending_writes().unwrap()); | ||
} | ||
} | ||
Entry::Vacant(miss) => { | ||
miss.insert_entry(value); | ||
} | ||
}; | ||
if priority { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we start constraining the batch size sent to Redis, we may want to add the priority flag in the value too so that the first batch we execute contains the priority items first. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I do this "hack" where I check the TTL on entry… to decide the things that are "important", but I think I should maybe just waste the additional bit… |
||
self.priority_flush.store(true, Ordering::Release); | ||
} | ||
self.notifier.notify_one(); | ||
} | ||
} | ||
|
||
impl Default for Batcher { | ||
fn default() -> Self { | ||
Self::new(Duration::from_millis(100)) | ||
} | ||
} | ||
|
||
pub struct CountersCache { | ||
max_ttl_cached_counters: Duration, | ||
pub ttl_ratio_cached_counters: u64, | ||
cache: Cache<Counter, Arc<CachedCounterValue>>, | ||
batcher: Batcher, | ||
} | ||
|
||
impl CachedCounterValue { | ||
pub fn from(counter: &Counter, value: i64, ttl: Duration) -> Self { | ||
pub fn from_authority(counter: &Counter, value: i64, ttl: Duration) -> Self { | ||
let now = SystemTime::now(); | ||
Self { | ||
value: AtomicExpiringValue::new(value, now + Duration::from_secs(counter.seconds())), | ||
initial_value: AtomicI64::new(value), | ||
expiry: AtomicExpiryTime::from_now(ttl), | ||
from_authority: AtomicBool::new(true), | ||
} | ||
} | ||
|
||
pub fn load_from_authority_asap(counter: &Counter, temp_value: i64) -> Self { | ||
let now = SystemTime::now(); | ||
Self { | ||
value: AtomicExpiringValue::new( | ||
temp_value, | ||
now + Duration::from_secs(counter.seconds()), | ||
), | ||
initial_value: AtomicI64::new(temp_value), | ||
expiry: AtomicExpiryTime::from_now(Duration::from_secs(counter.seconds())), | ||
from_authority: AtomicBool::new(false), | ||
} | ||
} | ||
|
||
|
@@ -34,13 +154,58 @@ impl CachedCounterValue { | |
|
||
pub fn set_from_authority(&self, counter: &Counter, value: i64, expiry: Duration) { | ||
let time_window = Duration::from_secs(counter.seconds()); | ||
self.initial_value.store(value, Ordering::SeqCst); | ||
self.value.set(value, time_window); | ||
self.expiry.update(expiry); | ||
self.from_authority.store(true, Ordering::Release); | ||
} | ||
|
||
pub fn delta(&self, counter: &Counter, delta: i64) -> i64 { | ||
self.value | ||
.update(delta, counter.seconds(), SystemTime::now()) | ||
let value = self | ||
.value | ||
.update(delta, counter.seconds(), SystemTime::now()); | ||
if value == delta { | ||
// new window, invalidate initial value | ||
self.initial_value.store(0, Ordering::SeqCst); | ||
} | ||
value | ||
} | ||
|
||
pub fn pending_writes(&self) -> Result<i64, ()> { | ||
let start = self.initial_value.load(Ordering::SeqCst); | ||
let value = self.value.value_at(SystemTime::now()); | ||
let offset = if start == 0 { | ||
value | ||
} else { | ||
let writes = value - start; | ||
if writes > 0 { | ||
writes | ||
} else { | ||
value | ||
} | ||
}; | ||
match self | ||
.initial_value | ||
.compare_exchange(start, value, Ordering::SeqCst, Ordering::SeqCst) | ||
{ | ||
Ok(_) => Ok(offset), | ||
Err(newer) => { | ||
if newer == 0 { | ||
// We got expired in the meantime, this fresh value can wait the next iteration | ||
Ok(0) | ||
} else { | ||
// Concurrent call to this method? | ||
// We could support that with a CAS loop in the future if needed | ||
Err(()) | ||
} | ||
} | ||
} | ||
} | ||
|
||
fn no_pending_writes(&self) -> bool { | ||
let start = self.initial_value.load(Ordering::SeqCst); | ||
let value = self.value.value_at(SystemTime::now()); | ||
value - start == 0 | ||
} | ||
|
||
pub fn hits(&self, _: &Counter) -> i64 { | ||
|
@@ -58,6 +223,10 @@ impl CachedCounterValue { | |
pub fn to_next_window(&self) -> Duration { | ||
self.value.ttl() | ||
} | ||
|
||
pub fn requires_fast_flush(&self, within: &Duration) -> bool { | ||
self.from_authority.load(Ordering::Acquire) || &self.value.ttl() <= within | ||
} | ||
} | ||
|
||
pub struct CountersCacheBuilder { | ||
|
@@ -90,18 +259,31 @@ impl CountersCacheBuilder { | |
self | ||
} | ||
|
||
pub fn build(&self) -> CountersCache { | ||
pub fn build(&self, period: Duration) -> CountersCache { | ||
CountersCache { | ||
max_ttl_cached_counters: self.max_ttl_cached_counters, | ||
ttl_ratio_cached_counters: self.ttl_ratio_cached_counters, | ||
cache: Cache::new(self.max_cached_counters as u64), | ||
batcher: Batcher::new(period), | ||
} | ||
} | ||
} | ||
|
||
impl CountersCache { | ||
pub fn get(&self, counter: &Counter) -> Option<Arc<CachedCounterValue>> { | ||
self.cache.get(counter) | ||
let option = self.cache.get(counter); | ||
if option.is_none() { | ||
let from_queue = self.batcher.updates.get(counter); | ||
if let Some(entry) = from_queue { | ||
self.cache.insert(counter.clone(), entry.value().clone()); | ||
return Some(entry.value().clone()); | ||
} | ||
} | ||
option | ||
} | ||
|
||
pub fn batcher(&self) -> &Batcher { | ||
&self.batcher | ||
} | ||
|
||
pub fn insert( | ||
|
@@ -122,25 +304,38 @@ impl CountersCache { | |
if let Some(ttl) = cache_ttl.checked_sub(ttl_margin) { | ||
if ttl > Duration::ZERO { | ||
let previous = self.cache.get_with(counter.clone(), || { | ||
Arc::new(CachedCounterValue::from(&counter, counter_val, cache_ttl)) | ||
if let Some(entry) = self.batcher.updates.get(&counter) { | ||
entry.value().clone() | ||
} else { | ||
Arc::new(CachedCounterValue::from_authority( | ||
&counter, | ||
counter_val, | ||
ttl, | ||
)) | ||
} | ||
}); | ||
if previous.expired_at(now) || previous.value.value() < counter_val { | ||
previous.set_from_authority(&counter, counter_val, cache_ttl); | ||
previous.set_from_authority(&counter, counter_val, ttl); | ||
} | ||
return previous; | ||
} | ||
} | ||
Arc::new(CachedCounterValue::from( | ||
Arc::new(CachedCounterValue::load_from_authority_asap( | ||
&counter, | ||
counter_val, | ||
Duration::ZERO, | ||
)) | ||
} | ||
|
||
pub fn increase_by(&self, counter: &Counter, delta: i64) { | ||
if let Some(val) = self.cache.get(counter) { | ||
val.delta(counter, delta); | ||
}; | ||
let val = self.cache.get_with_by_ref(counter, || { | ||
if let Some(entry) = self.batcher.updates.get(counter) { | ||
entry.value().clone() | ||
} else { | ||
Arc::new(CachedCounterValue::load_from_authority_asap(counter, 0)) | ||
} | ||
}); | ||
val.delta(counter, delta); | ||
self.batcher.add(counter.clone(), val.clone()); | ||
} | ||
|
||
fn ttl_from_redis_ttl( | ||
|
@@ -209,7 +404,7 @@ mod tests { | |
values, | ||
); | ||
|
||
let cache = CountersCacheBuilder::new().build(); | ||
let cache = CountersCacheBuilder::new().build(Duration::default()); | ||
cache.insert( | ||
counter.clone(), | ||
Some(10), | ||
|
@@ -236,7 +431,7 @@ mod tests { | |
values, | ||
); | ||
|
||
let cache = CountersCacheBuilder::new().build(); | ||
let cache = CountersCacheBuilder::new().build(Duration::default()); | ||
|
||
assert!(cache.get(&counter).is_none()); | ||
} | ||
|
@@ -258,7 +453,7 @@ mod tests { | |
values, | ||
); | ||
|
||
let cache = CountersCacheBuilder::new().build(); | ||
let cache = CountersCacheBuilder::new().build(Duration::default()); | ||
cache.insert( | ||
counter.clone(), | ||
Some(current_value), | ||
|
@@ -289,7 +484,7 @@ mod tests { | |
values, | ||
); | ||
|
||
let cache = CountersCacheBuilder::new().build(); | ||
let cache = CountersCacheBuilder::new().build(Duration::default()); | ||
cache.insert( | ||
counter.clone(), | ||
None, | ||
|
@@ -318,7 +513,7 @@ mod tests { | |
values, | ||
); | ||
|
||
let cache = CountersCacheBuilder::new().build(); | ||
let cache = CountersCacheBuilder::new().build(Duration::default()); | ||
cache.insert( | ||
counter.clone(), | ||
Some(current_val), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DashMap makes it much cleaner and easier to consume <3