Skip to content

Commit

Permalink
Refactor filter model from Context->Response into ref mut Context
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed Oct 25, 2022
1 parent 7b8b464 commit a191434
Show file tree
Hide file tree
Showing 23 changed files with 320 additions and 432 deletions.
8 changes: 4 additions & 4 deletions docs/src/filters/writing_custom_filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ sent to a downstream client.
use quilkin::filters::prelude::*;
impl Filter for Greet {
fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
fn read(&self, ctx: &mut ReadContext) -> Option<()> {
ctx.contents.extend(b"Hello");
Some(ctx.into())
Some(())
}
fn write(&self, mut ctx: WriteContext) -> Option<WriteResponse> {
fn write(&self, ctx: &mut WriteContext) -> Option<()> {
ctx.contents.extend(b"Goodbye");
Some(ctx.into())
Some(())
}
}
```
Expand Down
8 changes: 4 additions & 4 deletions examples/quilkin-filter-example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ struct Greet {
}

impl Filter for Greet {
fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
fn read(&self, ctx: &mut ReadContext) -> Option<()> {
ctx.contents
.splice(0..0, format!("{} ", self.config.greeting).into_bytes());
Some(ctx.into())
Some(())
}
fn write(&self, mut ctx: WriteContext) -> Option<WriteResponse> {
fn write(&self, ctx: &mut WriteContext) -> Option<()> {
ctx.contents
.splice(0..0, format!("{} ", self.config.greeting).into_bytes());
Some(ctx.into())
Some(())
}
}
// ANCHOR_END: filter
Expand Down
4 changes: 2 additions & 2 deletions src/config/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,11 @@ impl<T: JsonSchema + Default> JsonSchema for Slot<T> {
}

impl<T: crate::filters::Filter + Default> crate::filters::Filter for Slot<T> {
fn read(&self, ctx: ReadContext) -> Option<ReadResponse> {
fn read(&self, ctx: &mut ReadContext) -> Option<()> {
self.load().read(ctx)
}

fn write(&self, ctx: WriteContext) -> Option<WriteResponse> {
fn write(&self, ctx: &mut WriteContext) -> Option<()> {
self.load().write(ctx)
}
}
Expand Down
35 changes: 16 additions & 19 deletions src/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub mod token_router;
pub mod prelude {
pub use super::{
ConvertProtoConfigError, CreateFilterArgs, Error, Filter, FilterInstance, ReadContext,
ReadResponse, StaticFilter, WriteContext, WriteResponse,
StaticFilter, WriteContext,
};
}

Expand All @@ -62,12 +62,12 @@ pub use self::{
local_rate_limit::LocalRateLimit,
pass::Pass,
r#match::Match,
read::{ReadContext, ReadResponse},
read::ReadContext,
registry::FilterRegistry,
set::{FilterMap, FilterSet},
timestamp::Timestamp,
token_router::TokenRouter,
write::{WriteContext, WriteResponse},
write::WriteContext,
};

pub(crate) use self::chain::FilterChain;
Expand All @@ -83,13 +83,13 @@ pub(crate) use self::chain::FilterChain;
/// struct Greet;
///
/// impl Filter for Greet {
/// fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
/// fn read(&self, ctx: &mut ReadContext) -> Option<()> {
/// ctx.contents.splice(0..0, b"Hello ".into_iter().copied());
/// Some(ctx.into())
/// Some(())
/// }
/// fn write(&self, mut ctx: WriteContext) -> Option<WriteResponse> {
/// fn write(&self, ctx: &mut WriteContext) -> Option<()> {
/// ctx.contents.splice(0..0, b"Goodbye ".into_iter().copied());
/// Some(ctx.into())
/// Some(())
/// }
/// }
///
Expand Down Expand Up @@ -194,23 +194,20 @@ pub trait Filter: Send + Sync {
/// [`Filter::read`] is invoked when the proxy receives data from a
/// downstream connection on the listening port.
///
/// This function should return a [`ReadResponse`] containing the array of
/// endpoints that the packet should be sent to and the packet that should
/// be sent (which may be manipulated) as well. If the packet should be
/// rejected, return [`None`]. By default, the context passes
/// through unchanged.
fn read(&self, ctx: ReadContext) -> Option<ReadResponse> {
Some(ctx.into())
/// This function should return an `Some` if the packet processing should
/// proceed. If the packet should be rejected, it will return [`None`]
/// instead. By default, the context passes through unchanged.
fn read(&self, _: &mut ReadContext) -> Option<()> {
Some(())
}

/// [`Filter::write`] is invoked when the proxy is about to send data to a
/// downstream connection via the listening port after receiving it via one
/// of the upstream Endpoints.
///
/// This function should return an [`WriteResponse`] containing the packet to
/// be sent (which may be manipulated). If the packet should be rejected,
/// return [`None`]. By default, the context passes through unchanged.
fn write(&self, ctx: WriteContext) -> Option<WriteResponse> {
Some(ctx.into())
/// This function should return an `Some` if the packet processing should
/// proceed. If the packet should be rejected, it will return [`None`]
fn write(&self, _: &mut WriteContext) -> Option<()> {
Some(())
}
}
37 changes: 19 additions & 18 deletions src/filters/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ impl Capture {

impl Filter for Capture {
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))]
fn read(&self, mut ctx: ReadContext) -> Option<ReadResponse> {
fn read(&self, ctx: &mut ReadContext) -> Option<()> {
let capture = self.capture.capture(&mut ctx.contents, &self.metrics);
ctx.metadata
.insert(self.is_present_key.clone(), Value::Bool(capture.is_some()));

if let Some(value) = capture {
tracing::trace!(key=&**self.metadata_key, %value, "captured value");
ctx.metadata.insert(self.metadata_key.clone(), value);
Some(ctx.into())
Some(())
} else {
tracing::trace!(key = &**self.metadata_key, "No value captured");
None
Expand Down Expand Up @@ -163,13 +163,14 @@ mod tests {
};
let filter = Capture::from_config(config.into());
let endpoints = vec![Endpoint::new("127.0.0.1:81".parse().unwrap())];
let response = filter.read(ReadContext::new(
endpoints,
(std::net::Ipv4Addr::LOCALHOST, 80).into(),
"abc".to_string().into_bytes(),
));
assert!(filter
.read(&mut ReadContext::new(
endpoints,
(std::net::Ipv4Addr::LOCALHOST, 80).into(),
"abc".to_string().into_bytes(),
))
.is_none());

assert!(response.is_none());
let count = filter.metrics.packets_dropped_total.get();
assert_eq!(1, count);
}
Expand Down Expand Up @@ -243,21 +244,21 @@ mod tests {
F: Filter + ?Sized,
{
let endpoints = vec![Endpoint::new("127.0.0.1:81".parse().unwrap())];
let response = filter
.read(ReadContext::new(
endpoints,
"127.0.0.1:80".parse().unwrap(),
"helloabc".to_string().into_bytes(),
))
.unwrap();
let mut context = ReadContext::new(
endpoints,
"127.0.0.1:80".parse().unwrap(),
"helloabc".to_string().into_bytes(),
);

filter.read(&mut context).unwrap();

if remove {
assert_eq!(b"hello".to_vec(), response.contents);
assert_eq!(b"hello", &*context.contents);
} else {
assert_eq!(b"helloabc".to_vec(), response.contents);
assert_eq!(b"helloabc", &*context.contents);
}

let token = response
let token = context
.metadata
.get(&Arc::new(key.into()))
.unwrap()
Expand Down
Loading

0 comments on commit a191434

Please sign in to comment.