From b714f8ebb58757846102dd77cec197a980df586e Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Thu, 21 Mar 2024 11:00:26 -0400 Subject: [PATCH 1/4] Removing nested lock --- limitador/src/storage/redis/batcher.rs | 22 +++++++++------------ limitador/src/storage/redis/redis_cached.rs | 2 +- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/limitador/src/storage/redis/batcher.rs b/limitador/src/storage/redis/batcher.rs index e5496334..4b23a0f7 100644 --- a/limitador/src/storage/redis/batcher.rs +++ b/limitador/src/storage/redis/batcher.rs @@ -2,43 +2,39 @@ use crate::counter::Counter; use crate::storage::redis::AsyncRedisStorage; use crate::storage::AsyncCounterStorage; use std::collections::HashMap; -use tokio::sync::Mutex; pub struct Batcher { - accumulated_counter_updates: Mutex>, + accumulated_counter_updates: HashMap, redis_storage: AsyncRedisStorage, } impl Batcher { pub fn new(redis_storage: AsyncRedisStorage) -> Self { Self { - accumulated_counter_updates: Mutex::new(HashMap::new()), + accumulated_counter_updates: HashMap::new(), redis_storage, } } - pub async fn add_counter(&self, counter: &Counter, delta: i64) { - let mut accumulated_counter_updates = self.accumulated_counter_updates.lock().await; - - match accumulated_counter_updates.get_mut(counter) { + pub async fn add_counter(&mut self, counter: &Counter, delta: i64) { + match self.accumulated_counter_updates.get_mut(counter) { Some(val) => { *val += delta; } None => { - accumulated_counter_updates.insert(counter.clone(), delta); + self.accumulated_counter_updates + .insert(counter.clone(), delta); } } } - pub async fn flush(&self) { - let mut accumulated_counter_updates = self.accumulated_counter_updates.lock().await; - - for (counter, delta) in accumulated_counter_updates.iter() { + pub async fn flush(&mut self) { + for (counter, delta) in self.accumulated_counter_updates.iter() { self.redis_storage .update_counter(counter, *delta) .await .unwrap(); } - accumulated_counter_updates.clear(); + self.accumulated_counter_updates.clear(); } } diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 2f6e1fc1..95a9f341 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -158,7 +158,7 @@ impl AsyncCounterStorage for CachedRedisStorage { // Batch or update depending on configuration if self.batching_is_enabled { - let batcher = self.batcher_counter_updates.lock().await; + let mut batcher = self.batcher_counter_updates.lock().await; for counter in counters.iter() { batcher.add_counter(counter, delta).await } From 2af9d7fb0ad09358a5552409f05badcdff8126fb Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Thu, 21 Mar 2024 11:18:21 -0400 Subject: [PATCH 2/4] Avoid keeping the lock when writing back to the redis storage --- limitador/src/storage/redis/batcher.rs | 19 +++++----------- limitador/src/storage/redis/redis_cached.rs | 24 +++++++++++++-------- 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/limitador/src/storage/redis/batcher.rs b/limitador/src/storage/redis/batcher.rs index 4b23a0f7..81232ebb 100644 --- a/limitador/src/storage/redis/batcher.rs +++ b/limitador/src/storage/redis/batcher.rs @@ -1,22 +1,19 @@ use crate::counter::Counter; -use crate::storage::redis::AsyncRedisStorage; -use crate::storage::AsyncCounterStorage; use std::collections::HashMap; +use std::mem; pub struct Batcher { accumulated_counter_updates: HashMap, - redis_storage: AsyncRedisStorage, } impl Batcher { - pub fn new(redis_storage: AsyncRedisStorage) -> Self { + pub fn new() -> Self { Self { accumulated_counter_updates: HashMap::new(), - redis_storage, } } - pub async fn add_counter(&mut self, counter: &Counter, delta: i64) { + pub fn add_counter(&mut self, counter: &Counter, delta: i64) { match self.accumulated_counter_updates.get_mut(counter) { Some(val) => { *val += delta; @@ -28,13 +25,7 @@ impl Batcher { } } - pub async fn flush(&mut self) { - for (counter, delta) in self.accumulated_counter_updates.iter() { - self.redis_storage - .update_counter(counter, *delta) - .await - .unwrap(); - } - self.accumulated_counter_updates.clear(); + pub fn flush(&mut self) -> HashMap { + mem::take(&mut self.accumulated_counter_updates) } } diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 95a9f341..68702936 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -16,9 +16,8 @@ use redis::aio::ConnectionManager; use redis::{ConnectionInfo, RedisError}; use std::collections::HashSet; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use tokio::sync::Mutex; // This is just a first version. // @@ -81,7 +80,7 @@ impl AsyncCounterStorage for CachedRedisStorage { // Check cached counters { - let cached_counters = self.cached_counters.lock().await; + let cached_counters = self.cached_counters.lock().unwrap(); for counter in counters.iter_mut() { match cached_counters.get(counter) { Some(val) => { @@ -122,7 +121,7 @@ impl AsyncCounterStorage for CachedRedisStorage { Duration::from_millis((Instant::now() - time_start_get_ttl).as_millis() as u64); { - let mut cached_counters = self.cached_counters.lock().await; + let mut cached_counters = self.cached_counters.lock().unwrap(); for (i, counter) in not_cached.iter_mut().enumerate() { cached_counters.insert( counter.clone(), @@ -150,7 +149,7 @@ impl AsyncCounterStorage for CachedRedisStorage { // Update cached values { - let mut cached_counters = self.cached_counters.lock().await; + let mut cached_counters = self.cached_counters.lock().unwrap(); for counter in counters.iter() { cached_counters.decrease_by(counter, delta); } @@ -158,9 +157,9 @@ impl AsyncCounterStorage for CachedRedisStorage { // Batch or update depending on configuration if self.batching_is_enabled { - let mut batcher = self.batcher_counter_updates.lock().await; + let mut batcher = self.batcher_counter_updates.lock().unwrap(); for counter in counters.iter() { - batcher.add_counter(counter, delta).await + batcher.add_counter(counter, delta) } } else { for counter in counters.iter() { @@ -216,13 +215,20 @@ impl CachedRedisStorage { let async_redis_storage = AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone()); - let batcher = Arc::new(Mutex::new(Batcher::new(async_redis_storage.clone()))); + let storage = async_redis_storage.clone(); + let batcher = Arc::new(Mutex::new(Batcher::new())); if let Some(flushing_period) = flushing_period { let batcher_flusher = batcher.clone(); tokio::spawn(async move { loop { let time_start = Instant::now(); - batcher_flusher.lock().await.flush().await; + let counters = { + let mut batch = batcher_flusher.lock().unwrap(); + batch.flush() + }; + for (counter, delta) in counters { + storage.update_counter(&counter, delta).await.unwrap(); + } let sleep_time = flushing_period .checked_sub(time_start.elapsed()) .unwrap_or_else(|| Duration::from_secs(0)); From 57d8f81e92c333f1e6573a62f19397dd6a61ab60 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Thu, 21 Mar 2024 11:28:23 -0400 Subject: [PATCH 3/4] Inline Batcher --- limitador/src/storage/redis/batcher.rs | 31 --------------------- limitador/src/storage/redis/mod.rs | 1 - limitador/src/storage/redis/redis_cached.rs | 18 ++++++++---- 3 files changed, 12 insertions(+), 38 deletions(-) delete mode 100644 limitador/src/storage/redis/batcher.rs diff --git a/limitador/src/storage/redis/batcher.rs b/limitador/src/storage/redis/batcher.rs deleted file mode 100644 index 81232ebb..00000000 --- a/limitador/src/storage/redis/batcher.rs +++ /dev/null @@ -1,31 +0,0 @@ -use crate::counter::Counter; -use std::collections::HashMap; -use std::mem; - -pub struct Batcher { - accumulated_counter_updates: HashMap, -} - -impl Batcher { - pub fn new() -> Self { - Self { - accumulated_counter_updates: HashMap::new(), - } - } - - pub fn add_counter(&mut self, counter: &Counter, delta: i64) { - match self.accumulated_counter_updates.get_mut(counter) { - Some(val) => { - *val += delta; - } - None => { - self.accumulated_counter_updates - .insert(counter.clone(), delta); - } - } - } - - pub fn flush(&mut self) -> HashMap { - mem::take(&mut self.accumulated_counter_updates) - } -} diff --git a/limitador/src/storage/redis/mod.rs b/limitador/src/storage/redis/mod.rs index 1945e3e6..1eec4626 100644 --- a/limitador/src/storage/redis/mod.rs +++ b/limitador/src/storage/redis/mod.rs @@ -1,7 +1,6 @@ use ::redis::RedisError; use std::time::Duration; -mod batcher; mod counters_cache; mod redis_async; mod redis_cached; diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 68702936..d3e09191 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -2,7 +2,6 @@ use crate::counter::Counter; use crate::limit::Limit; use crate::prometheus_metrics::CounterAccess; use crate::storage::keys::*; -use crate::storage::redis::batcher::Batcher; use crate::storage::redis::counters_cache::{CountersCache, CountersCacheBuilder}; use crate::storage::redis::redis_async::AsyncRedisStorage; use crate::storage::redis::scripts::VALUES_AND_TTLS; @@ -14,7 +13,7 @@ use crate::storage::{AsyncCounterStorage, Authorization, StorageErr}; use async_trait::async_trait; use redis::aio::ConnectionManager; use redis::{ConnectionInfo, RedisError}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -39,7 +38,7 @@ use std::time::{Duration, Instant}; pub struct CachedRedisStorage { cached_counters: Mutex, - batcher_counter_updates: Arc>, + batcher_counter_updates: Arc>>, async_redis_storage: AsyncRedisStorage, redis_conn_manager: ConnectionManager, batching_is_enabled: bool, @@ -159,7 +158,14 @@ impl AsyncCounterStorage for CachedRedisStorage { if self.batching_is_enabled { let mut batcher = self.batcher_counter_updates.lock().unwrap(); for counter in counters.iter() { - batcher.add_counter(counter, delta) + match batcher.get_mut(counter) { + Some(val) => { + *val += delta; + } + None => { + batcher.insert(counter.clone(), delta); + } + } } } else { for counter in counters.iter() { @@ -216,7 +222,7 @@ impl CachedRedisStorage { AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone()); let storage = async_redis_storage.clone(); - let batcher = Arc::new(Mutex::new(Batcher::new())); + let batcher = Arc::new(Mutex::new(Default::default())); if let Some(flushing_period) = flushing_period { let batcher_flusher = batcher.clone(); tokio::spawn(async move { @@ -224,7 +230,7 @@ impl CachedRedisStorage { let time_start = Instant::now(); let counters = { let mut batch = batcher_flusher.lock().unwrap(); - batch.flush() + std::mem::take(&mut *batch) }; for (counter, delta) in counters { storage.update_counter(&counter, delta).await.unwrap(); From eab1166c19f0d41032a5eba4342a12cf3da149cb Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Thu, 21 Mar 2024 11:32:54 -0400 Subject: [PATCH 4/4] Use interval instead of manual duration calculation --- limitador/src/storage/redis/redis_cached.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index d3e09191..fd543aee 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -225,9 +225,9 @@ impl CachedRedisStorage { let batcher = Arc::new(Mutex::new(Default::default())); if let Some(flushing_period) = flushing_period { let batcher_flusher = batcher.clone(); + let mut interval = tokio::time::interval(flushing_period); tokio::spawn(async move { loop { - let time_start = Instant::now(); let counters = { let mut batch = batcher_flusher.lock().unwrap(); std::mem::take(&mut *batch) @@ -235,10 +235,7 @@ impl CachedRedisStorage { for (counter, delta) in counters { storage.update_counter(&counter, delta).await.unwrap(); } - let sleep_time = flushing_period - .checked_sub(time_start.elapsed()) - .unwrap_or_else(|| Duration::from_secs(0)); - tokio::time::sleep(sleep_time).await; + interval.tick().await; } }); }