diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index 5033a9a2..9eb17af7 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -8,7 +8,7 @@ use ttl_cache::TtlCache; pub struct CountersCache { max_ttl_cached_counters: Duration, - ttl_ratio_cached_counters: u64, + pub ttl_ratio_cached_counters: u64, cache: TtlCache, } diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index bb94e933..ee788de8 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -242,6 +242,14 @@ impl AsyncRedisStorage { Self { conn_manager } } + pub async fn is_alive(&self) -> bool { + self.conn_manager + .clone() + .incr::<&str, i32, u64>("LIMITADOR_LIVE_CHECK", 1) + .await + .is_ok() + } + async fn delete_counters_associated_with_limit(&self, limit: &Limit) -> Result<(), StorageErr> { let mut con = self.conn_manager.clone(); diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index ef20afe8..ff5c4793 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -15,7 +15,8 @@ use redis::aio::ConnectionManager; use redis::{ConnectionInfo, RedisError}; use std::collections::{HashMap, HashSet}; use std::str::FromStr; -use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex, MutexGuard}; use std::time::{Duration, Instant}; // This is just a first version. @@ -42,6 +43,7 @@ pub struct CachedRedisStorage { async_redis_storage: AsyncRedisStorage, redis_conn_manager: ConnectionManager, batching_is_enabled: bool, + partitioned: Arc, } #[async_trait] @@ -107,8 +109,18 @@ impl AsyncCounterStorage for CachedRedisStorage { if !not_cached.is_empty() { let time_start_get_ttl = Instant::now(); - // todo needs to be resilient - let (counter_vals, counter_ttls_msecs) = self.values_with_ttls(¬_cached).await?; + let (counter_vals, counter_ttls_msecs) = if self.is_partitioned() { + self.fallback_vals_ttls(¬_cached) + } else { + self.values_with_ttls(¬_cached).await.or_else(|err| { + if err.is_transient() { + self.partitioned(true); + Ok(self.fallback_vals_ttls(¬_cached)) + } else { + Err(err) + } + })? + }; // Some time could have passed from the moment we got the TTL from Redis. // This margin is not exact, because we don't know exactly the @@ -153,22 +165,23 @@ impl AsyncCounterStorage for CachedRedisStorage { } // Batch or update depending on configuration - if self.batching_is_enabled { + if self.is_partitioned() || self.batching_is_enabled { let mut batcher = self.batcher_counter_updates.lock().unwrap(); for counter in counters.iter() { - match batcher.get_mut(counter) { - Some(val) => { - *val += delta; - } - None => { - batcher.insert(counter.clone(), delta); - } - } + Self::batch_counter(delta, &mut batcher, counter); } } else { for counter in counters.iter() { - // todo needs to be resilient - self.update_counter(counter, delta).await? + self.update_counter(counter, delta).await.or_else(|err| { + if err.is_transient() { + self.partitioned(true); + let mut batcher = self.batcher_counter_updates.lock().unwrap(); + Self::batch_counter(delta, &mut batcher, counter); + Ok(()) + } else { + Err(err) + } + })? } } @@ -217,22 +230,41 @@ impl CachedRedisStorage { ) .await?; + let partitioned = Arc::new(AtomicBool::new(false)); let async_redis_storage = AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone()); let storage = async_redis_storage.clone(); - let batcher = Arc::new(Mutex::new(Default::default())); + let batcher: Arc>> = Arc::new(Mutex::new(Default::default())); + let p = Arc::clone(&partitioned); if let Some(flushing_period) = flushing_period { let batcher_flusher = batcher.clone(); let mut interval = tokio::time::interval(flushing_period); tokio::spawn(async move { loop { - let counters = { - let mut batch = batcher_flusher.lock().unwrap(); - std::mem::take(&mut *batch) - }; - for (counter, delta) in counters { - storage.update_counter(&counter, delta).await.unwrap(); + if p.load(Ordering::Acquire) { + if storage.is_alive().await { + p.store(false, Ordering::Release); + } + } else { + let counters = { + let mut batch = batcher_flusher.lock().unwrap(); + std::mem::take(&mut *batch) + }; + for (counter, delta) in counters { + storage + .update_counter(&counter, delta) + .await + .or_else(|err| { + if err.is_transient() { + p.store(true, Ordering::Release); + Ok(()) + } else { + Err(err) + } + }) + .expect("Unrecoverable Redis error!"); + } } interval.tick().await; } @@ -251,9 +283,40 @@ impl CachedRedisStorage { redis_conn_manager, async_redis_storage, batching_is_enabled: flushing_period.is_some(), + partitioned, }) } + fn is_partitioned(&self) -> bool { + self.partitioned.load(Ordering::Acquire) + } + + fn partitioned(&self, partition: bool) -> bool { + if partition { + println!("We are partitioned!"); + } + self.partitioned + .compare_exchange(!partition, partition, Ordering::Release, Ordering::Acquire) + .is_ok() + } + + fn fallback_vals_ttls(&self, counters: &Vec<&mut Counter>) -> (Vec>, Vec) { + let mut vals = Vec::with_capacity(counters.len()); + let mut ttls = Vec::with_capacity(counters.len()); + for counter in counters { + vals.push(Some(0i64)); + ttls.push( + (counter.limit().seconds() + / self + .cached_counters + .lock() + .unwrap() + .ttl_ratio_cached_counters) as i64, + ); + } + (vals, ttls) + } + async fn values_with_ttls( &self, counters: &[&mut Counter], @@ -286,6 +349,21 @@ impl CachedRedisStorage { Ok((counter_vals, counter_ttls_msecs)) } + + fn batch_counter( + delta: i64, + batcher: &mut MutexGuard>, + counter: &Counter, + ) { + match batcher.get_mut(counter) { + Some(val) => { + *val += delta; + } + None => { + batcher.insert(counter.clone(), delta); + } + } + } } pub struct CachedRedisStorageBuilder {