From 7c223131cfe396a3f83d7f249bbab26a626e9f6a Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Wed, 1 May 2024 08:26:34 -0400 Subject: [PATCH 1/3] Added tests... and fixes... to CachedCounterValue --- limitador/src/storage/redis/counters_cache.rs | 187 ++++++++++++------ 1 file changed, 124 insertions(+), 63 deletions(-) diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index 3754a46e..643b1f15 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -9,6 +9,7 @@ use dashmap::DashMap; use moka::sync::Cache; use std::collections::HashMap; use std::future::Future; +use std::ops::Not; use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime}; @@ -64,6 +65,7 @@ impl CachedCounterValue { .update(delta, counter.seconds(), SystemTime::now()); if value == delta { // new window, invalidate initial value + // which happens _after_ the self.value was reset, see `pending_writes` self.initial_value.store(0, Ordering::SeqCst); } value @@ -76,9 +78,11 @@ impl CachedCounterValue { value } else { let writes = value - start; - if writes > 0 { + if writes >= 0 { writes } else { + // self.value expired, is now less than the writes of the previous window + // which have not yet been reset... it'll be 0, so treat it as such. value } }; @@ -89,7 +93,7 @@ impl CachedCounterValue { Ok(_) => Ok(offset), Err(newer) => { if newer == 0 { - // We got expired in the meantime, this fresh value can wait the next iteration + // We got reset because of expiry, this fresh value can wait the next iteration Ok(0) } else { // Concurrent call to this method? @@ -123,7 +127,7 @@ impl CachedCounterValue { } pub fn requires_fast_flush(&self, within: &Duration) -> bool { - self.from_authority.load(Ordering::Acquire) || &self.value.ttl() <= within + self.from_authority.load(Ordering::Acquire).not() || &self.value.ttl() <= within } } @@ -388,20 +392,103 @@ mod tests { use crate::limit::Limit; use std::collections::HashMap; + mod cached_counter_value { + use crate::storage::redis::counters_cache::tests::test_counter; + use crate::storage::redis::counters_cache::CachedCounterValue; + use std::time::{Duration, SystemTime}; + + #[test] + fn records_pending_writes() { + let counter = test_counter(10, None); + let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + assert_eq!(value.pending_writes(), Ok(0)); + value.delta(&counter, 5); + assert_eq!(value.pending_writes(), Ok(5)); + } + + #[test] + fn consumes_pending_writes() { + let counter = test_counter(10, None); + let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + value.delta(&counter, 5); + assert_eq!(value.pending_writes(), Ok(5)); + assert_eq!(value.pending_writes(), Ok(0)); + } + + #[test] + fn no_pending_writes() { + let counter = test_counter(10, None); + let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + value.delta(&counter, 5); + assert_eq!(value.no_pending_writes(), false); + assert!(value.pending_writes().is_ok()); + assert_eq!(value.no_pending_writes(), true); + } + + #[test] + fn setting_from_auth_resets_pending_writes() { + let counter = test_counter(10, None); + let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + value.delta(&counter, 5); + assert_eq!(value.no_pending_writes(), false); + value.set_from_authority(&counter, 6, Duration::from_secs(1)); + assert_eq!(value.no_pending_writes(), true); + assert_eq!(value.pending_writes(), Ok(0)); + } + + #[test] + fn from_authority_no_need_to_flush() { + let counter = test_counter(10, None); + let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(10)); + assert_eq!(value.requires_fast_flush(&Duration::from_secs(30)), false); + } + + #[test] + fn from_authority_needs_to_flush_within_ttl() { + let counter = test_counter(10, None); + let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + assert_eq!(value.requires_fast_flush(&Duration::from_secs(90)), true); + } + + #[test] + fn fake_needs_to_flush_within_ttl() { + let counter = test_counter(10, None); + let value = CachedCounterValue::load_from_authority_asap(&counter, 0); + assert_eq!(value.requires_fast_flush(&Duration::from_secs(30)), true); + } + + #[test] + fn expiry_of_cached_entry() { + let counter = test_counter(10, None); + let then = SystemTime::now(); + let cache_entry_ttl = Duration::from_secs(1); + let value = CachedCounterValue::from_authority(&counter, 0, cache_entry_ttl); + let now = SystemTime::now(); + assert_eq!(value.expired_at(now), false); + assert_eq!(value.expired_at(then + cache_entry_ttl), false); + assert_eq!(value.expired_at(now + cache_entry_ttl), true); + } + + #[test] + fn delegates_to_underlying_value() { + let hits = 4; + + let counter = test_counter(10, None); + let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); + value.delta(&counter, hits); + assert_eq!(value.to_next_window() > Duration::from_millis(59999), true); + assert_eq!(value.hits(&counter), hits); + let remaining = counter.max_value() - hits; + assert_eq!(value.remaining(&counter), remaining); + assert_eq!(value.is_limited(&counter, 1), false); + assert_eq!(value.is_limited(&counter, remaining), false); + assert_eq!(value.is_limited(&counter, remaining + 1), true); + } + } + #[test] fn get_existing_counter() { - let mut values = HashMap::new(); - values.insert("app_id".to_string(), "1".to_string()); - let counter = Counter::new( - Limit::new( - "test_namespace", - 10, - 60, - vec!["req.method == 'POST'"], - vec!["app_id"], - ), - values, - ); + let counter = test_counter(10, None); let cache = CountersCacheBuilder::new().build(Duration::default()); cache.insert( @@ -417,18 +504,7 @@ mod tests { #[test] fn get_non_existing_counter() { - let mut values = HashMap::new(); - values.insert("app_id".to_string(), "1".to_string()); - let counter = Counter::new( - Limit::new( - "test_namespace", - 10, - 60, - vec!["req.method == 'POST'"], - vec!["app_id"], - ), - values, - ); + let counter = test_counter(10, None); let cache = CountersCacheBuilder::new().build(Duration::default()); @@ -439,18 +515,7 @@ mod tests { fn insert_saves_the_given_value_when_is_some() { let max_val = 10; let current_value = max_val / 2; - let mut values = HashMap::new(); - values.insert("app_id".to_string(), "1".to_string()); - let counter = Counter::new( - Limit::new( - "test_namespace", - max_val, - 60, - vec!["req.method == 'POST'"], - vec!["app_id"], - ), - values, - ); + let counter = test_counter(max_val, None); let cache = CountersCacheBuilder::new().build(Duration::default()); cache.insert( @@ -470,18 +535,7 @@ mod tests { #[test] fn insert_saves_zero_when_redis_val_is_none() { let max_val = 10; - let mut values = HashMap::new(); - values.insert("app_id".to_string(), "1".to_string()); - let counter = Counter::new( - Limit::new( - "test_namespace", - max_val, - 60, - vec!["req.method == 'POST'"], - vec!["app_id"], - ), - values, - ); + let counter = test_counter(max_val, None); let cache = CountersCacheBuilder::new().build(Duration::default()); cache.insert( @@ -499,18 +553,7 @@ mod tests { fn increase_by() { let current_val = 10; let increase_by = 8; - let mut values = HashMap::new(); - values.insert("app_id".to_string(), "1".to_string()); - let counter = Counter::new( - Limit::new( - "test_namespace", - current_val, - 60, - vec!["req.method == 'POST'"], - vec!["app_id"], - ), - values, - ); + let counter = test_counter(current_val, None); let cache = CountersCacheBuilder::new().build(Duration::default()); cache.insert( @@ -527,4 +570,22 @@ mod tests { (current_val + increase_by) ); } + + fn test_counter(max_val: i64, other_values: Option>) -> Counter { + let mut values = HashMap::new(); + values.insert("app_id".to_string(), "1".to_string()); + if let Some(overrides) = other_values { + values.extend(overrides); + } + Counter::new( + Limit::new( + "test_namespace", + max_val, + 60, + vec!["req.method == 'POST'"], + vec!["app_id"], + ), + values, + ) + } } From 5bd6fae3bad828536e47885f6e03eb733140211b Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Wed, 1 May 2024 09:02:18 -0400 Subject: [PATCH 2/3] Need to either do work, or make up work, otherwise nothing to flush --- limitador/src/storage/redis/counters_cache.rs | 5 ++-- limitador/src/storage/redis/redis_cached.rs | 23 ++++++++++--------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index 643b1f15..485e8694 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -16,6 +16,7 @@ use std::time::{Duration, SystemTime}; use tokio::select; use tokio::sync::Notify; +#[derive(Debug)] pub struct CachedCounterValue { value: AtomicExpiringValue, initial_value: AtomicI64, @@ -41,7 +42,7 @@ impl CachedCounterValue { temp_value, now + Duration::from_secs(counter.seconds()), ), - initial_value: AtomicI64::new(temp_value), + initial_value: AtomicI64::new(0), expiry: AtomicExpiryTime::from_now(Duration::from_secs(counter.seconds())), from_authority: AtomicBool::new(false), } @@ -460,12 +461,10 @@ mod tests { #[test] fn expiry_of_cached_entry() { let counter = test_counter(10, None); - let then = SystemTime::now(); let cache_entry_ttl = Duration::from_secs(1); let value = CachedCounterValue::from_authority(&counter, 0, cache_entry_ttl); let now = SystemTime::now(); assert_eq!(value.expired_at(now), false); - assert_eq!(value.expired_at(then + cache_entry_ttl), false); assert_eq!(value.expired_at(now + cache_entry_ttl), true); } diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 0f5c36ff..7433613a 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -298,8 +298,8 @@ async fn update_counters( return Ok(res); } - for (counter, delta) in counters_and_deltas { - let delta = delta.pending_writes().expect("State machine is wrong!"); + for (counter, value) in counters_and_deltas { + let delta = value.pending_writes().expect("State machine is wrong!"); if delta > 0 { script_invocation.key(key_for_counter(&counter)); script_invocation.key(key_for_counters_of_limit(counter.limit())); @@ -418,13 +418,15 @@ mod tests { Default::default(), ); + let arc = Arc::new(CachedCounterValue::from_authority( + &counter, + 1, + Duration::from_secs(60), + )); + arc.delta(&counter, 1); counters_and_deltas.insert( counter.clone(), - Arc::new(CachedCounterValue::from_authority( - &counter, - 1, - Duration::from_secs(60), - )), + arc, ); let mock_response = Value::Bulk(vec![Value::Int(10), Value::Int(60)]); @@ -437,7 +439,7 @@ mod tests { .arg(key_for_counters_of_limit(counter.limit())) .arg(60) .arg(1), - Ok(mock_response.clone()), + Ok(mock_response), )]); let result = update_counters(&mut mock_client, counters_and_deltas).await; @@ -476,16 +478,15 @@ mod tests { .arg(key_for_counters_of_limit(counter.limit())) .arg(60) .arg(2), - Ok(mock_response.clone()), + Ok(mock_response), )]); let cache = CountersCacheBuilder::new().build(Duration::from_millis(1)); cache.batcher().add( counter.clone(), - Arc::new(CachedCounterValue::from_authority( + Arc::new(CachedCounterValue::load_from_authority_asap( &counter, 2, - Duration::from_secs(60), )), ); cache.insert( From 7441e4d98e0ba538e3ee49a2e93fd698e84802b8 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Wed, 1 May 2024 09:05:34 -0400 Subject: [PATCH 3/3] clippy changes, I almost don't agree with --- limitador/src/storage/redis/counters_cache.rs | 27 ++++++++++--------- limitador/src/storage/redis/redis_cached.rs | 10 ++----- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index 485e8694..caa9a06d 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -396,6 +396,7 @@ mod tests { mod cached_counter_value { use crate::storage::redis::counters_cache::tests::test_counter; use crate::storage::redis::counters_cache::CachedCounterValue; + use std::ops::Not; use std::time::{Duration, SystemTime}; #[test] @@ -421,9 +422,9 @@ mod tests { let counter = test_counter(10, None); let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); value.delta(&counter, 5); - assert_eq!(value.no_pending_writes(), false); + assert!(value.no_pending_writes().not()); assert!(value.pending_writes().is_ok()); - assert_eq!(value.no_pending_writes(), true); + assert!(value.no_pending_writes()); } #[test] @@ -431,9 +432,9 @@ mod tests { let counter = test_counter(10, None); let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); value.delta(&counter, 5); - assert_eq!(value.no_pending_writes(), false); + assert!(value.no_pending_writes().not()); value.set_from_authority(&counter, 6, Duration::from_secs(1)); - assert_eq!(value.no_pending_writes(), true); + assert!(value.no_pending_writes()); assert_eq!(value.pending_writes(), Ok(0)); } @@ -441,21 +442,21 @@ mod tests { fn from_authority_no_need_to_flush() { let counter = test_counter(10, None); let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(10)); - assert_eq!(value.requires_fast_flush(&Duration::from_secs(30)), false); + assert!(value.requires_fast_flush(&Duration::from_secs(30)).not()); } #[test] fn from_authority_needs_to_flush_within_ttl() { let counter = test_counter(10, None); let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); - assert_eq!(value.requires_fast_flush(&Duration::from_secs(90)), true); + assert!(value.requires_fast_flush(&Duration::from_secs(90))); } #[test] fn fake_needs_to_flush_within_ttl() { let counter = test_counter(10, None); let value = CachedCounterValue::load_from_authority_asap(&counter, 0); - assert_eq!(value.requires_fast_flush(&Duration::from_secs(30)), true); + assert!(value.requires_fast_flush(&Duration::from_secs(30))); } #[test] @@ -464,8 +465,8 @@ mod tests { let cache_entry_ttl = Duration::from_secs(1); let value = CachedCounterValue::from_authority(&counter, 0, cache_entry_ttl); let now = SystemTime::now(); - assert_eq!(value.expired_at(now), false); - assert_eq!(value.expired_at(now + cache_entry_ttl), true); + assert!(value.expired_at(now).not()); + assert!(value.expired_at(now + cache_entry_ttl)); } #[test] @@ -475,13 +476,13 @@ mod tests { let counter = test_counter(10, None); let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1)); value.delta(&counter, hits); - assert_eq!(value.to_next_window() > Duration::from_millis(59999), true); + assert!(value.to_next_window() > Duration::from_millis(59999)); assert_eq!(value.hits(&counter), hits); let remaining = counter.max_value() - hits; assert_eq!(value.remaining(&counter), remaining); - assert_eq!(value.is_limited(&counter, 1), false); - assert_eq!(value.is_limited(&counter, remaining), false); - assert_eq!(value.is_limited(&counter, remaining + 1), true); + assert!(value.is_limited(&counter, 1).not()); + assert!(value.is_limited(&counter, remaining).not()); + assert!(value.is_limited(&counter, remaining + 1)); } } diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 7433613a..cce46f51 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -424,10 +424,7 @@ mod tests { Duration::from_secs(60), )); arc.delta(&counter, 1); - counters_and_deltas.insert( - counter.clone(), - arc, - ); + counters_and_deltas.insert(counter.clone(), arc); let mock_response = Value::Bulk(vec![Value::Int(10), Value::Int(60)]); @@ -484,10 +481,7 @@ mod tests { let cache = CountersCacheBuilder::new().build(Duration::from_millis(1)); cache.batcher().add( counter.clone(), - Arc::new(CachedCounterValue::load_from_authority_asap( - &counter, - 2, - )), + Arc::new(CachedCounterValue::load_from_authority_asap(&counter, 2)), ); cache.insert( counter.clone(),