Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More write behind #303

Merged
merged 5 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 126 additions & 127 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::select;
use tokio::sync::Notify;
use tokio::time::interval;

pub struct CachedCounterValue {
value: AtomicExpiringValue,
Expand All @@ -23,107 +22,6 @@ pub struct CachedCounterValue {
from_authority: AtomicBool,
}

pub struct Batcher {
updates: DashMap<Counter, Arc<CachedCounterValue>>,
notifier: Notify,
interval: Duration,
priority_flush: AtomicBool,
}

impl Batcher {
fn new(period: Duration) -> Self {
Self {
updates: Default::default(),
notifier: Default::default(),
interval: period,
priority_flush: AtomicBool::new(false),
}
}

pub fn is_empty(&self) -> bool {
self.updates.is_empty()
}

pub async fn consume<F, Fut, O>(&self, min: usize, consumer: F) -> O
where
F: FnOnce(HashMap<Counter, Arc<CachedCounterValue>>) -> Fut,
Fut: Future<Output = O>,
{
let mut interval = interval(self.interval);
let mut ready = self.updates.len() >= min;
loop {
if ready {
let mut batch = Vec::with_capacity(min);
for entry in &self.updates {
if entry.value().requires_fast_flush(&self.interval) {
batch.push(entry.key().clone());
if batch.len() == min {
break;
}
}
}
if let Some(remaining) = min.checked_sub(batch.len()) {
let take = self.updates.iter().take(remaining);
batch.append(&mut take.map(|e| e.key().clone()).collect());
}
let mut result = HashMap::new();
for counter in &batch {
let value = self.updates.get(counter).unwrap().clone();
result.insert(counter.clone(), value);
}
let result = consumer(result).await;
for counter in &batch {
self.updates
.remove_if(counter, |_, v| v.no_pending_writes());
}
return result;
} else {
ready = select! {
_ = self.notifier.notified() => {
self.updates.len() >= min ||
self.priority_flush
.compare_exchange(true, false, Ordering::Release, Ordering::Acquire)
.is_ok()
},
_ = interval.tick() => true,
}
}
}
}

pub fn add(&self, counter: Counter, value: Arc<CachedCounterValue>) {
let priority = value.requires_fast_flush(&self.interval);
match self.updates.entry(counter.clone()) {
Entry::Occupied(needs_merge) => {
let arc = needs_merge.get();
if !Arc::ptr_eq(arc, &value) {
arc.delta(&counter, value.pending_writes().unwrap());
}
}
Entry::Vacant(miss) => {
miss.insert_entry(value);
}
};
if priority {
self.priority_flush.store(true, Ordering::Release);
}
self.notifier.notify_one();
}
}

impl Default for Batcher {
fn default() -> Self {
Self::new(Duration::from_millis(100))
}
}

pub struct CountersCache {
max_ttl_cached_counters: Duration,
pub ttl_ratio_cached_counters: u64,
cache: Cache<Counter, Arc<CachedCounterValue>>,
batcher: Batcher,
}

impl CachedCounterValue {
pub fn from_authority(counter: &Counter, value: i64, ttl: Duration) -> Self {
let now = SystemTime::now();
Expand Down Expand Up @@ -229,46 +127,107 @@ impl CachedCounterValue {
}
}

pub struct CountersCacheBuilder {
max_cached_counters: usize,
max_ttl_cached_counters: Duration,
ttl_ratio_cached_counters: u64,
pub struct Batcher {
updates: DashMap<Counter, Arc<CachedCounterValue>>,
notifier: Notify,
interval: Duration,
priority_flush: AtomicBool,
}

impl CountersCacheBuilder {
pub fn new() -> Self {
impl Batcher {
fn new(period: Duration) -> Self {
Self {
max_cached_counters: DEFAULT_MAX_CACHED_COUNTERS,
max_ttl_cached_counters: Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC),
ttl_ratio_cached_counters: DEFAULT_TTL_RATIO_CACHED_COUNTERS,
updates: Default::default(),
notifier: Default::default(),
interval: period,
priority_flush: AtomicBool::new(false),
}
}

pub fn max_cached_counters(mut self, max_cached_counters: usize) -> Self {
self.max_cached_counters = max_cached_counters;
self
pub fn add(&self, counter: Counter, value: Arc<CachedCounterValue>) {
let priority = value.requires_fast_flush(&self.interval);
match self.updates.entry(counter.clone()) {
Entry::Occupied(needs_merge) => {
let arc = needs_merge.get();
if !Arc::ptr_eq(arc, &value) {
arc.delta(&counter, value.pending_writes().unwrap());
}
}
Entry::Vacant(miss) => {
miss.insert_entry(value);
}
};
if priority {
self.priority_flush.store(true, Ordering::Release);
}
self.notifier.notify_one();
}

pub fn max_ttl_cached_counter(mut self, max_ttl_cached_counter: Duration) -> Self {
self.max_ttl_cached_counters = max_ttl_cached_counter;
self
pub async fn consume<F, Fut, O>(&self, max: usize, consumer: F) -> O
where
F: FnOnce(HashMap<Counter, Arc<CachedCounterValue>>) -> Fut,
Fut: Future<Output = O>,
{
let mut ready = self.batch_ready(max);
loop {
if ready {
let mut batch = Vec::with_capacity(max);
batch.extend(
self.updates
.iter()
.filter(|entry| entry.value().requires_fast_flush(&self.interval))
.take(max)
.map(|e| e.key().clone()),
);
if let Some(remaining) = max.checked_sub(batch.len()) {
batch.extend(self.updates.iter().take(remaining).map(|e| e.key().clone()));
}
let mut result = HashMap::new();
for counter in &batch {
let value = self.updates.get(counter).unwrap().clone();
result.insert(counter.clone(), value);
}
let result = consumer(result).await;
batch.iter().for_each(|counter| {
self.updates
.remove_if(counter, |_, v| v.no_pending_writes());
});
return result;
} else {
ready = select! {
_ = self.notifier.notified() => self.batch_ready(max),
_ = tokio::time::sleep(self.interval) => true,
}
}
}
}

pub fn ttl_ratio_cached_counter(mut self, ttl_ratio_cached_counter: u64) -> Self {
self.ttl_ratio_cached_counters = ttl_ratio_cached_counter;
self
pub fn is_empty(&self) -> bool {
self.updates.is_empty()
}

pub fn build(&self, period: Duration) -> CountersCache {
CountersCache {
max_ttl_cached_counters: self.max_ttl_cached_counters,
ttl_ratio_cached_counters: self.ttl_ratio_cached_counters,
cache: Cache::new(self.max_cached_counters as u64),
batcher: Batcher::new(period),
}
fn batch_ready(&self, size: usize) -> bool {
self.updates.len() >= size
|| self
.priority_flush
.compare_exchange(true, false, Ordering::Release, Ordering::Acquire)
.is_ok()
}
}

impl Default for Batcher {
fn default() -> Self {
Self::new(Duration::from_millis(100))
}
}

pub struct CountersCache {
max_ttl_cached_counters: Duration,
pub ttl_ratio_cached_counters: u64,
cache: Cache<Counter, Arc<CachedCounterValue>>,
batcher: Batcher,
}

impl CountersCache {
pub fn get(&self, counter: &Counter) -> Option<Arc<CachedCounterValue>> {
let option = self.cache.get(counter);
Expand Down Expand Up @@ -383,6 +342,46 @@ impl CountersCache {
}
}

pub struct CountersCacheBuilder {
max_cached_counters: usize,
max_ttl_cached_counters: Duration,
ttl_ratio_cached_counters: u64,
}

impl CountersCacheBuilder {
pub fn new() -> Self {
Self {
max_cached_counters: DEFAULT_MAX_CACHED_COUNTERS,
max_ttl_cached_counters: Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC),
ttl_ratio_cached_counters: DEFAULT_TTL_RATIO_CACHED_COUNTERS,
}
}

pub fn max_cached_counters(mut self, max_cached_counters: usize) -> Self {
self.max_cached_counters = max_cached_counters;
self
}

pub fn max_ttl_cached_counter(mut self, max_ttl_cached_counter: Duration) -> Self {
self.max_ttl_cached_counters = max_ttl_cached_counter;
self
}

pub fn ttl_ratio_cached_counter(mut self, ttl_ratio_cached_counter: u64) -> Self {
self.ttl_ratio_cached_counters = ttl_ratio_cached_counter;
self
}

pub fn build(&self, period: Duration) -> CountersCache {
CountersCache {
max_ttl_cached_counters: self.max_ttl_cached_counters,
ttl_ratio_cached_counters: self.ttl_ratio_cached_counters,
cache: Cache::new(self.max_cached_counters as u64),
batcher: Batcher::new(period),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
8 changes: 6 additions & 2 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,11 @@ async fn update_counters<C: ConnectionLike>(
let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS);
let mut script_invocation = redis_script.prepare_invoke();

let mut res: Vec<(Counter, i64, i64)> = Vec::new();
let mut res: Vec<(Counter, i64, i64)> = Vec::with_capacity(counters_and_deltas.len());
if counters_and_deltas.is_empty() {
return Ok(res);
}

for (counter, delta) in counters_and_deltas {
let delta = delta.pending_writes().expect("State machine is wrong!");
if delta > 0 {
Expand Down Expand Up @@ -339,7 +343,7 @@ async fn flush_batcher_and_update_counters<C: ConnectionLike>(
} else {
let updated_counters = cached_counters
.batcher()
.consume(1, |counters| update_counters(&mut redis_conn, counters))
.consume(100, |counters| update_counters(&mut redis_conn, counters))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should've seen this in my previous review... mea culpa

.await
.or_else(|err| {
if err.is_transient() {
Expand Down
2 changes: 1 addition & 1 deletion limitador/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ mod test {
}

// We wait for the flushing period to pass so the counters are flushed in the cached storage
tokio::time::sleep(Duration::from_millis(3)).await;
tokio::time::sleep(Duration::from_millis(4)).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't fix this in my previous PR since I couldn't find an easy way to do so, we could tackle this later to avoid sleeping in tests


assert!(rate_limiter
.is_rate_limited(namespace, &get_values, 1)
Expand Down
Loading