From 1211d697962c5520a4246d729615417e3999c50c Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Fri, 28 Jul 2023 09:37:31 -0400 Subject: [PATCH 1/4] Store qualified counters in dedicated capacity limited cache --- Cargo.lock | 241 ++++++++++++++++++++++++----- limitador/Cargo.toml | 1 + limitador/src/counter.rs | 4 + limitador/src/storage/in_memory.rs | 202 ++++++++++++------------ 4 files changed, 311 insertions(+), 137 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 09271137..68b88d96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -65,7 +65,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] @@ -275,9 +275,9 @@ checksum = "3a30da5c5f2d5e72842e00bcb57657162cdabef0931f40e2deb9b4140440cecd" [[package]] name = "anstyle-parse" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e765fd216e48e067936442276d1d57399e37bce53c264d6fefbe298080cb57ee" +checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333" dependencies = [ "utf8parse", ] @@ -321,7 +321,7 @@ checksum = "cc6dde6e4ed435a4c1ee4e73592f5ba9da2151af10076cc04858746af9352d09" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] @@ -341,9 +341,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.6.19" +version = "0.6.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6a1de45611fdb535bfde7b7de4fd54f4fd2b17b1737c0a59b69bf9b92074b8c" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", "axum-core", @@ -429,7 +429,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] @@ -480,6 +480,12 @@ version = "3.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" +[[package]] +name = "bytecount" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" + [[package]] name = "byteorder" version = "1.4.3" @@ -512,6 +518,37 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "camino" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo-platform" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cfa25e60aea747ec7e1124f238816749faa93759c6ff5b31f1ccdda137f4479" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] + [[package]] name = "cast" version = "0.3.0" @@ -520,11 +557,12 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.79" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" +checksum = "6c6b2562119bf28c3439f7f02db99faf0aa1a8cdfe5772a2ee155d32227239f0" dependencies = [ "jobserver", + "libc", ] [[package]] @@ -812,6 +850,12 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "deranged" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7684a49fb1af197853ef7b2ee694bc1f5b4179556f1e5710e1760c5db6f5e929" + [[package]] name = "derive_more" version = "0.99.17" @@ -871,9 +915,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bcfec3a70f97c962c307b2d2c56e358cf1d00b558d74262b5f929ee8cc7e73a" +checksum = "6b30f669a7961ef1631673d2766cc92f52d64f7ef354d4fe0ddfd30ed52f0f4f" dependencies = [ "errno-dragonfly", "libc", @@ -890,6 +934,15 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "fastrand" version = "2.0.0" @@ -1019,7 +1072,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] @@ -1453,6 +1506,7 @@ dependencies = [ "futures", "infinispan", "lazy_static", + "moka", "paste", "postcard", "prometheus", @@ -1504,9 +1558,9 @@ checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" [[package]] name = "linux-raw-sys" -version = "0.4.3" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0" +checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503" [[package]] name = "local-channel" @@ -1552,11 +1606,20 @@ dependencies = [ "libc", ] +[[package]] +name = "mach2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +dependencies = [ + "libc", +] + [[package]] name = "matchit" -version = "0.7.0" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" +checksum = "ed1202b2a6f884ae56f04cff409ab315c5ce26b5e58d7412e484f01fd52f52ef" [[package]] name = "memchr" @@ -1616,6 +1679,28 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "moka" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206bf83f415b0579fd885fe0804eb828e727636657dc1bf73d80d2f1218e14a1" +dependencies = [ + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "once_cell", + "parking_lot", + "quanta", + "rustc_version", + "scheduled-thread-pool", + "skeptic", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + [[package]] name = "multimap" version = "0.8.3" @@ -1731,7 +1816,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] @@ -1893,7 +1978,7 @@ checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] @@ -1976,7 +2061,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c64d9ba0963cdcea2e1b2230fbae2bab30eb25a174be395c41e764bfb65dd62" dependencies = [ "proc-macro2", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] @@ -2087,6 +2172,33 @@ version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +[[package]] +name = "pulldown-cmark" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a1a2f1f0a7ecff9c31abbe177637be0e97a0aef46cf8738ece09327985d998" +dependencies = [ + "bitflags 1.3.2", + "memchr", + "unicase", +] + +[[package]] +name = "quanta" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" +dependencies = [ + "crossbeam-utils", + "libc", + "mach2", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.32" @@ -2137,6 +2249,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "rayon" version = "1.7.0" @@ -2215,9 +2336,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39354c10dd07468c2e73926b23bb9c2caca74c5501e38a35da70406f1d923310" +checksum = "b7b6d6190b7594385f61bd3911cd1be99dfddcfc365a4160cc2ab5bff4aed294" dependencies = [ "aho-corasick", "memchr", @@ -2322,9 +2443,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.4" +version = "0.38.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5" +checksum = "1ee020b1716f0a80e2ace9b03441a749e402e86712f15f16fe8a8f75afac732f" dependencies = [ "bitflags 2.3.3", "errno", @@ -2406,25 +2527,28 @@ name = "semver" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" +dependencies = [ + "serde", +] [[package]] name = "serde" -version = "1.0.179" +version = "1.0.181" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a5bf42b8d227d4abf38a1ddb08602e229108a517cd4e5bb28f9c7eaafdce5c0" +checksum = "6d3e73c93c3240c0bda063c239298e633114c69a888c3e37ca8bb33f343e9890" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.179" +version = "1.0.181" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "741e124f5485c7e60c03b043f79f320bff3527f4bbf12cf3831750dc46a0ec2c" +checksum = "be02f6cb0cd3a5ec20bbcfbcbd749f57daddb1a0882dc2e46a6c236c90b977ed" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] @@ -2485,7 +2609,7 @@ checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] @@ -2520,6 +2644,21 @@ dependencies = [ "libc", ] +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", +] + [[package]] name = "slab" version = "0.4.8" @@ -2598,9 +2737,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.27" +version = "2.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b60f673f44a8255b9c8c657daf66a596d435f2da81a555b06dc644d080ba45e0" +checksum = "04361975b3f5e348b2189d8dc55bc942f278b2d482a6a0365de5bdd62d351567" dependencies = [ "proc-macro2", "quote", @@ -2613,6 +2752,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tempfile" version = "3.7.0" @@ -2652,15 +2797,16 @@ checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] name = "time" -version = "0.3.23" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59e399c068f43a5d116fedaf73b203fa4f9c519f17e2b34f63221d3792f81446" +checksum = "b0fdd63d58b18d663fbdf70e049f00a22c8e42be082203be7f26589213cd75ea" dependencies = [ + "deranged", "itoa", "serde", "time-core", @@ -2675,9 +2821,9 @@ checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" [[package]] name = "time-macros" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96ba15a897f3c86766b757e5ac7221554c6750054d74d5b28844fce5fb36a6c4" +checksum = "eb71511c991639bb078fd5bf97757e03914361c48100d52878b8e52b46fb92cd" dependencies = [ "time-core", ] @@ -2745,7 +2891,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] @@ -2888,7 +3034,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] @@ -2900,6 +3046,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "triomphe" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee8098afad3fb0c54a9007aab6804558410503ad676d4633f9c2559a00ac0f" + [[package]] name = "try-lock" version = "0.2.4" @@ -2986,6 +3138,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "uuid" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" +dependencies = [ + "getrandom", +] + [[package]] name = "vcpkg" version = "0.2.15" @@ -3044,7 +3205,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", "wasm-bindgen-shared", ] @@ -3078,7 +3239,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/limitador/Cargo.toml b/limitador/Cargo.toml index 8d81dea3..3edb6bb4 100644 --- a/limitador/Cargo.toml +++ b/limitador/Cargo.toml @@ -21,6 +21,7 @@ infinispan_storage = ["infinispan", "reqwest", "base64"] lenient_conditions = [] [dependencies] +moka = "0.11.2" ttl_cache = "0.5" serde = { version = "1", features = ["derive"] } postcard = { version = "1.0.4", features = ["use-std"] } diff --git a/limitador/src/counter.rs b/limitador/src/counter.rs index c1bcf889..d00dd59d 100644 --- a/limitador/src/counter.rs +++ b/limitador/src/counter.rs @@ -87,6 +87,10 @@ impl Counter { self.expires_in = Some(duration) } + pub fn is_qualified(&self) -> bool { + !self.set_variables.is_empty() + } + #[cfg(feature = "disk_storage")] pub(crate) fn variables_for_key(&self) -> Vec<(&str, &str)> { let mut variables = Vec::with_capacity(self.set_variables.len()); diff --git a/limitador/src/storage/in_memory.rs b/limitador/src/storage/in_memory.rs index 81bdfdb8..8b1cd080 100644 --- a/limitador/src/storage/in_memory.rs +++ b/limitador/src/storage/in_memory.rs @@ -2,58 +2,18 @@ use crate::counter::Counter; use crate::limit::{Limit, Namespace}; use crate::storage::atomic_expiring_value::AtomicExpiringValue; use crate::storage::{Authorization, CounterStorage, StorageErr}; +use moka::sync::Cache; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; -use std::hash::{Hash, Hasher}; -use std::sync::RwLock; +use std::ops::Deref; +use std::sync::{Arc, RwLock}; use std::time::{Duration, SystemTime}; -#[derive(Eq, Clone)] -struct CounterKey { - set_variables: HashMap, -} - -impl CounterKey { - fn to_counter(&self, limit: &Limit) -> Counter { - Counter::new(limit.clone(), self.set_variables.clone()) - } -} - -impl From<&Counter> for CounterKey { - fn from(counter: &Counter) -> Self { - CounterKey { - set_variables: counter.set_variables().clone(), - } - } -} - -impl From<&mut Counter> for CounterKey { - fn from(counter: &mut Counter) -> Self { - CounterKey { - set_variables: counter.set_variables().clone(), - } - } -} - -impl Hash for CounterKey { - fn hash(&self, state: &mut H) { - self.set_variables.iter().for_each(|(k, v)| { - k.hash(state); - v.hash(state); - }); - } -} - -impl PartialEq for CounterKey { - fn eq(&self, other: &Self) -> bool { - self.set_variables == other.set_variables - } -} - -type NamespacedLimitCounters = HashMap>>; +type NamespacedLimitCounters = HashMap>; pub struct InMemoryStorage { limits_for_namespace: RwLock>, + qualified_counters: Cache>, } impl CounterStorage for InMemoryStorage { @@ -61,36 +21,59 @@ impl CounterStorage for InMemoryStorage { let limits_by_namespace = self.limits_for_namespace.read().unwrap(); let mut value = 0; - if let Some(limits) = limits_by_namespace.get(counter.limit().namespace()) { - if let Some(counters) = limits.get(counter.limit()) { - if let Some(expiring_value) = counters.get(&counter.into()) { - value = expiring_value.value(); - } + + if counter.is_qualified() { + if let Some(counter) = self.qualified_counters.get(counter) { + value = counter.value(); + } + } else if let Some(limits) = limits_by_namespace.get(counter.limit().namespace()) { + if let Some(counter) = limits.get(counter.limit()) { + value = counter.value(); } } + Ok(counter.max_value() >= value + delta) } fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let mut limits_by_namespace = self.limits_for_namespace.write().unwrap(); - match limits_by_namespace.entry(counter.limit().namespace().clone()) { - Entry::Vacant(v) => { - let mut limits = HashMap::new(); - let mut counters = HashMap::new(); - self.insert_or_update_counter(&mut counters, counter, delta); - limits.insert(counter.limit().clone(), counters); - v.insert(limits); - } - Entry::Occupied(mut o) => match o.get_mut().entry(counter.limit().clone()) { + let now = SystemTime::now(); + if counter.is_qualified() { + let value = match self.qualified_counters.get(counter) { + None => self.qualified_counters.get_with(counter.clone(), || { + Arc::new(AtomicExpiringValue::new( + 0, + now + Duration::from_secs(counter.seconds()), + )) + }), + Some(counter) => counter, + }; + value.update(delta, counter.seconds(), now); + } else { + match limits_by_namespace.entry(counter.limit().namespace().clone()) { Entry::Vacant(v) => { - let mut counters = HashMap::new(); - self.insert_or_update_counter(&mut counters, counter, delta); - v.insert(counters); + let mut limits = HashMap::new(); + limits.insert( + counter.limit().clone(), + AtomicExpiringValue::new( + delta, + now + Duration::from_secs(counter.seconds()), + ), + ); + v.insert(limits); } - Entry::Occupied(mut o) => { - self.insert_or_update_counter(o.get_mut(), counter, delta); - } - }, + Entry::Occupied(mut o) => match o.get_mut().entry(counter.limit().clone()) { + Entry::Vacant(v) => { + v.insert(AtomicExpiringValue::new( + delta, + now + Duration::from_secs(counter.seconds()), + )); + } + Entry::Occupied(o) => { + o.get().update(delta, counter.seconds(), now); + } + }, + } } Ok(()) } @@ -104,6 +87,8 @@ impl CounterStorage for InMemoryStorage { let mut limits_by_namespace = self.limits_for_namespace.write().unwrap(); let mut first_limited = None; let mut counter_values_to_update: Vec<(&AtomicExpiringValue, u64)> = Vec::new(); + let mut qualified_counter_values_to_updated: Vec<(Arc, u64)> = + Vec::new(); let now = SystemTime::now(); let mut process_counter = @@ -126,22 +111,19 @@ impl CounterStorage for InMemoryStorage { }; // Normalize counters and values - for counter in counters.iter() { + for counter in counters.iter().filter(|c| !c.is_qualified()) { limits_by_namespace .entry(counter.limit().namespace().clone()) .or_insert_with(HashMap::new) .entry(counter.limit().clone()) - .or_insert_with(HashMap::new) - .entry(counter.into()) .or_insert_with(AtomicExpiringValue::default); } - // Process counters - for counter in counters.iter_mut() { + // Process simple counters + for counter in counters.iter_mut().filter(|c| !c.is_qualified()) { let atomic_expiring_value: &AtomicExpiringValue = limits_by_namespace .get(counter.limit().namespace()) .and_then(|limits| limits.get(counter.limit())) - .and_then(|counters| counters.get(&counter.into())) .unwrap(); if let Some(limited) = process_counter(counter, atomic_expiring_value.value(), delta) { @@ -149,10 +131,30 @@ impl CounterStorage for InMemoryStorage { return Ok(limited); } } - counter_values_to_update.push((atomic_expiring_value, counter.seconds())); } + // Process qualified counters + for counter in counters.iter_mut().filter(|c| c.is_qualified()) { + let value = match self.qualified_counters.get(counter) { + None => self.qualified_counters.get_with(counter.clone(), || { + Arc::new(AtomicExpiringValue::new( + 0, + now + Duration::from_secs(counter.seconds()), + )) + }), + Some(counter) => counter, + }; + + if let Some(limited) = process_counter(counter, value.value(), delta) { + if !load_counters { + return Ok(limited); + } + } + + qualified_counter_values_to_updated.push((value, counter.seconds())); + } + if let Some(limited) = first_limited { return Ok(limited); } @@ -161,6 +163,11 @@ impl CounterStorage for InMemoryStorage { counter_values_to_update.iter().for_each(|(v, ttl)| { v.update(delta, *ttl, now); }); + qualified_counter_values_to_updated + .iter() + .for_each(|(v, ttl)| { + v.update(delta, *ttl, now); + }); Ok(Authorization::Ok) } @@ -189,6 +196,19 @@ impl CounterStorage for InMemoryStorage { } } } + + for (counter, expiring_value) in self.qualified_counters.iter() { + if limits.contains(counter.limit()) { + let mut counter_with_val = counter.deref().clone(); + counter_with_val + .set_remaining(counter_with_val.max_value() - expiring_value.value()); + counter_with_val.set_expires_in(expiring_value.ttl()); + if counter_with_val.expires_in().unwrap() > Duration::ZERO { + res.insert(counter_with_val); + } + } + } + Ok(res) } @@ -209,6 +229,7 @@ impl InMemoryStorage { pub fn new() -> Self { Self { limits_for_namespace: RwLock::new(HashMap::new()), + qualified_counters: Cache::new(1000), } } @@ -219,10 +240,17 @@ impl InMemoryStorage { let mut res: HashMap = HashMap::new(); if let Some(counters_by_limit) = self.limits_for_namespace.read().unwrap().get(namespace) { - for (limit, values) in counters_by_limit { - for (counter_key, expiring_value) in values { - res.insert(counter_key.to_counter(limit), expiring_value.clone()); - } + for (limit, value) in counters_by_limit { + res.insert( + Counter::new(limit.clone(), HashMap::default()), + value.clone(), + ); + } + } + + for (counter, value) in self.qualified_counters.iter() { + if counter.namespace() == namespace { + res.insert(counter.deref().clone(), value.deref().clone()); } } @@ -240,26 +268,6 @@ impl InMemoryStorage { } } - fn insert_or_update_counter( - &self, - counters: &mut HashMap, - counter: &Counter, - delta: i64, - ) { - let now = SystemTime::now(); - match counters.entry(counter.into()) { - Entry::Vacant(v) => { - v.insert(AtomicExpiringValue::new( - delta, - now + Duration::from_secs(counter.seconds()), - )); - } - Entry::Occupied(o) => { - o.get().update(delta, counter.seconds(), now); - } - } - } - fn counter_is_within_limits(counter: &Counter, current_val: Option<&i64>, delta: i64) -> bool { match current_val { Some(current_val) => current_val + delta <= counter.max_value(), From 5b0ff99650426546feb1fb8f517a36961603cbc6 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Fri, 4 Aug 2023 10:11:40 -0400 Subject: [PATCH 2/4] Add cmd line arg to set cache size --- Cargo.lock | 25 +++++++++ limitador-server/Cargo.toml | 1 + limitador-server/src/config.rs | 11 +++- limitador-server/src/envoy_rls/server.rs | 8 +-- limitador-server/src/main.rs | 52 +++++++++++++++---- limitador/src/lib.rs | 37 ++++++------- .../src/storage/atomic_expiring_value.rs | 7 +++ limitador/src/storage/in_memory.rs | 8 +-- limitador/src/storage/mod.rs | 10 +--- 9 files changed, 111 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 68b88d96..a25aa0eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1543,6 +1543,7 @@ dependencies = [ "prost-types", "serde", "serde_yaml", + "sysinfo", "thiserror", "tokio", "tonic", @@ -1753,6 +1754,15 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "num-traits" version = "0.2.16" @@ -2752,6 +2762,21 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sysinfo" +version = "0.29.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "165d6d8539689e3d3bc8b98ac59541e1f21c7de7c85d60dc80e43ae0ed2113db" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "winapi", +] + [[package]] name = "tagptr" version = "0.2.0" diff --git a/limitador-server/Cargo.toml b/limitador-server/Cargo.toml index 775477c9..2dbf90c5 100644 --- a/limitador-server/Cargo.toml +++ b/limitador-server/Cargo.toml @@ -36,6 +36,7 @@ notify = "6.0.1" const_format = "0.2.31" lazy_static = "1.4.0" clap = "4.3" +sysinfo = "0.29.7" [build-dependencies] tonic-build = "0.9.2" diff --git a/limitador-server/src/config.rs b/limitador-server/src/config.rs index bc78b4a6..3cbb7712 100644 --- a/limitador-server/src/config.rs +++ b/limitador-server/src/config.rs @@ -111,7 +111,9 @@ impl Default for Configuration { fn default() -> Self { Configuration { limits_file: "".to_string(), - storage: StorageConfiguration::InMemory, + storage: StorageConfiguration::InMemory(InMemoryStorageConfiguration { + cache_size: Some(10_000), + }), rls_host: "".to_string(), rls_port: 0, http_host: "".to_string(), @@ -125,13 +127,18 @@ impl Default for Configuration { #[derive(PartialEq, Eq, Debug)] pub enum StorageConfiguration { - InMemory, + InMemory(InMemoryStorageConfiguration), Disk(DiskStorageConfiguration), Redis(RedisStorageConfiguration), #[cfg(feature = "infinispan")] Infinispan(InfinispanStorageConfiguration), } +#[derive(PartialEq, Eq, Debug)] +pub struct InMemoryStorageConfiguration { + pub cache_size: Option, +} + #[derive(PartialEq, Eq, Debug)] pub struct DiskStorageConfiguration { pub path: String, diff --git a/limitador-server/src/envoy_rls/server.rs b/limitador-server/src/envoy_rls/server.rs index 3cf93275..48413c0d 100644 --- a/limitador-server/src/envoy_rls/server.rs +++ b/limitador-server/src/envoy_rls/server.rs @@ -246,7 +246,7 @@ mod tests { vec!["app_id"], ); - let limiter = RateLimiter::default(); + let limiter = RateLimiter::new(10_000); limiter.add_limit(limit); let rate_limiter = MyRateLimiter::new( @@ -366,7 +366,7 @@ mod tests { #[tokio::test] async fn test_takes_into_account_all_the_descriptors() { - let limiter = RateLimiter::default(); + let limiter = RateLimiter::new(10_000); let namespace = "test_namespace"; @@ -434,7 +434,7 @@ mod tests { let namespace = "test_namespace"; let limit = Limit::new(namespace, 10, 60, vec!["x == '1'"], vec!["y"]); - let limiter = RateLimiter::default(); + let limiter = RateLimiter::new(10_000); limiter.add_limit(limit); let rate_limiter = MyRateLimiter::new( @@ -499,7 +499,7 @@ mod tests { let namespace = "test_namespace"; let limit = Limit::new(namespace, 1, 60, vec!["x == '1'"], vec!["y"]); - let limiter = RateLimiter::default(); + let limiter = RateLimiter::new(10_000); limiter.add_limit(limit); let rate_limiter = MyRateLimiter::new( diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 81ddbf56..e30aaca6 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -7,14 +7,15 @@ extern crate clap; #[cfg(feature = "infinispan")] use crate::config::InfinispanStorageConfiguration; use crate::config::{ - Configuration, DiskStorageConfiguration, RedisStorageCacheConfiguration, - RedisStorageConfiguration, StorageConfiguration, + Configuration, DiskStorageConfiguration, InMemoryStorageConfiguration, + RedisStorageCacheConfiguration, RedisStorageConfiguration, StorageConfiguration, }; use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders}; use crate::http_api::server::run_http_server; use clap::{value_parser, Arg, ArgAction, Command}; use const_format::formatcp; use env_logger::Builder; +use limitador::counter::Counter; use limitador::errors::LimitadorError; use limitador::limit::Limit; use limitador::storage::disk::DiskStorage; @@ -38,6 +39,7 @@ use std::path::Path; use std::sync::Arc; use std::time::Duration; use std::{env, process}; +use sysinfo::{RefreshKind, System, SystemExt}; use thiserror::Error; use tokio::runtime::Handle; @@ -82,7 +84,9 @@ impl Limiter { StorageConfiguration::Infinispan(cfg) => { Self::infinispan_limiter(cfg, config.limit_name_in_labels).await } - StorageConfiguration::InMemory => Self::in_memory_limiter(config), + StorageConfiguration::InMemory(cfg) => { + Self::in_memory_limiter(cfg, config.limit_name_in_labels) + } StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg, config.limit_name_in_labels), }; @@ -210,7 +214,7 @@ impl Limiter { } }; let mut rate_limiter_builder = - RateLimiterBuilder::new().storage(Storage::with_counter_storage(Box::new(storage))); + RateLimiterBuilder::with_storage(Storage::with_counter_storage(Box::new(storage))); if limit_name_in_labels { rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels() @@ -219,10 +223,11 @@ impl Limiter { Self::Blocking(rate_limiter_builder.build()) } - fn in_memory_limiter(cfg: Configuration) -> Self { - let mut rate_limiter_builder = RateLimiterBuilder::new(); + fn in_memory_limiter(cfg: InMemoryStorageConfiguration, limit_name_in_labels: bool) -> Self { + let mut rate_limiter_builder = + RateLimiterBuilder::new(cfg.cache_size.or_else(guess_cache_size).unwrap()); - if cfg.limit_name_in_labels { + if limit_name_in_labels { rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels() } @@ -513,7 +518,16 @@ fn create_config() -> (Configuration, &'static str) { .subcommand( Command::new("memory") .display_order(1) - .about("Counters are held in Limitador (ephemeral)"), + .about("Counters are held in Limitador (ephemeral)") + .arg( + Arg::new("CACHE_SIZE") + .long("cache") + .short('c') + .action(ArgAction::Set) + .value_parser(value_parser!(u64)) + .display_order(1) + .help("Sets the size of the cache for 'qualified counters'"), + ), ) .subcommand( Command::new("disk") @@ -698,7 +712,9 @@ fn create_config() -> (Configuration, &'static str) { consistency: Some(sub.get_one::("consistency").unwrap().to_string()), }) } - Some(("memory", _sub)) => StorageConfiguration::InMemory, + Some(("memory", sub)) => StorageConfiguration::InMemory(InMemoryStorageConfiguration { + cache_size: sub.get_one::("CACHE_SIZE").copied(), + }), None => match storage_config_from_env() { Ok(storage_cfg) => storage_cfg, Err(_) => { @@ -785,10 +801,26 @@ fn storage_config_from_env() -> Result { consistency: env::var("INFINISPAN_COUNTERS_CONSISTENCY").ok(), }, )), - _ => Ok(StorageConfiguration::InMemory), + _ => Ok(StorageConfiguration::InMemory( + InMemoryStorageConfiguration { cache_size: None }, + )), } } +fn guess_cache_size() -> Option { + let sys = System::new_with_specifics(RefreshKind::new().with_memory()); + let free_mem = sys.available_memory(); + let memory = free_mem as f64 * 0.7; + let size = (memory + / (std::mem::size_of::() + 16/* size_of::() */) as f64) + as u64; + warn!( + "No cache size provided, aiming at 70% of {}MB, i.e. {size} entries", + free_mem / 1024 / 1024 + ); + Some(size) +} + fn env_option_is_enabled(env_name: &str) -> bool { match env::var(env_name) { Ok(value) => value == "1", diff --git a/limitador/src/lib.rs b/limitador/src/lib.rs index 641b615c..177ae1d0 100644 --- a/limitador/src/lib.rs +++ b/limitador/src/lib.rs @@ -7,10 +7,12 @@ //! Limitador. Storing the limits in Redis is slower, but they can be shared //! between instances. //! -//! By default, the rate limiter is configured to store the counters in memory: +//! By default, the rate limiter is configured to store the counters in memory. +//! It'll store only a limited amount of "qualified counters", specified as a +//! `u64` value in the constructor. //! ``` //! use limitador::RateLimiter; -//! let rate_limiter = RateLimiter::default(); +//! let rate_limiter = RateLimiter::new(1000); //! ``` //! //! To use Redis: @@ -72,7 +74,7 @@ //! vec!["req.method == 'GET'"], //! vec!["user_id"], //! ); -//! let mut rate_limiter = RateLimiter::default(); +//! let mut rate_limiter = RateLimiter::new(1000); //! //! // Add a limit //! rate_limiter.add_limit(limit.clone()); @@ -95,7 +97,7 @@ //! use limitador::limit::Limit; //! use std::collections::HashMap; //! -//! let mut rate_limiter = RateLimiter::default(); +//! let mut rate_limiter = RateLimiter::new(1000); //! //! let limit = Limit::new( //! "my_namespace", @@ -236,9 +238,16 @@ impl From for bool { } impl RateLimiterBuilder { - pub fn new() -> Self { + pub fn with_storage(storage: Storage) -> Self { Self { - storage: Storage::new(), + storage, + prometheus_limit_name_labels_enabled: false, + } + } + + pub fn new(cache_size: u64) -> Self { + Self { + storage: Storage::new(cache_size), prometheus_limit_name_labels_enabled: false, } } @@ -267,12 +276,6 @@ impl RateLimiterBuilder { } } -impl Default for RateLimiterBuilder { - fn default() -> Self { - Self::new() - } -} - pub struct AsyncRateLimiterBuilder { storage: AsyncStorage, prometheus_limit_name_labels_enabled: bool, @@ -306,9 +309,9 @@ impl AsyncRateLimiterBuilder { } impl RateLimiter { - pub fn new() -> Self { + pub fn new(cache_size: u64) -> Self { Self { - storage: Storage::new(), + storage: Storage::new(cache_size), prometheus_metrics: PrometheusMetrics::new(), } } @@ -492,12 +495,6 @@ impl RateLimiter { } } -impl Default for RateLimiter { - fn default() -> Self { - Self::new() - } -} - // TODO: the code of this implementation is almost identical to the blocking // one. The only exception is that the functions defined are "async" and all the // calls to the storage need to include ".await". We'll need to think about how diff --git a/limitador/src/storage/atomic_expiring_value.rs b/limitador/src/storage/atomic_expiring_value.rs index 570fe5c2..634b9e35 100644 --- a/limitador/src/storage/atomic_expiring_value.rs +++ b/limitador/src/storage/atomic_expiring_value.rs @@ -141,4 +141,11 @@ mod tests { }); assert!([2i64, 3i64].contains(&atomic_expiring_value.value.load(Ordering::SeqCst))); } + + #[test] + fn size_of_struct() { + // This is ugly, but we don't have access to `AtomicExpiringValue` in the server, + // so this is hardcoded in main.rs + assert_eq!(16, std::mem::size_of::()); + } } diff --git a/limitador/src/storage/in_memory.rs b/limitador/src/storage/in_memory.rs index 8b1cd080..244d8b86 100644 --- a/limitador/src/storage/in_memory.rs +++ b/limitador/src/storage/in_memory.rs @@ -226,10 +226,10 @@ impl CounterStorage for InMemoryStorage { } impl InMemoryStorage { - pub fn new() -> Self { + pub fn new(cache_size: u64) -> Self { Self { limits_for_namespace: RwLock::new(HashMap::new()), - qualified_counters: Cache::new(1000), + qualified_counters: Cache::new(cache_size), } } @@ -278,7 +278,7 @@ impl InMemoryStorage { impl Default for InMemoryStorage { fn default() -> Self { - Self::new() + Self::new(10_000) } } @@ -288,7 +288,7 @@ mod tests { #[test] fn counters_for_multiple_limit_per_ns() { - let storage = InMemoryStorage::new(); + let storage = InMemoryStorage::default(); let namespace = "test_namespace"; let limit_1 = Limit::new(namespace, 1, 1, vec!["req.method == 'GET'"], vec!["app_id"]); let limit_2 = Limit::new( diff --git a/limitador/src/storage/mod.rs b/limitador/src/storage/mod.rs index b735c778..a269615c 100644 --- a/limitador/src/storage/mod.rs +++ b/limitador/src/storage/mod.rs @@ -37,10 +37,10 @@ pub struct AsyncStorage { } impl Storage { - pub fn new() -> Self { + pub fn new(cache_size: u64) -> Self { Self { limits: RwLock::new(HashMap::new()), - counters: Box::::default(), + counters: Box::new(InMemoryStorage::new(cache_size)), } } @@ -145,12 +145,6 @@ impl Storage { } } -impl Default for Storage { - fn default() -> Self { - Self::new() - } -} - impl AsyncStorage { pub fn with_counter_storage(counters: Box) -> Self { Self { From 41b023861cd88e4cc6cb18b9c4997694f1cae18a Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Fri, 4 Aug 2023 10:45:26 -0400 Subject: [PATCH 3/4] Clean up bench --- limitador/benches/bench.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/limitador/benches/bench.rs b/limitador/benches/bench.rs index 825de2b7..1926efcf 100644 --- a/limitador/benches/bench.rs +++ b/limitador/benches/bench.rs @@ -103,7 +103,7 @@ fn bench_in_mem(c: &mut Criterion) { #[cfg(feature = "disk_storage")] fn bench_disk(c: &mut Criterion) { let mut group = c.benchmark_group("Disk"); - for (index, scenario) in TEST_SCENARIOS.iter().enumerate() { + for scenario in TEST_SCENARIOS.iter() { group.bench_with_input( BenchmarkId::new("is_rate_limited", scenario), scenario, From c82a258e7ca9119b0f5ea43eee64bcf99e4b1598 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Fri, 4 Aug 2023 11:53:42 -0400 Subject: [PATCH 4/4] Fix to enable feature JS on getrandom pulled in transitively by moka --- Cargo.lock | 3 +++ limitador/Cargo.toml | 1 + 2 files changed, 4 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index a25aa0eb..eacb2702 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1122,8 +1122,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1504,6 +1506,7 @@ dependencies = [ "cfg-if", "criterion", "futures", + "getrandom", "infinispan", "lazy_static", "moka", diff --git a/limitador/Cargo.toml b/limitador/Cargo.toml index 3edb6bb4..d5a9df8c 100644 --- a/limitador/Cargo.toml +++ b/limitador/Cargo.toml @@ -22,6 +22,7 @@ lenient_conditions = [] [dependencies] moka = "0.11.2" +getrandom = { version = "0.2", features = ["js"] } ttl_cache = "0.5" serde = { version = "1", features = ["derive"] } postcard = { version = "1.0.4", features = ["use-std"] }