diff --git a/Cargo.lock b/Cargo.lock index c78a0b45..21708c2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -434,6 +434,12 @@ version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" +[[package]] +name = "bytecount" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" + [[package]] name = "byteorder" version = "1.4.3" @@ -466,6 +472,37 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "camino" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo-platform" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cfa25e60aea747ec7e1124f238816749faa93759c6ff5b31f1ccdda137f4479" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] + [[package]] name = "cast" version = "0.3.0" @@ -809,6 +846,15 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "fastrand" version = "1.9.0" @@ -1373,6 +1419,7 @@ dependencies = [ "futures", "infinispan", "lazy_static", + "moka", "paste", "postcard", "prometheus", @@ -1474,6 +1521,15 @@ dependencies = [ "libc", ] +[[package]] +name = "mach2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +dependencies = [ + "libc", +] + [[package]] name = "matchit" version = "0.7.0" @@ -1528,6 +1584,28 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "moka" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206bf83f415b0579fd885fe0804eb828e727636657dc1bf73d80d2f1218e14a1" +dependencies = [ + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "once_cell", + "parking_lot", + "quanta", + "rustc_version", + "scheduled-thread-pool", + "skeptic", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + [[package]] name = "multimap" version = "0.8.3" @@ -1998,6 +2076,33 @@ version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +[[package]] +name = "pulldown-cmark" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a1a2f1f0a7ecff9c31abbe177637be0e97a0aef46cf8738ece09327985d998" +dependencies = [ + "bitflags", + "memchr", + "unicase", +] + +[[package]] +name = "quanta" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" +dependencies = [ + "crossbeam-utils", + "libc", + "mach2", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.28" @@ -2048,6 +2153,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags", +] + [[package]] name = "rayon" version = "1.7.0" @@ -2298,6 +2412,9 @@ name = "semver" version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" +dependencies = [ + "serde", +] [[package]] name = "serde" @@ -2434,6 +2551,21 @@ dependencies = [ "libc", ] +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", +] + [[package]] name = "slab" version = "0.4.8" @@ -2527,6 +2659,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tempfile" version = "3.5.0" @@ -2823,6 +2961,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "triomphe" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee8098afad3fb0c54a9007aab6804558410503ad676d4633f9c2559a00ac0f" + [[package]] name = "try-lock" version = "0.2.4" @@ -2844,6 +2988,15 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.11" @@ -2888,6 +3041,15 @@ version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8db7427f936968176eaa7cdf81b7f98b980b18495ec28f1b5791ac3bfe3eea9" +[[package]] +name = "uuid" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" +dependencies = [ + "getrandom", +] + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/limitador/Cargo.toml b/limitador/Cargo.toml index 26ee5c68..b0e6fc26 100644 --- a/limitador/Cargo.toml +++ b/limitador/Cargo.toml @@ -21,6 +21,7 @@ infinispan_storage = ["infinispan", "reqwest", "base64"] lenient_conditions = [] [dependencies] +moka = "0.11.2" ttl_cache = "0.5" serde = { version = "1", features = ["derive"] } postcard = { version = "1.0.4", features = ["use-std"] } diff --git a/limitador/src/counter.rs b/limitador/src/counter.rs index c1bcf889..d00dd59d 100644 --- a/limitador/src/counter.rs +++ b/limitador/src/counter.rs @@ -87,6 +87,10 @@ impl Counter { self.expires_in = Some(duration) } + pub fn is_qualified(&self) -> bool { + !self.set_variables.is_empty() + } + #[cfg(feature = "disk_storage")] pub(crate) fn variables_for_key(&self) -> Vec<(&str, &str)> { let mut variables = Vec::with_capacity(self.set_variables.len()); diff --git a/limitador/src/storage/in_memory.rs b/limitador/src/storage/in_memory.rs index 81bdfdb8..8b1cd080 100644 --- a/limitador/src/storage/in_memory.rs +++ b/limitador/src/storage/in_memory.rs @@ -2,58 +2,18 @@ use crate::counter::Counter; use crate::limit::{Limit, Namespace}; use crate::storage::atomic_expiring_value::AtomicExpiringValue; use crate::storage::{Authorization, CounterStorage, StorageErr}; +use moka::sync::Cache; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; -use std::hash::{Hash, Hasher}; -use std::sync::RwLock; +use std::ops::Deref; +use std::sync::{Arc, RwLock}; use std::time::{Duration, SystemTime}; -#[derive(Eq, Clone)] -struct CounterKey { - set_variables: HashMap, -} - -impl CounterKey { - fn to_counter(&self, limit: &Limit) -> Counter { - Counter::new(limit.clone(), self.set_variables.clone()) - } -} - -impl From<&Counter> for CounterKey { - fn from(counter: &Counter) -> Self { - CounterKey { - set_variables: counter.set_variables().clone(), - } - } -} - -impl From<&mut Counter> for CounterKey { - fn from(counter: &mut Counter) -> Self { - CounterKey { - set_variables: counter.set_variables().clone(), - } - } -} - -impl Hash for CounterKey { - fn hash(&self, state: &mut H) { - self.set_variables.iter().for_each(|(k, v)| { - k.hash(state); - v.hash(state); - }); - } -} - -impl PartialEq for CounterKey { - fn eq(&self, other: &Self) -> bool { - self.set_variables == other.set_variables - } -} - -type NamespacedLimitCounters = HashMap>>; +type NamespacedLimitCounters = HashMap>; pub struct InMemoryStorage { limits_for_namespace: RwLock>, + qualified_counters: Cache>, } impl CounterStorage for InMemoryStorage { @@ -61,36 +21,59 @@ impl CounterStorage for InMemoryStorage { let limits_by_namespace = self.limits_for_namespace.read().unwrap(); let mut value = 0; - if let Some(limits) = limits_by_namespace.get(counter.limit().namespace()) { - if let Some(counters) = limits.get(counter.limit()) { - if let Some(expiring_value) = counters.get(&counter.into()) { - value = expiring_value.value(); - } + + if counter.is_qualified() { + if let Some(counter) = self.qualified_counters.get(counter) { + value = counter.value(); + } + } else if let Some(limits) = limits_by_namespace.get(counter.limit().namespace()) { + if let Some(counter) = limits.get(counter.limit()) { + value = counter.value(); } } + Ok(counter.max_value() >= value + delta) } fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let mut limits_by_namespace = self.limits_for_namespace.write().unwrap(); - match limits_by_namespace.entry(counter.limit().namespace().clone()) { - Entry::Vacant(v) => { - let mut limits = HashMap::new(); - let mut counters = HashMap::new(); - self.insert_or_update_counter(&mut counters, counter, delta); - limits.insert(counter.limit().clone(), counters); - v.insert(limits); - } - Entry::Occupied(mut o) => match o.get_mut().entry(counter.limit().clone()) { + let now = SystemTime::now(); + if counter.is_qualified() { + let value = match self.qualified_counters.get(counter) { + None => self.qualified_counters.get_with(counter.clone(), || { + Arc::new(AtomicExpiringValue::new( + 0, + now + Duration::from_secs(counter.seconds()), + )) + }), + Some(counter) => counter, + }; + value.update(delta, counter.seconds(), now); + } else { + match limits_by_namespace.entry(counter.limit().namespace().clone()) { Entry::Vacant(v) => { - let mut counters = HashMap::new(); - self.insert_or_update_counter(&mut counters, counter, delta); - v.insert(counters); + let mut limits = HashMap::new(); + limits.insert( + counter.limit().clone(), + AtomicExpiringValue::new( + delta, + now + Duration::from_secs(counter.seconds()), + ), + ); + v.insert(limits); } - Entry::Occupied(mut o) => { - self.insert_or_update_counter(o.get_mut(), counter, delta); - } - }, + Entry::Occupied(mut o) => match o.get_mut().entry(counter.limit().clone()) { + Entry::Vacant(v) => { + v.insert(AtomicExpiringValue::new( + delta, + now + Duration::from_secs(counter.seconds()), + )); + } + Entry::Occupied(o) => { + o.get().update(delta, counter.seconds(), now); + } + }, + } } Ok(()) } @@ -104,6 +87,8 @@ impl CounterStorage for InMemoryStorage { let mut limits_by_namespace = self.limits_for_namespace.write().unwrap(); let mut first_limited = None; let mut counter_values_to_update: Vec<(&AtomicExpiringValue, u64)> = Vec::new(); + let mut qualified_counter_values_to_updated: Vec<(Arc, u64)> = + Vec::new(); let now = SystemTime::now(); let mut process_counter = @@ -126,22 +111,19 @@ impl CounterStorage for InMemoryStorage { }; // Normalize counters and values - for counter in counters.iter() { + for counter in counters.iter().filter(|c| !c.is_qualified()) { limits_by_namespace .entry(counter.limit().namespace().clone()) .or_insert_with(HashMap::new) .entry(counter.limit().clone()) - .or_insert_with(HashMap::new) - .entry(counter.into()) .or_insert_with(AtomicExpiringValue::default); } - // Process counters - for counter in counters.iter_mut() { + // Process simple counters + for counter in counters.iter_mut().filter(|c| !c.is_qualified()) { let atomic_expiring_value: &AtomicExpiringValue = limits_by_namespace .get(counter.limit().namespace()) .and_then(|limits| limits.get(counter.limit())) - .and_then(|counters| counters.get(&counter.into())) .unwrap(); if let Some(limited) = process_counter(counter, atomic_expiring_value.value(), delta) { @@ -149,10 +131,30 @@ impl CounterStorage for InMemoryStorage { return Ok(limited); } } - counter_values_to_update.push((atomic_expiring_value, counter.seconds())); } + // Process qualified counters + for counter in counters.iter_mut().filter(|c| c.is_qualified()) { + let value = match self.qualified_counters.get(counter) { + None => self.qualified_counters.get_with(counter.clone(), || { + Arc::new(AtomicExpiringValue::new( + 0, + now + Duration::from_secs(counter.seconds()), + )) + }), + Some(counter) => counter, + }; + + if let Some(limited) = process_counter(counter, value.value(), delta) { + if !load_counters { + return Ok(limited); + } + } + + qualified_counter_values_to_updated.push((value, counter.seconds())); + } + if let Some(limited) = first_limited { return Ok(limited); } @@ -161,6 +163,11 @@ impl CounterStorage for InMemoryStorage { counter_values_to_update.iter().for_each(|(v, ttl)| { v.update(delta, *ttl, now); }); + qualified_counter_values_to_updated + .iter() + .for_each(|(v, ttl)| { + v.update(delta, *ttl, now); + }); Ok(Authorization::Ok) } @@ -189,6 +196,19 @@ impl CounterStorage for InMemoryStorage { } } } + + for (counter, expiring_value) in self.qualified_counters.iter() { + if limits.contains(counter.limit()) { + let mut counter_with_val = counter.deref().clone(); + counter_with_val + .set_remaining(counter_with_val.max_value() - expiring_value.value()); + counter_with_val.set_expires_in(expiring_value.ttl()); + if counter_with_val.expires_in().unwrap() > Duration::ZERO { + res.insert(counter_with_val); + } + } + } + Ok(res) } @@ -209,6 +229,7 @@ impl InMemoryStorage { pub fn new() -> Self { Self { limits_for_namespace: RwLock::new(HashMap::new()), + qualified_counters: Cache::new(1000), } } @@ -219,10 +240,17 @@ impl InMemoryStorage { let mut res: HashMap = HashMap::new(); if let Some(counters_by_limit) = self.limits_for_namespace.read().unwrap().get(namespace) { - for (limit, values) in counters_by_limit { - for (counter_key, expiring_value) in values { - res.insert(counter_key.to_counter(limit), expiring_value.clone()); - } + for (limit, value) in counters_by_limit { + res.insert( + Counter::new(limit.clone(), HashMap::default()), + value.clone(), + ); + } + } + + for (counter, value) in self.qualified_counters.iter() { + if counter.namespace() == namespace { + res.insert(counter.deref().clone(), value.deref().clone()); } } @@ -240,26 +268,6 @@ impl InMemoryStorage { } } - fn insert_or_update_counter( - &self, - counters: &mut HashMap, - counter: &Counter, - delta: i64, - ) { - let now = SystemTime::now(); - match counters.entry(counter.into()) { - Entry::Vacant(v) => { - v.insert(AtomicExpiringValue::new( - delta, - now + Duration::from_secs(counter.seconds()), - )); - } - Entry::Occupied(o) => { - o.get().update(delta, counter.seconds(), now); - } - } - } - fn counter_is_within_limits(counter: &Counter, current_val: Option<&i64>, delta: i64) -> bool { match current_val { Some(current_val) => current_val + delta <= counter.max_value(),