Skip to content

Commit

Permalink
Update prometheus metrics from server
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-cattermole committed Mar 26, 2024
1 parent fccf71e commit 1f95d1b
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 232 deletions.
5 changes: 4 additions & 1 deletion limitador-server/src/envoy_rls/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::envoy_rls::server::envoy::service::ratelimit::v3::rate_limit_service_
use crate::envoy_rls::server::envoy::service::ratelimit::v3::{
RateLimitRequest, RateLimitResponse,
};
use crate::Limiter;
use crate::{Limiter, PROMETHEUS_METRICS};

include!("envoy_types.rs");

Expand Down Expand Up @@ -114,8 +114,11 @@ impl RateLimitService for MyRateLimiter {

let mut rate_limited_resp = rate_limited_resp.unwrap();
let resp_code = if rate_limited_resp.limited {
PROMETHEUS_METRICS
.incr_limited_calls(&namespace, rate_limited_resp.limit_name.as_deref());
Code::OverLimit
} else {
PROMETHEUS_METRICS.incr_authorized_calls(&namespace);
Code::Ok
};

Expand Down
14 changes: 7 additions & 7 deletions limitador-server/src/http_api/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::http_api::request_types::{CheckAndReportInfo, Counter, Limit};
use crate::Limiter;
use crate::{Limiter, PROMETHEUS_METRICS};
use actix_web::{http::StatusCode, ResponseError};
use actix_web::{App, HttpServer};
use paperclip::actix::{
Expand Down Expand Up @@ -44,13 +44,10 @@ async fn status() -> web::Json<()> {
Json(())
}

#[tracing::instrument(skip(data))]
#[tracing::instrument(skip(_data))]
#[api_v2_operation]
async fn metrics(data: web::Data<Arc<Limiter>>) -> String {
match data.get_ref().as_ref() {
Limiter::Blocking(limiter) => limiter.gather_prometheus_metrics(),
Limiter::Async(limiter) => limiter.gather_prometheus_metrics(),
}
async fn metrics(_data: web::Data<Arc<Limiter>>) -> String {
PROMETHEUS_METRICS.gather_metrics()
}

#[api_v2_operation]
Expand Down Expand Up @@ -170,8 +167,11 @@ async fn check_and_report(
match rate_limited_and_update_result {
Ok(is_rate_limited) => {
if is_rate_limited.limited {
PROMETHEUS_METRICS
.incr_limited_calls(&namespace, is_rate_limited.limit_name.as_deref());
Err(ErrorResponse::TooManyRequests)
} else {
PROMETHEUS_METRICS.incr_authorized_calls(&namespace);
Ok(Json(()))
}
}
Expand Down
22 changes: 8 additions & 14 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ use crate::http_api::server::run_http_server;
use crate::metrics::MetricsLayer;
use clap::{value_parser, Arg, ArgAction, Command};
use const_format::formatcp;
use lazy_static::lazy_static;
use limitador::counter::Counter;
use limitador::errors::LimitadorError;
use limitador::limit::Limit;
use limitador::prometheus_metrics::PrometheusMetrics;
use limitador::storage::disk::DiskStorage;
#[cfg(feature = "infinispan")]
use limitador::storage::infinispan::{Consistency, InfinispanStorageBuilder};
Expand Down Expand Up @@ -72,6 +74,11 @@ pub enum LimitadorServerError {
Internal(LimitadorError),
}

lazy_static! {
// TODO: this should be using config.limit_name_in_labels to initialize
pub static ref PROMETHEUS_METRICS: PrometheusMetrics = PrometheusMetrics::default();
}

pub enum Limiter {
Blocking(RateLimiter),
Async(AsyncRateLimiter),
Expand Down Expand Up @@ -291,7 +298,6 @@ fn find_first_negative_limit(limits: &[Limit]) -> Option<usize> {

#[actix_rt::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// let metrics_layer = MetricsLayer::new();
let config = {
let (config, version) = create_config();
println!("{LIMITADOR_HEADER} {version}");
Expand All @@ -308,7 +314,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let metrics_layer = MetricsLayer::new().gather(
"should_rate_limit",
|timings| println!("should_rate_limit/datastore timings: {:?}", timings),
|timings| PROMETHEUS_METRICS.counter_access(Duration::from(timings)),
vec!["datastore"],
);

Expand Down Expand Up @@ -358,18 +364,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
};

// let metrics_layer = MetricsLayer::new().gather(
// "should_rate_limit",
// |timings| println!("should_rate_limit/datastore timings: {:?}", timings),
// vec!["datastore"],
// );
// match rate_limiter {
// Limiter::Blocking(limiter) => {
// metrics_layer.gather();
// }
// Limiter::Async(limiter) => {}
// };

info!("limits file path: {}", limit_file);
if let Err(e) = rate_limiter.load_limits_from_file(&limit_file).await {
eprintln!("Failed to load limit file: {e}");
Expand Down
20 changes: 12 additions & 8 deletions limitador-server/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::ops;
use std::time::Instant;
use std::time::{Duration, Instant};
use tracing::span::{Attributes, Id};
use tracing::Subscriber;
use tracing_subscriber::layer::{Context, Layer};
Expand Down Expand Up @@ -45,6 +45,12 @@ impl ops::AddAssign for Timings {
}
}

impl From<Timings> for Duration {
fn from(timings: Timings) -> Self {
Duration::from_nanos(timings.idle + timings.busy)
}
}

#[derive(Debug, Clone)]
struct SpanState {
group_times: HashMap<String, Timings>,
Expand Down Expand Up @@ -293,10 +299,8 @@ mod tests {
}
}

// mylayer.gather("aggregate_on", timings| pr.vomit(timings), ["datastore"])

// mylayer.gather("aggregate_on", ["datastore"]).consumer("aggregate_on", |timings| pr.vomit(timings))

// write a consumer function, takes a function that does something with the timings

// fn consumer(timings: Timings) -> () {}
// [X] 1. Use prometheus metrics in main
// [X] 2. Try to use consume method from the prometheus metrics in main
// [X] 3. Invoke the server using the PrometheusMetrics defined in main not the limiter
// [ ] 4. Record the authorized/limited calls
// [ ] 5. Burn the old prometheus instance and move inside the server + old timing stuff
77 changes: 25 additions & 52 deletions limitador/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@
#![allow(clippy::multiple_crate_versions)]

use std::collections::{HashMap, HashSet};
use std::time::Duration;

use crate::counter::Counter;
use crate::errors::LimitadorError;
Expand All @@ -209,7 +208,7 @@ extern crate core;
pub mod counter;
pub mod errors;
pub mod limit;
mod prometheus_metrics;
pub mod prometheus_metrics;
pub mod storage;

pub struct RateLimiter {
Expand All @@ -230,6 +229,7 @@ pub struct RateLimiterBuilder {
pub struct CheckResult {
pub limited: bool,
pub counters: Vec<Counter>,
pub limit_name: Option<String>,
}

impl From<CheckResult> for bool {
Expand Down Expand Up @@ -358,16 +358,13 @@ impl RateLimiter {
match self.storage.is_within_limits(&counter, delta) {
Ok(within_limits) => {
if !within_limits {
self.prometheus_metrics
.incr_limited_calls(namespace, counter.limit().name());
return Ok(true);
}
}
Err(e) => return Err(e.into()),
}
}

self.prometheus_metrics.incr_authorized_calls(namespace);
Ok(false)
}

Expand Down Expand Up @@ -395,10 +392,10 @@ impl RateLimiter {
let mut counters = self.counters_that_apply(namespace, values)?;

if counters.is_empty() {
self.prometheus_metrics.incr_authorized_calls(namespace);
return Ok(CheckResult {
limited: false,
counters,
limit_name: None,
});
}

Expand All @@ -413,21 +410,16 @@ impl RateLimiter {
};

match check_result {
Authorization::Ok => {
self.prometheus_metrics.incr_authorized_calls(namespace);
Ok(CheckResult {
limited: false,
counters,
})
}
Authorization::Limited(name) => {
self.prometheus_metrics
.incr_limited_calls(namespace, name.as_deref());
Ok(CheckResult {
limited: true,
counters,
})
}
Authorization::Ok => Ok(CheckResult {
limited: false,
counters,
limit_name: None,
}),
Authorization::Limited(name) => Ok(CheckResult {
limited: true,
counters,
limit_name: name,
}),
}
}

Expand Down Expand Up @@ -479,10 +471,6 @@ impl RateLimiter {
self.prometheus_metrics.gather_metrics()
}

pub fn prometheus_counter_access(&self, duration: Duration) {
self.prometheus_metrics.counter_access(duration)
}

fn counters_that_apply(
&self,
namespace: &Namespace,
Expand Down Expand Up @@ -547,16 +535,12 @@ impl AsyncRateLimiter {
match self.storage.is_within_limits(&counter, delta).await {
Ok(within_limits) => {
if !within_limits {
self.prometheus_metrics
.incr_limited_calls(namespace, counter.limit().name());
return Ok(true);
}
}
Err(e) => return Err(e.into()),
}
}

self.prometheus_metrics.incr_authorized_calls(namespace);
Ok(false)
}

Expand Down Expand Up @@ -586,17 +570,16 @@ impl AsyncRateLimiter {
let mut counters = self.counters_that_apply(namespace, values).await?;

if counters.is_empty() {
self.prometheus_metrics.incr_authorized_calls(namespace);
return Ok(CheckResult {
limited: false,
counters,
limit_name: None,
});
}

let access = self.prometheus_metrics.counter_accesses();
let check_result = self
.storage
.check_and_update(&mut counters, delta, load_counters, access)
.check_and_update(&mut counters, delta, load_counters)
.await?;

let counters = if load_counters {
Expand All @@ -606,22 +589,16 @@ impl AsyncRateLimiter {
};

match check_result {
Authorization::Ok => {
self.prometheus_metrics.incr_authorized_calls(namespace);

Ok(CheckResult {
limited: false,
counters,
})
}
Authorization::Limited(name) => {
self.prometheus_metrics
.incr_limited_calls(namespace, name.as_deref());
Ok(CheckResult {
limited: true,
counters,
})
}
Authorization::Ok => Ok(CheckResult {
limited: false,
counters,
limit_name: None,
}),
Authorization::Limited(name) => Ok(CheckResult {
limited: true,
counters,
limit_name: name,
}),
}
}

Expand Down Expand Up @@ -677,10 +654,6 @@ impl AsyncRateLimiter {
self.prometheus_metrics.gather_metrics()
}

pub fn prometheus_counter_access(&self, duration: Duration) {
self.prometheus_metrics.counter_access(duration)
}

async fn counters_that_apply(
&self,
namespace: &Namespace,
Expand Down
Loading

0 comments on commit 1f95d1b

Please sign in to comment.