Skip to content

Commit

Permalink
Merge pull request #44 from 3scale-labs/infinispan-with-given-cache-name
Browse files Browse the repository at this point in the history
Allow to choose an Infinispan cache for the limits instead of creating one
  • Loading branch information
davidor authored Jun 1, 2021
2 parents 7ce2523 + c915455 commit 21ec621
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 59 deletions.
10 changes: 10 additions & 0 deletions limitador-server/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ variables:
- Format: integer.


## INFINISPAN_CACHE_NAME

- The name of the Infinispan cache that Limitador will use to store limits and
counters. This variable applies only when [INFINISPAN_URL](#infinispan_url) is
set.
- Optional. By default, Limitador creates a cache called "limitador" and
configured as "local".
- Format: string.


## INFINISPAN_URL

- Infinispan URL. Required only when you want to use Infinispan to store the
Expand Down
33 changes: 19 additions & 14 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ extern crate log;
use crate::envoy_rls::server::run_envoy_rls_server;
use crate::http_api::server::run_http_server;
use limitador::limit::Limit;
use limitador::storage::infinispan::InfinispanStorage;
use limitador::storage::infinispan::InfinispanStorageBuilder;
use limitador::storage::redis::{AsyncRedisStorage, CachedRedisStorage, CachedRedisStorageBuilder};
use limitador::storage::AsyncStorage;
use limitador::{AsyncRateLimiter, AsyncRateLimiterBuilder, RateLimiter, RateLimiterBuilder};
Expand All @@ -21,6 +21,7 @@ const LIMITS_FILE_ENV: &str = "LIMITS_FILE";
const LIMIT_NAME_IN_PROMETHEUS_LABELS_ENV: &str = "LIMIT_NAME_IN_PROMETHEUS_LABELS";
const REDIS_URL_ENV: &str = "REDIS_URL";
const INFINISPAN_URL_ENV: &str = "INFINISPAN_URL";
const INFINISPAN_CACHE_NAME_ENV: &str = "INFINISPAN_CACHE_NAME";
const DEFAULT_HOST: &str = "0.0.0.0";
const DEFAULT_HTTP_API_PORT: u32 = 8080;
const DEFAULT_ENVOY_RLS_PORT: u32 = 8081;
Expand Down Expand Up @@ -128,20 +129,24 @@ impl Limiter {

async fn infinispan_limiter(url: &str) -> Limiter {
let parsed_url = Url::parse(url).unwrap();
let storage = Box::new(
InfinispanStorage::new(
&format!(
"{}://{}:{}",
parsed_url.scheme(),
parsed_url.host_str().unwrap(),
parsed_url.port().unwrap().to_string(),
),
parsed_url.username(),
parsed_url.password().unwrap_or_default(),
)
.await,

let builder = InfinispanStorageBuilder::new(
&format!(
"{}://{}:{}",
parsed_url.scheme(),
parsed_url.host_str().unwrap(),
parsed_url.port().unwrap().to_string(),
),
parsed_url.username(),
parsed_url.password().unwrap_or_default(),
);
let mut rate_limiter_builder = AsyncRateLimiterBuilder::new(storage);

let storage = match env::var(INFINISPAN_CACHE_NAME_ENV) {
Ok(cache_name) => builder.cache_name(cache_name).build().await,
Err(_) => builder.build().await,
};

let mut rate_limiter_builder = AsyncRateLimiterBuilder::new(Box::new(storage));

if Self::env_option_is_enabled(LIMIT_NAME_IN_PROMETHEUS_LABELS_ENV) {
rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels()
Expand Down
112 changes: 70 additions & 42 deletions limitador/src/storage/infinispan/infinispan_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@ use infinispan::Infinispan;
use std::collections::HashSet;
use std::time::Duration;

const INFINISPAN_LIMITS_CACHE_NAME: &str = "limits";
const DEFAULT_INFINISPAN_LIMITS_CACHE_NAME: &str = "limitador";

pub struct InfinispanStorage {
infinispan: Infinispan,
cache_name: String,
}

pub struct InfinispanStorageBuilder {
url: String,
username: String,
password: String,
cache_name: Option<String>,
}

#[async_trait]
Expand Down Expand Up @@ -62,7 +70,7 @@ impl AsyncStorage for InfinispanStorage {
let _ = self
.infinispan
.run(&request::entries::delete(
INFINISPAN_LIMITS_CACHE_NAME,
&self.cache_name,
key_for_counters_of_limit(&limit),
))
.await?;
Expand Down Expand Up @@ -94,7 +102,7 @@ impl AsyncStorage for InfinispanStorage {
let _ = self
.infinispan
.run(&request::entries::delete(
INFINISPAN_LIMITS_CACHE_NAME,
&self.cache_name,
key_for_counters_of_limit(&limit),
))
.await?;
Expand All @@ -103,7 +111,7 @@ impl AsyncStorage for InfinispanStorage {
let _ = self
.infinispan
.run(&request::entries::delete(
INFINISPAN_LIMITS_CACHE_NAME,
&self.cache_name,
key_for_limits_of_namespace(namespace),
))
.await?;
Expand All @@ -114,8 +122,7 @@ impl AsyncStorage for InfinispanStorage {
async fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result<bool, StorageErr> {
let counter_key = key_for_counter(counter);
let counter_val =
counters::get_value(&self.infinispan, INFINISPAN_LIMITS_CACHE_NAME, &counter_key)
.await?;
counters::get_value(&self.infinispan, &self.cache_name, &counter_key).await?;

match counter_val {
Some(val) => Ok(val - delta >= 0),
Expand All @@ -128,7 +135,7 @@ impl AsyncStorage for InfinispanStorage {

let counter_created = counters::decrement_by(
&self.infinispan,
INFINISPAN_LIMITS_CACHE_NAME,
&self.cache_name,
&counter_key,
delta,
&CounterOpts::new(counter.max_value(), Duration::from_secs(counter.seconds())),
Expand Down Expand Up @@ -167,12 +174,8 @@ impl AsyncStorage for InfinispanStorage {

for limit in self.get_limits(namespace).await? {
for counter_key in self.counter_keys_of_limit(&limit).await? {
let counter_val = counters::get_value(
&self.infinispan,
INFINISPAN_LIMITS_CACHE_NAME,
&counter_key,
)
.await?;
let counter_val =
counters::get_value(&self.infinispan, &self.cache_name, &counter_key).await?;

// If the key does not exist, it means that the counter expired,
// so we don't have to return it.
Expand All @@ -197,7 +200,7 @@ impl AsyncStorage for InfinispanStorage {
async fn clear(&self) -> Result<(), StorageErr> {
let _ = self
.infinispan
.run(&request::caches::clear(INFINISPAN_LIMITS_CACHE_NAME))
.run(&request::caches::clear(&self.cache_name))
.await?;

let _ = self.delete_all_counters().await?;
Expand All @@ -207,20 +210,33 @@ impl AsyncStorage for InfinispanStorage {
}

impl InfinispanStorage {
pub async fn new(url: &str, username: &str, password: &str) -> InfinispanStorage {
pub async fn new(
url: &str,
username: &str,
password: &str,
cache_name: Option<String>,
) -> InfinispanStorage {
let infinispan = Infinispan::new(url, username, password);

// TODO: the cache type and its attributes should be configurable. For
// now, we use the "local" type with the default attributes.
//
// TODO: check if this recreates everything or does not do anything when
// it already exists.
let _ = infinispan
.run(&request::caches::create_local(INFINISPAN_LIMITS_CACHE_NAME))
.await
.unwrap();

InfinispanStorage { infinispan }
match cache_name {
Some(cache_name) => InfinispanStorage {
infinispan,
cache_name,
},
None => {
let cache_name = DEFAULT_INFINISPAN_LIMITS_CACHE_NAME;

let _ = infinispan
.run(&request::caches::create_local(&cache_name))
.await
.unwrap();

InfinispanStorage {
infinispan,
cache_name: cache_name.into(),
}
}
}
}

async fn delete_counters_of_namespace(&self, namespace: &Namespace) -> Result<(), StorageErr> {
Expand All @@ -233,7 +249,7 @@ impl InfinispanStorage {

async fn delete_counters_associated_with_limit(&self, limit: &Limit) -> Result<(), StorageErr> {
for counter_key in self.counter_keys_of_limit(&limit).await? {
counters::delete(&self.infinispan, INFINISPAN_LIMITS_CACHE_NAME, &counter_key).await?
counters::delete(&self.infinispan, &self.cache_name, &counter_key).await?
}

Ok(())
Expand Down Expand Up @@ -263,34 +279,46 @@ impl InfinispanStorage {
}

async fn get_set(&self, set_key: impl AsRef<str>) -> Result<HashSet<String>, InfinispanError> {
sets::get(&self.infinispan, INFINISPAN_LIMITS_CACHE_NAME, set_key).await
sets::get(&self.infinispan, &self.cache_name, set_key).await
}

async fn add_to_set(
&self,
set_key: impl Into<String>,
element: impl Into<String>,
) -> Result<(), StorageErr> {
sets::add(
&self.infinispan,
INFINISPAN_LIMITS_CACHE_NAME,
set_key,
element,
)
.await
sets::add(&self.infinispan, &self.cache_name, set_key, element).await
}

async fn delete_from_set(
&self,
set_key: impl Into<String>,
element: impl Into<String>,
) -> Result<HashSet<String>, StorageErr> {
sets::delete(
&self.infinispan,
INFINISPAN_LIMITS_CACHE_NAME,
set_key,
element,
)
.await
sets::delete(&self.infinispan, &self.cache_name, set_key, element).await
}
}

impl InfinispanStorageBuilder {
pub fn new(
url: impl Into<String>,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
Self {
url: url.into(),
username: username.into(),
password: password.into(),
cache_name: None,
}
}

pub fn cache_name(mut self, cache_name: impl Into<String>) -> Self {
self.cache_name = Some(cache_name.into());
self
}

pub async fn build(self) -> InfinispanStorage {
InfinispanStorage::new(&self.url, &self.username, &self.password, self.cache_name).await
}
}
1 change: 1 addition & 0 deletions limitador/src/storage/infinispan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod sets;
use crate::storage::StorageErr;
use infinispan::errors::InfinispanError;
pub use infinispan_storage::InfinispanStorage;
pub use infinispan_storage::InfinispanStorageBuilder;

impl From<reqwest::Error> for StorageErr {
fn from(e: reqwest::Error) -> Self {
Expand Down
6 changes: 3 additions & 3 deletions limitador/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ macro_rules! test_with_all_storage_impls {
#[tokio::test]
#[serial]
async fn [<$function _with_infinispan>]() {
let storage = InfinispanStorage::new(
let storage = InfinispanStorageBuilder::new(
"http://127.0.0.1:11222", "username", "password"
).await;
).build().await;
storage.clear().await.unwrap();
let rate_limiter = AsyncRateLimiter::new_with_storage(
Box::new(storage)
Expand Down Expand Up @@ -83,7 +83,7 @@ mod test {

cfg_if::cfg_if! {
if #[cfg(feature = "infinispan_storage")] {
use limitador::storage::infinispan::InfinispanStorage;
use limitador::storage::infinispan::InfinispanStorageBuilder;
}
}

Expand Down

0 comments on commit 21ec621

Please sign in to comment.