Skip to content

Commit

Permalink
[wip] Updating cached counters
Browse files Browse the repository at this point in the history
  • Loading branch information
didierofrivia committed Apr 15, 2024
1 parent 518c92c commit 7581f14
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 49 deletions.
21 changes: 3 additions & 18 deletions limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl AsyncRedisStorage {
pub(crate) async fn update_counters(
&self,
counters_and_deltas: HashMap<Counter, AtomicExpiringValue>,
) -> Result<HashMap<Counter, AtomicExpiringValue>, StorageErr> {
) -> Result<Vec<(String, i64)>, StorageErr> {
let mut con = self.conn_manager.clone();
let span = trace_span!("datastore");

Expand All @@ -256,27 +256,12 @@ impl AsyncRedisStorage {
}
}

let script_res: Vec<Option<(String, i64)>> = script_invocation
let script_res: Vec<Vec<(String, i64)>> = script_invocation
.invoke_async::<_, _>(&mut con)
.instrument(span)
.await?;

let counter_value_map: HashMap<Counter, AtomicExpiringValue> = script_res
.iter()
.filter_map(|counter_value| match counter_value {
Some((raw_counter_key, val)) => {
let counter = partial_counter_from_counter_key(raw_counter_key);
let seconds = counter.seconds();
Some((
counter,
AtomicExpiringValue::new(*val, now + Duration::from_secs(seconds)),
))
}
None => None,
})
.collect();

Ok(counter_value_map)
Ok(script_res.into_iter().flatten().collect())
}
}

Expand Down
82 changes: 51 additions & 31 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use tracing::{error, warn};
// multiple times when it is not cached.

pub struct CachedRedisStorage {
cached_counters: CountersCache,
cached_counters: Arc<CountersCache>,
batcher_counter_updates: Arc<Mutex<HashMap<Counter, AtomicExpiringValue>>>,
async_redis_storage: AsyncRedisStorage,
redis_conn_manager: ConnectionManager,
Expand Down Expand Up @@ -226,6 +226,15 @@ impl CachedRedisStorage {
)
.await?;

let cached_counters = CountersCacheBuilder::new()
.max_cached_counters(max_cached_counters)
.max_ttl_cached_counter(ttl_cached_counters)
.ttl_ratio_cached_counter(ttl_ratio_cached_counters)
.build();

let cacher = Arc::new(cached_counters);
let cacher_clone = cacher.clone();

let partitioned = Arc::new(AtomicBool::new(false));
let async_redis_storage =
AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone());
Expand All @@ -238,42 +247,16 @@ impl CachedRedisStorage {
let mut interval = tokio::time::interval(flushing_period);
tokio::spawn(async move {
loop {
if p.load(Ordering::Acquire) {
if storage.is_alive().await {
warn!("Partition to Redis resolved!");
p.store(false, Ordering::Release);
tokio::select! {
_ = interval.tick() => {
flush_batcher_and_update_counters(batcher_flusher.clone(), storage.clone(), cacher_clone.clone(), p.clone()).await;
}
} else {
let counters = {
let mut batch = batcher_flusher.lock().unwrap();
std::mem::take(&mut *batch)
};

let _updated_counters = storage
.update_counters(counters)
.await
.or_else(|err| {
if err.is_transient() {
p.store(true, Ordering::Release);
Ok(HashMap::default())
} else {
Err(err)
}
})
.expect("Unrecoverable Redis error!");
}
interval.tick().await;
}
});

let cached_counters = CountersCacheBuilder::new()
.max_cached_counters(max_cached_counters)
.max_ttl_cached_counter(ttl_cached_counters)
.ttl_ratio_cached_counter(ttl_ratio_cached_counters)
.build();

Ok(Self {
cached_counters,
cached_counters: cacher,
batcher_counter_updates: batcher,
redis_conn_manager,
async_redis_storage,
Expand Down Expand Up @@ -397,6 +380,43 @@ impl CachedRedisStorageBuilder {
}
}

async fn flush_batcher_and_update_counters(
batcher: Arc<Mutex<HashMap<Counter, AtomicExpiringValue>>>,
storage: AsyncRedisStorage,
cached_counters: Arc<CountersCache>,
partitioned: Arc<AtomicBool>,
) {
if partitioned.load(Ordering::Acquire) {
if storage.is_alive().await {
warn!("Partition to Redis resolved!");
partitioned.store(false, Ordering::Release);
}
} else {
let counters = {
let mut batch = batcher.lock().unwrap();
std::mem::take(&mut *batch)
};

let updated_counters = storage
.update_counters(counters)
.await
.or_else(|err| {
if err.is_transient() {
partitioned.store(true, Ordering::Release);
Ok(Vec::new())
} else {
Err(err)
}
})
.expect("Unrecoverable Redis error!");

for (counter_key, value) in updated_counters {
let counter = partial_counter_from_counter_key(&counter_key);
cached_counters.increase_by(&counter, value);
}
}
}

#[cfg(test)]
mod tests {
use crate::storage::redis::CachedRedisStorage;
Expand Down

0 comments on commit 7581f14

Please sign in to comment.