Skip to content

Commit

Permalink
Work with absolute timestamps from redis
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed May 9, 2024
1 parent 580c14d commit 8767d28
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 157 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
- uses: actions/checkout@v4
- uses: supercharge/[email protected]
with:
redis-version: 5
redis-version: 7
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: abelfodil/protoc-action@v1
with:
Expand Down
1 change: 0 additions & 1 deletion limitador-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ pub struct RedisStorageConfiguration {
#[derive(PartialEq, Eq, Debug)]
pub struct RedisStorageCacheConfiguration {
pub flushing_period: i64,
pub max_ttl: u64,
pub ttl_ratio: u64,
pub max_counters: usize,
pub response_timeout: u64,
Expand Down
23 changes: 1 addition & 22 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ use limitador::storage::disk::DiskStorage;
use limitador::storage::infinispan::{Consistency, InfinispanStorageBuilder};
use limitador::storage::redis::{
AsyncRedisStorage, CachedRedisStorage, CachedRedisStorageBuilder, DEFAULT_FLUSHING_PERIOD_SEC,
DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC, DEFAULT_RESPONSE_TIMEOUT_MS,
DEFAULT_TTL_RATIO_CACHED_COUNTERS,
DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_RESPONSE_TIMEOUT_MS, DEFAULT_TTL_RATIO_CACHED_COUNTERS,
};
use limitador::storage::{AsyncCounterStorage, AsyncStorage, Storage};
use limitador::{
Expand Down Expand Up @@ -134,7 +133,6 @@ impl Limiter {

let cached_redis_storage = CachedRedisStorageBuilder::new(redis_url)
.flushing_period(Duration::from_millis(cache_cfg.flushing_period as u64))
.max_ttl_cached_counters(Duration::from_millis(cache_cfg.max_ttl))
.ttl_ratio_cached_counters(cache_cfg.ttl_ratio)
.max_cached_counters(cache_cfg.max_counters)
.response_timeout(Duration::from_millis(cache_cfg.response_timeout));
Expand Down Expand Up @@ -591,18 +589,6 @@ fn create_config() -> (Configuration, &'static str) {
.about("Uses Redis to store counters, with an in-memory cache")
.display_order(4)
.arg(redis_url_arg)
.arg(
Arg::new("TTL")
.long("ttl")
.action(ArgAction::Set)
.value_parser(clap::value_parser!(u64))
.default_value(
config::env::REDIS_LOCAL_CACHE_MAX_TTL_CACHED_COUNTERS_MS
.unwrap_or(leak(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC * 1000)),
)
.display_order(2)
.help("TTL for cached counters in milliseconds"),
)
.arg(
Arg::new("ratio")
.long("ratio")
Expand Down Expand Up @@ -748,7 +734,6 @@ fn create_config() -> (Configuration, &'static str) {
url: sub.get_one::<String>("URL").unwrap().to_owned(),
cache: Some(RedisStorageCacheConfiguration {
flushing_period: *sub.get_one("flush").unwrap(),
max_ttl: *sub.get_one("TTL").unwrap(),
ttl_ratio: *sub.get_one("ratio").unwrap(),
max_counters: *sub.get_one("max").unwrap(),
response_timeout: *sub.get_one("timeout").unwrap(),
Expand Down Expand Up @@ -832,12 +817,6 @@ fn storage_config_from_env() -> Result<StorageConfiguration, ()> {
.unwrap_or_else(|_| (DEFAULT_FLUSHING_PERIOD_SEC * 1000).to_string())
.parse()
.expect("Expected an i64"),
max_ttl: env::var("REDIS_LOCAL_CACHE_MAX_TTL_CACHED_COUNTERS_MS")
.unwrap_or_else(|_| {
(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC * 1000).to_string()
})
.parse()
.expect("Expected an u64"),
ttl_ratio: env::var("REDIS_LOCAL_CACHE_TTL_RATIO_CACHED_COUNTERS")
.unwrap_or_else(|_| DEFAULT_TTL_RATIO_CACHED_COUNTERS.to_string())
.parse()
Expand Down
14 changes: 4 additions & 10 deletions limitador/src/storage/atomic_expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ impl AtomicExpiringValue {
}

#[allow(dead_code)]
pub fn add_and_set_expiry(&self, delta: i64, expiry: Duration) -> i64 {
self.expiry.update(expiry);
pub fn add_and_set_expiry(&self, delta: i64, expire_at: SystemTime) -> i64 {
self.expiry.update(expire_at);
self.value.fetch_add(delta, Ordering::SeqCst) + delta
}

Expand All @@ -44,12 +44,6 @@ impl AtomicExpiringValue {
pub fn ttl(&self) -> Duration {
self.expiry.duration()
}

#[allow(dead_code)]
pub fn set(&self, value: i64, ttl: Duration) {
self.expiry.update(ttl);
self.value.store(value, Ordering::SeqCst);
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -90,9 +84,9 @@ impl AtomicExpiryTime {
}

#[allow(dead_code)]
pub fn update(&self, ttl: Duration) {
pub fn update(&self, expiry: SystemTime) {
self.expiry
.store(Self::since_epoch(SystemTime::now() + ttl), Ordering::SeqCst);
.store(Self::since_epoch(expiry), Ordering::SeqCst);
}

pub fn update_if_expired(&self, ttl: u64, when: SystemTime) -> bool {
Expand Down
109 changes: 26 additions & 83 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use crate::counter::Counter;
use crate::storage::atomic_expiring_value::AtomicExpiringValue;
use crate::storage::redis::{
DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC,
DEFAULT_TTL_RATIO_CACHED_COUNTERS,
};
use crate::storage::redis::{DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_TTL_RATIO_CACHED_COUNTERS};
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use moka::sync::Cache;
Expand Down Expand Up @@ -45,8 +42,8 @@ impl CachedCounterValue {
}
}

pub fn add_from_authority(&self, delta: i64, expiry: Duration) {
self.value.add_and_set_expiry(delta, expiry);
pub fn add_from_authority(&self, delta: i64, expire_at: SystemTime) {
self.value.add_and_set_expiry(delta, expire_at);
self.initial_value.fetch_add(delta, Ordering::SeqCst);
self.from_authority.store(true, Ordering::Release);
}
Expand Down Expand Up @@ -222,7 +219,6 @@ impl Default for Batcher {
}

pub struct CountersCache {
max_ttl_cached_counters: Duration,
pub ttl_ratio_cached_counters: u64,
cache: Cache<Counter, Arc<CachedCounterValue>>,
batcher: Batcher,
Expand Down Expand Up @@ -250,30 +246,24 @@ impl CountersCache {
counter: Counter,
redis_val: i64,
remote_deltas: i64,
redis_ttl_ms: i64,
ttl_margin: Duration,
redis_expiry: i64,
) -> Arc<CachedCounterValue> {
let cache_ttl = self.ttl_from_redis_ttl(
redis_ttl_ms,
counter.seconds(),
redis_val,
counter.max_value(),
);
if let Some(ttl) = cache_ttl.checked_sub(ttl_margin) {
if ttl > Duration::ZERO {
if redis_expiry > 0 {
let expiry_ts = SystemTime::UNIX_EPOCH + Duration::from_millis(redis_expiry as u64);
if expiry_ts > SystemTime::now() {
let mut from_cache = true;
let cached = self.cache.get_with(counter.clone(), || {
from_cache = false;
if let Some(entry) = self.batcher.updates.get(&counter) {
let cached_value = entry.value();
cached_value.add_from_authority(remote_deltas, ttl);
cached_value.add_from_authority(remote_deltas, expiry_ts);
cached_value.clone()
} else {
Arc::new(CachedCounterValue::from_authority(&counter, redis_val))
}
});
if from_cache {
cached.add_from_authority(remote_deltas, ttl);
cached.add_from_authority(remote_deltas, expiry_ts);
}
return cached;
}
Expand All @@ -294,63 +284,17 @@ impl CountersCache {
val.delta(counter, delta);
self.batcher.add(counter.clone(), val.clone());
}

fn ttl_from_redis_ttl(
&self,
redis_ttl_ms: i64,
counter_seconds: u64,
counter_val: i64,
counter_max: i64,
) -> Duration {
// Redis returns -2 when the key does not exist. Ref:
// https://redis.io/commands/ttl
// This function returns a ttl of the given counter seconds in this
// case.

let counter_ttl = if redis_ttl_ms >= 0 {
Duration::from_millis(redis_ttl_ms as u64)
} else {
Duration::from_secs(counter_seconds)
};

// If a counter is already at counter_max, we can cache it for as long as its TTL
// is in Redis. This does not depend on the requests received by other
// instances of Limitador. No matter what they do, we know that the
// counter is not going to recover its quota until it expires in Redis.
if counter_val >= counter_max {
return counter_ttl;
}

// Expire the counter in the cache before it expires in Redis.
// There might be several Limitador instances updating the Redis
// counter. The tradeoff is as follows: the shorter the TTL in the
// cache, the sooner we'll take into account those updates coming from
// other instances. If the TTL in the cache is long, there will be less
// accesses to Redis, so latencies will be better. However, it'll be
// easier to go over the limits defined, because not taking into account
// updates from other Limitador instances.
let mut res =
Duration::from_millis(counter_ttl.as_millis() as u64 / self.ttl_ratio_cached_counters);

if res > self.max_ttl_cached_counters {
res = self.max_ttl_cached_counters;
}

res
}
}

pub struct CountersCacheBuilder {
max_cached_counters: usize,
max_ttl_cached_counters: Duration,
ttl_ratio_cached_counters: u64,
}

impl CountersCacheBuilder {
pub fn new() -> Self {
Self {
max_cached_counters: DEFAULT_MAX_CACHED_COUNTERS,
max_ttl_cached_counters: Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC),
ttl_ratio_cached_counters: DEFAULT_TTL_RATIO_CACHED_COUNTERS,
}
}
Expand All @@ -360,19 +304,13 @@ impl CountersCacheBuilder {
self
}

pub fn max_ttl_cached_counter(mut self, max_ttl_cached_counter: Duration) -> Self {
self.max_ttl_cached_counters = max_ttl_cached_counter;
self
}

pub fn ttl_ratio_cached_counter(mut self, ttl_ratio_cached_counter: u64) -> Self {
self.ttl_ratio_cached_counters = ttl_ratio_cached_counter;
self
}

pub fn build(&self, period: Duration) -> CountersCache {
CountersCache {
max_ttl_cached_counters: self.max_ttl_cached_counters,
ttl_ratio_cached_counters: self.ttl_ratio_cached_counters,
cache: Cache::new(self.max_cached_counters as u64),
batcher: Batcher::new(period),
Expand All @@ -385,12 +323,14 @@ mod tests {
use super::*;
use crate::limit::Limit;
use std::collections::HashMap;
use std::ops::Add;
use std::time::UNIX_EPOCH;

mod cached_counter_value {
use crate::storage::redis::counters_cache::tests::test_counter;
use crate::storage::redis::counters_cache::CachedCounterValue;
use std::ops::Not;
use std::time::Duration;
use std::ops::{Add, Not};
use std::time::{Duration, SystemTime};

#[test]
fn records_pending_writes() {
Expand Down Expand Up @@ -426,7 +366,7 @@ mod tests {
let value = CachedCounterValue::from_authority(&counter, 0);
value.delta(&counter, 5);
assert!(value.no_pending_writes().not());
value.add_from_authority(6, Duration::from_secs(1));
value.add_from_authority(6, SystemTime::now().add(Duration::from_secs(1)));
assert!(value.no_pending_writes().not());
assert_eq!(value.pending_writes(), Ok(5));
}
Expand Down Expand Up @@ -592,7 +532,10 @@ mod tests {
let counter = test_counter(10, None);

let cache = CountersCacheBuilder::new().build(Duration::default());
cache.apply_remote_delta(counter.clone(), 10, 0, 10, Duration::from_secs(0));
cache.apply_remote_delta(counter.clone(), 10, 0, SystemTime::now()
.add(Duration::from_secs(1))
.duration_since(UNIX_EPOCH)
.unwrap().as_micros() as i64);

assert!(cache.get(&counter).is_some());
}
Expand All @@ -613,13 +556,10 @@ mod tests {
let counter = test_counter(max_val, None);

let cache = CountersCacheBuilder::new().build(Duration::default());
cache.apply_remote_delta(
counter.clone(),
current_value,
0,
10,
Duration::from_secs(0),
);
cache.apply_remote_delta(counter.clone(), current_value, 0, SystemTime::now()
.add(Duration::from_secs(1))
.duration_since(UNIX_EPOCH)
.unwrap().as_micros() as i64);

assert_eq!(
cache.get(&counter).map(|e| e.hits(&counter)).unwrap(),
Expand All @@ -634,7 +574,10 @@ mod tests {
let counter = test_counter(current_val, None);

let cache = CountersCacheBuilder::new().build(Duration::default());
cache.apply_remote_delta(counter.clone(), current_val, 0, 10, Duration::from_secs(0));
cache.apply_remote_delta(counter.clone(), current_val, 0, SystemTime::now()
.add(Duration::from_secs(1))
.duration_since(UNIX_EPOCH)
.unwrap().as_micros() as i64);
cache.increase_by(&counter, increase_by);

assert_eq!(
Expand Down
1 change: 0 additions & 1 deletion limitador/src/storage/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ mod scripts;

pub const DEFAULT_FLUSHING_PERIOD_SEC: u64 = 1;
pub const DEFAULT_MAX_CACHED_COUNTERS: usize = 10000;
pub const DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC: u64 = 5;
pub const DEFAULT_TTL_RATIO_CACHED_COUNTERS: u64 = 10;
pub const DEFAULT_RESPONSE_TIMEOUT_MS: u64 = 350;

Expand Down
Loading

0 comments on commit 8767d28

Please sign in to comment.