Skip to content

Commit

Permalink
Add distributed_storage feature
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed May 16, 2024
1 parent 99b9e87 commit 020e468
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 40 deletions.
3 changes: 3 additions & 0 deletions limitador-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ documentation = "https://kuadrant.io/docs/limitador"
readme = "README.md"
edition = "2021"

[features]
distributed_storage = ["limitador/distributed_storage"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
Expand Down
2 changes: 2 additions & 0 deletions limitador-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ pub enum StorageConfiguration {
InMemory(InMemoryStorageConfiguration),
Disk(DiskStorageConfiguration),
Redis(RedisStorageConfiguration),
#[cfg(feature = "distributed_storage")]
Distributed(DistributedStorageConfiguration),
}

Expand All @@ -149,6 +150,7 @@ pub struct InMemoryStorageConfiguration {
}

#[derive(PartialEq, Eq, Debug)]
#[cfg(feature = "distributed_storage")]
pub struct DistributedStorageConfiguration {
pub name: String,
pub cache_size: Option<u64>,
Expand Down
83 changes: 45 additions & 38 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
extern crate log;
extern crate clap;

#[cfg(feature = "distributed_storage")]
use crate::config::DistributedStorageConfiguration;
use crate::config::{
Configuration, DiskStorageConfiguration, DistributedStorageConfiguration,
InMemoryStorageConfiguration, RedisStorageCacheConfiguration, RedisStorageConfiguration,
StorageConfiguration,
Configuration, DiskStorageConfiguration, InMemoryStorageConfiguration,
RedisStorageCacheConfiguration, RedisStorageConfiguration, StorageConfiguration,
};
use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders};
use crate::http_api::server::run_http_server;
Expand All @@ -24,6 +25,7 @@ use limitador::storage::redis::{
AsyncRedisStorage, CachedRedisStorage, CachedRedisStorageBuilder, DEFAULT_BATCH_SIZE,
DEFAULT_FLUSHING_PERIOD_SEC, DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_RESPONSE_TIMEOUT_MS,
};
#[cfg(feature = "distributed_storage")]
use limitador::storage::DistributedInMemoryStorage;
use limitador::storage::{AsyncCounterStorage, AsyncStorage, Storage};
use limitador::{
Expand Down Expand Up @@ -85,6 +87,7 @@ impl Limiter {
let rate_limiter = match config.storage {
StorageConfiguration::Redis(cfg) => Self::redis_limiter(cfg).await,
StorageConfiguration::InMemory(cfg) => Self::in_memory_limiter(cfg),
#[cfg(feature = "distributed_storage")]
StorageConfiguration::Distributed(cfg) => Self::distributed_limiter(cfg),
StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg),
};
Expand Down Expand Up @@ -157,6 +160,7 @@ impl Limiter {
Self::Blocking(rate_limiter_builder.build())
}

#[cfg(feature = "distributed_storage")]
fn distributed_limiter(cfg: DistributedStorageConfiguration) -> Self {
let storage = DistributedInMemoryStorage::new(
cfg.name,
Expand Down Expand Up @@ -579,43 +583,45 @@ fn create_config() -> (Configuration, &'static str) {
.display_order(6)
.help("Timeout for Redis commands in milliseconds"),
),
)
.subcommand(
Command::new("distributed")
.about("Replicates CRDT-based counters across multiple Limitador servers")
.display_order(5)
.arg(
Arg::new("NAME")
.action(ArgAction::Set)
.required(true)
.display_order(2)
.help("Unique name to identify this Limitador instance"),
)
.arg(
Arg::new("LOCAL")
.action(ArgAction::Set)
.required(true)
.display_order(2)
.help("Local IP:PORT to send datagrams from"),
)
.arg(
Arg::new("BROADCAST")
.action(ArgAction::Set)
.required(true)
.display_order(3)
.help("Broadcast IP:PORT to send datagrams to"),
)
.arg(
Arg::new("CACHE_SIZE")
.long("cache")
.short('c')
.action(ArgAction::Set)
.value_parser(value_parser!(u64))
.display_order(4)
.help("Sets the size of the cache for 'qualified counters'"),
),
);

#[cfg(feature = "distributed_storage")]
let cmdline = cmdline.subcommand(
Command::new("distributed")
.about("Replicates CRDT-based counters across multiple Limitador servers")
.display_order(5)
.arg(
Arg::new("NAME")
.action(ArgAction::Set)
.required(true)
.display_order(2)
.help("Unique name to identify this Limitador instance"),
)
.arg(
Arg::new("LOCAL")
.action(ArgAction::Set)
.required(true)
.display_order(2)
.help("Local IP:PORT to send datagrams from"),
)
.arg(
Arg::new("BROADCAST")
.action(ArgAction::Set)
.required(true)
.display_order(3)
.help("Broadcast IP:PORT to send datagrams to"),
)
.arg(
Arg::new("CACHE_SIZE")
.long("cache")
.short('c')
.action(ArgAction::Set)
.value_parser(value_parser!(u64))
.display_order(4)
.help("Sets the size of the cache for 'qualified counters'"),
),
);

let matches = cmdline.get_matches();

let limits_file = matches.get_one::<String>("LIMITS_FILE").unwrap();
Expand Down Expand Up @@ -681,6 +687,7 @@ fn create_config() -> (Configuration, &'static str) {
Some(("memory", sub)) => StorageConfiguration::InMemory(InMemoryStorageConfiguration {
cache_size: sub.get_one::<u64>("CACHE_SIZE").copied(),
}),
#[cfg(feature = "distributed_storage")]
Some(("distributed", sub)) => {
StorageConfiguration::Distributed(DistributedStorageConfiguration {
name: sub.get_one::<String>("NAME").unwrap().to_owned(),
Expand Down
1 change: 1 addition & 0 deletions limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ edition = "2021"
[features]
default = ["disk_storage", "redis_storage"]
disk_storage = ["rocksdb"]
distributed_storage = []
redis_storage = ["redis", "r2d2", "tokio"]
lenient_conditions = []

Expand Down
4 changes: 2 additions & 2 deletions limitador/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use thiserror::Error;

#[cfg(feature = "disk_storage")]
pub mod disk;
#[cfg(not(target_arch = "wasm32"))]
#[cfg(feature = "distributed_storage")]
pub mod distributed;
pub mod in_memory;

#[cfg(not(target_arch = "wasm32"))]
#[cfg(feature = "distributed_storage")]
pub use crate::storage::distributed::CrInMemoryStorage as DistributedInMemoryStorage;

#[cfg(feature = "redis_storage")]
Expand Down
2 changes: 2 additions & 0 deletions limitador/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ macro_rules! test_with_all_storage_impls {
$function(&mut TestsLimiter::new_from_blocking_impl(rate_limiter)).await;
}

#[cfg(feature = "distributed_storage")]
#[tokio::test]
async fn [<$function _distributed_storage>]() {
let rate_limiter =
Expand Down Expand Up @@ -96,6 +97,7 @@ mod test {
use crate::helpers::tests_limiter::*;
use limitador::limit::Limit;
use limitador::storage::disk::{DiskStorage, OptimizeFor};
#[cfg(feature = "distributed_storage")]
use limitador::storage::distributed::CrInMemoryStorage;
use limitador::storage::in_memory::InMemoryStorage;
use std::collections::{HashMap, HashSet};
Expand Down

0 comments on commit 020e468

Please sign in to comment.