Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write behind lock #276

Merged
merged 4 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 0 additions & 44 deletions limitador/src/storage/redis/batcher.rs

This file was deleted.

1 change: 0 additions & 1 deletion limitador/src/storage/redis/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use ::redis::RedisError;
use std::time::Duration;

mod batcher;
mod counters_cache;
mod redis_async;
mod redis_cached;
Expand Down
43 changes: 26 additions & 17 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::counter::Counter;
use crate::limit::Limit;
use crate::prometheus_metrics::CounterAccess;
use crate::storage::keys::*;
use crate::storage::redis::batcher::Batcher;
use crate::storage::redis::counters_cache::{CountersCache, CountersCacheBuilder};
use crate::storage::redis::redis_async::AsyncRedisStorage;
use crate::storage::redis::scripts::VALUES_AND_TTLS;
Expand All @@ -14,11 +13,10 @@ use crate::storage::{AsyncCounterStorage, Authorization, StorageErr};
use async_trait::async_trait;
use redis::aio::ConnectionManager;
use redis::{ConnectionInfo, RedisError};
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::sync::Mutex;

// This is just a first version.
//
Expand All @@ -40,7 +38,7 @@ use tokio::sync::Mutex;

pub struct CachedRedisStorage {
cached_counters: Mutex<CountersCache>,
batcher_counter_updates: Arc<Mutex<Batcher>>,
batcher_counter_updates: Arc<Mutex<HashMap<Counter, i64>>>,
async_redis_storage: AsyncRedisStorage,
redis_conn_manager: ConnectionManager,
batching_is_enabled: bool,
Expand Down Expand Up @@ -81,7 +79,7 @@ impl AsyncCounterStorage for CachedRedisStorage {

// Check cached counters
{
let cached_counters = self.cached_counters.lock().await;
let cached_counters = self.cached_counters.lock().unwrap();
for counter in counters.iter_mut() {
match cached_counters.get(counter) {
Some(val) => {
Expand Down Expand Up @@ -122,7 +120,7 @@ impl AsyncCounterStorage for CachedRedisStorage {
Duration::from_millis((Instant::now() - time_start_get_ttl).as_millis() as u64);

{
let mut cached_counters = self.cached_counters.lock().await;
let mut cached_counters = self.cached_counters.lock().unwrap();
for (i, counter) in not_cached.iter_mut().enumerate() {
cached_counters.insert(
counter.clone(),
Expand Down Expand Up @@ -150,17 +148,24 @@ impl AsyncCounterStorage for CachedRedisStorage {

// Update cached values
{
let mut cached_counters = self.cached_counters.lock().await;
let mut cached_counters = self.cached_counters.lock().unwrap();
for counter in counters.iter() {
cached_counters.decrease_by(counter, delta);
}
}

// Batch or update depending on configuration
if self.batching_is_enabled {
let batcher = self.batcher_counter_updates.lock().await;
let mut batcher = self.batcher_counter_updates.lock().unwrap();
for counter in counters.iter() {
batcher.add_counter(counter, delta).await
match batcher.get_mut(counter) {
Some(val) => {
*val += delta;
}
None => {
batcher.insert(counter.clone(), delta);
}
}
}
} else {
for counter in counters.iter() {
Expand Down Expand Up @@ -216,17 +221,21 @@ impl CachedRedisStorage {
let async_redis_storage =
AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone());

let batcher = Arc::new(Mutex::new(Batcher::new(async_redis_storage.clone())));
let storage = async_redis_storage.clone();
let batcher = Arc::new(Mutex::new(Default::default()));
if let Some(flushing_period) = flushing_period {
let batcher_flusher = batcher.clone();
let mut interval = tokio::time::interval(flushing_period);
tokio::spawn(async move {
loop {
let time_start = Instant::now();
batcher_flusher.lock().await.flush().await;
let sleep_time = flushing_period
.checked_sub(time_start.elapsed())
.unwrap_or_else(|| Duration::from_secs(0));
tokio::time::sleep(sleep_time).await;
let counters = {
let mut batch = batcher_flusher.lock().unwrap();
std::mem::take(&mut *batch)
};
for (counter, delta) in counters {
storage.update_counter(&counter, delta).await.unwrap();
}
interval.tick().await;
}
});
}
Expand Down
Loading