Skip to content

Commit

Permalink
Adding response headers in the RLS server so that it implements https…
Browse files Browse the repository at this point in the history
  • Loading branch information
chirino committed Mar 21, 2023
1 parent c824f99 commit 818c2b7
Show file tree
Hide file tree
Showing 9 changed files with 462 additions and 199 deletions.
264 changes: 180 additions & 84 deletions limitador-server/src/envoy_rls/server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
use std::cmp::Ordering;
use std::collections::HashMap;
use std::sync::Arc;

use tonic::{transport, transport::Server, Request, Response, Status};

use limitador::counter::Counter;

use crate::envoy_rls::server::envoy::config::core::v3::HeaderValue;
use crate::envoy_rls::server::envoy::service::ratelimit::v3::rate_limit_response::Code;
use crate::envoy_rls::server::envoy::service::ratelimit::v3::rate_limit_service_server::{
RateLimitService, RateLimitServiceServer,
Expand All @@ -6,9 +15,6 @@ use crate::envoy_rls::server::envoy::service::ratelimit::v3::{
RateLimitRequest, RateLimitResponse,
};
use crate::Limiter;
use std::collections::HashMap;
use std::sync::Arc;
use tonic::{transport, transport::Server, Request, Response, Status};

include!("envoy_types.rs");

Expand Down Expand Up @@ -62,23 +68,29 @@ impl RateLimitService for MyRateLimiter {
req.hits_addend
};

let is_rate_limited_res = match &*self.limiter {
Limiter::Blocking(limiter) => {
limiter.check_rate_limited_and_update(&namespace, &values, i64::from(hits_addend))
}
let rate_limited_resp = match &*self.limiter {
Limiter::Blocking(limiter) => limiter.check_rate_limited_and_update_getting_counters(
&namespace,
&values,
i64::from(hits_addend),
),
Limiter::Async(limiter) => {
limiter
.check_rate_limited_and_update(&namespace, &values, i64::from(hits_addend))
.check_rate_limited_and_update_getting_counters(
&namespace,
&values,
i64::from(hits_addend),
)
.await
}
};

let resp_code = match is_rate_limited_res {
Ok(rate_limited) => {
let (resp_code, mut counters) = match rate_limited_resp {
Ok((rate_limited, counters)) => {
if rate_limited {
Code::OverLimit
(Code::OverLimit, counters)
} else {
Code::Ok
(Code::Ok, counters)
}
}
Err(e) => {
Expand All @@ -100,7 +112,7 @@ impl RateLimitService for MyRateLimiter {
overall_code: resp_code.into(),
statuses: vec![],
request_headers_to_add: vec![],
response_headers_to_add: vec![],
response_headers_to_add: to_response_header(&mut counters),
raw_body: vec![],
dynamic_metadata: None,
quota: None,
Expand All @@ -110,6 +122,54 @@ impl RateLimitService for MyRateLimiter {
}
}

// create response headers per https://datatracker.ietf.org/doc/id/draft-polli-ratelimit-headers-03.html
pub fn to_response_header(counters: &mut Vec<Counter>) -> Vec<HeaderValue> {
// sort by the limit remaining..
counters.sort_by(|a, b| {
let a_remaining = a.remaining().unwrap_or(a.max_value());
let b_remaining = b.remaining().unwrap_or(b.max_value());
if a_remaining - b_remaining < 0 {
Ordering::Less
} else {
Ordering::Greater
}
});

let mut all_limits_text = String::with_capacity(20 * counters.len());
counters.iter_mut().for_each(|counter| {
all_limits_text
.push_str(format!(", {};w={}", counter.max_value(), counter.seconds()).as_str());
if let Some(name) = counter.limit().name() {
all_limits_text.push_str(format!(";name=\"{}\"", name).as_str());
}
});

let mut headers = Vec::new();
if let Some(counter) = counters.first() {
headers.push(HeaderValue {
key: "X-RateLimit-Limit".to_string(),
value: format!("{}{}", counter.max_value(), all_limits_text),
});

let mut remaining = counter.remaining().unwrap_or(counter.max_value());
if remaining < 0 {
remaining = 0
}
headers.push(HeaderValue {
key: "X-RateLimit-Remaining".to_string(),
value: format!("{}", remaining),
});

if let Some(duration) = counter.expires_in() {
headers.push(HeaderValue {
key: "X-RateLimit-Reset".to_string(),
value: format!("{}", duration.as_secs()),
});
}
}
headers
}

pub async fn run_envoy_rls_server(
address: String,
limiter: Arc<Limiter>,
Expand All @@ -125,13 +185,23 @@ pub async fn run_envoy_rls_server(

#[cfg(test)]
mod tests {
use super::*;
use tonic::IntoRequest;

use limitador::limit::Limit;
use limitador::RateLimiter;

use crate::envoy_rls::server::envoy::extensions::common::ratelimit::v3::rate_limit_descriptor::Entry;
use crate::envoy_rls::server::envoy::extensions::common::ratelimit::v3::RateLimitDescriptor;
use crate::Configuration;
use limitador::limit::Limit;
use limitador::RateLimiter;
use tonic::IntoRequest;

use super::*;

fn header_value(key: &str, value: &str) -> HeaderValue {
HeaderValue {
key: key.to_string(),
value: value.to_string(),
}
}

// All these tests use the in-memory storage implementation to simplify. We
// know that some storage implementations like the Redis one trade
Expand Down Expand Up @@ -177,24 +247,32 @@ mod tests {
// There's a limit of 1, so the first request should return "OK" and the
// second "OverLimit".

let response = rate_limiter
.should_rate_limit(req.clone().into_request())
.await
.unwrap()
.into_inner();
assert_eq!(response.overall_code, i32::from(Code::Ok));
assert_eq!(
rate_limiter
.should_rate_limit(req.clone().into_request())
.await
.unwrap()
.into_inner()
.overall_code,
i32::from(Code::Ok)
response.response_headers_to_add,
vec![
header_value("X-RateLimit-Limit", "1, 1;w=60"),
header_value("X-RateLimit-Remaining", "0"),
],
);

let response = rate_limiter
.should_rate_limit(req.clone().into_request())
.await
.unwrap()
.into_inner();
assert_eq!(response.overall_code, i32::from(Code::OverLimit));
assert_eq!(
rate_limiter
.should_rate_limit(req.clone().into_request())
.await
.unwrap()
.into_inner()
.overall_code,
i32::from(Code::OverLimit)
response.response_headers_to_add,
vec![
header_value("X-RateLimit-Limit", "1, 1;w=60"),
header_value("X-RateLimit-Remaining", "0"),
],
);
}

Expand All @@ -218,15 +296,14 @@ mod tests {
}
.into_request();

assert_eq!(
rate_limiter
.should_rate_limit(req)
.await
.unwrap()
.into_inner()
.overall_code,
i32::from(Code::Ok)
);
let response = rate_limiter
.should_rate_limit(req)
.await
.unwrap()
.into_inner();

assert_eq!(response.overall_code, i32::from(Code::Ok));
assert_eq!(response.response_headers_to_add, vec![],);
}

#[tokio::test]
Expand All @@ -248,15 +325,13 @@ mod tests {
}
.into_request();

assert_eq!(
rate_limiter
.should_rate_limit(req)
.await
.unwrap()
.into_inner()
.overall_code,
i32::from(Code::Unknown)
);
let response = rate_limiter
.should_rate_limit(req)
.await
.unwrap()
.into_inner();
assert_eq!(response.overall_code, i32::from(Code::Unknown));
assert_eq!(response.response_headers_to_add, vec![],);
}

#[tokio::test]
Expand Down Expand Up @@ -305,14 +380,19 @@ mod tests {
hits_addend: 1,
};

let response = rate_limiter
.should_rate_limit(req.clone().into_request())
.await
.unwrap()
.into_inner();

assert_eq!(response.overall_code, i32::from(Code::OverLimit));
assert_eq!(
rate_limiter
.should_rate_limit(req.clone().into_request())
.await
.unwrap()
.into_inner()
.overall_code,
i32::from(Code::OverLimit)
response.response_headers_to_add,
vec![
header_value("X-RateLimit-Limit", "0, 0;w=60, 10;w=60"),
header_value("X-RateLimit-Remaining", "0"),
],
);
}

Expand Down Expand Up @@ -347,24 +427,32 @@ mod tests {
// There's a limit of 10, "hits_addend" is 6, so the first request
// should return "Ok" and the second "OverLimit".

let response = rate_limiter
.should_rate_limit(req.clone().into_request())
.await
.unwrap()
.into_inner();
assert_eq!(response.overall_code, i32::from(Code::Ok));
assert_eq!(
rate_limiter
.should_rate_limit(req.clone().into_request())
.await
.unwrap()
.into_inner()
.overall_code,
i32::from(Code::Ok)
response.response_headers_to_add,
vec![
header_value("X-RateLimit-Limit", "10, 10;w=60"),
header_value("X-RateLimit-Remaining", "4"),
],
);

let response = rate_limiter
.should_rate_limit(req.clone().into_request())
.await
.unwrap()
.into_inner();
assert_eq!(response.overall_code, i32::from(Code::OverLimit));
assert_eq!(
rate_limiter
.should_rate_limit(req.clone().into_request())
.await
.unwrap()
.into_inner()
.overall_code,
i32::from(Code::OverLimit)
response.response_headers_to_add,
vec![
header_value("X-RateLimit-Limit", "10, 10;w=60"),
header_value("X-RateLimit-Remaining", "0"),
],
);
}

Expand Down Expand Up @@ -401,24 +489,32 @@ mod tests {
// There's a limit of 1, and hits_addend is converted to 1, so the first
// request should return "OK" and the second "OverLimit".

let response = rate_limiter
.should_rate_limit(req.clone().into_request())
.await
.unwrap()
.into_inner();
assert_eq!(response.overall_code, i32::from(Code::Ok));
assert_eq!(
rate_limiter
.should_rate_limit(req.clone().into_request())
.await
.unwrap()
.into_inner()
.overall_code,
i32::from(Code::Ok)
response.response_headers_to_add,
vec![
header_value("X-RateLimit-Limit", "1, 1;w=60"),
header_value("X-RateLimit-Remaining", "0"),
],
);

let response = rate_limiter
.should_rate_limit(req.clone().into_request())
.await
.unwrap()
.into_inner();
assert_eq!(response.overall_code, i32::from(Code::OverLimit));
assert_eq!(
rate_limiter
.should_rate_limit(req.clone().into_request())
.await
.unwrap()
.into_inner()
.overall_code,
i32::from(Code::OverLimit)
response.response_headers_to_add,
vec![
header_value("X-RateLimit-Limit", "1, 1;w=60"),
header_value("X-RateLimit-Remaining", "0"),
],
);
}
}
Loading

0 comments on commit 818c2b7

Please sign in to comment.