From 020e46835896aa0e2160e95bcae7b66476d7f180 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Wed, 15 May 2024 11:10:46 -0400 Subject: [PATCH] Add distributed_storage feature --- limitador-server/Cargo.toml | 3 + limitador-server/src/config.rs | 2 + limitador-server/src/main.rs | 83 +++++++++++++++------------- limitador/Cargo.toml | 1 + limitador/src/storage/mod.rs | 4 +- limitador/tests/integration_tests.rs | 2 + 6 files changed, 55 insertions(+), 40 deletions(-) diff --git a/limitador-server/Cargo.toml b/limitador-server/Cargo.toml index f7b2c146..4acdb31d 100644 --- a/limitador-server/Cargo.toml +++ b/limitador-server/Cargo.toml @@ -12,6 +12,9 @@ documentation = "https://kuadrant.io/docs/limitador" readme = "README.md" edition = "2021" +[features] +distributed_storage = ["limitador/distributed_storage"] + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] diff --git a/limitador-server/src/config.rs b/limitador-server/src/config.rs index e71bc98b..dc4ef59c 100644 --- a/limitador-server/src/config.rs +++ b/limitador-server/src/config.rs @@ -140,6 +140,7 @@ pub enum StorageConfiguration { InMemory(InMemoryStorageConfiguration), Disk(DiskStorageConfiguration), Redis(RedisStorageConfiguration), + #[cfg(feature = "distributed_storage")] Distributed(DistributedStorageConfiguration), } @@ -149,6 +150,7 @@ pub struct InMemoryStorageConfiguration { } #[derive(PartialEq, Eq, Debug)] +#[cfg(feature = "distributed_storage")] pub struct DistributedStorageConfiguration { pub name: String, pub cache_size: Option, diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 5aabb526..f04e4fa9 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -5,10 +5,11 @@ extern crate log; extern crate clap; +#[cfg(feature = "distributed_storage")] +use crate::config::DistributedStorageConfiguration; use crate::config::{ - Configuration, DiskStorageConfiguration, DistributedStorageConfiguration, - InMemoryStorageConfiguration, RedisStorageCacheConfiguration, RedisStorageConfiguration, - StorageConfiguration, + Configuration, DiskStorageConfiguration, InMemoryStorageConfiguration, + RedisStorageCacheConfiguration, RedisStorageConfiguration, StorageConfiguration, }; use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders}; use crate::http_api::server::run_http_server; @@ -24,6 +25,7 @@ use limitador::storage::redis::{ AsyncRedisStorage, CachedRedisStorage, CachedRedisStorageBuilder, DEFAULT_BATCH_SIZE, DEFAULT_FLUSHING_PERIOD_SEC, DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_RESPONSE_TIMEOUT_MS, }; +#[cfg(feature = "distributed_storage")] use limitador::storage::DistributedInMemoryStorage; use limitador::storage::{AsyncCounterStorage, AsyncStorage, Storage}; use limitador::{ @@ -85,6 +87,7 @@ impl Limiter { let rate_limiter = match config.storage { StorageConfiguration::Redis(cfg) => Self::redis_limiter(cfg).await, StorageConfiguration::InMemory(cfg) => Self::in_memory_limiter(cfg), + #[cfg(feature = "distributed_storage")] StorageConfiguration::Distributed(cfg) => Self::distributed_limiter(cfg), StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg), }; @@ -157,6 +160,7 @@ impl Limiter { Self::Blocking(rate_limiter_builder.build()) } + #[cfg(feature = "distributed_storage")] fn distributed_limiter(cfg: DistributedStorageConfiguration) -> Self { let storage = DistributedInMemoryStorage::new( cfg.name, @@ -579,43 +583,45 @@ fn create_config() -> (Configuration, &'static str) { .display_order(6) .help("Timeout for Redis commands in milliseconds"), ), - ) - .subcommand( - Command::new("distributed") - .about("Replicates CRDT-based counters across multiple Limitador servers") - .display_order(5) - .arg( - Arg::new("NAME") - .action(ArgAction::Set) - .required(true) - .display_order(2) - .help("Unique name to identify this Limitador instance"), - ) - .arg( - Arg::new("LOCAL") - .action(ArgAction::Set) - .required(true) - .display_order(2) - .help("Local IP:PORT to send datagrams from"), - ) - .arg( - Arg::new("BROADCAST") - .action(ArgAction::Set) - .required(true) - .display_order(3) - .help("Broadcast IP:PORT to send datagrams to"), - ) - .arg( - Arg::new("CACHE_SIZE") - .long("cache") - .short('c') - .action(ArgAction::Set) - .value_parser(value_parser!(u64)) - .display_order(4) - .help("Sets the size of the cache for 'qualified counters'"), - ), ); + #[cfg(feature = "distributed_storage")] + let cmdline = cmdline.subcommand( + Command::new("distributed") + .about("Replicates CRDT-based counters across multiple Limitador servers") + .display_order(5) + .arg( + Arg::new("NAME") + .action(ArgAction::Set) + .required(true) + .display_order(2) + .help("Unique name to identify this Limitador instance"), + ) + .arg( + Arg::new("LOCAL") + .action(ArgAction::Set) + .required(true) + .display_order(2) + .help("Local IP:PORT to send datagrams from"), + ) + .arg( + Arg::new("BROADCAST") + .action(ArgAction::Set) + .required(true) + .display_order(3) + .help("Broadcast IP:PORT to send datagrams to"), + ) + .arg( + Arg::new("CACHE_SIZE") + .long("cache") + .short('c') + .action(ArgAction::Set) + .value_parser(value_parser!(u64)) + .display_order(4) + .help("Sets the size of the cache for 'qualified counters'"), + ), + ); + let matches = cmdline.get_matches(); let limits_file = matches.get_one::("LIMITS_FILE").unwrap(); @@ -681,6 +687,7 @@ fn create_config() -> (Configuration, &'static str) { Some(("memory", sub)) => StorageConfiguration::InMemory(InMemoryStorageConfiguration { cache_size: sub.get_one::("CACHE_SIZE").copied(), }), + #[cfg(feature = "distributed_storage")] Some(("distributed", sub)) => { StorageConfiguration::Distributed(DistributedStorageConfiguration { name: sub.get_one::("NAME").unwrap().to_owned(), diff --git a/limitador/Cargo.toml b/limitador/Cargo.toml index 456b3309..8f0a681b 100644 --- a/limitador/Cargo.toml +++ b/limitador/Cargo.toml @@ -15,6 +15,7 @@ edition = "2021" [features] default = ["disk_storage", "redis_storage"] disk_storage = ["rocksdb"] +distributed_storage = [] redis_storage = ["redis", "r2d2", "tokio"] lenient_conditions = [] diff --git a/limitador/src/storage/mod.rs b/limitador/src/storage/mod.rs index f51979b9..22abd33a 100644 --- a/limitador/src/storage/mod.rs +++ b/limitador/src/storage/mod.rs @@ -8,11 +8,11 @@ use thiserror::Error; #[cfg(feature = "disk_storage")] pub mod disk; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(feature = "distributed_storage")] pub mod distributed; pub mod in_memory; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(feature = "distributed_storage")] pub use crate::storage::distributed::CrInMemoryStorage as DistributedInMemoryStorage; #[cfg(feature = "redis_storage")] diff --git a/limitador/tests/integration_tests.rs b/limitador/tests/integration_tests.rs index 90623d9f..2b1e9afe 100644 --- a/limitador/tests/integration_tests.rs +++ b/limitador/tests/integration_tests.rs @@ -13,6 +13,7 @@ macro_rules! test_with_all_storage_impls { $function(&mut TestsLimiter::new_from_blocking_impl(rate_limiter)).await; } + #[cfg(feature = "distributed_storage")] #[tokio::test] async fn [<$function _distributed_storage>]() { let rate_limiter = @@ -96,6 +97,7 @@ mod test { use crate::helpers::tests_limiter::*; use limitador::limit::Limit; use limitador::storage::disk::{DiskStorage, OptimizeFor}; + #[cfg(feature = "distributed_storage")] use limitador::storage::distributed::CrInMemoryStorage; use limitador::storage::in_memory::InMemoryStorage; use std::collections::{HashMap, HashSet};