Skip to content

Commit

Permalink
Merge pull request #321 from Kuadrant/additional-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-cattermole authored May 17, 2024
2 parents 6534b97 + 4efcfbc commit d6fa117
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 31 deletions.
17 changes: 11 additions & 6 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::config::{
use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders};
use crate::http_api::server::run_http_server;
use crate::metrics::MetricsLayer;
use ::metrics::histogram;
use clap::{value_parser, Arg, ArgAction, Command};
use const_format::formatcp;
use limitador::counter::Counter;
Expand Down Expand Up @@ -202,11 +201,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::layer()
};

let metrics_layer = MetricsLayer::new().gather(
"should_rate_limit",
|timings| histogram!("counter_latency").record(Duration::from(timings).as_secs_f64()),
vec!["datastore"],
);
let metrics_layer = MetricsLayer::new()
.gather(
"should_rate_limit",
PrometheusMetrics::record_datastore_latency,
vec!["datastore"],
)
.gather(
"flush_batcher_and_update_counters",
PrometheusMetrics::record_datastore_latency,
vec!["datastore"],
);

if !config.tracing_endpoint.is_empty() {
global::set_text_map_propagator(TraceContextPropagator::new());
Expand Down
40 changes: 27 additions & 13 deletions limitador-server/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct Timings {
idle: u64,
busy: u64,
last: Instant,
updated: bool,
}

impl Timings {
Expand All @@ -19,6 +20,7 @@ impl Timings {
idle: 0,
busy: 0,
last: Instant::now(),
updated: false,
}
}
}
Expand All @@ -31,6 +33,7 @@ impl ops::Add for Timings {
busy: self.busy + rhs.busy,
idle: self.idle + rhs.idle,
last: self.last.max(rhs.last),
updated: self.updated || rhs.updated,
}
}
}
Expand Down Expand Up @@ -68,39 +71,39 @@ impl SpanState {
}
}

pub struct MetricsGroup<F: Fn(Timings)> {
consumer: F,
pub struct MetricsGroup {
consumer: Box<fn(Timings)>,
records: Vec<String>,
}

impl<F: Fn(Timings)> MetricsGroup<F> {
pub fn new(consumer: F, records: Vec<String>) -> Self {
impl MetricsGroup {
pub fn new(consumer: Box<fn(Timings)>, records: Vec<String>) -> Self {
Self { consumer, records }
}
}

pub struct MetricsLayer<F: Fn(Timings)> {
groups: HashMap<String, MetricsGroup<F>>,
pub struct MetricsLayer {
groups: HashMap<String, MetricsGroup>,
}

impl<F: Fn(Timings)> MetricsLayer<F> {
impl MetricsLayer {
pub fn new() -> Self {
Self {
groups: HashMap::new(),
}
}

pub fn gather(mut self, aggregate: &str, consumer: F, records: Vec<&str>) -> Self {
pub fn gather(mut self, aggregate: &str, consumer: fn(Timings), records: Vec<&str>) -> Self {
// TODO(adam-cattermole): does not handle case where aggregate already exists
let rec = records.iter().map(|r| r.to_string()).collect();
self.groups
.entry(aggregate.to_string())
.or_insert(MetricsGroup::new(consumer, rec));
.or_insert(MetricsGroup::new(Box::new(consumer), rec));
self
}
}

impl<S, F: Fn(Timings) + 'static> Layer<S> for MetricsLayer<F>
impl<S> Layer<S> for MetricsLayer
where
S: Subscriber,
S: for<'lookup> LookupSpan<'lookup>,
Expand Down Expand Up @@ -160,6 +163,7 @@ where
let now = Instant::now();
timings.idle += (now - timings.last).as_nanos() as u64;
timings.last = now;
timings.updated = true;
}
}

Expand All @@ -171,6 +175,7 @@ where
let now = Instant::now();
timings.busy += (now - timings.last).as_nanos() as u64;
timings.last = now;
timings.updated = true;
}
}

Expand Down Expand Up @@ -210,7 +215,9 @@ where
}
// IF we are aggregator call consume function
if let Some(metrics_group) = self.groups.get(name) {
(metrics_group.consumer)(*span_state.group_times.get(name).unwrap())
if let Some(t) = span_state.group_times.get(name).filter(|&t| t.updated) {
(metrics_group.consumer)(*t);
}
}
}
}
Expand All @@ -228,19 +235,22 @@ mod tests {
idle: 5,
busy: 5,
last: now,
updated: false,
};
let t2 = Timings {
idle: 3,
busy: 5,
last: now,
updated: false,
};
let t3 = t1 + t2;
assert_eq!(
t3,
Timings {
idle: 8,
busy: 10,
last: now
last: now,
updated: false,
}
)
}
Expand All @@ -252,19 +262,22 @@ mod tests {
idle: 5,
busy: 5,
last: now,
updated: false,
};
let t2 = Timings {
idle: 3,
busy: 5,
last: now,
updated: false,
};
t1 += t2;
assert_eq!(
t1,
Timings {
idle: 8,
busy: 10,
last: now
last: now,
updated: false,
}
)
}
Expand All @@ -277,6 +290,7 @@ mod tests {
idle: 5,
busy: 5,
last: Instant::now(),
updated: true,
};
span_state.increment(&group, t1);
assert_eq!(span_state.group_times.get(&group).unwrap().idle, t1.idle);
Expand Down
15 changes: 13 additions & 2 deletions limitador-server/src/prometheus_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge};
use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram};
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use std::sync::Arc;
use std::time::Duration;

use crate::metrics::Timings;
use limitador::limit::Namespace;

const NAMESPACE_LABEL: &str = "limitador_namespace";
Expand Down Expand Up @@ -40,13 +42,18 @@ impl PrometheusMetrics {
prometheus_handle: Arc<PrometheusHandle>,
) -> Self {
describe_histogram!(
"counter_latency",
"datastore_latency",
"Latency to the underlying counter datastore"
);
describe_counter!("authorized_calls", "Authorized calls");
describe_counter!("limited_calls", "Limited calls");
describe_gauge!("limitador_up", "Limitador is running");
gauge!("limitador_up").set(1);
describe_gauge!(
"datastore_partitioned",
"Limitador is partitioned from backing datastore"
);
gauge!("datastore_partitioned").set(0);
Self {
use_limit_name_label,
prometheus_handle,
Expand Down Expand Up @@ -86,6 +93,10 @@ impl PrometheusMetrics {
pub fn gather_metrics(&self) -> String {
self.prometheus_handle.render()
}

pub fn record_datastore_latency(timings: Timings) {
histogram!("datastore_latency").record(Duration::from(timings).as_secs_f64())
}
}

#[cfg(test)]
Expand Down
45 changes: 39 additions & 6 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use crate::storage::atomic_expiring_value::AtomicExpiringValue;
use crate::storage::redis::DEFAULT_MAX_CACHED_COUNTERS;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use metrics::{counter, gauge, histogram};
use moka::notification::RemovalCause;
use moka::sync::Cache;
use std::collections::HashMap;
use std::future::Future;
Expand Down Expand Up @@ -42,8 +44,11 @@ impl CachedCounterValue {
}
}

pub fn add_from_authority(&self, delta: u64, expire_at: SystemTime) {
self.value.add_and_set_expiry(delta, expire_at);
pub fn add_from_authority(&self, delta: u64, expire_at: SystemTime, max_value: u64) {
let new_val = self.value.add_and_set_expiry(delta, expire_at);
if new_val > max_value {
histogram!("counter_overshoot").record((new_val - max_value) as f64);
}
self.initial_value.fetch_add(delta, Ordering::SeqCst);
self.from_authority.store(true, Ordering::Release);
}
Expand Down Expand Up @@ -150,6 +155,7 @@ impl Batcher {
}
Entry::Vacant(miss) => {
self.limiter.acquire().await.unwrap().forget();
gauge!("batcher_size").increment(1);
miss.insert_entry(value);
}
};
Expand Down Expand Up @@ -183,13 +189,15 @@ impl Batcher {
let value = self.updates.get(counter).unwrap().clone();
result.insert(counter.clone(), value);
}
histogram!("batcher_flush_size").record(result.len() as f64);
let result = consumer(result).await;
batch.iter().for_each(|counter| {
let prev = self
.updates
.remove_if(counter, |_, v| v.no_pending_writes());
if prev.is_some() {
self.limiter.add_permits(1);
gauge!("batcher_size").decrement(1);
}
});
return result;
Expand Down Expand Up @@ -232,6 +240,7 @@ impl CountersCache {
if option.is_none() {
let from_queue = self.batcher.updates.get(counter);
if let Some(entry) = from_queue {
gauge!("cache_size").increment(1);
self.cache.insert(counter.clone(), entry.value().clone());
return Some(entry.value().clone());
}
Expand All @@ -255,17 +264,22 @@ impl CountersCache {
if expiry_ts > SystemTime::now() {
let mut from_cache = true;
let cached = self.cache.get_with(counter.clone(), || {
gauge!("cache_size").increment(1);
from_cache = false;
if let Some(entry) = self.batcher.updates.get(&counter) {
let cached_value = entry.value();
cached_value.add_from_authority(remote_deltas, expiry_ts);
cached_value.add_from_authority(
remote_deltas,
expiry_ts,
counter.max_value(),
);
cached_value.clone()
} else {
Arc::new(CachedCounterValue::from_authority(&counter, redis_val))
}
});
if from_cache {
cached.add_from_authority(remote_deltas, expiry_ts);
cached.add_from_authority(remote_deltas, expiry_ts, counter.max_value());
}
return cached;
}
Expand All @@ -277,6 +291,7 @@ impl CountersCache {

pub async fn increase_by(&self, counter: &Counter, delta: u64) {
let val = self.cache.get_with_by_ref(counter, || {
gauge!("cache_size").increment(1);
if let Some(entry) = self.batcher.updates.get(counter) {
entry.value().clone()
} else {
Expand Down Expand Up @@ -304,9 +319,23 @@ impl CountersCacheBuilder {
self
}

fn eviction_listener(
_key: Arc<Counter>,
value: Arc<CachedCounterValue>,
_removal_cause: RemovalCause,
) {
gauge!("cache_size").decrement(1);
if value.no_pending_writes().not() {
counter!("evicted_pending_writes").increment(1);
}
}

pub fn build(&self, period: Duration) -> CountersCache {
CountersCache {
cache: Cache::new(self.max_cached_counters as u64),
cache: Cache::builder()
.max_capacity(self.max_cached_counters as u64)
.eviction_listener(Self::eviction_listener)
.build(),
batcher: Batcher::new(period, self.max_cached_counters),
}
}
Expand Down Expand Up @@ -363,7 +392,11 @@ mod tests {
let value = CachedCounterValue::from_authority(&counter, 0);
value.delta(&counter, 5);
assert!(value.no_pending_writes().not());
value.add_from_authority(6, SystemTime::now().add(Duration::from_secs(1)));
value.add_from_authority(
6,
SystemTime::now().add(Duration::from_secs(1)),
counter.max_value(),
);
assert!(value.no_pending_writes().not());
assert_eq!(value.pending_writes(), Ok(5));
}
Expand Down
6 changes: 5 additions & 1 deletion limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use crate::storage::redis::is_limited;
use crate::storage::redis::scripts::{SCRIPT_UPDATE_COUNTER, VALUES_AND_TTLS};
use crate::storage::{AsyncCounterStorage, Authorization, StorageErr};
use async_trait::async_trait;
use metrics::histogram;
use redis::{AsyncCommands, RedisError};
use std::collections::HashSet;
use std::str::FromStr;
use std::time::Duration;
use std::time::{Duration, Instant};
use tracing::{debug_span, Instrument};

// Note: this implementation does not guarantee exact limits. Ensuring that we
Expand Down Expand Up @@ -209,11 +210,14 @@ impl AsyncRedisStorage {
}

pub async fn is_alive(&self) -> bool {
let now = Instant::now();
self.conn_manager
.clone()
.incr::<&str, i32, u64>("LIMITADOR_LIVE_CHECK", 1)
.await
.is_ok()
.then(|| histogram!("liveness_latency").record(now.elapsed().as_secs_f64()))
.is_some()
}

async fn delete_counters_associated_with_limit(&self, limit: &Limit) -> Result<(), StorageErr> {
Expand Down
Loading

0 comments on commit d6fa117

Please sign in to comment.