From 94cb79ecddf05782d42d482a38caa2baaa7efb40 Mon Sep 17 00:00:00 2001 From: Adam Cattermole Date: Tue, 5 Mar 2024 15:58:51 +0000 Subject: [PATCH 1/4] Add new tracing metrics layer --- Cargo.lock | 34 +-- limitador-server/src/main.rs | 23 ++ limitador-server/src/metrics.rs | 302 +++++++++++++++++++++ limitador/src/lib.rs | 9 + limitador/src/storage/redis/redis_async.rs | 179 +++++++----- 5 files changed, 446 insertions(+), 101 deletions(-) create mode 100644 limitador-server/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 30a471da..13ebcc57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -821,46 +821,37 @@ checksum = "7059fff8937831a9ae6f0fe4d658ffabf58f2ca96aa9dec1c889f936f705f216" [[package]] name = "crossbeam-channel" -version = "0.5.8" +version = "0.5.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" dependencies = [ - "cfg-if", "crossbeam-utils", ] [[package]] name = "crossbeam-deque" -version = "0.8.3" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" dependencies = [ - "cfg-if", "crossbeam-epoch", "crossbeam-utils", ] [[package]] name = "crossbeam-epoch" -version = "0.9.15" +version = "0.9.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" dependencies = [ - "autocfg", - "cfg-if", "crossbeam-utils", - "memoffset", - "scopeguard", ] [[package]] name = "crossbeam-utils" -version = "0.8.16" +version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" -dependencies = [ - "cfg-if", -] +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" [[package]] name = "crypto-common" @@ -1704,15 +1695,6 @@ version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" -[[package]] -name = "memoffset" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" -dependencies = [ - "autocfg", -] - [[package]] name = "mime" version = "0.3.17" diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 8aff7275..73b61d1e 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -13,6 +13,7 @@ use crate::config::{ }; use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders}; use crate::http_api::server::run_http_server; +use crate::metrics::MetricsLayer; use clap::{value_parser, Arg, ArgAction, Command}; use const_format::formatcp; use limitador::counter::Counter; @@ -55,6 +56,7 @@ mod envoy_rls; mod http_api; mod config; +mod metrics; const LIMITADOR_VERSION: &str = env!("CARGO_PKG_VERSION"); const LIMITADOR_PROFILE: &str = env!("LIMITADOR_PROFILE"); @@ -290,6 +292,7 @@ fn find_first_negative_limit(limits: &[Limit]) -> Option { #[actix_rt::main] async fn main() -> Result<(), Box> { + // let metrics_layer = MetricsLayer::new(); let config = { let (config, version) = create_config(); println!("{LIMITADOR_HEADER} {version}"); @@ -304,6 +307,12 @@ async fn main() -> Result<(), Box> { tracing_subscriber::fmt::layer() }; + let metrics_layer = MetricsLayer::new().gather( + "should_rate_limit", + |timings| println!("should_rate_limit/datastore timings: {:?}", timings), + vec!["datastore"], + ); + if !config.tracing_endpoint.is_empty() { global::set_text_map_propagator(TraceContextPropagator::new()); let tracer = opentelemetry_otlp::new_pipeline() @@ -320,12 +329,14 @@ async fn main() -> Result<(), Box> { let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer); tracing_subscriber::registry() .with(level) + .with(metrics_layer) .with(fmt_layer) .with(telemetry_layer) .init(); } else { tracing_subscriber::registry() .with(level) + .with(metrics_layer) .with(fmt_layer) .init(); }; @@ -349,6 +360,18 @@ async fn main() -> Result<(), Box> { } }; + // let metrics_layer = MetricsLayer::new().gather( + // "should_rate_limit", + // |timings| println!("should_rate_limit/datastore timings: {:?}", timings), + // vec!["datastore"], + // ); + // match rate_limiter { + // Limiter::Blocking(limiter) => { + // metrics_layer.gather(); + // } + // Limiter::Async(limiter) => {} + // }; + info!("limits file path: {}", limit_file); if let Err(e) = rate_limiter.load_limits_from_file(&limit_file).await { eprintln!("Failed to load limit file: {e}"); diff --git a/limitador-server/src/metrics.rs b/limitador-server/src/metrics.rs new file mode 100644 index 00000000..2f96bb3d --- /dev/null +++ b/limitador-server/src/metrics.rs @@ -0,0 +1,302 @@ +use std::collections::HashMap; +use std::ops; +use std::time::Instant; +use tracing::span::{Attributes, Id}; +use tracing::Subscriber; +use tracing_subscriber::layer::{Context, Layer}; +use tracing_subscriber::registry::LookupSpan; + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct Timings { + idle: u64, + busy: u64, + last: Instant, +} + +impl Timings { + fn new() -> Self { + Self { + idle: 0, + busy: 0, + last: Instant::now(), + } + } +} + +impl ops::Add for Timings { + type Output = Self; + + fn add(self, _rhs: Self) -> Self { + Self { + busy: self.busy + _rhs.busy, + idle: self.idle + _rhs.idle, + last: if self.last < _rhs.last { + _rhs.last + } else { + self.last + }, + } + } +} + +impl ops::AddAssign for Timings { + fn add_assign(&mut self, _rhs: Self) { + *self = *self + _rhs + } +} + +#[derive(Debug, Clone)] +struct SpanState { + group_times: HashMap, +} + +impl SpanState { + fn new(group: String) -> Self { + Self { + group_times: HashMap::from([(group, Timings::new())]), + } + } + + fn increment(&mut self, group: &String, timings: Timings) -> &mut Self { + self.group_times + .entry(group.to_string()) + .and_modify(|x| *x += timings) + .or_insert(timings); + self + } +} + +pub struct MetricsGroup { + consumer: F, + records: Vec, +} + +impl MetricsGroup { + pub fn new(consumer: F, records: Vec) -> Self { + Self { consumer, records } + } +} + +pub struct MetricsLayer { + groups: HashMap>, +} + +impl MetricsLayer { + pub fn new() -> Self { + Self { + groups: HashMap::new(), + } + } + + pub fn gather(mut self, aggregate: &str, consumer: F, records: Vec<&str>) -> Self { + // TODO: does not handle case where aggregate already exists + let rec = records.iter().map(|r| r.to_string()).collect(); + self.groups + .entry(aggregate.to_string()) + .or_insert(MetricsGroup::new(consumer, rec)); + self + } +} + +impl Layer for MetricsLayer +where + S: Subscriber, + S: for<'lookup> LookupSpan<'lookup>, +{ + fn on_new_span(&self, _attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + let name = span.name(); + + // if there's a parent + if let Some(parent) = span.parent() { + // if the parent has SpanState propagate to this span + if let Some(parent_state) = parent.extensions_mut().get_mut::() { + extensions.insert(parent_state.clone()); + } + } + + // if we are an aggregator + if self.groups.contains_key(name) { + if let Some(span_state) = extensions.get_mut::() { + // if the SpanState has come from parent and we must append + // (we are a second level aggregator) + span_state + .group_times + .entry(name.to_string()) + .or_insert(Timings::new()); + } else { + // otherwise create a new SpanState with ourselves + extensions.insert(SpanState::new(name.to_string())) + } + } + + // if timing is already set (which it shouldn't be) + // don't create it again + // if !extensions.get_mut::().is_none() { + // return; + // } + + if let Some(span_state) = extensions.get_mut::() { + // either we are an aggregator or nested within one + for group in span_state.group_times.keys() { + for record in &self.groups.get(group).unwrap().records { + if name == record { + extensions.insert(Timings::new()); + return; + } + } + } + // if here we are an intermediate span that should not be recorded + } + } + + fn on_enter(&self, id: &Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + + if let Some(timings) = extensions.get_mut::() { + let now = Instant::now(); + timings.idle += (now - timings.last).as_nanos() as u64; + timings.last = now; + } + } + + fn on_exit(&self, id: &Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + + if let Some(timings) = extensions.get_mut::() { + let now = Instant::now(); + timings.busy += (now - timings.last).as_nanos() as u64; + timings.last = now; + } + } + + fn on_close(&self, id: Id, ctx: Context<'_, S>) { + let span = ctx.span(&id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + let name = span.name(); + let mut t: Option = None; + + if let Some(timing) = extensions.get_mut::() { + let mut time = *timing; + time.idle += (Instant::now() - time.last).as_nanos() as u64; + t = Some(time); + } + + if let Some(span_state) = extensions.get_mut::() { + if let Some(timing) = t { + let group_times = span_state.group_times.clone(); + // iterate over the groups this span belongs to + 'aggregate: for group in group_times.keys() { + // find the set of records related to these groups in the layer + for record in &self.groups.get(group).unwrap().records { + // if we are a record for this group then increment the relevant + // span-local timing and continue to the next group + if name == record { + span_state.increment(group, timing); + continue 'aggregate; + } + } + } + } + // we have updated local span_state + // but we need to bubble back up through parents + // NOTE: this propagates the timings, ready to be cloned by next new span! + if let Some(parent) = span.parent() { + parent.extensions_mut().replace(span_state.clone()); + } + // IF we are aggregator call consume function + match self.groups.get(name) { + Some(metrics_group) => { + (metrics_group.consumer)(*span_state.group_times.get(name).unwrap()) + } + _ => (), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::{MetricsLayer, SpanState, Timings}; + use std::time::Instant; + + #[test] + fn timings_add() { + let now = Instant::now(); + let t1 = Timings { + idle: 5, + busy: 5, + last: now, + }; + let t2 = Timings { + idle: 3, + busy: 5, + last: now, + }; + let t3 = t1 + t2; + assert_eq!( + t3, + Timings { + idle: 8, + busy: 10, + last: now + } + ) + } + + #[test] + fn timings_add_assign() { + let now = Instant::now(); + let mut t1 = Timings { + idle: 5, + busy: 5, + last: now, + }; + let t2 = Timings { + idle: 3, + busy: 5, + last: now, + }; + t1 += t2; + assert_eq!( + t1, + Timings { + idle: 8, + busy: 10, + last: now + } + ) + } + + #[test] + fn span_state_increment() { + let group = String::from("group"); + let mut span_state = SpanState::new(group.clone()); + let t1 = Timings { + idle: 5, + busy: 5, + last: Instant::now(), + }; + span_state.increment(&group, t1); + assert_eq!(span_state.group_times.get(&group).unwrap().idle, t1.idle); + assert_eq!(span_state.group_times.get(&group).unwrap().busy, t1.busy); + } + + #[test] + fn metrics_layer() { + let consumer = |_| println!("group/record"); + let ml = MetricsLayer::new().gather("group", consumer, vec!["record"]); + assert_eq!(ml.groups.get("group").unwrap().records, vec!["record"]); + } +} + +// mylayer.gather("aggregate_on", timings| pr.vomit(timings), ["datastore"]) + +// mylayer.gather("aggregate_on", ["datastore"]).consumer("aggregate_on", |timings| pr.vomit(timings)) + +// write a consumer function, takes a function that does something with the timings + +// fn consumer(timings: Timings) -> () {} diff --git a/limitador/src/lib.rs b/limitador/src/lib.rs index 64730b51..7b08736e 100644 --- a/limitador/src/lib.rs +++ b/limitador/src/lib.rs @@ -193,6 +193,7 @@ #![allow(clippy::multiple_crate_versions)] use std::collections::{HashMap, HashSet}; +use std::time::Duration; use crate::counter::Counter; use crate::errors::LimitadorError; @@ -478,6 +479,10 @@ impl RateLimiter { self.prometheus_metrics.gather_metrics() } + pub fn prometheus_counter_access(&self, duration: Duration) { + self.prometheus_metrics.counter_access(duration) + } + fn counters_that_apply( &self, namespace: &Namespace, @@ -672,6 +677,10 @@ impl AsyncRateLimiter { self.prometheus_metrics.gather_metrics() } + pub fn prometheus_counter_access(&self, duration: Duration) { + self.prometheus_metrics.counter_access(duration) + } + async fn counters_that_apply( &self, namespace: &Namespace, diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index 23782254..4ddf09f6 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -14,7 +14,7 @@ use redis::{AsyncCommands, RedisError}; use std::collections::HashSet; use std::str::FromStr; use std::time::{Duration, Instant}; -use tracing::{trace_span, Instrument}; +use tracing::{debug_span, Instrument}; // Note: this implementation does not guarantee exact limits. Ensuring that we // never go over the limits would hurt performance. This implementation @@ -36,12 +36,12 @@ impl AsyncCounterStorage for AsyncRedisStorage { async fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result { let mut con = self.conn_manager.clone(); - let span = trace_span!("datastore"); + // let span = debug_span!("datastore"); match async move { con.get::>(key_for_counter(counter)) .await } - .instrument(span) + .instrument(debug_span!("datastore")) .await? { Some(val) => Ok(val + delta <= counter.max_value()), @@ -53,18 +53,19 @@ impl AsyncCounterStorage for AsyncRedisStorage { async fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let mut con = self.conn_manager.clone(); - let span = trace_span!("datastore"); - async { - redis::Script::new(SCRIPT_UPDATE_COUNTER) - .key(key_for_counter(counter)) - .key(key_for_counters_of_limit(counter.limit())) - .arg(counter.seconds()) - .arg(delta) - .invoke_async::<_, _>(&mut con) - .await - } - .instrument(span) - .await?; + // let span = debug_span!("datastore"); + // async { + redis::Script::new(SCRIPT_UPDATE_COUNTER) + .key(key_for_counter(counter)) + .key(key_for_counters_of_limit(counter.limit())) + .arg(counter.seconds()) + .arg(delta) + .invoke_async::<_, _>(&mut con) + .instrument(debug_span!("datastore")) + .await?; + // } + + // .await?; Ok(()) } @@ -89,33 +90,37 @@ impl AsyncCounterStorage for AsyncRedisStorage { } let script_res: Vec> = { - let span = trace_span!("datastore"); - async { - let start = Instant::now(); - let result = script_invocation.invoke_async(&mut con).await; - counter_access.observe(start.elapsed()); - result - } - .instrument(span) - .await? + // let span = debug_span!("datastore"); + // async { + let start = Instant::now(); + let result = script_invocation + .invoke_async(&mut con) + .instrument(debug_span!("datastore")) + .await?; + counter_access.observe(start.elapsed()); + result + // } + // .instrument(debug_span!("datastore")) + // .await? }; if let Some(res) = is_limited(counters, delta, script_res) { return Ok(res); } } else { let counter_vals: Vec> = { - let span = trace_span!("datastore"); - async { - let start = Instant::now(); - let result = redis::cmd("MGET") - .arg(counter_keys.clone()) - .query_async(&mut con) - .await; - counter_access.observe(start.elapsed()); - result - } - .instrument(span) - .await? + // let span = debug_span!("datastore"); + // async { + let start = Instant::now(); + let result = redis::cmd("MGET") + .arg(counter_keys.clone()) + .query_async(&mut con) + .instrument(debug_span!("datastore")) + .await?; + counter_access.observe(start.elapsed()); + result + // } + // .instrument(debug_span!("datastore")) + // .await? }; for (i, counter) in counters.iter().enumerate() { @@ -132,21 +137,22 @@ impl AsyncCounterStorage for AsyncRedisStorage { // TODO: this can be optimized by using pipelines with multiple updates for (counter_idx, key) in counter_keys.into_iter().enumerate() { let counter = &counters[counter_idx]; - let span = trace_span!("datastore"); - async { - let start = Instant::now(); - let result = redis::Script::new(SCRIPT_UPDATE_COUNTER) - .key(key) - .key(key_for_counters_of_limit(counter.limit())) - .arg(counter.seconds()) - .arg(delta) - .invoke_async::<_, _>(&mut con) - .await; - counter_access.observe(start.elapsed()); - result - } - .instrument(span) - .await? + // let span = debug_span!("datastore"); + // async { + let start = Instant::now(); + let result = redis::Script::new(SCRIPT_UPDATE_COUNTER) + .key(key) + .key(key_for_counters_of_limit(counter.limit())) + .arg(counter.seconds()) + .arg(delta) + .invoke_async::<_, _>(&mut con) + .instrument(debug_span!("datastore")) + .await?; + counter_access.observe(start.elapsed()); + result + // } + // .instrument(debug_span!("datastore")) + // .await? } Ok(Authorization::Ok) @@ -160,12 +166,12 @@ impl AsyncCounterStorage for AsyncRedisStorage { for limit in limits { let counter_keys = { - let span = trace_span!("datastore"); + // let span = debug_span!("datastore"); async { con.smembers::>(key_for_counters_of_limit(&limit)) .await } - .instrument(span) + .instrument(debug_span!("datastore")) .await? }; @@ -180,18 +186,26 @@ impl AsyncCounterStorage for AsyncRedisStorage { // This does not cause any bugs, but consumes memory // unnecessarily. let option = { - let span = trace_span!("datastore"); - async { con.get::>(counter_key.clone()).await } - .instrument(span) + // let span = debug_span!("datastore"); + // async { + con.get::>(counter_key.clone()) + .instrument(debug_span!("datastore")) .await? + // } + // .instrument(debug_span!("datastore")) + // .await? }; if let Some(val) = option { counter.set_remaining(limit.max_value() - val); let ttl = { - let span = trace_span!("datastore"); - async { con.ttl(&counter_key).await } - .instrument(span) + // let span = debug_span!("datastore"); + // async { + con.ttl(&counter_key) + .instrument(debug_span!("datastore")) .await? + // } + // .instrument(debug_span!("datastore")) + // .await? }; counter.set_expires_in(Duration::from_secs(ttl)); @@ -206,10 +220,14 @@ impl AsyncCounterStorage for AsyncRedisStorage { #[tracing::instrument(skip_all)] async fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr> { for limit in limits { - let span = trace_span!("datastore"); - async { self.delete_counters_associated_with_limit(&limit).await } - .instrument(span) + // let span = debug_span!("datastore"); + // async { + self.delete_counters_associated_with_limit(&limit) + .instrument(debug_span!("datastore")) .await? + // } + // .instrument(debug_span!("datastore")) + // .await? } Ok(()) } @@ -217,10 +235,16 @@ impl AsyncCounterStorage for AsyncRedisStorage { #[tracing::instrument(skip_all)] async fn clear(&self) -> Result<(), StorageErr> { let mut con = self.conn_manager.clone(); - let span = trace_span!("datastore"); - async { redis::cmd("FLUSHDB").query_async(&mut con).await } - .instrument(span) + // let span = debug_span!("datastore"); + // async { + redis::cmd("FLUSHDB") + .query_async(&mut con) + .instrument(debug_span!("datastore")) .await?; + + // } + // .instrument(debug_span!("datastore")) + // .await?; Ok(()) } } @@ -245,20 +269,25 @@ impl AsyncRedisStorage { let mut con = self.conn_manager.clone(); let counter_keys = { - let span = trace_span!("datastore"); - async { - con.smembers::>(key_for_counters_of_limit(limit)) - .await - } - .instrument(span) - .await? + // let span = debug_span!("datastore"); + // async { + con.smembers::>(key_for_counters_of_limit(limit)) + .instrument(debug_span!("datastore")) + .await? + // } + // .instrument(debug_span!("datastore")) + // .await? }; for counter_key in counter_keys { - let span = trace_span!("datastore"); - async { con.del(counter_key).await } - .instrument(span) + // let span = debug_span!("datastore"); + // async { + con.del(counter_key) + .instrument(debug_span!("datastore")) .await?; + // } + // .instrument(debug_span!("datastore")) + // .await?; } Ok(()) From 543728ea586b858c5cf9a24f37653e9abba77468 Mon Sep 17 00:00:00 2001 From: Adam Cattermole Date: Fri, 8 Mar 2024 16:45:42 +0000 Subject: [PATCH 2/4] Update prometheus metrics from server --- limitador-server/src/envoy_rls/server.rs | 5 +- limitador-server/src/http_api/server.rs | 14 +-- limitador-server/src/main.rs | 22 ++--- limitador-server/src/metrics.rs | 20 ++-- limitador/src/lib.rs | 77 +++++---------- limitador/src/prometheus_metrics.rs | 91 ++++++----------- .../storage/infinispan/infinispan_storage.rs | 2 - limitador/src/storage/mod.rs | 5 +- limitador/src/storage/redis/redis_async.rs | 97 +++---------------- limitador/src/storage/redis/redis_cached.rs | 2 - 10 files changed, 103 insertions(+), 232 deletions(-) diff --git a/limitador-server/src/envoy_rls/server.rs b/limitador-server/src/envoy_rls/server.rs index d5b1747f..f9a57139 100644 --- a/limitador-server/src/envoy_rls/server.rs +++ b/limitador-server/src/envoy_rls/server.rs @@ -19,7 +19,7 @@ use crate::envoy_rls::server::envoy::service::ratelimit::v3::rate_limit_service_ use crate::envoy_rls::server::envoy::service::ratelimit::v3::{ RateLimitRequest, RateLimitResponse, }; -use crate::Limiter; +use crate::{Limiter, PROMETHEUS_METRICS}; include!("envoy_types.rs"); @@ -124,8 +124,11 @@ impl RateLimitService for MyRateLimiter { let mut rate_limited_resp = rate_limited_resp.unwrap(); let resp_code = if rate_limited_resp.limited { + PROMETHEUS_METRICS + .incr_limited_calls(&namespace, rate_limited_resp.limit_name.as_deref()); Code::OverLimit } else { + PROMETHEUS_METRICS.incr_authorized_calls(&namespace); Code::Ok }; diff --git a/limitador-server/src/http_api/server.rs b/limitador-server/src/http_api/server.rs index b32001ea..f86863dc 100644 --- a/limitador-server/src/http_api/server.rs +++ b/limitador-server/src/http_api/server.rs @@ -1,5 +1,5 @@ use crate::http_api::request_types::{CheckAndReportInfo, Counter, Limit}; -use crate::Limiter; +use crate::{Limiter, PROMETHEUS_METRICS}; use actix_web::{http::StatusCode, ResponseError}; use actix_web::{App, HttpServer}; use paperclip::actix::{ @@ -44,13 +44,10 @@ async fn status() -> web::Json<()> { Json(()) } -#[tracing::instrument(skip(data))] +#[tracing::instrument(skip(_data))] #[api_v2_operation] -async fn metrics(data: web::Data>) -> String { - match data.get_ref().as_ref() { - Limiter::Blocking(limiter) => limiter.gather_prometheus_metrics(), - Limiter::Async(limiter) => limiter.gather_prometheus_metrics(), - } +async fn metrics(_data: web::Data>) -> String { + PROMETHEUS_METRICS.gather_metrics() } #[api_v2_operation] @@ -170,8 +167,11 @@ async fn check_and_report( match rate_limited_and_update_result { Ok(is_rate_limited) => { if is_rate_limited.limited { + PROMETHEUS_METRICS + .incr_limited_calls(&namespace, is_rate_limited.limit_name.as_deref()); Err(ErrorResponse::TooManyRequests) } else { + PROMETHEUS_METRICS.incr_authorized_calls(&namespace); Ok(Json(())) } } diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 73b61d1e..666c4b5c 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -16,9 +16,11 @@ use crate::http_api::server::run_http_server; use crate::metrics::MetricsLayer; use clap::{value_parser, Arg, ArgAction, Command}; use const_format::formatcp; +use lazy_static::lazy_static; use limitador::counter::Counter; use limitador::errors::LimitadorError; use limitador::limit::Limit; +use limitador::prometheus_metrics::PrometheusMetrics; use limitador::storage::disk::DiskStorage; #[cfg(feature = "infinispan")] use limitador::storage::infinispan::{Consistency, InfinispanStorageBuilder}; @@ -73,6 +75,11 @@ pub enum LimitadorServerError { Internal(LimitadorError), } +lazy_static! { + // TODO: this should be using config.limit_name_in_labels to initialize + pub static ref PROMETHEUS_METRICS: PrometheusMetrics = PrometheusMetrics::default(); +} + pub enum Limiter { Blocking(RateLimiter), Async(AsyncRateLimiter), @@ -292,7 +299,6 @@ fn find_first_negative_limit(limits: &[Limit]) -> Option { #[actix_rt::main] async fn main() -> Result<(), Box> { - // let metrics_layer = MetricsLayer::new(); let config = { let (config, version) = create_config(); println!("{LIMITADOR_HEADER} {version}"); @@ -309,7 +315,7 @@ async fn main() -> Result<(), Box> { let metrics_layer = MetricsLayer::new().gather( "should_rate_limit", - |timings| println!("should_rate_limit/datastore timings: {:?}", timings), + |timings| PROMETHEUS_METRICS.counter_access(Duration::from(timings)), vec!["datastore"], ); @@ -360,18 +366,6 @@ async fn main() -> Result<(), Box> { } }; - // let metrics_layer = MetricsLayer::new().gather( - // "should_rate_limit", - // |timings| println!("should_rate_limit/datastore timings: {:?}", timings), - // vec!["datastore"], - // ); - // match rate_limiter { - // Limiter::Blocking(limiter) => { - // metrics_layer.gather(); - // } - // Limiter::Async(limiter) => {} - // }; - info!("limits file path: {}", limit_file); if let Err(e) = rate_limiter.load_limits_from_file(&limit_file).await { eprintln!("Failed to load limit file: {e}"); diff --git a/limitador-server/src/metrics.rs b/limitador-server/src/metrics.rs index 2f96bb3d..c4766adf 100644 --- a/limitador-server/src/metrics.rs +++ b/limitador-server/src/metrics.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::ops; -use std::time::Instant; +use std::time::{Duration, Instant}; use tracing::span::{Attributes, Id}; use tracing::Subscriber; use tracing_subscriber::layer::{Context, Layer}; @@ -45,6 +45,12 @@ impl ops::AddAssign for Timings { } } +impl From for Duration { + fn from(timings: Timings) -> Self { + Duration::from_nanos(timings.idle + timings.busy) + } +} + #[derive(Debug, Clone)] struct SpanState { group_times: HashMap, @@ -293,10 +299,8 @@ mod tests { } } -// mylayer.gather("aggregate_on", timings| pr.vomit(timings), ["datastore"]) - -// mylayer.gather("aggregate_on", ["datastore"]).consumer("aggregate_on", |timings| pr.vomit(timings)) - -// write a consumer function, takes a function that does something with the timings - -// fn consumer(timings: Timings) -> () {} +// [X] 1. Use prometheus metrics in main +// [X] 2. Try to use consume method from the prometheus metrics in main +// [X] 3. Invoke the server using the PrometheusMetrics defined in main not the limiter +// [ ] 4. Record the authorized/limited calls +// [ ] 5. Burn the old prometheus instance and move inside the server + old timing stuff diff --git a/limitador/src/lib.rs b/limitador/src/lib.rs index 7b08736e..04af4b51 100644 --- a/limitador/src/lib.rs +++ b/limitador/src/lib.rs @@ -193,7 +193,6 @@ #![allow(clippy::multiple_crate_versions)] use std::collections::{HashMap, HashSet}; -use std::time::Duration; use crate::counter::Counter; use crate::errors::LimitadorError; @@ -209,7 +208,7 @@ extern crate core; pub mod counter; pub mod errors; pub mod limit; -mod prometheus_metrics; +pub mod prometheus_metrics; pub mod storage; pub struct RateLimiter { @@ -230,6 +229,7 @@ pub struct RateLimiterBuilder { pub struct CheckResult { pub limited: bool, pub counters: Vec, + pub limit_name: Option, } impl From for bool { @@ -358,8 +358,6 @@ impl RateLimiter { match self.storage.is_within_limits(&counter, delta) { Ok(within_limits) => { if !within_limits { - self.prometheus_metrics - .incr_limited_calls(namespace, counter.limit().name()); return Ok(true); } } @@ -367,7 +365,6 @@ impl RateLimiter { } } - self.prometheus_metrics.incr_authorized_calls(namespace); Ok(false) } @@ -395,10 +392,10 @@ impl RateLimiter { let mut counters = self.counters_that_apply(namespace, values)?; if counters.is_empty() { - self.prometheus_metrics.incr_authorized_calls(namespace); return Ok(CheckResult { limited: false, counters, + limit_name: None, }); } @@ -413,21 +410,16 @@ impl RateLimiter { }; match check_result { - Authorization::Ok => { - self.prometheus_metrics.incr_authorized_calls(namespace); - Ok(CheckResult { - limited: false, - counters, - }) - } - Authorization::Limited(name) => { - self.prometheus_metrics - .incr_limited_calls(namespace, name.as_deref()); - Ok(CheckResult { - limited: true, - counters, - }) - } + Authorization::Ok => Ok(CheckResult { + limited: false, + counters, + limit_name: None, + }), + Authorization::Limited(name) => Ok(CheckResult { + limited: true, + counters, + limit_name: name, + }), } } @@ -479,10 +471,6 @@ impl RateLimiter { self.prometheus_metrics.gather_metrics() } - pub fn prometheus_counter_access(&self, duration: Duration) { - self.prometheus_metrics.counter_access(duration) - } - fn counters_that_apply( &self, namespace: &Namespace, @@ -547,16 +535,12 @@ impl AsyncRateLimiter { match self.storage.is_within_limits(&counter, delta).await { Ok(within_limits) => { if !within_limits { - self.prometheus_metrics - .incr_limited_calls(namespace, counter.limit().name()); return Ok(true); } } Err(e) => return Err(e.into()), } } - - self.prometheus_metrics.incr_authorized_calls(namespace); Ok(false) } @@ -586,17 +570,16 @@ impl AsyncRateLimiter { let mut counters = self.counters_that_apply(namespace, values).await?; if counters.is_empty() { - self.prometheus_metrics.incr_authorized_calls(namespace); return Ok(CheckResult { limited: false, counters, + limit_name: None, }); } - let access = self.prometheus_metrics.counter_accesses(); let check_result = self .storage - .check_and_update(&mut counters, delta, load_counters, access) + .check_and_update(&mut counters, delta, load_counters) .await?; let counters = if load_counters { @@ -606,22 +589,16 @@ impl AsyncRateLimiter { }; match check_result { - Authorization::Ok => { - self.prometheus_metrics.incr_authorized_calls(namespace); - - Ok(CheckResult { - limited: false, - counters, - }) - } - Authorization::Limited(name) => { - self.prometheus_metrics - .incr_limited_calls(namespace, name.as_deref()); - Ok(CheckResult { - limited: true, - counters, - }) - } + Authorization::Ok => Ok(CheckResult { + limited: false, + counters, + limit_name: None, + }), + Authorization::Limited(name) => Ok(CheckResult { + limited: true, + counters, + limit_name: name, + }), } } @@ -677,10 +654,6 @@ impl AsyncRateLimiter { self.prometheus_metrics.gather_metrics() } - pub fn prometheus_counter_access(&self, duration: Duration) { - self.prometheus_metrics.counter_access(duration) - } - async fn counters_that_apply( &self, namespace: &Namespace, diff --git a/limitador/src/prometheus_metrics.rs b/limitador/src/prometheus_metrics.rs index 4b2e0f51..3ee8f79a 100644 --- a/limitador/src/prometheus_metrics.rs +++ b/limitador/src/prometheus_metrics.rs @@ -83,14 +83,6 @@ impl PrometheusMetrics { self.counter_latency.observe(duration.as_secs_f64()); } - #[must_use] - pub fn counter_accesses(&self) -> CounterAccess { - CounterAccess { - metrics: self, - duration: Duration::ZERO, - } - } - pub fn gather_metrics(&self) -> String { let mut buffer = Vec::new(); @@ -171,25 +163,6 @@ impl PrometheusMetrics { } } -pub struct CounterAccess<'a> { - metrics: &'a PrometheusMetrics, - duration: Duration, -} - -impl CounterAccess<'_> { - pub fn observe(&mut self, duration: Duration) { - self.duration += duration; - } -} - -impl<'a> Drop for CounterAccess<'a> { - fn drop(&mut self) { - if self.duration > Duration::ZERO { - self.metrics.counter_access(self.duration); - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -324,38 +297,38 @@ mod tests { ) } - #[test] - fn collects_latencies() { - let metrics = PrometheusMetrics::new(); - assert_eq!(metrics.counter_latency.get_sample_count(), 0); - { - let _access = metrics.counter_accesses(); - } - assert_eq!(metrics.counter_latency.get_sample_count(), 0); - { - let mut access = metrics.counter_accesses(); - access.observe(Duration::from_millis(12)); - } - assert_eq!(metrics.counter_latency.get_sample_count(), 1); - assert_eq!( - metrics.counter_latency.get_sample_sum(), - Duration::from_millis(12).as_secs_f64() - ); - { - let mut access = metrics.counter_accesses(); - access.observe(Duration::from_millis(5)); - assert_eq!(metrics.counter_latency.get_sample_count(), 1); - assert_eq!( - metrics.counter_latency.get_sample_sum(), - Duration::from_millis(12).as_secs_f64() - ); - } - assert_eq!(metrics.counter_latency.get_sample_count(), 2); - assert_eq!( - metrics.counter_latency.get_sample_sum(), - Duration::from_millis(17).as_secs_f64() - ); - } + // #[test] + // fn collects_latencies() { + // let metrics = PrometheusMetrics::new(); + // assert_eq!(metrics.counter_latency.get_sample_count(), 0); + // { + // let _access = metrics.counter_accesses(); + // } + // assert_eq!(metrics.counter_latency.get_sample_count(), 0); + // { + // let mut access = metrics.counter_accesses(); + // access.observe(Duration::from_millis(12)); + // } + // assert_eq!(metrics.counter_latency.get_sample_count(), 1); + // assert_eq!( + // metrics.counter_latency.get_sample_sum(), + // Duration::from_millis(12).as_secs_f64() + // ); + // { + // let mut access = metrics.counter_accesses(); + // access.observe(Duration::from_millis(5)); + // assert_eq!(metrics.counter_latency.get_sample_count(), 1); + // assert_eq!( + // metrics.counter_latency.get_sample_sum(), + // Duration::from_millis(12).as_secs_f64() + // ); + // } + // assert_eq!(metrics.counter_latency.get_sample_count(), 2); + // assert_eq!( + // metrics.counter_latency.get_sample_sum(), + // Duration::from_millis(17).as_secs_f64() + // ); + // } fn formatted_counter_with_namespace_and_limit( metric_name: &str, diff --git a/limitador/src/storage/infinispan/infinispan_storage.rs b/limitador/src/storage/infinispan/infinispan_storage.rs index af790634..7e521d8a 100644 --- a/limitador/src/storage/infinispan/infinispan_storage.rs +++ b/limitador/src/storage/infinispan/infinispan_storage.rs @@ -1,6 +1,5 @@ use crate::counter::Counter; use crate::limit::Limit; -use crate::prometheus_metrics::CounterAccess; use crate::storage::infinispan::counters::{Consistency, CounterOpts}; use crate::storage::infinispan::response::response_to_string; use crate::storage::infinispan::{ @@ -74,7 +73,6 @@ impl AsyncCounterStorage for InfinispanStorage { counters: &mut Vec, delta: i64, load_counters: bool, - _counter_access: CounterAccess<'a>, ) -> Result { let mut counter_keys = Vec::with_capacity(counters.len()); diff --git a/limitador/src/storage/mod.rs b/limitador/src/storage/mod.rs index c6cb1c92..72e0346b 100644 --- a/limitador/src/storage/mod.rs +++ b/limitador/src/storage/mod.rs @@ -1,6 +1,5 @@ use crate::counter::Counter; use crate::limit::{Limit, Namespace}; -use crate::prometheus_metrics::CounterAccess; use crate::InMemoryStorage; use async_trait::async_trait; use std::collections::{HashMap, HashSet}; @@ -243,10 +242,9 @@ impl AsyncStorage { counters: &mut Vec, delta: i64, load_counters: bool, - counter_access: CounterAccess<'a>, ) -> Result { self.counters - .check_and_update(counters, delta, load_counters, counter_access) + .check_and_update(counters, delta, load_counters) .await } @@ -288,7 +286,6 @@ pub trait AsyncCounterStorage: Sync + Send { counters: &mut Vec, delta: i64, load_counters: bool, - counter_access: CounterAccess<'a>, ) -> Result; async fn get_counters(&self, limits: HashSet) -> Result, StorageErr>; async fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr>; diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index 4ddf09f6..d180b11b 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -4,7 +4,6 @@ use self::redis::aio::ConnectionManager; use self::redis::ConnectionInfo; use crate::counter::Counter; use crate::limit::Limit; -use crate::prometheus_metrics::CounterAccess; use crate::storage::keys::*; use crate::storage::redis::is_limited; use crate::storage::redis::scripts::{SCRIPT_UPDATE_COUNTER, VALUES_AND_TTLS}; @@ -13,7 +12,7 @@ use async_trait::async_trait; use redis::{AsyncCommands, RedisError}; use std::collections::HashSet; use std::str::FromStr; -use std::time::{Duration, Instant}; +use std::time::Duration; use tracing::{debug_span, Instrument}; // Note: this implementation does not guarantee exact limits. Ensuring that we @@ -36,13 +35,10 @@ impl AsyncCounterStorage for AsyncRedisStorage { async fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result { let mut con = self.conn_manager.clone(); - // let span = debug_span!("datastore"); - match async move { - con.get::>(key_for_counter(counter)) - .await - } - .instrument(debug_span!("datastore")) - .await? + match con + .get::>(key_for_counter(counter)) + .instrument(debug_span!("datastore")) + .await? { Some(val) => Ok(val + delta <= counter.max_value()), None => Ok(counter.max_value() - delta >= 0), @@ -53,8 +49,6 @@ impl AsyncCounterStorage for AsyncRedisStorage { async fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let mut con = self.conn_manager.clone(); - // let span = debug_span!("datastore"); - // async { redis::Script::new(SCRIPT_UPDATE_COUNTER) .key(key_for_counter(counter)) .key(key_for_counters_of_limit(counter.limit())) @@ -63,9 +57,6 @@ impl AsyncCounterStorage for AsyncRedisStorage { .invoke_async::<_, _>(&mut con) .instrument(debug_span!("datastore")) .await?; - // } - - // .await?; Ok(()) } @@ -76,7 +67,6 @@ impl AsyncCounterStorage for AsyncRedisStorage { counters: &mut Vec, delta: i64, load_counters: bool, - mut counter_access: CounterAccess<'a>, ) -> Result { let mut con = self.conn_manager.clone(); let counter_keys: Vec = counters.iter().map(key_for_counter).collect(); @@ -90,37 +80,21 @@ impl AsyncCounterStorage for AsyncRedisStorage { } let script_res: Vec> = { - // let span = debug_span!("datastore"); - // async { - let start = Instant::now(); - let result = script_invocation + script_invocation .invoke_async(&mut con) .instrument(debug_span!("datastore")) - .await?; - counter_access.observe(start.elapsed()); - result - // } - // .instrument(debug_span!("datastore")) - // .await? + .await? }; if let Some(res) = is_limited(counters, delta, script_res) { return Ok(res); } } else { let counter_vals: Vec> = { - // let span = debug_span!("datastore"); - // async { - let start = Instant::now(); - let result = redis::cmd("MGET") + redis::cmd("MGET") .arg(counter_keys.clone()) .query_async(&mut con) .instrument(debug_span!("datastore")) - .await?; - counter_access.observe(start.elapsed()); - result - // } - // .instrument(debug_span!("datastore")) - // .await? + .await? }; for (i, counter) in counters.iter().enumerate() { @@ -137,22 +111,14 @@ impl AsyncCounterStorage for AsyncRedisStorage { // TODO: this can be optimized by using pipelines with multiple updates for (counter_idx, key) in counter_keys.into_iter().enumerate() { let counter = &counters[counter_idx]; - // let span = debug_span!("datastore"); - // async { - let start = Instant::now(); - let result = redis::Script::new(SCRIPT_UPDATE_COUNTER) + redis::Script::new(SCRIPT_UPDATE_COUNTER) .key(key) .key(key_for_counters_of_limit(counter.limit())) .arg(counter.seconds()) .arg(delta) .invoke_async::<_, _>(&mut con) .instrument(debug_span!("datastore")) - .await?; - counter_access.observe(start.elapsed()); - result - // } - // .instrument(debug_span!("datastore")) - // .await? + .await? } Ok(Authorization::Ok) @@ -166,13 +132,9 @@ impl AsyncCounterStorage for AsyncRedisStorage { for limit in limits { let counter_keys = { - // let span = debug_span!("datastore"); - async { - con.smembers::>(key_for_counters_of_limit(&limit)) - .await - } - .instrument(debug_span!("datastore")) - .await? + con.smembers::>(key_for_counters_of_limit(&limit)) + .instrument(debug_span!("datastore")) + .await? }; for counter_key in counter_keys { @@ -186,26 +148,16 @@ impl AsyncCounterStorage for AsyncRedisStorage { // This does not cause any bugs, but consumes memory // unnecessarily. let option = { - // let span = debug_span!("datastore"); - // async { con.get::>(counter_key.clone()) .instrument(debug_span!("datastore")) .await? - // } - // .instrument(debug_span!("datastore")) - // .await? }; if let Some(val) = option { counter.set_remaining(limit.max_value() - val); let ttl = { - // let span = debug_span!("datastore"); - // async { con.ttl(&counter_key) .instrument(debug_span!("datastore")) .await? - // } - // .instrument(debug_span!("datastore")) - // .await? }; counter.set_expires_in(Duration::from_secs(ttl)); @@ -220,14 +172,9 @@ impl AsyncCounterStorage for AsyncRedisStorage { #[tracing::instrument(skip_all)] async fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr> { for limit in limits { - // let span = debug_span!("datastore"); - // async { self.delete_counters_associated_with_limit(&limit) .instrument(debug_span!("datastore")) .await? - // } - // .instrument(debug_span!("datastore")) - // .await? } Ok(()) } @@ -235,16 +182,10 @@ impl AsyncCounterStorage for AsyncRedisStorage { #[tracing::instrument(skip_all)] async fn clear(&self) -> Result<(), StorageErr> { let mut con = self.conn_manager.clone(); - // let span = debug_span!("datastore"); - // async { redis::cmd("FLUSHDB") .query_async(&mut con) .instrument(debug_span!("datastore")) .await?; - - // } - // .instrument(debug_span!("datastore")) - // .await?; Ok(()) } } @@ -269,25 +210,15 @@ impl AsyncRedisStorage { let mut con = self.conn_manager.clone(); let counter_keys = { - // let span = debug_span!("datastore"); - // async { con.smembers::>(key_for_counters_of_limit(limit)) .instrument(debug_span!("datastore")) .await? - // } - // .instrument(debug_span!("datastore")) - // .await? }; for counter_key in counter_keys { - // let span = debug_span!("datastore"); - // async { con.del(counter_key) .instrument(debug_span!("datastore")) .await?; - // } - // .instrument(debug_span!("datastore")) - // .await?; } Ok(()) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index a3e329c3..f29ccab4 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -1,6 +1,5 @@ use crate::counter::Counter; use crate::limit::Limit; -use crate::prometheus_metrics::CounterAccess; use crate::storage::keys::*; use crate::storage::redis::counters_cache::{CountersCache, CountersCacheBuilder}; use crate::storage::redis::redis_async::AsyncRedisStorage; @@ -70,7 +69,6 @@ impl AsyncCounterStorage for CachedRedisStorage { counters: &mut Vec, delta: i64, load_counters: bool, - _counter_access: CounterAccess<'a>, ) -> Result { let mut con = self.redis_conn_manager.clone(); From b1464b90efd0f537bd752f155a34c6103bb0ad3b Mon Sep 17 00:00:00 2001 From: Adam Cattermole Date: Tue, 12 Mar 2024 15:54:32 +0000 Subject: [PATCH 3/4] Move prometheus_metrics and remove dependency from lib --- Cargo.lock | 2 +- limitador-server/Cargo.toml | 1 + limitador-server/src/main.rs | 55 ++++++------------- limitador-server/src/metrics.rs | 21 +------ .../src/prometheus_metrics.rs | 48 ++++------------ limitador/Cargo.toml | 1 - limitador/src/lib.rs | 53 +----------------- 7 files changed, 34 insertions(+), 147 deletions(-) rename {limitador => limitador-server}/src/prometheus_metrics.rs (86%) diff --git a/Cargo.lock b/Cargo.lock index 13ebcc57..26330f77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1558,7 +1558,6 @@ dependencies = [ "moka", "paste", "postcard", - "prometheus", "r2d2", "rand", "redis", @@ -1593,6 +1592,7 @@ dependencies = [ "opentelemetry-stdout", "opentelemetry_sdk", "paperclip", + "prometheus", "prost 0.12.1", "prost-types", "serde", diff --git a/limitador-server/Cargo.toml b/limitador-server/Cargo.toml index 07907a04..b0cbaf0c 100644 --- a/limitador-server/Cargo.toml +++ b/limitador-server/Cargo.toml @@ -46,6 +46,7 @@ lazy_static = "1.4.0" clap = "4.3" sysinfo = "0.29.7" openssl = { version = "0.10.57", features = ["vendored"] } +prometheus = "0.13.3" [build-dependencies] diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 666c4b5c..5a8004f7 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -20,7 +20,6 @@ use lazy_static::lazy_static; use limitador::counter::Counter; use limitador::errors::LimitadorError; use limitador::limit::Limit; -use limitador::prometheus_metrics::PrometheusMetrics; use limitador::storage::disk::DiskStorage; #[cfg(feature = "infinispan")] use limitador::storage::infinispan::{Consistency, InfinispanStorageBuilder}; @@ -39,6 +38,7 @@ use opentelemetry::{global, KeyValue}; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::{trace, Resource}; +use prometheus_metrics::PrometheusMetrics; use std::env::VarError; use std::fmt::Display; use std::fs; @@ -59,6 +59,7 @@ mod http_api; mod config; mod metrics; +pub mod prometheus_metrics; const LIMITADOR_VERSION: &str = env!("CARGO_PKG_VERSION"); const LIMITADOR_PROFILE: &str = env!("LIMITADOR_PROFILE"); @@ -76,7 +77,6 @@ pub enum LimitadorServerError { } lazy_static! { - // TODO: this should be using config.limit_name_in_labels to initialize pub static ref PROMETHEUS_METRICS: PrometheusMetrics = PrometheusMetrics::default(); } @@ -94,29 +94,19 @@ impl From for LimitadorServerError { impl Limiter { pub async fn new(config: Configuration) -> Result { let rate_limiter = match config.storage { - StorageConfiguration::Redis(cfg) => { - Self::redis_limiter(cfg, config.limit_name_in_labels).await - } + StorageConfiguration::Redis(cfg) => Self::redis_limiter(cfg).await, #[cfg(feature = "infinispan")] - StorageConfiguration::Infinispan(cfg) => { - Self::infinispan_limiter(cfg, config.limit_name_in_labels).await - } - StorageConfiguration::InMemory(cfg) => { - Self::in_memory_limiter(cfg, config.limit_name_in_labels) - } - StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg, config.limit_name_in_labels), + StorageConfiguration::Infinispan(cfg) => Self::infinispan_limiter(cfg).await, + StorageConfiguration::InMemory(cfg) => Self::in_memory_limiter(cfg), + StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg), }; Ok(rate_limiter) } - async fn redis_limiter(cfg: RedisStorageConfiguration, limit_name_labels: bool) -> Self { + async fn redis_limiter(cfg: RedisStorageConfiguration) -> Self { let storage = Self::storage_using_redis(cfg).await; - let mut rate_limiter_builder = AsyncRateLimiterBuilder::new(storage); - - if limit_name_labels { - rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels() - } + let rate_limiter_builder = AsyncRateLimiterBuilder::new(storage); Self::Async(rate_limiter_builder.build()) } @@ -173,10 +163,7 @@ impl Limiter { } #[cfg(feature = "infinispan")] - async fn infinispan_limiter( - cfg: InfinispanStorageConfiguration, - limit_name_labels: bool, - ) -> Self { + async fn infinispan_limiter(cfg: InfinispanStorageConfiguration) -> Self { use url::Url; let parsed_url = Url::parse(&cfg.url).unwrap(); @@ -212,17 +199,13 @@ impl Limiter { None => builder.build().await, }; - let mut rate_limiter_builder = + let rate_limiter_builder = AsyncRateLimiterBuilder::new(AsyncStorage::with_counter_storage(Box::new(storage))); - if limit_name_labels { - rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels() - } - Self::Async(rate_limiter_builder.build()) } - fn disk_limiter(cfg: DiskStorageConfiguration, limit_name_in_labels: bool) -> Self { + fn disk_limiter(cfg: DiskStorageConfiguration) -> Self { let storage = match DiskStorage::open(cfg.path.as_str(), cfg.optimization) { Ok(storage) => storage, Err(err) => { @@ -230,24 +213,16 @@ impl Limiter { process::exit(1) } }; - let mut rate_limiter_builder = + let rate_limiter_builder = RateLimiterBuilder::with_storage(Storage::with_counter_storage(Box::new(storage))); - if limit_name_in_labels { - rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels() - } - Self::Blocking(rate_limiter_builder.build()) } - fn in_memory_limiter(cfg: InMemoryStorageConfiguration, limit_name_in_labels: bool) -> Self { - let mut rate_limiter_builder = + fn in_memory_limiter(cfg: InMemoryStorageConfiguration) -> Self { + let rate_limiter_builder = RateLimiterBuilder::new(cfg.cache_size.or_else(guess_cache_size).unwrap()); - if limit_name_in_labels { - rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels() - } - Self::Blocking(rate_limiter_builder.build()) } @@ -347,6 +322,8 @@ async fn main() -> Result<(), Box> { .init(); }; + PROMETHEUS_METRICS.set_use_limit_name_in_label(config.limit_name_in_labels); + info!("Version: {}", version); info!("Using config: {:?}", config); config diff --git a/limitador-server/src/metrics.rs b/limitador-server/src/metrics.rs index c4766adf..4292c7c9 100644 --- a/limitador-server/src/metrics.rs +++ b/limitador-server/src/metrics.rs @@ -95,7 +95,7 @@ impl MetricsLayer { } pub fn gather(mut self, aggregate: &str, consumer: F, records: Vec<&str>) -> Self { - // TODO: does not handle case where aggregate already exists + // TODO(adam-cattermole): does not handle case where aggregate already exists let rec = records.iter().map(|r| r.to_string()).collect(); self.groups .entry(aggregate.to_string()) @@ -137,12 +137,6 @@ where } } - // if timing is already set (which it shouldn't be) - // don't create it again - // if !extensions.get_mut::().is_none() { - // return; - // } - if let Some(span_state) = extensions.get_mut::() { // either we are an aggregator or nested within one for group in span_state.group_times.keys() { @@ -214,11 +208,8 @@ where parent.extensions_mut().replace(span_state.clone()); } // IF we are aggregator call consume function - match self.groups.get(name) { - Some(metrics_group) => { - (metrics_group.consumer)(*span_state.group_times.get(name).unwrap()) - } - _ => (), + if let Some(metrics_group) = self.groups.get(name) { + (metrics_group.consumer)(*span_state.group_times.get(name).unwrap()) } } } @@ -298,9 +289,3 @@ mod tests { assert_eq!(ml.groups.get("group").unwrap().records, vec!["record"]); } } - -// [X] 1. Use prometheus metrics in main -// [X] 2. Try to use consume method from the prometheus metrics in main -// [X] 3. Invoke the server using the PrometheusMetrics defined in main not the limiter -// [ ] 4. Record the authorized/limited calls -// [ ] 5. Burn the old prometheus instance and move inside the server + old timing stuff diff --git a/limitador/src/prometheus_metrics.rs b/limitador-server/src/prometheus_metrics.rs similarity index 86% rename from limitador/src/prometheus_metrics.rs rename to limitador-server/src/prometheus_metrics.rs index 3ee8f79a..34f9537d 100644 --- a/limitador/src/prometheus_metrics.rs +++ b/limitador-server/src/prometheus_metrics.rs @@ -1,7 +1,9 @@ -use crate::limit::Namespace; +use lazy_static::lazy_static; +use limitador::limit::Namespace; use prometheus::{ Encoder, Histogram, HistogramOpts, IntCounterVec, IntGauge, Opts, Registry, TextEncoder, }; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; const NAMESPACE_LABEL: &str = "limitador_namespace"; @@ -36,7 +38,7 @@ pub struct PrometheusMetrics { authorized_calls: IntCounterVec, limited_calls: IntCounterVec, counter_latency: Histogram, - use_limit_name_label: bool, + use_limit_name_label: AtomicBool, } impl Default for PrometheusMetrics { @@ -58,6 +60,11 @@ impl PrometheusMetrics { Self::new_with_options(true) } + pub fn set_use_limit_name_in_label(&self, use_limit_name_in_label: bool) { + self.use_limit_name_label + .store(use_limit_name_in_label, Ordering::SeqCst) + } + pub fn incr_authorized_calls(&self, namespace: &Namespace) { self.authorized_calls .with_label_values(&[namespace.as_ref()]) @@ -70,7 +77,7 @@ impl PrometheusMetrics { { let mut labels = vec![namespace.as_ref()]; - if self.use_limit_name_label { + if self.use_limit_name_label.load(Ordering::Relaxed) { // If we have configured the metric to accept 2 labels we need to // set values for them. labels.push(limit_name.into().unwrap_or("")); @@ -124,7 +131,7 @@ impl PrometheusMetrics { authorized_calls: authorized_calls_counter, limited_calls: limited_calls_counter, counter_latency, - use_limit_name_label, + use_limit_name_label: AtomicBool::new(use_limit_name_label), } } @@ -297,39 +304,6 @@ mod tests { ) } - // #[test] - // fn collects_latencies() { - // let metrics = PrometheusMetrics::new(); - // assert_eq!(metrics.counter_latency.get_sample_count(), 0); - // { - // let _access = metrics.counter_accesses(); - // } - // assert_eq!(metrics.counter_latency.get_sample_count(), 0); - // { - // let mut access = metrics.counter_accesses(); - // access.observe(Duration::from_millis(12)); - // } - // assert_eq!(metrics.counter_latency.get_sample_count(), 1); - // assert_eq!( - // metrics.counter_latency.get_sample_sum(), - // Duration::from_millis(12).as_secs_f64() - // ); - // { - // let mut access = metrics.counter_accesses(); - // access.observe(Duration::from_millis(5)); - // assert_eq!(metrics.counter_latency.get_sample_count(), 1); - // assert_eq!( - // metrics.counter_latency.get_sample_sum(), - // Duration::from_millis(12).as_secs_f64() - // ); - // } - // assert_eq!(metrics.counter_latency.get_sample_count(), 2); - // assert_eq!( - // metrics.counter_latency.get_sample_sum(), - // Duration::from_millis(17).as_secs_f64() - // ); - // } - fn formatted_counter_with_namespace_and_limit( metric_name: &str, count: i32, diff --git a/limitador/Cargo.toml b/limitador/Cargo.toml index 0f2336b7..682a9295 100644 --- a/limitador/Cargo.toml +++ b/limitador/Cargo.toml @@ -32,7 +32,6 @@ thiserror = "1" futures = "0.3" async-trait = "0.1" cfg-if = "1" -prometheus = "0.13" lazy_static = "1" tracing = "0.1.40" diff --git a/limitador/src/lib.rs b/limitador/src/lib.rs index 04af4b51..25010829 100644 --- a/limitador/src/lib.rs +++ b/limitador/src/lib.rs @@ -197,33 +197,27 @@ use std::collections::{HashMap, HashSet}; use crate::counter::Counter; use crate::errors::LimitadorError; use crate::limit::{Limit, Namespace}; -use crate::prometheus_metrics::PrometheusMetrics; use crate::storage::in_memory::InMemoryStorage; use crate::storage::{AsyncCounterStorage, AsyncStorage, Authorization, CounterStorage, Storage}; #[macro_use] -extern crate lazy_static; extern crate core; pub mod counter; pub mod errors; pub mod limit; -pub mod prometheus_metrics; pub mod storage; pub struct RateLimiter { storage: Storage, - prometheus_metrics: PrometheusMetrics, } pub struct AsyncRateLimiter { storage: AsyncStorage, - prometheus_metrics: PrometheusMetrics, } pub struct RateLimiterBuilder { storage: Storage, - prometheus_limit_name_labels_enabled: bool, } pub struct CheckResult { @@ -240,16 +234,12 @@ impl From for bool { impl RateLimiterBuilder { pub fn with_storage(storage: Storage) -> Self { - Self { - storage, - prometheus_limit_name_labels_enabled: false, - } + Self { storage } } pub fn new(cache_size: u64) -> Self { Self { storage: Storage::new(cache_size), - prometheus_limit_name_labels_enabled: false, } } @@ -258,53 +248,25 @@ impl RateLimiterBuilder { self } - pub fn with_prometheus_limit_name_labels(mut self) -> Self { - self.prometheus_limit_name_labels_enabled = true; - self - } - pub fn build(self) -> RateLimiter { - let prometheus_metrics = if self.prometheus_limit_name_labels_enabled { - PrometheusMetrics::new_with_counters_by_limit_name() - } else { - PrometheusMetrics::new() - }; - RateLimiter { storage: self.storage, - prometheus_metrics, } } } pub struct AsyncRateLimiterBuilder { storage: AsyncStorage, - prometheus_limit_name_labels_enabled: bool, } impl AsyncRateLimiterBuilder { pub fn new(storage: AsyncStorage) -> Self { - Self { - storage, - prometheus_limit_name_labels_enabled: false, - } - } - - pub fn with_prometheus_limit_name_labels(mut self) -> Self { - self.prometheus_limit_name_labels_enabled = true; - self + Self { storage } } pub fn build(self) -> AsyncRateLimiter { - let prometheus_metrics = if self.prometheus_limit_name_labels_enabled { - PrometheusMetrics::new_with_counters_by_limit_name() - } else { - PrometheusMetrics::new() - }; - AsyncRateLimiter { storage: self.storage, - prometheus_metrics, } } } @@ -313,14 +275,12 @@ impl RateLimiter { pub fn new(cache_size: u64) -> Self { Self { storage: Storage::new(cache_size), - prometheus_metrics: PrometheusMetrics::new(), } } pub fn new_with_storage(counters: Box) -> Self { Self { storage: Storage::with_counter_storage(counters), - prometheus_metrics: PrometheusMetrics::new(), } } @@ -467,10 +427,6 @@ impl RateLimiter { Ok(()) } - pub fn gather_prometheus_metrics(&self) -> String { - self.prometheus_metrics.gather_metrics() - } - fn counters_that_apply( &self, namespace: &Namespace, @@ -497,7 +453,6 @@ impl AsyncRateLimiter { pub fn new_with_storage(storage: Box) -> Self { Self { storage: AsyncStorage::with_counter_storage(storage), - prometheus_metrics: PrometheusMetrics::new(), } } @@ -650,10 +605,6 @@ impl AsyncRateLimiter { Ok(()) } - pub fn gather_prometheus_metrics(&self) -> String { - self.prometheus_metrics.gather_metrics() - } - async fn counters_that_apply( &self, namespace: &Namespace, From 0cf8bd37cb9947f61d53bf80e95b82b83165f77f Mon Sep 17 00:00:00 2001 From: Adam Cattermole Date: Wed, 3 Apr 2024 12:06:25 +0100 Subject: [PATCH 4/4] Remove unwrap, use max, and rename _rhs --- limitador-server/src/metrics.rs | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/limitador-server/src/metrics.rs b/limitador-server/src/metrics.rs index 4292c7c9..b19119e6 100644 --- a/limitador-server/src/metrics.rs +++ b/limitador-server/src/metrics.rs @@ -26,22 +26,18 @@ impl Timings { impl ops::Add for Timings { type Output = Self; - fn add(self, _rhs: Self) -> Self { + fn add(self, rhs: Self) -> Self { Self { - busy: self.busy + _rhs.busy, - idle: self.idle + _rhs.idle, - last: if self.last < _rhs.last { - _rhs.last - } else { - self.last - }, + busy: self.busy + rhs.busy, + idle: self.idle + rhs.idle, + last: self.last.max(rhs.last), } } } impl ops::AddAssign for Timings { - fn add_assign(&mut self, _rhs: Self) { - *self = *self + _rhs + fn add_assign(&mut self, rhs: Self) { + *self = *self + rhs } } @@ -140,7 +136,12 @@ where if let Some(span_state) = extensions.get_mut::() { // either we are an aggregator or nested within one for group in span_state.group_times.keys() { - for record in &self.groups.get(group).unwrap().records { + for record in &self + .groups + .get(group) + .expect("Span state contains group times for an unconfigured group") + .records + { if name == record { extensions.insert(Timings::new()); return;