diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 0c86b095..a76a191d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -38,7 +38,7 @@ jobs: - uses: actions/checkout@v4 - uses: supercharge/redis-github-action@1.1.0 with: - redis-version: 5 + redis-version: 7 - uses: actions-rust-lang/setup-rust-toolchain@v1 - uses: abelfodil/protoc-action@v1 with: diff --git a/limitador-server/src/config.rs b/limitador-server/src/config.rs index c659d8cc..372b1732 100644 --- a/limitador-server/src/config.rs +++ b/limitador-server/src/config.rs @@ -163,8 +163,6 @@ pub struct RedisStorageConfiguration { #[derive(PartialEq, Eq, Debug)] pub struct RedisStorageCacheConfiguration { pub flushing_period: i64, - pub max_ttl: u64, - pub ttl_ratio: u64, pub max_counters: usize, pub response_timeout: u64, } diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 3f4ad865..226f1aa9 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -25,8 +25,7 @@ use limitador::storage::disk::DiskStorage; use limitador::storage::infinispan::{Consistency, InfinispanStorageBuilder}; use limitador::storage::redis::{ AsyncRedisStorage, CachedRedisStorage, CachedRedisStorageBuilder, DEFAULT_FLUSHING_PERIOD_SEC, - DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC, DEFAULT_RESPONSE_TIMEOUT_MS, - DEFAULT_TTL_RATIO_CACHED_COUNTERS, + DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_RESPONSE_TIMEOUT_MS, }; use limitador::storage::{AsyncCounterStorage, AsyncStorage, Storage}; use limitador::{ @@ -134,8 +133,6 @@ impl Limiter { let cached_redis_storage = CachedRedisStorageBuilder::new(redis_url) .flushing_period(Duration::from_millis(cache_cfg.flushing_period as u64)) - .max_ttl_cached_counters(Duration::from_millis(cache_cfg.max_ttl)) - .ttl_ratio_cached_counters(cache_cfg.ttl_ratio) .max_cached_counters(cache_cfg.max_counters) .response_timeout(Duration::from_millis(cache_cfg.response_timeout)); @@ -591,30 +588,6 @@ fn create_config() -> (Configuration, &'static str) { .about("Uses Redis to store counters, with an in-memory cache") .display_order(4) .arg(redis_url_arg) - .arg( - Arg::new("TTL") - .long("ttl") - .action(ArgAction::Set) - .value_parser(clap::value_parser!(u64)) - .default_value( - config::env::REDIS_LOCAL_CACHE_MAX_TTL_CACHED_COUNTERS_MS - .unwrap_or(leak(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC * 1000)), - ) - .display_order(2) - .help("TTL for cached counters in milliseconds"), - ) - .arg( - Arg::new("ratio") - .long("ratio") - .action(ArgAction::Set) - .value_parser(clap::value_parser!(u64)) - .default_value( - config::env::REDIS_LOCAL_CACHE_TTL_RATIO_CACHED_COUNTERS - .unwrap_or(leak(DEFAULT_TTL_RATIO_CACHED_COUNTERS)), - ) - .display_order(3) - .help("Ratio to apply to the TTL from Redis on cached counters"), - ) .arg( Arg::new("flush") .long("flush-period") @@ -748,8 +721,6 @@ fn create_config() -> (Configuration, &'static str) { url: sub.get_one::("URL").unwrap().to_owned(), cache: Some(RedisStorageCacheConfiguration { flushing_period: *sub.get_one("flush").unwrap(), - max_ttl: *sub.get_one("TTL").unwrap(), - ttl_ratio: *sub.get_one("ratio").unwrap(), max_counters: *sub.get_one("max").unwrap(), response_timeout: *sub.get_one("timeout").unwrap(), }), @@ -832,16 +803,6 @@ fn storage_config_from_env() -> Result { .unwrap_or_else(|_| (DEFAULT_FLUSHING_PERIOD_SEC * 1000).to_string()) .parse() .expect("Expected an i64"), - max_ttl: env::var("REDIS_LOCAL_CACHE_MAX_TTL_CACHED_COUNTERS_MS") - .unwrap_or_else(|_| { - (DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC * 1000).to_string() - }) - .parse() - .expect("Expected an u64"), - ttl_ratio: env::var("REDIS_LOCAL_CACHE_TTL_RATIO_CACHED_COUNTERS") - .unwrap_or_else(|_| DEFAULT_TTL_RATIO_CACHED_COUNTERS.to_string()) - .parse() - .expect("Expected an u64"), max_counters: DEFAULT_MAX_CACHED_COUNTERS, response_timeout: DEFAULT_RESPONSE_TIMEOUT_MS, }) diff --git a/limitador/src/storage/atomic_expiring_value.rs b/limitador/src/storage/atomic_expiring_value.rs index 84ae5b39..465f7daf 100644 --- a/limitador/src/storage/atomic_expiring_value.rs +++ b/limitador/src/storage/atomic_expiring_value.rs @@ -28,8 +28,8 @@ impl AtomicExpiringValue { } #[allow(dead_code)] - pub fn add_and_set_expiry(&self, delta: i64, expiry: Duration) -> i64 { - self.expiry.update(expiry); + pub fn add_and_set_expiry(&self, delta: i64, expire_at: SystemTime) -> i64 { + self.expiry.update(expire_at); self.value.fetch_add(delta, Ordering::SeqCst) + delta } @@ -44,12 +44,6 @@ impl AtomicExpiringValue { pub fn ttl(&self) -> Duration { self.expiry.duration() } - - #[allow(dead_code)] - pub fn set(&self, value: i64, ttl: Duration) { - self.expiry.update(ttl); - self.value.store(value, Ordering::SeqCst); - } } #[derive(Debug)] @@ -90,9 +84,9 @@ impl AtomicExpiryTime { } #[allow(dead_code)] - pub fn update(&self, ttl: Duration) { + pub fn update(&self, expiry: SystemTime) { self.expiry - .store(Self::since_epoch(SystemTime::now() + ttl), Ordering::SeqCst); + .store(Self::since_epoch(expiry), Ordering::SeqCst); } pub fn update_if_expired(&self, ttl: u64, when: SystemTime) -> bool { diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index c7c5cd85..3949d0c6 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -1,9 +1,6 @@ use crate::counter::Counter; -use crate::storage::atomic_expiring_value::{AtomicExpiringValue, AtomicExpiryTime}; -use crate::storage::redis::{ - DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC, - DEFAULT_TTL_RATIO_CACHED_COUNTERS, -}; +use crate::storage::atomic_expiring_value::AtomicExpiringValue; +use crate::storage::redis::DEFAULT_MAX_CACHED_COUNTERS; use dashmap::mapref::entry::Entry; use dashmap::DashMap; use moka::sync::Cache; @@ -20,17 +17,15 @@ use tokio::sync::Notify; pub struct CachedCounterValue { value: AtomicExpiringValue, initial_value: AtomicI64, - expiry: AtomicExpiryTime, from_authority: AtomicBool, } impl CachedCounterValue { - pub fn from_authority(counter: &Counter, value: i64, ttl: Duration) -> Self { + pub fn from_authority(counter: &Counter, value: i64) -> Self { let now = SystemTime::now(); Self { value: AtomicExpiringValue::new(value, now + Duration::from_secs(counter.seconds())), initial_value: AtomicI64::new(value), - expiry: AtomicExpiryTime::from_now(ttl), from_authority: AtomicBool::new(true), } } @@ -43,19 +38,13 @@ impl CachedCounterValue { now + Duration::from_secs(counter.seconds()), ), initial_value: AtomicI64::new(0), - expiry: AtomicExpiryTime::from_now(Duration::from_secs(counter.seconds())), from_authority: AtomicBool::new(false), } } - pub fn expired_at(&self, now: SystemTime) -> bool { - self.expiry.expired_at(now) - } - - pub fn add_from_authority(&self, delta: i64, expiry: Duration) { - self.value.add_and_set_expiry(delta, expiry); + pub fn add_from_authority(&self, delta: i64, expire_at: SystemTime) { + self.value.add_and_set_expiry(delta, expire_at); self.initial_value.fetch_add(delta, Ordering::SeqCst); - self.expiry.update(expiry); self.from_authority.store(true, Ordering::Release); } @@ -131,9 +120,7 @@ impl CachedCounterValue { } pub fn requires_fast_flush(&self, within: &Duration) -> bool { - self.from_authority.load(Ordering::Acquire).not() - || self.expired_at(SystemTime::now()) - || &self.value.ttl() <= within + self.from_authority.load(Ordering::Acquire).not() || &self.value.ttl() <= within } } @@ -232,8 +219,6 @@ impl Default for Batcher { } pub struct CountersCache { - max_ttl_cached_counters: Duration, - pub ttl_ratio_cached_counters: u64, cache: Cache>, batcher: Batcher, } @@ -260,30 +245,24 @@ impl CountersCache { counter: Counter, redis_val: i64, remote_deltas: i64, - redis_ttl_ms: i64, - ttl_margin: Duration, + redis_expiry: i64, ) -> Arc { - let cache_ttl = self.ttl_from_redis_ttl( - redis_ttl_ms, - counter.seconds(), - redis_val, - counter.max_value(), - ); - if let Some(ttl) = cache_ttl.checked_sub(ttl_margin) { - if ttl > Duration::ZERO { + if redis_expiry > 0 { + let expiry_ts = SystemTime::UNIX_EPOCH + Duration::from_millis(redis_expiry as u64); + if expiry_ts > SystemTime::now() { let mut from_cache = true; let cached = self.cache.get_with(counter.clone(), || { from_cache = false; if let Some(entry) = self.batcher.updates.get(&counter) { let cached_value = entry.value(); - cached_value.add_from_authority(remote_deltas, ttl); + cached_value.add_from_authority(remote_deltas, expiry_ts); cached_value.clone() } else { - Arc::new(CachedCounterValue::from_authority(&counter, redis_val, ttl)) + Arc::new(CachedCounterValue::from_authority(&counter, redis_val)) } }); if from_cache { - cached.add_from_authority(remote_deltas, ttl); + cached.add_from_authority(remote_deltas, expiry_ts); } return cached; } @@ -304,64 +283,16 @@ impl CountersCache { val.delta(counter, delta); self.batcher.add(counter.clone(), val.clone()); } - - fn ttl_from_redis_ttl( - &self, - redis_ttl_ms: i64, - counter_seconds: u64, - counter_val: i64, - counter_max: i64, - ) -> Duration { - // Redis returns -2 when the key does not exist. Ref: - // https://redis.io/commands/ttl - // This function returns a ttl of the given counter seconds in this - // case. - - let counter_ttl = if redis_ttl_ms >= 0 { - Duration::from_millis(redis_ttl_ms as u64) - } else { - Duration::from_secs(counter_seconds) - }; - - // If a counter is already at counter_max, we can cache it for as long as its TTL - // is in Redis. This does not depend on the requests received by other - // instances of Limitador. No matter what they do, we know that the - // counter is not going to recover its quota until it expires in Redis. - if counter_val >= counter_max { - return counter_ttl; - } - - // Expire the counter in the cache before it expires in Redis. - // There might be several Limitador instances updating the Redis - // counter. The tradeoff is as follows: the shorter the TTL in the - // cache, the sooner we'll take into account those updates coming from - // other instances. If the TTL in the cache is long, there will be less - // accesses to Redis, so latencies will be better. However, it'll be - // easier to go over the limits defined, because not taking into account - // updates from other Limitador instances. - let mut res = - Duration::from_millis(counter_ttl.as_millis() as u64 / self.ttl_ratio_cached_counters); - - if res > self.max_ttl_cached_counters { - res = self.max_ttl_cached_counters; - } - - res - } } pub struct CountersCacheBuilder { max_cached_counters: usize, - max_ttl_cached_counters: Duration, - ttl_ratio_cached_counters: u64, } impl CountersCacheBuilder { pub fn new() -> Self { Self { max_cached_counters: DEFAULT_MAX_CACHED_COUNTERS, - max_ttl_cached_counters: Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC), - ttl_ratio_cached_counters: DEFAULT_TTL_RATIO_CACHED_COUNTERS, } } @@ -370,20 +301,8 @@ impl CountersCacheBuilder { self } - pub fn max_ttl_cached_counter(mut self, max_ttl_cached_counter: Duration) -> Self { - self.max_ttl_cached_counters = max_ttl_cached_counter; - self - } - - pub fn ttl_ratio_cached_counter(mut self, ttl_ratio_cached_counter: u64) -> Self { - self.ttl_ratio_cached_counters = ttl_ratio_cached_counter; - self - } - pub fn build(&self, period: Duration) -> CountersCache { CountersCache { - max_ttl_cached_counters: self.max_ttl_cached_counters, - ttl_ratio_cached_counters: self.ttl_ratio_cached_counters, cache: Cache::new(self.max_cached_counters as u64), batcher: Batcher::new(period), } @@ -395,17 +314,19 @@ mod tests { use super::*; use crate::limit::Limit; use std::collections::HashMap; + use std::ops::Add; + use std::time::UNIX_EPOCH; mod cached_counter_value { use crate::storage::redis::counters_cache::tests::test_counter; use crate::storage::redis::counters_cache::CachedCounterValue; - use std::ops::Not; + use std::ops::{Add, Not}; use std::time::{Duration, SystemTime}; #[test] fn records_pending_writes() { let counter = test_counter(10, None); - let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + let value = CachedCounterValue::from_authority(&counter, 0); assert_eq!(value.pending_writes(), Ok(0)); value.delta(&counter, 5); assert_eq!(value.pending_writes(), Ok(5)); @@ -414,7 +335,7 @@ mod tests { #[test] fn consumes_pending_writes() { let counter = test_counter(10, None); - let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + let value = CachedCounterValue::from_authority(&counter, 0); value.delta(&counter, 5); assert_eq!(value.pending_writes(), Ok(5)); assert_eq!(value.pending_writes(), Ok(0)); @@ -423,7 +344,7 @@ mod tests { #[test] fn no_pending_writes() { let counter = test_counter(10, None); - let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + let value = CachedCounterValue::from_authority(&counter, 0); value.delta(&counter, 5); assert!(value.no_pending_writes().not()); assert!(value.pending_writes().is_ok()); @@ -433,10 +354,10 @@ mod tests { #[test] fn adding_from_auth_not_affecting_pending_writes() { let counter = test_counter(10, None); - let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + let value = CachedCounterValue::from_authority(&counter, 0); value.delta(&counter, 5); assert!(value.no_pending_writes().not()); - value.add_from_authority(6, Duration::from_secs(1)); + value.add_from_authority(6, SystemTime::now().add(Duration::from_secs(1))); assert!(value.no_pending_writes().not()); assert_eq!(value.pending_writes(), Ok(5)); } @@ -444,14 +365,14 @@ mod tests { #[test] fn from_authority_no_need_to_flush() { let counter = test_counter(10, None); - let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(10)); + let value = CachedCounterValue::from_authority(&counter, 0); assert!(value.requires_fast_flush(&Duration::from_secs(30)).not()); } #[test] fn from_authority_needs_to_flush_within_ttl() { let counter = test_counter(10, None); - let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + let value = CachedCounterValue::from_authority(&counter, 0); assert!(value.requires_fast_flush(&Duration::from_secs(90))); } @@ -462,22 +383,12 @@ mod tests { assert!(value.requires_fast_flush(&Duration::from_secs(30))); } - #[test] - fn expiry_of_cached_entry() { - let counter = test_counter(10, None); - let cache_entry_ttl = Duration::from_secs(1); - let value = CachedCounterValue::from_authority(&counter, 0, cache_entry_ttl); - let now = SystemTime::now(); - assert!(value.expired_at(now).not()); - assert!(value.expired_at(now + cache_entry_ttl)); - } - #[test] fn delegates_to_underlying_value() { let hits = 4; let counter = test_counter(10, None); - let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + let value = CachedCounterValue::from_authority(&counter, 0); value.delta(&counter, hits); assert!(value.to_next_window() > Duration::from_millis(59999)); assert_eq!(value.hits(&counter), hits); @@ -519,11 +430,7 @@ mod tests { tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(40)).await; let counter = test_counter(6, None); - let arc = Arc::new(CachedCounterValue::from_authority( - &counter, - 0, - Duration::from_secs(1), - )); + let arc = Arc::new(CachedCounterValue::from_authority(&counter, 0)); batcher.add(counter, arc); }); } @@ -549,11 +456,7 @@ mod tests { tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(40)).await; let counter = test_counter(6, None); - let arc = Arc::new(CachedCounterValue::from_authority( - &counter, - 0, - Duration::from_secs(1), - )); + let arc = Arc::new(CachedCounterValue::from_authority(&counter, 0)); batcher.add(counter, arc); }); } @@ -575,11 +478,7 @@ mod tests { let start = SystemTime::now(); { let counter = test_counter(6, None); - let arc = Arc::new(CachedCounterValue::from_authority( - &counter, - 0, - Duration::from_secs(1), - )); + let arc = Arc::new(CachedCounterValue::from_authority(&counter, 0)); batcher.add(counter, arc); } batcher @@ -624,7 +523,16 @@ mod tests { let counter = test_counter(10, None); let cache = CountersCacheBuilder::new().build(Duration::default()); - cache.apply_remote_delta(counter.clone(), 10, 0, 10, Duration::from_secs(0)); + cache.apply_remote_delta( + counter.clone(), + 10, + 0, + SystemTime::now() + .add(Duration::from_secs(1)) + .duration_since(UNIX_EPOCH) + .unwrap() + .as_micros() as i64, + ); assert!(cache.get(&counter).is_some()); } @@ -649,8 +557,11 @@ mod tests { counter.clone(), current_value, 0, - 10, - Duration::from_secs(0), + SystemTime::now() + .add(Duration::from_secs(1)) + .duration_since(UNIX_EPOCH) + .unwrap() + .as_micros() as i64, ); assert_eq!( @@ -666,7 +577,16 @@ mod tests { let counter = test_counter(current_val, None); let cache = CountersCacheBuilder::new().build(Duration::default()); - cache.apply_remote_delta(counter.clone(), current_val, 0, 10, Duration::from_secs(0)); + cache.apply_remote_delta( + counter.clone(), + current_val, + 0, + SystemTime::now() + .add(Duration::from_secs(1)) + .duration_since(UNIX_EPOCH) + .unwrap() + .as_micros() as i64, + ); cache.increase_by(&counter, increase_by); assert_eq!( diff --git a/limitador/src/storage/redis/mod.rs b/limitador/src/storage/redis/mod.rs index a60ab99a..785c13f8 100644 --- a/limitador/src/storage/redis/mod.rs +++ b/limitador/src/storage/redis/mod.rs @@ -9,8 +9,6 @@ mod scripts; pub const DEFAULT_FLUSHING_PERIOD_SEC: u64 = 1; pub const DEFAULT_MAX_CACHED_COUNTERS: usize = 10000; -pub const DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC: u64 = 5; -pub const DEFAULT_TTL_RATIO_CACHED_COUNTERS: u64 = 10; pub const DEFAULT_RESPONSE_TIMEOUT_MS: u64 = 350; use crate::counter::Counter; diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 78fafe05..f8282859 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -7,8 +7,7 @@ use crate::storage::redis::counters_cache::{ use crate::storage::redis::redis_async::AsyncRedisStorage; use crate::storage::redis::scripts::BATCH_UPDATE_COUNTERS; use crate::storage::redis::{ - DEFAULT_FLUSHING_PERIOD_SEC, DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC, - DEFAULT_RESPONSE_TIMEOUT_MS, DEFAULT_TTL_RATIO_CACHED_COUNTERS, + DEFAULT_FLUSHING_PERIOD_SEC, DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_RESPONSE_TIMEOUT_MS, }; use crate::storage::{AsyncCounterStorage, Authorization, StorageErr}; use async_trait::async_trait; @@ -19,7 +18,7 @@ use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant, SystemTime}; +use std::time::Duration; use tracing::{debug_span, error, warn, Instrument}; // This is just a first version. @@ -149,8 +148,6 @@ impl CachedRedisStorage { redis_url, Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC), DEFAULT_MAX_CACHED_COUNTERS, - Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC), - DEFAULT_TTL_RATIO_CACHED_COUNTERS, Duration::from_millis(DEFAULT_RESPONSE_TIMEOUT_MS), ) .await @@ -160,8 +157,6 @@ impl CachedRedisStorage { redis_url: &str, flushing_period: Duration, max_cached_counters: usize, - ttl_cached_counters: Duration, - ttl_ratio_cached_counters: u64, response_timeout: Duration, ) -> Result { let info = ConnectionInfo::from_str(redis_url)?; @@ -179,8 +174,6 @@ impl CachedRedisStorage { let cached_counters = CountersCacheBuilder::new() .max_cached_counters(max_cached_counters) - .max_ttl_cached_counter(ttl_cached_counters) - .ttl_ratio_cached_counter(ttl_ratio_cached_counters) .build(flushing_period); let counters_cache = Arc::new(cached_counters); @@ -233,8 +226,6 @@ pub struct CachedRedisStorageBuilder { redis_url: String, flushing_period: Duration, max_cached_counters: usize, - max_ttl_cached_counters: Duration, - ttl_ratio_cached_counters: u64, response_timeout: Duration, } @@ -244,8 +235,6 @@ impl CachedRedisStorageBuilder { redis_url: redis_url.to_string(), flushing_period: Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC), max_cached_counters: DEFAULT_MAX_CACHED_COUNTERS, - max_ttl_cached_counters: Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC), - ttl_ratio_cached_counters: DEFAULT_TTL_RATIO_CACHED_COUNTERS, response_timeout: Duration::from_millis(DEFAULT_RESPONSE_TIMEOUT_MS), } } @@ -260,16 +249,6 @@ impl CachedRedisStorageBuilder { self } - pub fn max_ttl_cached_counters(mut self, max_ttl_cached_counters: Duration) -> Self { - self.max_ttl_cached_counters = max_ttl_cached_counters; - self - } - - pub fn ttl_ratio_cached_counters(mut self, ttl_ratio_cached_counters: u64) -> Self { - self.ttl_ratio_cached_counters = ttl_ratio_cached_counters; - self - } - pub fn response_timeout(mut self, response_timeout: Duration) -> Self { self.response_timeout = response_timeout; self @@ -280,8 +259,6 @@ impl CachedRedisStorageBuilder { &self.redis_url, self.flushing_period, self.max_cached_counters, - self.max_ttl_cached_counters, - self.ttl_ratio_cached_counters, self.response_timeout, ) .await @@ -326,10 +303,10 @@ async fn update_counters( let script_res_range = (0..script_res.len()).step_by(2); for (i, j) in counters_range.zip(script_res_range) { - let (_, val, delta, ttl) = &mut res[i]; + let (_, val, delta, expires_at) = &mut res[i]; *val = script_res[j]; *delta = script_res[j] - *delta; - *ttl = script_res[j + 1]; + *expires_at = script_res[j + 1]; } res }; @@ -362,18 +339,8 @@ async fn flush_batcher_and_update_counters( }) .expect("Unrecoverable Redis error!"); - let time_start_update_counters = Instant::now(); - for (counter, new_value, remote_deltas, ttl) in updated_counters { - cached_counters.apply_remote_delta( - counter, - new_value, - remote_deltas, - ttl, - Duration::from_millis( - (Instant::now() - time_start_update_counters).as_millis() as u64 - ), - ); + cached_counters.apply_remote_delta(counter, new_value, remote_deltas, ttl); } } } @@ -391,9 +358,10 @@ mod tests { use redis::{ErrorKind, Value}; use redis_test::{MockCmd, MockRedisConnection}; use std::collections::HashMap; + use std::ops::Add; use std::sync::atomic::AtomicBool; use std::sync::Arc; - use std::time::Duration; + use std::time::{Duration, SystemTime, UNIX_EPOCH}; #[tokio::test] async fn errs_on_bad_url() { @@ -432,16 +400,22 @@ mod tests { let arc = Arc::new(CachedCounterValue::from_authority( &counter, INITIAL_VALUE_FROM_REDIS, - Duration::from_secs(60), )); arc.delta(&counter, LOCAL_INCREMENTS); counters_and_deltas.insert(counter.clone(), arc); - let mock_response = Value::Bulk(vec![Value::Int(NEW_VALUE_FROM_REDIS), Value::Int(60)]); + let one_sec_from_now = SystemTime::now() + .add(Duration::from_secs(1)) + .duration_since(UNIX_EPOCH) + .unwrap(); + let mock_response = Value::Bulk(vec![ + Value::Int(NEW_VALUE_FROM_REDIS), + Value::Int(one_sec_from_now.as_millis() as i64), + ]); let mut mock_client = MockRedisConnection::new(vec![MockCmd::new( redis::cmd("EVALSHA") - .arg("1e87383cf7dba2bd0f9972ed73671274e6cbd5da") + .arg("95a717e821d8fbdd667b5e4c6fede4c9cad16006") .arg("2") .arg(key_for_counter(&counter)) .arg(key_for_counters_of_limit(counter.limit())) @@ -454,14 +428,14 @@ mod tests { .await .unwrap(); - let (c, new_value, remote_increments, new_ttl) = result.remove(0); + let (c, new_value, remote_increments, expire_at) = result.remove(0); assert_eq!(key_for_counter(&counter), key_for_counter(&c)); assert_eq!(NEW_VALUE_FROM_REDIS, new_value); assert_eq!( NEW_VALUE_FROM_REDIS - INITIAL_VALUE_FROM_REDIS - LOCAL_INCREMENTS, remote_increments ); - assert_eq!(60, new_ttl); + assert_eq!(one_sec_from_now.as_millis(), expire_at as u128); } #[tokio::test] @@ -477,11 +451,20 @@ mod tests { Default::default(), ); - let mock_response = Value::Bulk(vec![Value::Int(8), Value::Int(60)]); + let mock_response = Value::Bulk(vec![ + Value::Int(8), + Value::Int( + SystemTime::now() + .add(Duration::from_secs(1)) + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64, + ), + ]); let mock_client = MockRedisConnection::new(vec![MockCmd::new( redis::cmd("EVALSHA") - .arg("1e87383cf7dba2bd0f9972ed73671274e6cbd5da") + .arg("95a717e821d8fbdd667b5e4c6fede4c9cad16006") .arg("2") .arg(key_for_counter(&counter)) .arg(key_for_counters_of_limit(counter.limit())) diff --git a/limitador/src/storage/redis/scripts.rs b/limitador/src/storage/redis/scripts.rs index b241d88d..80ee31de 100644 --- a/limitador/src/storage/redis/scripts.rs +++ b/limitador/src/storage/redis/scripts.rs @@ -38,10 +38,8 @@ pub const BATCH_UPDATE_COUNTERS: &str = " if c == tonumber(delta) then redis.call('expire', counter_key, ttl) redis.call('sadd', limit_key, counter_key) - table.insert(res, ttl*1000) - else - table.insert(res, redis.call('pttl', counter_key)) end + table.insert(res, redis.call('pexpiretime', counter_key)) end return res "; diff --git a/limitador/tests/integration_tests.rs b/limitador/tests/integration_tests.rs index 96392ba7..7bd8cb7d 100644 --- a/limitador/tests/integration_tests.rs +++ b/limitador/tests/integration_tests.rs @@ -60,8 +60,6 @@ macro_rules! test_with_all_storage_impls { async fn [<$function _with_async_redis_and_local_cache>]() { let storage_builder = CachedRedisStorageBuilder::new("redis://127.0.0.1:6379"). flushing_period(Duration::from_millis(2)). - max_ttl_cached_counters(Duration::from_secs(3600)). - ttl_ratio_cached_counters(1). max_cached_counters(10000); let storage = storage_builder.build().await.expect("We need a Redis running locally"); storage.clear().await.unwrap(); @@ -537,7 +535,7 @@ mod test { } // We wait for the flushing period to pass so the counters are flushed in the cached storage - tokio::time::sleep(Duration::from_millis(4)).await; + tokio::time::sleep(Duration::from_millis(40)).await; assert!(rate_limiter .is_rate_limited(namespace, &get_values, 1)