Skip to content

Commit

Permalink
Merge pull request #87 from Kuadrant/issue_64
Browse files Browse the repository at this point in the history
`Limit`s `max_value` and `name` field play no role in their identity…
  • Loading branch information
alexsnaps authored Jul 8, 2022
2 parents cc82acc + 0e72581 commit 5e50be1
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 22 deletions.
4 changes: 3 additions & 1 deletion limitador-server/src/envoy_rls/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ mod tests {
Limit::new(namespace, 0, 60, vec!["x == 1", "y == 2"], vec!["z"]),
]
.into_iter()
.for_each(|limit| limiter.add_limit(limit));
.for_each(|limit| {
limiter.add_limit(limit);
});

let rate_limiter = MyRateLimiter::new(Arc::new(Limiter::Blocking(limiter)));

Expand Down
2 changes: 1 addition & 1 deletion limitador-server/src/http_api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ mod tests {
match &limiter {
Limiter::Blocking(limiter) => limiter.add_limit(limit.clone()),
Limiter::Async(limiter) => limiter.add_limit(limit.clone()),
}
};
limit
}
}
11 changes: 11 additions & 0 deletions limitador/src/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ impl Counter {
self.limit.max_value()
}

pub fn update_to_limit(&mut self, limit: &Limit) -> bool {
if limit == &self.limit {
self.limit.set_max_value(limit.max_value());
if let Some(name) = limit.name() {
self.limit.set_name(name.to_string());
}
return true;
}
false
}

pub fn seconds(&self) -> u64 {
self.limit.seconds()
}
Expand Down
5 changes: 3 additions & 2 deletions limitador/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ use crate::storage::{AsyncCounterStorage, AsyncStorage, Authorization, CounterSt

#[macro_use]
extern crate lazy_static;
extern crate core;

pub mod counter;
pub mod errors;
Expand Down Expand Up @@ -312,7 +313,7 @@ impl RateLimiter {
self.storage.get_namespaces()
}

pub fn add_limit(&self, limit: Limit) {
pub fn add_limit(&self, limit: Limit) -> bool {
self.storage.add_limit(limit)
}

Expand Down Expand Up @@ -483,7 +484,7 @@ impl AsyncRateLimiter {
self.storage.get_namespaces()
}

pub fn add_limit(&self, limit: Limit) {
pub fn add_limit(&self, limit: Limit) -> bool {
self.storage.add_limit(limit)
}

Expand Down
10 changes: 6 additions & 4 deletions limitador/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ impl From<String> for Namespace {
#[derive(Eq, Debug, Clone, Serialize, Deserialize)]
pub struct Limit {
namespace: Namespace,
#[serde(skip)]
max_value: i64,
seconds: u64,
#[serde(skip)]
name: Option<String>,

// Need to sort to generate the same object when using the JSON as a key or
Expand Down Expand Up @@ -88,6 +90,10 @@ impl Limit {
self.name = Some(name)
}

pub fn set_max_value(&mut self, value: i64) {
self.max_value = value;
}

pub fn conditions(&self) -> HashSet<String> {
self.conditions.iter().map(|cond| cond.into()).collect()
}
Expand Down Expand Up @@ -129,9 +135,7 @@ impl Limit {
impl Hash for Limit {
fn hash<H: Hasher>(&self, state: &mut H) {
self.namespace.hash(state);
self.max_value.hash(state);
self.seconds.hash(state);
self.name.hash(state);

let mut encoded_conditions = self
.conditions
Expand All @@ -156,9 +160,7 @@ impl Hash for Limit {
impl PartialEq for Limit {
fn eq(&self, other: &Self) -> bool {
self.namespace == other.namespace
&& self.max_value == other.max_value
&& self.seconds == other.seconds
&& self.name == other.name
&& self.conditions == other.conditions
&& self.variables == other.variables
}
Expand Down
2 changes: 1 addition & 1 deletion limitador/src/storage/infinispan/infinispan_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl AsyncCounterStorage for InfinispanStorage {
// unnecessarily.

if let Some(val) = counter_val {
let mut counter: Counter = counter_from_counter_key(&counter_key);
let mut counter: Counter = counter_from_counter_key(&counter_key, &limit);
let ttl = 0; // TODO: calculate TTL from response headers.
counter.set_remaining(val);
counter.set_expires_in(Duration::from_secs(ttl));
Expand Down
34 changes: 32 additions & 2 deletions limitador/src/storage/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,39 @@ pub fn key_for_counters_of_limit(limit: &Limit) -> String {
)
}

pub fn counter_from_counter_key(key: &str) -> Counter {
pub fn counter_from_counter_key(key: &str, limit: &Limit) -> Counter {
let counter_prefix = "counter:";
let start_pos_counter = key.find(counter_prefix).unwrap() + counter_prefix.len();

serde_json::from_str(&key[start_pos_counter..]).unwrap()
let mut counter: Counter = serde_json::from_str(&key[start_pos_counter..]).unwrap();
if !counter.update_to_limit(limit) {
// this means some kind of data corruption _or_ most probably
// an out of sync `impl PartialEq for Limit` vs `pub fn key_for_counter(counter: &Counter) -> String`
panic!(
"Failed to rebuild Counter's Limit from the provided Limit: {:?} vs {:?}",
counter.limit(),
limit
)
}
counter
}

#[cfg(test)]
mod tests {
use crate::storage::keys::key_for_counters_of_limit;
use crate::Limit;

#[test]
fn key_for_limit_format() {
let limit = Limit::new(
"example.com",
10,
60,
vec!["req.method == GET"],
vec!["app_id"],
);
assert_eq!(
"namespace:{example.com},counters_of_limit:{\"namespace\":\"example.com\",\"seconds\":60,\"conditions\":[\"req.method == GET\"],\"variables\":[\"app_id\"]}",
key_for_counters_of_limit(&limit))
}
}
11 changes: 5 additions & 6 deletions limitador/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ impl Storage {
self.limits.read().unwrap().keys().cloned().collect()
}

pub fn add_limit(&self, limit: Limit) {
pub fn add_limit(&self, limit: Limit) -> bool {
let namespace = limit.namespace().clone();
self.limits
.write()
.unwrap()
.entry(namespace)
.or_default()
.insert(limit);
.insert(limit)
}

pub fn get_limits(&self, namespace: &Namespace) -> HashSet<Limit> {
Expand Down Expand Up @@ -140,19 +140,18 @@ impl AsyncStorage {
self.limits.read().unwrap().keys().cloned().collect()
}

pub fn add_limit(&self, limit: Limit) {
pub fn add_limit(&self, limit: Limit) -> bool {
let namespace = limit.namespace().clone();

let mut limits_for_namespace = self.limits.write().unwrap();

match limits_for_namespace.get_mut(&namespace) {
Some(limits) => {
limits.insert(limit);
}
Some(limits) => limits.insert(limit),
None => {
let mut limits = HashSet::new();
limits.insert(limit);
limits_for_namespace.insert(namespace, limits);
true
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl AsyncCounterStorage for AsyncRedisStorage {
.await?;

for counter_key in counter_keys {
let mut counter: Counter = counter_from_counter_key(&counter_key);
let mut counter: Counter = counter_from_counter_key(&counter_key, &limit);

// If the key does not exist, it means that the counter expired,
// so we don't have to return it.
Expand Down
2 changes: 1 addition & 1 deletion limitador/src/storage/redis/redis_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl CounterStorage for RedisStorage {
con.smembers::<String, HashSet<String>>(key_for_counters_of_limit(limit))?;

for counter_key in counter_keys {
let mut counter: Counter = counter_from_counter_key(&counter_key);
let mut counter: Counter = counter_from_counter_key(&counter_key, limit);

// If the key does not exist, it means that the counter expired,
// so we don't have to return it.
Expand Down
2 changes: 1 addition & 1 deletion limitador/tests/helpers/tests_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl TestsLimiter {
}
}

pub async fn add_limit(&self, limit: &Limit) {
pub async fn add_limit(&self, limit: &Limit) -> bool {
match &self.limiter_impl {
LimiterImpl::Blocking(limiter) => limiter.add_limit(limit.clone()),
LimiterImpl::Async(limiter) => limiter.add_limit(limit.clone()),
Expand Down
27 changes: 25 additions & 2 deletions limitador/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ mod test {
test_with_all_storage_impls!(configure_with_creates_the_given_limits);
test_with_all_storage_impls!(configure_with_keeps_the_given_limits_and_counters_if_they_exist);
test_with_all_storage_impls!(configure_with_deletes_all_except_the_limits_given);
test_with_all_storage_impls!(add_limit_only_adds_if_not_present);

// All these functions need to use async/await. That's needed to support
// both the sync and the async implementations of the rate limiter.
Expand Down Expand Up @@ -328,7 +329,7 @@ mod test {
];

for limit in limits.iter() {
rate_limiter.add_limit(limit).await
rate_limiter.add_limit(limit).await;
}

rate_limiter.delete_limits(namespace).await.unwrap();
Expand Down Expand Up @@ -830,7 +831,7 @@ mod test {
let namespace = "test_namespace";

let limit_to_be_kept =
Limit::new(namespace, 10, 60, vec!["req.method == GET"], vec!["app_id"]);
Limit::new(namespace, 10, 1, vec!["req.method == GET"], vec!["app_id"]);

let limit_to_be_deleted =
Limit::new(namespace, 20, 60, vec!["req.method == GET"], vec!["app_id"]);
Expand All @@ -849,4 +850,26 @@ mod test {
assert!(limits.contains(&limit_to_be_kept));
assert!(!limits.contains(&limit_to_be_deleted));
}

async fn add_limit_only_adds_if_not_present(rate_limiter: &mut TestsLimiter) {
let namespace = "test_namespace";

let limit_1 = Limit::new(namespace, 10, 60, vec!["req.method == GET"], vec!["app_id"]);

let limit_2 = Limit::new(namespace, 20, 60, vec!["req.method == GET"], vec!["app_id"]);

let mut limit_3 = Limit::new(namespace, 20, 60, vec!["req.method == GET"], vec!["app_id"]);
limit_3.set_name("Name is irrelevant too".to_owned());

assert!(rate_limiter.add_limit(&limit_1).await);
assert!(!rate_limiter.add_limit(&limit_2).await);
assert!(!rate_limiter.add_limit(&limit_3).await);

let limits = rate_limiter.get_limits(namespace).await;

assert_eq!(limits.len(), 1);
let known_limit = limits.iter().next().unwrap();
assert_eq!(known_limit.max_value(), 10);
assert_eq!(known_limit.name(), None);
}
}

0 comments on commit 5e50be1

Please sign in to comment.