Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cached delta updates #316

Merged
merged 2 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions limitador/src/storage/atomic_expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ impl AtomicExpiringValue {
self.value_at(SystemTime::now())
}

pub fn add_and_set_expiry(&self, delta: i64, expiry: Duration) -> i64 {
self.expiry.update(expiry);
self.value.fetch_add(delta, Ordering::SeqCst) + delta
}

pub fn update(&self, delta: i64, ttl: u64, when: SystemTime) -> i64 {
if self.expiry.update_if_expired(ttl, when) {
self.value.store(delta, Ordering::SeqCst);
Expand Down
94 changes: 33 additions & 61 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ impl CachedCounterValue {
self.expiry.expired_at(now)
}

pub fn set_from_authority(&self, counter: &Counter, value: i64, expiry: Duration) {
let time_window = Duration::from_secs(counter.seconds());
self.initial_value.store(value, Ordering::SeqCst);
self.value.set(value, time_window);
pub fn add_from_authority(&self, delta: i64, expiry: Duration) {
self.value.add_and_set_expiry(delta, expiry);
self.initial_value.fetch_add(delta, Ordering::SeqCst);
self.expiry.update(expiry);
self.from_authority.store(true, Ordering::Release);
}
Expand All @@ -73,6 +72,10 @@ impl CachedCounterValue {
}

pub fn pending_writes(&self) -> Result<i64, ()> {
self.pending_writes_and_value().map(|(writes, _)| writes)
}

pub fn pending_writes_and_value(&self) -> Result<(i64, i64), ()> {
let start = self.initial_value.load(Ordering::SeqCst);
let value = self.value.value_at(SystemTime::now());
let offset = if start == 0 {
Expand All @@ -91,11 +94,11 @@ impl CachedCounterValue {
.initial_value
.compare_exchange(start, value, Ordering::SeqCst, Ordering::SeqCst)
{
Ok(_) => Ok(offset),
Ok(_) => Ok((offset, value)),
Err(newer) => {
if newer == 0 {
// We got reset because of expiry, this fresh value can wait the next iteration
Ok(0)
Ok((0, 0))
} else {
// Concurrent call to this method?
// We could support that with a CAS loop in the future if needed
Expand Down Expand Up @@ -250,43 +253,41 @@ impl CountersCache {
&self.batcher
}

pub fn insert(
pub fn apply_remote_delta(
&self,
counter: Counter,
redis_val: Option<i64>,
redis_val: i64,
remote_deltas: i64,
redis_ttl_ms: i64,
ttl_margin: Duration,
now: SystemTime,
) -> Arc<CachedCounterValue> {
let counter_val = redis_val.unwrap_or(0);
let cache_ttl = self.ttl_from_redis_ttl(
redis_ttl_ms,
counter.seconds(),
counter_val,
redis_val,
counter.max_value(),
);
if let Some(ttl) = cache_ttl.checked_sub(ttl_margin) {
if ttl > Duration::ZERO {
let previous = self.cache.get_with(counter.clone(), || {
let mut from_cache = true;
let cached = self.cache.get_with(counter.clone(), || {
from_cache = false;
if let Some(entry) = self.batcher.updates.get(&counter) {
entry.value().clone()
let cached_value = entry.value();
cached_value.add_from_authority(remote_deltas, ttl);
cached_value.clone()
} else {
Arc::new(CachedCounterValue::from_authority(
&counter,
counter_val,
ttl,
))
Arc::new(CachedCounterValue::from_authority(&counter, redis_val, ttl))
}
});
if previous.expired_at(now) || previous.value.value() < counter_val {
previous.set_from_authority(&counter, counter_val, ttl);
if from_cache {
cached.add_from_authority(remote_deltas, ttl);
}
return previous;
return cached;
}
}
Arc::new(CachedCounterValue::load_from_authority_asap(
&counter,
counter_val,
&counter, redis_val,
))
}

Expand Down Expand Up @@ -428,14 +429,14 @@ mod tests {
}

#[test]
fn setting_from_auth_resets_pending_writes() {
fn adding_from_auth_not_affecting_pending_writes() {
let counter = test_counter(10, None);
let value = CachedCounterValue::from_authority(&counter, 0, Duration::from_secs(1));
value.delta(&counter, 5);
assert!(value.no_pending_writes().not());
value.set_from_authority(&counter, 6, Duration::from_secs(1));
assert!(value.no_pending_writes());
assert_eq!(value.pending_writes(), Ok(0));
value.add_from_authority(6, Duration::from_secs(1));
assert!(value.no_pending_writes().not());
assert_eq!(value.pending_writes(), Ok(5));
}

#[test]
Expand Down Expand Up @@ -621,13 +622,7 @@ mod tests {
let counter = test_counter(10, None);

let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
Some(10),
10,
Duration::from_secs(0),
SystemTime::now(),
);
cache.apply_remote_delta(counter.clone(), 10, 0, 10, Duration::from_secs(0));

assert!(cache.get(&counter).is_some());
}
Expand All @@ -648,12 +643,12 @@ mod tests {
let counter = test_counter(max_val, None);

let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
cache.apply_remote_delta(
counter.clone(),
Some(current_value),
current_value,
0,
10,
Duration::from_secs(0),
SystemTime::now(),
);

assert_eq!(
Expand All @@ -662,37 +657,14 @@ mod tests {
);
}

#[test]
fn insert_saves_zero_when_redis_val_is_none() {
let max_val = 10;
let counter = test_counter(max_val, None);

let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
None,
10,
Duration::from_secs(0),
SystemTime::now(),
);

assert_eq!(cache.get(&counter).map(|e| e.hits(&counter)).unwrap(), 0);
}

#[test]
fn increase_by() {
let current_val = 10;
let increase_by = 8;
let counter = test_counter(current_val, None);

let cache = CountersCacheBuilder::new().build(Duration::default());
cache.insert(
counter.clone(),
Some(current_val),
10,
Duration::from_secs(0),
SystemTime::now(),
);
cache.apply_remote_delta(counter.clone(), current_val, 0, 10, Duration::from_secs(0));
cache.increase_by(&counter, increase_by);

assert_eq!(
Expand Down
118 changes: 61 additions & 57 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,43 +292,48 @@ impl CachedRedisStorageBuilder {
async fn update_counters<C: ConnectionLike>(
redis_conn: &mut C,
counters_and_deltas: HashMap<Counter, Arc<CachedCounterValue>>,
) -> Result<Vec<(Counter, i64, i64)>, StorageErr> {
) -> Result<Vec<(Counter, i64, i64, i64)>, StorageErr> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the new value corresponds to the delta in the last position? maybe we could add an doc annotation to this fn

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll burn the whole ttl stuff, use absolute timestamps instead and then refactor this...
I hate tuples... of 4 elements even more so.

let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS);
let mut script_invocation = redis_script.prepare_invoke();

let mut res: Vec<(Counter, i64, i64)> = Vec::with_capacity(counters_and_deltas.len());
if counters_and_deltas.is_empty() {
return Ok(res);
}

for (counter, value) in counters_and_deltas {
let delta = value.pending_writes().expect("State machine is wrong!");
if delta > 0 {
script_invocation.key(key_for_counter(&counter));
script_invocation.key(key_for_counters_of_limit(counter.limit()));
script_invocation.arg(counter.seconds());
script_invocation.arg(delta);
// We need to store the counter in the actual order we are sending it to the script
res.push((counter, 0, 0));
let res = if counters_and_deltas.is_empty() {
Default::default()
} else {
let mut res: Vec<(Counter, i64, i64, i64)> = Vec::with_capacity(counters_and_deltas.len());

for (counter, value) in counters_and_deltas {
let (delta, last_value_from_redis) = value
.pending_writes_and_value()
.expect("State machine is wrong!");
if delta > 0 {
script_invocation.key(key_for_counter(&counter));
script_invocation.key(key_for_counters_of_limit(counter.limit()));
script_invocation.arg(counter.seconds());
script_invocation.arg(delta);
// We need to store the counter in the actual order we are sending it to the script
res.push((counter, 0, last_value_from_redis, 0));
}
}
}

let span = debug_span!("datastore");
// The redis crate is not working with tables, thus the response will be a Vec of counter values
let script_res: Vec<i64> = script_invocation
.invoke_async(redis_conn)
.instrument(span)
.await?;

// We need to update the values and ttls returned by redis
let counters_range = 0..res.len();
let script_res_range = (0..script_res.len()).step_by(2);

for (i, j) in counters_range.zip(script_res_range) {
let (_, val, ttl) = &mut res[i];
*val = script_res[j];
*ttl = script_res[j + 1];
}
let span = debug_span!("datastore");
// The redis crate is not working with tables, thus the response will be a Vec of counter values
let script_res: Vec<i64> = script_invocation
.invoke_async(redis_conn)
.instrument(span)
.await?;

// We need to update the values and ttls returned by redis
let counters_range = 0..res.len();
let script_res_range = (0..script_res.len()).step_by(2);

for (i, j) in counters_range.zip(script_res_range) {
let (_, val, delta, ttl) = &mut res[i];
*val = script_res[j];
*delta = script_res[j] - *delta;
*ttl = script_res[j + 1];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so last one is ttl

}
res
};

Ok(res)
}
Expand Down Expand Up @@ -360,15 +365,15 @@ async fn flush_batcher_and_update_counters<C: ConnectionLike>(

let time_start_update_counters = Instant::now();

for (counter, value, ttl) in updated_counters {
cached_counters.insert(
for (counter, new_value, remote_deltas, ttl) in updated_counters {
cached_counters.apply_remote_delta(
counter,
Option::from(value),
new_value,
remote_deltas,
ttl,
Duration::from_millis(
(Instant::now() - time_start_update_counters).as_millis() as u64
),
SystemTime::now(),
);
}
}
Expand All @@ -389,7 +394,7 @@ mod tests {
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::time::Duration;

#[tokio::test]
async fn errs_on_bad_url() {
Expand All @@ -409,6 +414,10 @@ mod tests {

#[tokio::test]
async fn batch_update_counters() {
const NEW_VALUE_FROM_REDIS: i64 = 10;
const INITIAL_VALUE_FROM_REDIS: i64 = 1;
const LOCAL_INCREMENTS: i64 = 2;

let mut counters_and_deltas = HashMap::new();
let counter = Counter::new(
Limit::new(
Expand All @@ -423,13 +432,13 @@ mod tests {

let arc = Arc::new(CachedCounterValue::from_authority(
&counter,
1,
INITIAL_VALUE_FROM_REDIS,
Duration::from_secs(60),
));
arc.delta(&counter, 1);
arc.delta(&counter, LOCAL_INCREMENTS);
counters_and_deltas.insert(counter.clone(), arc);

let mock_response = Value::Bulk(vec![Value::Int(10), Value::Int(60)]);
let mock_response = Value::Bulk(vec![Value::Int(NEW_VALUE_FROM_REDIS), Value::Int(60)]);

let mut mock_client = MockRedisConnection::new(vec![MockCmd::new(
redis::cmd("EVALSHA")
Expand All @@ -438,21 +447,22 @@ mod tests {
.arg(key_for_counter(&counter))
.arg(key_for_counters_of_limit(counter.limit()))
.arg(60)
.arg(1),
.arg(LOCAL_INCREMENTS),
Ok(mock_response),
)]);

let result = update_counters(&mut mock_client, counters_and_deltas).await;

assert!(result.is_ok());
let mut result = update_counters(&mut mock_client, counters_and_deltas)
.await
.unwrap();

let (c, v, t) = result.unwrap()[0].clone();
let (c, new_value, remote_increments, new_ttl) = result.remove(0);
assert_eq!(key_for_counter(&counter), key_for_counter(&c));
assert_eq!(NEW_VALUE_FROM_REDIS, new_value);
assert_eq!(
"req.method == \"GET\"",
c.limit().conditions().iter().collect::<Vec<_>>()[0]
NEW_VALUE_FROM_REDIS - INITIAL_VALUE_FROM_REDIS - LOCAL_INCREMENTS,
remote_increments
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explicit, so we can make assertions with the redis deltas 👍🏼

);
assert_eq!(10, v);
assert_eq!(60, t);
assert_eq!(60, new_ttl);
}

#[tokio::test]
Expand Down Expand Up @@ -481,18 +491,12 @@ mod tests {
Ok(mock_response),
)]);

let cache = CountersCacheBuilder::new().build(Duration::from_millis(1));
let cache = CountersCacheBuilder::new().build(Duration::from_millis(10));
cache.batcher().add(
counter.clone(),
Arc::new(CachedCounterValue::load_from_authority_asap(&counter, 2)),
);
cache.insert(
counter.clone(),
Some(1),
10,
Duration::from_secs(0),
SystemTime::now(),
);

let cached_counters: Arc<CountersCache> = Arc::new(cache);
let partitioned = Arc::new(AtomicBool::new(false));

Expand Down