Skip to content

Commit

Permalink
Inline Batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed Mar 25, 2024
1 parent 2af9d7f commit 57d8f81
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 38 deletions.
31 changes: 0 additions & 31 deletions limitador/src/storage/redis/batcher.rs

This file was deleted.

1 change: 0 additions & 1 deletion limitador/src/storage/redis/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use ::redis::RedisError;
use std::time::Duration;

mod batcher;
mod counters_cache;
mod redis_async;
mod redis_cached;
Expand Down
18 changes: 12 additions & 6 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::counter::Counter;
use crate::limit::Limit;
use crate::prometheus_metrics::CounterAccess;
use crate::storage::keys::*;
use crate::storage::redis::batcher::Batcher;
use crate::storage::redis::counters_cache::{CountersCache, CountersCacheBuilder};
use crate::storage::redis::redis_async::AsyncRedisStorage;
use crate::storage::redis::scripts::VALUES_AND_TTLS;
Expand All @@ -14,7 +13,7 @@ use crate::storage::{AsyncCounterStorage, Authorization, StorageErr};
use async_trait::async_trait;
use redis::aio::ConnectionManager;
use redis::{ConnectionInfo, RedisError};
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
Expand All @@ -39,7 +38,7 @@ use std::time::{Duration, Instant};

pub struct CachedRedisStorage {
cached_counters: Mutex<CountersCache>,
batcher_counter_updates: Arc<Mutex<Batcher>>,
batcher_counter_updates: Arc<Mutex<HashMap<Counter, i64>>>,
async_redis_storage: AsyncRedisStorage,
redis_conn_manager: ConnectionManager,
batching_is_enabled: bool,
Expand Down Expand Up @@ -159,7 +158,14 @@ impl AsyncCounterStorage for CachedRedisStorage {
if self.batching_is_enabled {
let mut batcher = self.batcher_counter_updates.lock().unwrap();
for counter in counters.iter() {
batcher.add_counter(counter, delta)
match batcher.get_mut(counter) {
Some(val) => {
*val += delta;
}
None => {
batcher.insert(counter.clone(), delta);
}
}
}
} else {
for counter in counters.iter() {
Expand Down Expand Up @@ -216,15 +222,15 @@ impl CachedRedisStorage {
AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone());

let storage = async_redis_storage.clone();
let batcher = Arc::new(Mutex::new(Batcher::new()));
let batcher = Arc::new(Mutex::new(Default::default()));
if let Some(flushing_period) = flushing_period {
let batcher_flusher = batcher.clone();
tokio::spawn(async move {
loop {
let time_start = Instant::now();
let counters = {
let mut batch = batcher_flusher.lock().unwrap();
batch.flush()
std::mem::take(&mut *batch)
};
for (counter, delta) in counters {
storage.update_counter(&counter, delta).await.unwrap();
Expand Down

0 comments on commit 57d8f81

Please sign in to comment.