diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 232ba1a5..dcc44569 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -137,17 +137,8 @@ impl Limiter { ) -> CachedRedisStorage { // TODO: Not all the options are configurable via ENV. Add them as needed. - let mut cached_redis_storage = CachedRedisStorageBuilder::new(redis_url); - - if cache_cfg.flushing_period < 0 { - cached_redis_storage = cached_redis_storage.flushing_period(None) - } else { - cached_redis_storage = cached_redis_storage.flushing_period(Some( - Duration::from_millis(cache_cfg.flushing_period as u64), - )) - } - - cached_redis_storage = cached_redis_storage + let cached_redis_storage = CachedRedisStorageBuilder::new(redis_url) + .flushing_period(Duration::from_millis(cache_cfg.flushing_period as u64)) .max_ttl_cached_counters(Duration::from_millis(cache_cfg.max_ttl)) .ttl_ratio_cached_counters(cache_cfg.ttl_ratio) .max_cached_counters(cache_cfg.max_counters) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index d8513ad6..2f28ab0f 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -43,7 +43,6 @@ pub struct CachedRedisStorage { batcher_counter_updates: Arc>>, async_redis_storage: AsyncRedisStorage, redis_conn_manager: ConnectionManager, - batching_is_enabled: bool, partitioned: Arc, } @@ -139,9 +138,7 @@ impl AsyncCounterStorage for CachedRedisStorage { counter_ttls_msecs[i], ttl_margin, ); - let remaining = counter.max_value() - - counter_vals[i].unwrap_or(0) - - delta; + let remaining = counter.max_value() - counter_vals[i].unwrap_or(0) - delta; if first_limited.is_none() && remaining < 0 { first_limited = Some(Authorization::Limited( counter.limit().name().map(|n| n.to_owned()), @@ -174,24 +171,9 @@ impl AsyncCounterStorage for CachedRedisStorage { } // Batch or update depending on configuration - if self.is_partitioned() || self.batching_is_enabled { - let mut batcher = self.batcher_counter_updates.lock().unwrap(); - for counter in counters.iter() { - Self::batch_counter(delta, &mut batcher, counter); - } - } else { - for counter in counters.iter() { - self.update_counter(counter, delta).await.or_else(|err| { - if err.is_transient() { - self.partitioned(true); - let mut batcher = self.batcher_counter_updates.lock().unwrap(); - Self::batch_counter(delta, &mut batcher, counter); - Ok(()) - } else { - Err(err) - } - })? - } + let mut batcher = self.batcher_counter_updates.lock().unwrap(); + for counter in counters.iter() { + Self::batch_counter(delta, &mut batcher, counter); } Ok(Authorization::Ok) @@ -217,7 +199,7 @@ impl CachedRedisStorage { pub async fn new(redis_url: &str) -> Result { Self::new_with_options( redis_url, - Some(Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC)), + Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC), DEFAULT_MAX_CACHED_COUNTERS, Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC), DEFAULT_TTL_RATIO_CACHED_COUNTERS, @@ -228,7 +210,7 @@ impl CachedRedisStorage { async fn new_with_options( redis_url: &str, - flushing_period: Option, + flushing_period: Duration, max_cached_counters: usize, ttl_cached_counters: Duration, ttl_ratio_cached_counters: u64, @@ -253,40 +235,38 @@ impl CachedRedisStorage { let storage = async_redis_storage.clone(); let batcher: Arc>> = Arc::new(Mutex::new(Default::default())); let p = Arc::clone(&partitioned); - if let Some(flushing_period) = flushing_period { - let batcher_flusher = batcher.clone(); - let mut interval = tokio::time::interval(flushing_period); - tokio::spawn(async move { - loop { - if p.load(Ordering::Acquire) { - if storage.is_alive().await { - warn!("Partition to Redis resolved!"); - p.store(false, Ordering::Release); - } - } else { - let counters = { - let mut batch = batcher_flusher.lock().unwrap(); - std::mem::take(&mut *batch) - }; - for (counter, delta) in counters { - storage - .update_counter(&counter, delta) - .await - .or_else(|err| { - if err.is_transient() { - p.store(true, Ordering::Release); - Ok(()) - } else { - Err(err) - } - }) - .expect("Unrecoverable Redis error!"); - } + let batcher_flusher = batcher.clone(); + let mut interval = tokio::time::interval(flushing_period); + tokio::spawn(async move { + loop { + if p.load(Ordering::Acquire) { + if storage.is_alive().await { + warn!("Partition to Redis resolved!"); + p.store(false, Ordering::Release); + } + } else { + let counters = { + let mut batch = batcher_flusher.lock().unwrap(); + std::mem::take(&mut *batch) + }; + for (counter, delta) in counters { + storage + .update_counter(&counter, delta) + .await + .or_else(|err| { + if err.is_transient() { + p.store(true, Ordering::Release); + Ok(()) + } else { + Err(err) + } + }) + .expect("Unrecoverable Redis error!"); } - interval.tick().await; } - }); - } + interval.tick().await; + } + }); let cached_counters = CountersCacheBuilder::new() .max_cached_counters(max_cached_counters) @@ -299,7 +279,6 @@ impl CachedRedisStorage { batcher_counter_updates: batcher, redis_conn_manager, async_redis_storage, - batching_is_enabled: flushing_period.is_some(), partitioned, }) } @@ -385,7 +364,7 @@ impl CachedRedisStorage { pub struct CachedRedisStorageBuilder { redis_url: String, - flushing_period: Option, + flushing_period: Duration, max_cached_counters: usize, max_ttl_cached_counters: Duration, ttl_ratio_cached_counters: u64, @@ -396,7 +375,7 @@ impl CachedRedisStorageBuilder { pub fn new(redis_url: &str) -> Self { Self { redis_url: redis_url.to_string(), - flushing_period: Some(Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC)), + flushing_period: Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC), max_cached_counters: DEFAULT_MAX_CACHED_COUNTERS, max_ttl_cached_counters: Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC), ttl_ratio_cached_counters: DEFAULT_TTL_RATIO_CACHED_COUNTERS, @@ -404,7 +383,7 @@ impl CachedRedisStorageBuilder { } } - pub fn flushing_period(mut self, flushing_period: Option) -> Self { + pub fn flushing_period(mut self, flushing_period: Duration) -> Self { self.flushing_period = flushing_period; self } diff --git a/limitador/tests/integration_tests.rs b/limitador/tests/integration_tests.rs index a6ccf1a5..5cdb6b88 100644 --- a/limitador/tests/integration_tests.rs +++ b/limitador/tests/integration_tests.rs @@ -59,7 +59,7 @@ macro_rules! test_with_all_storage_impls { #[serial] async fn [<$function _with_async_redis_and_local_cache>]() { let storage_builder = CachedRedisStorageBuilder::new("redis://127.0.0.1:6379"). - flushing_period(Some(Duration::from_millis(200))). + flushing_period(Duration::from_millis(200)). max_ttl_cached_counters(Duration::from_secs(3600)). ttl_ratio_cached_counters(1). max_cached_counters(10000);