Skip to content

Commit

Permalink
[refactor] Fix using AtomicExpiringValue when batch updating counters
Browse files Browse the repository at this point in the history
  • Loading branch information
didierofrivia committed Apr 15, 2024
1 parent 3ade8cf commit 518c92c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
29 changes: 18 additions & 11 deletions limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use self::redis::aio::ConnectionManager;
use self::redis::ConnectionInfo;
use crate::counter::Counter;
use crate::limit::Limit;
use crate::storage::atomic_expiring_value::AtomicExpiringValue;
use crate::storage::keys::*;
use crate::storage::redis::is_limited;
use crate::storage::redis::scripts::{
Expand All @@ -14,9 +15,8 @@ use async_trait::async_trait;
use redis::{AsyncCommands, RedisError};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::time::Duration;
use std::time::{Duration, SystemTime};
use tracing::{debug_span, trace_span, Instrument};
use crate::storage::atomic_expiring_value::AtomicExpiringValue;

// Note: this implementation does not guarantee exact limits. Ensuring that we
// never go over the limits would hurt performance. This implementation
Expand Down Expand Up @@ -235,35 +235,42 @@ impl AsyncRedisStorage {
Ok(())
}

pub async fn update_counters(
pub(crate) async fn update_counters(
&self,
counters_and_deltas: HashMap<Counter, AtomicExpiringValue>,
) -> Result<HashMap<Counter, i64>, StorageErr> {
) -> Result<HashMap<Counter, AtomicExpiringValue>, StorageErr> {
let mut con = self.conn_manager.clone();
let span = trace_span!("datastore");

let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS);
let mut script_invocation = redis_script.prepare_invoke();

// TODO: Fix calculating delta value greater than zero at _now_ and return the right AtomicExpiringValue
let now = SystemTime::now();
for (counter, delta) in counters_and_deltas {
script_invocation.key(key_for_counter(&counter));
script_invocation.key(key_for_counters_of_limit(counter.limit()));
script_invocation.arg(counter.seconds());
script_invocation.arg(delta);
let delta = delta.value_at(now);
if delta > 0 {
script_invocation.key(key_for_counter(&counter));
script_invocation.key(key_for_counters_of_limit(counter.limit()));
script_invocation.arg(counter.seconds());
script_invocation.arg(delta);
}
}

let script_res: Vec<Option<(String, i64)>> = script_invocation
.invoke_async::<_, _>(&mut con)
.instrument(span)
.await?;

let counter_value_map: HashMap<Counter, i64> = script_res
let counter_value_map: HashMap<Counter, AtomicExpiringValue> = script_res
.iter()
.filter_map(|counter_value| match counter_value {
Some((raw_counter_key, val)) => {
let counter = partial_counter_from_counter_key(raw_counter_key);
Some((counter, *val))
let seconds = counter.seconds();
Some((
counter,
AtomicExpiringValue::new(*val, now + Duration::from_secs(seconds)),
))
}
None => None,
})
Expand Down
1 change: 0 additions & 1 deletion limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ impl CachedRedisStorage {
std::mem::take(&mut *batch)
};

// TODO: After rebase, the code needs to be refactored to use delta.value_at(SystemTime::now()) and compare delta is greater than 0 after adding the key to update in update_counters
let _updated_counters = storage
.update_counters(counters)
.await
Expand Down

0 comments on commit 518c92c

Please sign in to comment.