Skip to content

Commit

Permalink
Refactor the way we deal with partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed Apr 22, 2024
1 parent 9d5eed7 commit 5596153
Showing 1 changed file with 35 additions and 21 deletions.
56 changes: 35 additions & 21 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,28 +238,42 @@ impl CachedRedisStorage {
AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone());
let batcher: Arc<Mutex<HashMap<Counter, AtomicExpiringValue>>> =
Arc::new(Mutex::new(Default::default()));

{
let storage = async_redis_storage.clone();
let counters_cache_clone = counters_cache.clone();
let conn = redis_conn_manager.clone();
let p = Arc::clone(&partitioned);
let batcher_flusher = batcher.clone();
let mut interval = tokio::time::interval(flushing_period);
tokio::spawn(async move {
loop {
flush_batcher_and_update_counters(
conn.clone(),
batcher_flusher.clone(),
storage.is_alive().await,
counters_cache_clone.clone(),
p.clone(),
)
.await;
interval.tick().await;
let p = Arc::clone(&partitioned);
let batcher_flusher = batcher.clone();
let mut interval = tokio::time::interval(flushing_period);
tokio::spawn(async move {
loop {
if p.load(Ordering::Acquire) {
if storage.is_alive().await {
warn!("Partition to Redis resolved!");
p.store(false, Ordering::Release);
}
} else {
let counters = {
let mut batch = batcher_flusher.lock().unwrap();
std::mem::take(&mut *batch)
};
let now = SystemTime::now();
for (counter, delta) in counters {
let delta = delta.value_at(now);
if delta > 0 {
storage
.update_counter(&counter, delta)
.await
.or_else(|err| if err.is_transient() { Ok(()) } else { Err(err) })
.expect("Unrecoverable Redis error!");
}
}
}
});
}
interval.tick().await;
}
});

let cached_counters = CountersCacheBuilder::new()
.max_cached_counters(max_cached_counters)
.max_ttl_cached_counter(ttl_cached_counters)
.ttl_ratio_cached_counter(ttl_ratio_cached_counters)
.build();

Ok(Self {
cached_counters: counters_cache,
Expand Down

0 comments on commit 5596153

Please sign in to comment.