diff --git a/Cargo.lock b/Cargo.lock index 13ebcc57..26330f77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1558,7 +1558,6 @@ dependencies = [ "moka", "paste", "postcard", - "prometheus", "r2d2", "rand", "redis", @@ -1593,6 +1592,7 @@ dependencies = [ "opentelemetry-stdout", "opentelemetry_sdk", "paperclip", + "prometheus", "prost 0.12.1", "prost-types", "serde", diff --git a/limitador-server/Cargo.toml b/limitador-server/Cargo.toml index 07907a04..b0cbaf0c 100644 --- a/limitador-server/Cargo.toml +++ b/limitador-server/Cargo.toml @@ -46,6 +46,7 @@ lazy_static = "1.4.0" clap = "4.3" sysinfo = "0.29.7" openssl = { version = "0.10.57", features = ["vendored"] } +prometheus = "0.13.3" [build-dependencies] diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 2f91a1bf..02b68197 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -20,7 +20,6 @@ use lazy_static::lazy_static; use limitador::counter::Counter; use limitador::errors::LimitadorError; use limitador::limit::Limit; -use limitador::prometheus_metrics::PrometheusMetrics; use limitador::storage::disk::DiskStorage; #[cfg(feature = "infinispan")] use limitador::storage::infinispan::{Consistency, InfinispanStorageBuilder}; @@ -38,6 +37,7 @@ use notify::{Error, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher use opentelemetry::KeyValue; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::{trace, Resource}; +use prometheus_metrics::PrometheusMetrics; use std::env::VarError; use std::fmt::Display; use std::fs; @@ -58,6 +58,7 @@ mod http_api; mod config; mod metrics; +pub mod prometheus_metrics; const LIMITADOR_VERSION: &str = env!("CARGO_PKG_VERSION"); const LIMITADOR_PROFILE: &str = env!("LIMITADOR_PROFILE"); @@ -75,7 +76,6 @@ pub enum LimitadorServerError { } lazy_static! { - // TODO: this should be using config.limit_name_in_labels to initialize pub static ref PROMETHEUS_METRICS: PrometheusMetrics = PrometheusMetrics::default(); } @@ -93,29 +93,19 @@ impl From for LimitadorServerError { impl Limiter { pub async fn new(config: Configuration) -> Result { let rate_limiter = match config.storage { - StorageConfiguration::Redis(cfg) => { - Self::redis_limiter(cfg, config.limit_name_in_labels).await - } + StorageConfiguration::Redis(cfg) => Self::redis_limiter(cfg).await, #[cfg(feature = "infinispan")] - StorageConfiguration::Infinispan(cfg) => { - Self::infinispan_limiter(cfg, config.limit_name_in_labels).await - } - StorageConfiguration::InMemory(cfg) => { - Self::in_memory_limiter(cfg, config.limit_name_in_labels) - } - StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg, config.limit_name_in_labels), + StorageConfiguration::Infinispan(cfg) => Self::infinispan_limiter(cfg).await, + StorageConfiguration::InMemory(cfg) => Self::in_memory_limiter(cfg), + StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg), }; Ok(rate_limiter) } - async fn redis_limiter(cfg: RedisStorageConfiguration, limit_name_labels: bool) -> Self { + async fn redis_limiter(cfg: RedisStorageConfiguration) -> Self { let storage = Self::storage_using_redis(cfg).await; - let mut rate_limiter_builder = AsyncRateLimiterBuilder::new(storage); - - if limit_name_labels { - rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels() - } + let rate_limiter_builder = AsyncRateLimiterBuilder::new(storage); Self::Async(rate_limiter_builder.build()) } @@ -172,10 +162,7 @@ impl Limiter { } #[cfg(feature = "infinispan")] - async fn infinispan_limiter( - cfg: InfinispanStorageConfiguration, - limit_name_labels: bool, - ) -> Self { + async fn infinispan_limiter(cfg: InfinispanStorageConfiguration) -> Self { use url::Url; let parsed_url = Url::parse(&cfg.url).unwrap(); @@ -211,17 +198,13 @@ impl Limiter { None => builder.build().await, }; - let mut rate_limiter_builder = + let rate_limiter_builder = AsyncRateLimiterBuilder::new(AsyncStorage::with_counter_storage(Box::new(storage))); - if limit_name_labels { - rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels() - } - Self::Async(rate_limiter_builder.build()) } - fn disk_limiter(cfg: DiskStorageConfiguration, limit_name_in_labels: bool) -> Self { + fn disk_limiter(cfg: DiskStorageConfiguration) -> Self { let storage = match DiskStorage::open(cfg.path.as_str(), cfg.optimization) { Ok(storage) => storage, Err(err) => { @@ -229,24 +212,16 @@ impl Limiter { process::exit(1) } }; - let mut rate_limiter_builder = + let rate_limiter_builder = RateLimiterBuilder::with_storage(Storage::with_counter_storage(Box::new(storage))); - if limit_name_in_labels { - rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels() - } - Self::Blocking(rate_limiter_builder.build()) } - fn in_memory_limiter(cfg: InMemoryStorageConfiguration, limit_name_in_labels: bool) -> Self { - let mut rate_limiter_builder = + fn in_memory_limiter(cfg: InMemoryStorageConfiguration) -> Self { + let rate_limiter_builder = RateLimiterBuilder::new(cfg.cache_size.or_else(guess_cache_size).unwrap()); - if limit_name_in_labels { - rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels() - } - Self::Blocking(rate_limiter_builder.build()) } @@ -345,6 +320,8 @@ async fn main() -> Result<(), Box> { .init(); }; + PROMETHEUS_METRICS.set_use_limit_name_in_label(config.limit_name_in_labels); + info!("Version: {}", version); info!("Using config: {:?}", config); config diff --git a/limitador-server/src/metrics.rs b/limitador-server/src/metrics.rs index c4766adf..4292c7c9 100644 --- a/limitador-server/src/metrics.rs +++ b/limitador-server/src/metrics.rs @@ -95,7 +95,7 @@ impl MetricsLayer { } pub fn gather(mut self, aggregate: &str, consumer: F, records: Vec<&str>) -> Self { - // TODO: does not handle case where aggregate already exists + // TODO(adam-cattermole): does not handle case where aggregate already exists let rec = records.iter().map(|r| r.to_string()).collect(); self.groups .entry(aggregate.to_string()) @@ -137,12 +137,6 @@ where } } - // if timing is already set (which it shouldn't be) - // don't create it again - // if !extensions.get_mut::().is_none() { - // return; - // } - if let Some(span_state) = extensions.get_mut::() { // either we are an aggregator or nested within one for group in span_state.group_times.keys() { @@ -214,11 +208,8 @@ where parent.extensions_mut().replace(span_state.clone()); } // IF we are aggregator call consume function - match self.groups.get(name) { - Some(metrics_group) => { - (metrics_group.consumer)(*span_state.group_times.get(name).unwrap()) - } - _ => (), + if let Some(metrics_group) = self.groups.get(name) { + (metrics_group.consumer)(*span_state.group_times.get(name).unwrap()) } } } @@ -298,9 +289,3 @@ mod tests { assert_eq!(ml.groups.get("group").unwrap().records, vec!["record"]); } } - -// [X] 1. Use prometheus metrics in main -// [X] 2. Try to use consume method from the prometheus metrics in main -// [X] 3. Invoke the server using the PrometheusMetrics defined in main not the limiter -// [ ] 4. Record the authorized/limited calls -// [ ] 5. Burn the old prometheus instance and move inside the server + old timing stuff diff --git a/limitador/src/prometheus_metrics.rs b/limitador-server/src/prometheus_metrics.rs similarity index 86% rename from limitador/src/prometheus_metrics.rs rename to limitador-server/src/prometheus_metrics.rs index 3ee8f79a..34f9537d 100644 --- a/limitador/src/prometheus_metrics.rs +++ b/limitador-server/src/prometheus_metrics.rs @@ -1,7 +1,9 @@ -use crate::limit::Namespace; +use lazy_static::lazy_static; +use limitador::limit::Namespace; use prometheus::{ Encoder, Histogram, HistogramOpts, IntCounterVec, IntGauge, Opts, Registry, TextEncoder, }; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; const NAMESPACE_LABEL: &str = "limitador_namespace"; @@ -36,7 +38,7 @@ pub struct PrometheusMetrics { authorized_calls: IntCounterVec, limited_calls: IntCounterVec, counter_latency: Histogram, - use_limit_name_label: bool, + use_limit_name_label: AtomicBool, } impl Default for PrometheusMetrics { @@ -58,6 +60,11 @@ impl PrometheusMetrics { Self::new_with_options(true) } + pub fn set_use_limit_name_in_label(&self, use_limit_name_in_label: bool) { + self.use_limit_name_label + .store(use_limit_name_in_label, Ordering::SeqCst) + } + pub fn incr_authorized_calls(&self, namespace: &Namespace) { self.authorized_calls .with_label_values(&[namespace.as_ref()]) @@ -70,7 +77,7 @@ impl PrometheusMetrics { { let mut labels = vec![namespace.as_ref()]; - if self.use_limit_name_label { + if self.use_limit_name_label.load(Ordering::Relaxed) { // If we have configured the metric to accept 2 labels we need to // set values for them. labels.push(limit_name.into().unwrap_or("")); @@ -124,7 +131,7 @@ impl PrometheusMetrics { authorized_calls: authorized_calls_counter, limited_calls: limited_calls_counter, counter_latency, - use_limit_name_label, + use_limit_name_label: AtomicBool::new(use_limit_name_label), } } @@ -297,39 +304,6 @@ mod tests { ) } - // #[test] - // fn collects_latencies() { - // let metrics = PrometheusMetrics::new(); - // assert_eq!(metrics.counter_latency.get_sample_count(), 0); - // { - // let _access = metrics.counter_accesses(); - // } - // assert_eq!(metrics.counter_latency.get_sample_count(), 0); - // { - // let mut access = metrics.counter_accesses(); - // access.observe(Duration::from_millis(12)); - // } - // assert_eq!(metrics.counter_latency.get_sample_count(), 1); - // assert_eq!( - // metrics.counter_latency.get_sample_sum(), - // Duration::from_millis(12).as_secs_f64() - // ); - // { - // let mut access = metrics.counter_accesses(); - // access.observe(Duration::from_millis(5)); - // assert_eq!(metrics.counter_latency.get_sample_count(), 1); - // assert_eq!( - // metrics.counter_latency.get_sample_sum(), - // Duration::from_millis(12).as_secs_f64() - // ); - // } - // assert_eq!(metrics.counter_latency.get_sample_count(), 2); - // assert_eq!( - // metrics.counter_latency.get_sample_sum(), - // Duration::from_millis(17).as_secs_f64() - // ); - // } - fn formatted_counter_with_namespace_and_limit( metric_name: &str, count: i32, diff --git a/limitador/Cargo.toml b/limitador/Cargo.toml index 0f2336b7..682a9295 100644 --- a/limitador/Cargo.toml +++ b/limitador/Cargo.toml @@ -32,7 +32,6 @@ thiserror = "1" futures = "0.3" async-trait = "0.1" cfg-if = "1" -prometheus = "0.13" lazy_static = "1" tracing = "0.1.40" diff --git a/limitador/src/lib.rs b/limitador/src/lib.rs index 04af4b51..25010829 100644 --- a/limitador/src/lib.rs +++ b/limitador/src/lib.rs @@ -197,33 +197,27 @@ use std::collections::{HashMap, HashSet}; use crate::counter::Counter; use crate::errors::LimitadorError; use crate::limit::{Limit, Namespace}; -use crate::prometheus_metrics::PrometheusMetrics; use crate::storage::in_memory::InMemoryStorage; use crate::storage::{AsyncCounterStorage, AsyncStorage, Authorization, CounterStorage, Storage}; #[macro_use] -extern crate lazy_static; extern crate core; pub mod counter; pub mod errors; pub mod limit; -pub mod prometheus_metrics; pub mod storage; pub struct RateLimiter { storage: Storage, - prometheus_metrics: PrometheusMetrics, } pub struct AsyncRateLimiter { storage: AsyncStorage, - prometheus_metrics: PrometheusMetrics, } pub struct RateLimiterBuilder { storage: Storage, - prometheus_limit_name_labels_enabled: bool, } pub struct CheckResult { @@ -240,16 +234,12 @@ impl From for bool { impl RateLimiterBuilder { pub fn with_storage(storage: Storage) -> Self { - Self { - storage, - prometheus_limit_name_labels_enabled: false, - } + Self { storage } } pub fn new(cache_size: u64) -> Self { Self { storage: Storage::new(cache_size), - prometheus_limit_name_labels_enabled: false, } } @@ -258,53 +248,25 @@ impl RateLimiterBuilder { self } - pub fn with_prometheus_limit_name_labels(mut self) -> Self { - self.prometheus_limit_name_labels_enabled = true; - self - } - pub fn build(self) -> RateLimiter { - let prometheus_metrics = if self.prometheus_limit_name_labels_enabled { - PrometheusMetrics::new_with_counters_by_limit_name() - } else { - PrometheusMetrics::new() - }; - RateLimiter { storage: self.storage, - prometheus_metrics, } } } pub struct AsyncRateLimiterBuilder { storage: AsyncStorage, - prometheus_limit_name_labels_enabled: bool, } impl AsyncRateLimiterBuilder { pub fn new(storage: AsyncStorage) -> Self { - Self { - storage, - prometheus_limit_name_labels_enabled: false, - } - } - - pub fn with_prometheus_limit_name_labels(mut self) -> Self { - self.prometheus_limit_name_labels_enabled = true; - self + Self { storage } } pub fn build(self) -> AsyncRateLimiter { - let prometheus_metrics = if self.prometheus_limit_name_labels_enabled { - PrometheusMetrics::new_with_counters_by_limit_name() - } else { - PrometheusMetrics::new() - }; - AsyncRateLimiter { storage: self.storage, - prometheus_metrics, } } } @@ -313,14 +275,12 @@ impl RateLimiter { pub fn new(cache_size: u64) -> Self { Self { storage: Storage::new(cache_size), - prometheus_metrics: PrometheusMetrics::new(), } } pub fn new_with_storage(counters: Box) -> Self { Self { storage: Storage::with_counter_storage(counters), - prometheus_metrics: PrometheusMetrics::new(), } } @@ -467,10 +427,6 @@ impl RateLimiter { Ok(()) } - pub fn gather_prometheus_metrics(&self) -> String { - self.prometheus_metrics.gather_metrics() - } - fn counters_that_apply( &self, namespace: &Namespace, @@ -497,7 +453,6 @@ impl AsyncRateLimiter { pub fn new_with_storage(storage: Box) -> Self { Self { storage: AsyncStorage::with_counter_storage(storage), - prometheus_metrics: PrometheusMetrics::new(), } } @@ -650,10 +605,6 @@ impl AsyncRateLimiter { Ok(()) } - pub fn gather_prometheus_metrics(&self) -> String { - self.prometheus_metrics.gather_metrics() - } - async fn counters_that_apply( &self, namespace: &Namespace,