diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 6a0e5220..13c965b1 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -166,7 +166,7 @@ impl Limiter { cfg.name, cfg.cache_size.or_else(guess_cache_size).unwrap(), cfg.local, - cfg.broadcast, + Some(cfg.broadcast), ); let rate_limiter_builder = RateLimiterBuilder::with_storage(Storage::with_counter_storage(Box::new(storage))); diff --git a/limitador/benches/bench.rs b/limitador/benches/bench.rs index db5a0977..111bad1f 100644 --- a/limitador/benches/bench.rs +++ b/limitador/benches/bench.rs @@ -1,18 +1,21 @@ +use std::collections::HashMap; +use std::fmt::{Display, Formatter}; +use std::future::Future; +use std::time::Instant; + use criterion::{black_box, criterion_group, criterion_main, Bencher, BenchmarkId, Criterion}; use rand::seq::SliceRandom; +use rand::SeedableRng; use limitador::limit::Limit; #[cfg(feature = "disk_storage")] use limitador::storage::disk::{DiskStorage, OptimizeFor}; +#[cfg(feature = "distributed_storage")] +use limitador::storage::distributed::CrInMemoryStorage; use limitador::storage::in_memory::InMemoryStorage; use limitador::storage::redis::CachedRedisStorageBuilder; use limitador::storage::{AsyncCounterStorage, CounterStorage}; use limitador::{AsyncRateLimiter, RateLimiter}; -use rand::SeedableRng; -use std::collections::HashMap; -use std::fmt::{Display, Formatter}; -use std::future::Future; -use std::time::Instant; const SEED: u64 = 42; @@ -22,7 +25,11 @@ criterion_group!(benches, bench_in_mem); criterion_group!(benches, bench_in_mem, bench_disk); #[cfg(all(not(feature = "disk_storage"), feature = "redis_storage"))] criterion_group!(benches, bench_in_mem, bench_redis, bench_cached_redis); -#[cfg(all(feature = "disk_storage", feature = "redis_storage"))] +#[cfg(all( + feature = "disk_storage", + feature = "redis_storage", + not(feature = "distributed_storage") +))] criterion_group!( benches, bench_in_mem, @@ -30,6 +37,19 @@ criterion_group!( bench_redis, bench_cached_redis ); +#[cfg(all( + feature = "disk_storage", + feature = "redis_storage", + feature = "distributed_storage" +))] +criterion_group!( + benches, + bench_in_mem, + bench_disk, + bench_redis, + bench_cached_redis, + bench_distributed, +); criterion_main!(benches); @@ -79,7 +99,7 @@ impl Display for TestScenario { } fn bench_in_mem(c: &mut Criterion) { - let mut group = c.benchmark_group("In memory"); + let mut group = c.benchmark_group("Memory"); for scenario in TEST_SCENARIOS { group.bench_with_input( BenchmarkId::new("is_rate_limited", scenario), @@ -109,6 +129,63 @@ fn bench_in_mem(c: &mut Criterion) { group.finish(); } +#[cfg(feature = "distributed_storage")] +fn bench_distributed(c: &mut Criterion) { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let mut group = c.benchmark_group("Distributed"); + for scenario in TEST_SCENARIOS { + group.bench_with_input( + BenchmarkId::new("is_rate_limited", scenario), + scenario, + |b: &mut Bencher, test_scenario: &&TestScenario| { + runtime.block_on(async move { + let storage = Box::new(CrInMemoryStorage::new( + "test_node".to_owned(), + 10_000, + "127.0.0.1:0".to_owned(), + None, + )); + bench_is_rate_limited(b, test_scenario, storage); + }) + }, + ); + group.bench_with_input( + BenchmarkId::new("update_counters", scenario), + scenario, + |b: &mut Bencher, test_scenario: &&TestScenario| { + runtime.block_on(async move { + let storage = Box::new(CrInMemoryStorage::new( + "test_node".to_owned(), + 10_000, + "127.0.0.1:0".to_owned(), + None, + )); + bench_update_counters(b, test_scenario, storage); + }) + }, + ); + group.bench_with_input( + BenchmarkId::new("check_rate_limited_and_update", scenario), + scenario, + |b: &mut Bencher, test_scenario: &&TestScenario| { + runtime.block_on(async move { + let storage = Box::new(CrInMemoryStorage::new( + "test_node".to_owned(), + 10_000, + "127.0.0.1:0".to_owned(), + None, + )); + bench_check_rate_limited_and_update(b, test_scenario, storage); + }) + }, + ); + } + group.finish(); +} #[cfg(feature = "disk_storage")] fn bench_disk(c: &mut Criterion) { let mut group = c.benchmark_group("Disk"); diff --git a/limitador/src/storage/distributed/mod.rs b/limitador/src/storage/distributed/mod.rs index 5732a322..ff9aa923 100644 --- a/limitador/src/storage/distributed/mod.rs +++ b/limitador/src/storage/distributed/mod.rs @@ -249,28 +249,34 @@ impl CounterStorage for CrInMemoryStorage { } impl CrInMemoryStorage { - pub fn new(identifier: String, cache_size: u64, local: String, broadcast: String) -> Self { + pub fn new( + identifier: String, + cache_size: u64, + local: String, + broadcast: Option, + ) -> Self { let (sender, mut rx) = mpsc::channel(1000); let local = local.to_socket_addrs().unwrap().next().unwrap(); - let remote = broadcast.clone(); - tokio::spawn(async move { - let sock = UdpSocket::bind(local).await.unwrap(); - sock.set_broadcast(true).unwrap(); - sock.connect(remote).await.unwrap(); - loop { - let message: CounterValueMessage = rx.recv().await.unwrap(); - let buf = postcard::to_stdvec(&message).unwrap(); - match sock.send(&buf).await { - Ok(len) => { - if len != buf.len() { - println!("Couldn't send complete message!"); + if let Some(remote) = broadcast.clone() { + tokio::spawn(async move { + let sock = UdpSocket::bind(local).await.unwrap(); + sock.set_broadcast(true).unwrap(); + sock.connect(remote).await.unwrap(); + loop { + let message: CounterValueMessage = rx.recv().await.unwrap(); + let buf = postcard::to_stdvec(&message).unwrap(); + match sock.send(&buf).await { + Ok(len) => { + if len != buf.len() { + println!("Couldn't send complete message!"); + } } - } - Err(err) => println!("Couldn't send update: {:?}", err), - }; - } - }); + Err(err) => println!("Couldn't send update: {:?}", err), + }; + } + }); + } let limits_for_namespace = Arc::new(RwLock::new(HashMap::< Namespace, @@ -282,44 +288,53 @@ impl CrInMemoryStorage { { let limits_for_namespace = limits_for_namespace.clone(); let qualified_counters = qualified_counters.clone(); - tokio::spawn(async move { - let sock = UdpSocket::bind(broadcast).await.unwrap(); - sock.set_broadcast(true).unwrap(); - let mut buf = [0; 1024]; - loop { - let (len, addr) = sock.recv_from(&mut buf).await.unwrap(); - if addr != local { - match postcard::from_bytes::(&buf[..len]) { - Ok(message) => { - let CounterValueMessage { - counter_key, - expiry, - values, - } = message; - let counter = >::into(counter_key); - if counter.is_qualified() { - if let Some(counter) = qualified_counters.get(&counter) { - counter.merge( + + if let Some(broadcast) = broadcast.clone() { + tokio::spawn(async move { + let sock = UdpSocket::bind(broadcast).await.unwrap(); + sock.set_broadcast(true).unwrap(); + let mut buf = [0; 1024]; + loop { + let (len, addr) = sock.recv_from(&mut buf).await.unwrap(); + if addr != local { + match postcard::from_bytes::(&buf[..len]) { + Ok(message) => { + let CounterValueMessage { + counter_key, + expiry, + values, + } = message; + let counter = >::into(counter_key); + if counter.is_qualified() { + if let Some(counter) = qualified_counters.get(&counter) { + counter.merge( + (UNIX_EPOCH + Duration::from_secs(expiry), values) + .into(), + ); + } + } else { + let counters = limits_for_namespace.read().unwrap(); + let limits = counters.get(counter.namespace()).unwrap(); + let value = limits.get(counter.limit()).unwrap(); + value.merge( (UNIX_EPOCH + Duration::from_secs(expiry), values) .into(), ); - } - } else { - let counters = limits_for_namespace.read().unwrap(); - let limits = counters.get(counter.namespace()).unwrap(); - let value = limits.get(counter.limit()).unwrap(); - value.merge( - (UNIX_EPOCH + Duration::from_secs(expiry), values).into(), - ); - }; - } - Err(err) => { - println!("Error from {} bytes: {:?} \n{:?}", len, err, &buf[..len]) + }; + } + Err(err) => { + println!( + "Error from {} bytes: {:?} \n{:?}", + len, + err, + &buf[..len] + ) + } } } } - } - }); + }); + } } Self { diff --git a/limitador/tests/integration_tests.rs b/limitador/tests/integration_tests.rs index 2b1e9afe..06c308c2 100644 --- a/limitador/tests/integration_tests.rs +++ b/limitador/tests/integration_tests.rs @@ -17,7 +17,7 @@ macro_rules! test_with_all_storage_impls { #[tokio::test] async fn [<$function _distributed_storage>]() { let rate_limiter = - RateLimiter::new_with_storage(Box::new(CrInMemoryStorage::new("test_node".to_owned(), 10_000, "127.0.0.1:19876".to_owned(), "127.0.0.255:19876".to_owned()))); + RateLimiter::new_with_storage(Box::new(CrInMemoryStorage::new("test_node".to_owned(), 10_000, "127.0.0.1:19876".to_owned(), Some("127.0.0.255:19876".to_owned())))); $function(&mut TestsLimiter::new_from_blocking_impl(rate_limiter)).await; }