Skip to content

Commit

Permalink
fix build descriptors
Browse files Browse the repository at this point in the history
  • Loading branch information
eguzki committed Jul 20, 2023
1 parent 8c512a5 commit 9fe81fc
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 113 deletions.
22 changes: 4 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ default = ["with-serde"]
with-serde = ["protobuf/with-serde"]

[dependencies]
proxy-wasm = "0.2"
proxy-wasm = "0.2.1"
serde_json = "1.0"
log = "0.4"
serde = { version = "1.0", features = ["derive"] }
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ curl -H "Host: test.a.com" http://127.0.0.1:18000/get
curl -H "Host: test.b.com" http://127.0.0.1:18000/get
```
* `rlp-c`: Five descriptors from multiple data items should be generated. Hence, rate limiting service should be called.
* `rlp-c`: Four descriptors from multiple rules should be generated. Hence, rate limiting service should be called.
```
curl -H "Host: test.c.com" -H "x-forwarded-for: 127.0.0.1" -H "My-Custom-Header-01: my-custom-header-value-01" -H "My-Custom-Header-02: my-custom-header-value-02" -H "x-dyn-user-id: bob" http://127.0.0.1:18000/get
curl -H "Host: test.c.com" -H "x-forwarded-for: 127.0.0.1" -H "My-Custom-Header-01: my-custom-header-value-01" -H "x-dyn-user-id: bob" http://127.0.0.1:18000/get
```
**Note:** Dynamic metadata can also be set with `user-id` as the key if you add the header `x-dyn-user-id`. This is provided using [Header-To-Metadata filter](https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/header_to_metadata_filter#config-http-filters-header-to-metadata).
Expand Down
136 changes: 76 additions & 60 deletions src/filter/http_context.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::configuration::{
Condition, DataItem, DataType, FilterConfig, PatternExpression, RateLimitPolicy,
Condition, DataItem, DataType, FailureMode, FilterConfig, PatternExpression, RateLimitPolicy,
Rule,
};
use crate::envoy::{
RateLimitDescriptor, RateLimitDescriptor_Entry, RateLimitRequest, RateLimitResponse,
RateLimitResponse_Code,
};
use crate::utils::request_process_failure;
use log::{debug, info, warn};
use protobuf::Message;
use proxy_wasm::traits::{Context, HttpContext};
Expand Down Expand Up @@ -58,28 +58,30 @@ impl Filter {
Some(&rl_req_serialized),
Duration::from_secs(5),
) {
Ok(call_id) => info!("Initiated gRPC call (id# {}) to Limitador", call_id),
Ok(call_id) => {
info!("Initiated gRPC call (id# {}) to Limitador", call_id);
Action::Pause
}
Err(e) => {
warn!("gRPC call to Limitador failed! {:?}", e);
request_process_failure(&self.config.failure_mode);
warn!("gRPC call to Limitador failed! {e:?}");
if let FailureMode::Deny = self.config.failure_mode {
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
}
Action::Continue
}
}
Action::Pause
}

fn build_descriptors(
&self,
rlp: &RateLimitPolicy,
) -> protobuf::RepeatedField<RateLimitDescriptor> {
//::protobuf::RepeatedField::default()
rlp.rules
.iter()
.filter(|rule| self.filter_rule_by_conditions(&rule.conditions))
// flatten the vec<vec<data> to vec<data>
.flat_map(|rule| &rule.data)
// WRONG: each rule generates one descriptor
jdsjd
.flat_map(|data| self.build_descriptor(data))
.filter(|rule: &&Rule| self.filter_rule_by_conditions(&rule.conditions))
// Mapping 1 Rule -> 1 Descriptor
// Filter out empty descriptors
.filter_map(|rule| self.build_single_descriptor(&rule.data))
.collect()
}

Expand Down Expand Up @@ -128,57 +130,61 @@ impl Filter {
}
}

fn build_descriptor(&self, data: &DataItem) -> Option<RateLimitDescriptor> {
fn build_single_descriptor(&self, data_list: &Vec<DataItem>) -> Option<RateLimitDescriptor> {
let mut entries = ::protobuf::RepeatedField::default();

match &data.item {
DataType::Static(static_item) => {
let mut descriptor_entry = RateLimitDescriptor_Entry::new();
descriptor_entry.set_key(static_item.key.to_owned());
descriptor_entry.set_value(static_item.value.to_owned());
entries.push(descriptor_entry);
}
DataType::Selector(selector_item) => {
let descriptor_key = match &selector_item.key {
None => selector_item.selector.to_owned(),
Some(key) => key.to_owned(),
};
// iterate over data items to allow any data item to skip the entire descriptor
for data in data_list.iter() {
match &data.item {
DataType::Static(static_item) => {
let mut descriptor_entry = RateLimitDescriptor_Entry::new();
descriptor_entry.set_key(static_item.key.to_owned());
descriptor_entry.set_value(static_item.value.to_owned());
entries.push(descriptor_entry);
}
DataType::Selector(selector_item) => {
let descriptor_key = match &selector_item.key {
None => selector_item.selector.to_owned(),
Some(key) => key.to_owned(),
};

let attribute_path = selector_item.selector.split(".").collect();
let attribute_path = selector_item.selector.split(".").collect();

// TODO(eastizle): not all fields are strings
// https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/advanced/attributes
match self.get_property(attribute_path) {
None => {
debug!(
"[context_id: {}]: selector not found: {}",
self.context_id, selector_item.selector
);
match &selector_item.default {
None => return None, // skipping descriptors
Some(default_value) => {
let mut descriptor_entry = RateLimitDescriptor_Entry::new();
descriptor_entry.set_key(descriptor_key);
descriptor_entry.set_value(default_value.to_owned());
entries.push(descriptor_entry);
// TODO(eastizle): not all fields are strings
// https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/advanced/attributes
match self.get_property(attribute_path) {
None => {
debug!(
"[context_id: {}]: selector not found: {}",
self.context_id, selector_item.selector
);
match &selector_item.default {
// skipping the entire descriptor
None => return None,
Some(default_value) => {
let mut descriptor_entry = RateLimitDescriptor_Entry::new();
descriptor_entry.set_key(descriptor_key);
descriptor_entry.set_value(default_value.to_owned());
entries.push(descriptor_entry);
}
}
}
}
Some(attribute_bytes) => match String::from_utf8(attribute_bytes) {
Err(e) => {
debug!(
Some(attribute_bytes) => match String::from_utf8(attribute_bytes) {
Err(e) => {
debug!(
"[context_id: {}]: failed to parse selector value: {}, error: {}",
self.context_id, selector_item.selector, e
);
return None;
}
Ok(attribute_value) => {
let mut descriptor_entry = RateLimitDescriptor_Entry::new();
descriptor_entry.set_key(descriptor_key);
descriptor_entry.set_value(attribute_value);
entries.push(descriptor_entry);
}
},
return None;
}
Ok(attribute_value) => {
let mut descriptor_entry = RateLimitDescriptor_Entry::new();
descriptor_entry.set_key(descriptor_key);
descriptor_entry.set_value(attribute_value);
entries.push(descriptor_entry);
}
},
}
}
}
}
Expand All @@ -187,6 +193,15 @@ impl Filter {
res.set_entries(entries);
Some(res)
}

fn handle_error_on_grpc_response(&self) {
match &self.config.failure_mode {
FailureMode::Deny => {
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
}
FailureMode::Allow => self.resume_http_request(),
}
}
}

impl HttpContext for Filter {
Expand All @@ -210,6 +225,7 @@ impl HttpContext for Filter {
}

fn on_http_response_headers(&mut self, _num_headers: usize, _end_of_stream: bool) -> Action {
debug!("on_http_response_headers #{}", self.context_id);
for (name, value) in &self.response_headers_to_add {
self.add_http_response_header(name, value);
}
Expand All @@ -220,15 +236,15 @@ impl HttpContext for Filter {
impl Context for Filter {
fn on_grpc_call_response(&mut self, token_id: u32, status_code: u32, resp_size: usize) {
info!(
"received gRPC call response: token: {}, status: {}",
token_id, status_code
"on_grpc_call_response #{}: received gRPC call response: token: {token_id}, status: {status_code}",
self.context_id
);

let res_body_bytes = match self.get_grpc_call_response_body(0, resp_size) {
Some(bytes) => bytes,
None => {
warn!("grpc response body is empty!");
request_process_failure(&self.config.failure_mode);
self.handle_error_on_grpc_response();
return;
}
};
Expand All @@ -237,7 +253,7 @@ impl Context for Filter {
Ok(res) => res,
Err(e) => {
warn!("failed to parse grpc response body into RateLimitResponse message: {e}");
request_process_failure(&self.config.failure_mode);
self.handle_error_on_grpc_response();
return;
}
};
Expand All @@ -247,7 +263,7 @@ impl Context for Filter {
overall_code: RateLimitResponse_Code::UNKNOWN,
..
} => {
request_process_failure(&self.config.failure_mode);
self.handle_error_on_grpc_response();
return;
}
RateLimitResponse {
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ mod envoy;
mod filter;
mod glob;
mod policy_index;
mod utils;

#[cfg(test)]
mod tests {
Expand Down
24 changes: 0 additions & 24 deletions src/utils.rs

This file was deleted.

Loading

0 comments on commit 9fe81fc

Please sign in to comment.