Skip to content

Commit

Permalink
Basic partition tolerance
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed Mar 25, 2024
1 parent b4fb98c commit 08a3be2
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 22 deletions.
2 changes: 1 addition & 1 deletion limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use ttl_cache::TtlCache;

pub struct CountersCache {
max_ttl_cached_counters: Duration,
ttl_ratio_cached_counters: u64,
pub ttl_ratio_cached_counters: u64,
cache: TtlCache<Counter, i64>,
}

Expand Down
8 changes: 8 additions & 0 deletions limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,14 @@ impl AsyncRedisStorage {
Self { conn_manager }
}

pub async fn is_alive(&self) -> bool {
self.conn_manager
.clone()
.incr::<&str, i32, u64>("LIMITADOR_LIVE_CHECK", 1)
.await
.is_ok()
}

async fn delete_counters_associated_with_limit(&self, limit: &Limit) -> Result<(), StorageErr> {
let mut con = self.conn_manager.clone();

Expand Down
120 changes: 99 additions & 21 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use redis::aio::ConnectionManager;
use redis::{ConnectionInfo, RedisError};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::{Duration, Instant};

// This is just a first version.
Expand All @@ -42,6 +43,7 @@ pub struct CachedRedisStorage {
async_redis_storage: AsyncRedisStorage,
redis_conn_manager: ConnectionManager,
batching_is_enabled: bool,
partitioned: Arc<AtomicBool>,
}

#[async_trait]
Expand Down Expand Up @@ -107,8 +109,18 @@ impl AsyncCounterStorage for CachedRedisStorage {
if !not_cached.is_empty() {
let time_start_get_ttl = Instant::now();

// todo needs to be resilient
let (counter_vals, counter_ttls_msecs) = self.values_with_ttls(&not_cached).await?;
let (counter_vals, counter_ttls_msecs) = if self.is_partitioned() {
self.fallback_vals_ttls(&not_cached)
} else {
self.values_with_ttls(&not_cached).await.or_else(|err| {
if err.is_transient() {
self.partitioned(true);
Ok(self.fallback_vals_ttls(&not_cached))
} else {
Err(err)
}
})?
};

// Some time could have passed from the moment we got the TTL from Redis.
// This margin is not exact, because we don't know exactly the
Expand Down Expand Up @@ -153,22 +165,23 @@ impl AsyncCounterStorage for CachedRedisStorage {
}

// Batch or update depending on configuration
if self.batching_is_enabled {
if self.is_partitioned() || self.batching_is_enabled {
let mut batcher = self.batcher_counter_updates.lock().unwrap();
for counter in counters.iter() {
match batcher.get_mut(counter) {
Some(val) => {
*val += delta;
}
None => {
batcher.insert(counter.clone(), delta);
}
}
Self::batch_counter(delta, &mut batcher, counter);
}
} else {
for counter in counters.iter() {
// todo needs to be resilient
self.update_counter(counter, delta).await?
self.update_counter(counter, delta).await.or_else(|err| {
if err.is_transient() {
self.partitioned(true);
let mut batcher = self.batcher_counter_updates.lock().unwrap();
Self::batch_counter(delta, &mut batcher, counter);
Ok(())
} else {
Err(err)
}
})?
}
}

Expand Down Expand Up @@ -217,22 +230,41 @@ impl CachedRedisStorage {
)
.await?;

let partitioned = Arc::new(AtomicBool::new(false));
let async_redis_storage =
AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone());

let storage = async_redis_storage.clone();
let batcher = Arc::new(Mutex::new(Default::default()));
let batcher: Arc<Mutex<HashMap<Counter, i64>>> = Arc::new(Mutex::new(Default::default()));
let p = Arc::clone(&partitioned);
if let Some(flushing_period) = flushing_period {
let batcher_flusher = batcher.clone();
let mut interval = tokio::time::interval(flushing_period);
tokio::spawn(async move {
loop {
let counters = {
let mut batch = batcher_flusher.lock().unwrap();
std::mem::take(&mut *batch)
};
for (counter, delta) in counters {
storage.update_counter(&counter, delta).await.unwrap();
if p.load(Ordering::Acquire) {
if storage.is_alive().await {
p.store(false, Ordering::Release);
}
} else {
let counters = {
let mut batch = batcher_flusher.lock().unwrap();
std::mem::take(&mut *batch)
};
for (counter, delta) in counters {
storage
.update_counter(&counter, delta)
.await
.or_else(|err| {
if err.is_transient() {
p.store(true, Ordering::Release);
Ok(())
} else {
Err(err)
}
})
.expect("Unrecoverable Redis error!");
}
}
interval.tick().await;
}
Expand All @@ -251,9 +283,40 @@ impl CachedRedisStorage {
redis_conn_manager,
async_redis_storage,
batching_is_enabled: flushing_period.is_some(),
partitioned,
})
}

fn is_partitioned(&self) -> bool {
self.partitioned.load(Ordering::Acquire)
}

fn partitioned(&self, partition: bool) -> bool {
if partition {
println!("We are partitioned!");
}
self.partitioned
.compare_exchange(!partition, partition, Ordering::Release, Ordering::Acquire)
.is_ok()
}

fn fallback_vals_ttls(&self, counters: &Vec<&mut Counter>) -> (Vec<Option<i64>>, Vec<i64>) {
let mut vals = Vec::with_capacity(counters.len());
let mut ttls = Vec::with_capacity(counters.len());
for counter in counters {
vals.push(Some(0i64));
ttls.push(
(counter.limit().seconds()
/ self
.cached_counters
.lock()
.unwrap()
.ttl_ratio_cached_counters) as i64,
);
}
(vals, ttls)
}

async fn values_with_ttls(
&self,
counters: &[&mut Counter],
Expand Down Expand Up @@ -286,6 +349,21 @@ impl CachedRedisStorage {

Ok((counter_vals, counter_ttls_msecs))
}

fn batch_counter(
delta: i64,
batcher: &mut MutexGuard<HashMap<Counter, i64>>,
counter: &Counter,
) {
match batcher.get_mut(counter) {
Some(val) => {
*val += delta;
}
None => {
batcher.insert(counter.clone(), delta);
}
}
}
}

pub struct CachedRedisStorageBuilder {
Expand Down

0 comments on commit 08a3be2

Please sign in to comment.