Skip to content

Commit

Permalink
rewrite tower::filter (#508)
Browse files Browse the repository at this point in the history
## Motivation

It was pointed out that there is currently some overlap between the
`try_with` `Service` combinator and `tower::filter` middleware (see #499 (comment) ).
`try_with` synchronously maps from a `Request` ->
`Result<DifferentRequest, Error>`, while `tower::filter`
_asynchronously_ maps from a `&Request` to a `Result<(), Error>`. The
key differences are: - `try_with` takes a request by value, and allows
the predicate to return a *different* request value - `try_with` also
permits changing the _type_ of the request - `try_with` is synchronous,
while `tower::filter` is asynchronous - `tower::filter` has a
`Predicate` trait, which can be implemented by more than just functions.
For example, a struct with a `HashSet` could implement `Predicate` by
failing requests that match the values in the hashset.

It definitely seems like there's demand for both synchronous and
asynchronous request filtering. However, the APIs we have currently
differ pretty significantly. It would be nice to make them more
consistent with each other.

As an aside, `tower::filter` [does not seem all that widely used][1].


Meanwhile, `linkerd2-proxy` defines its own `RequestFilter` middleware,
using a [predicate trait][2] that's essentially in between `tower::filter` and
`ServiceExt::try_with`: - it's synchronous, like `try_with` - it allows
modifying the type of the request, like `try_with` - it uses a trait for
predicates, rather than a `Fn`, like `tower::filter` - it uses a similar
naming scheme to `tower::filter` ("filtering" rather than "with"/"map").

[1]: https://github.com/search?l=&p=1&q=%22tower%3A%3Afilter%22+extension%3Ars&ref=advsearch&type=Code
[2]: https://github.com/linkerd/linkerd2-proxy/blob/24bee8cbc5413b4587a14bea1e2714ce1f1f919a/linkerd/stack/src/request_filter.rs#L8-L12

## Solution

This branch rewrites `tower::filter` to make the following changes:

* Predicates are synchronous by default. A separate `AsyncFilter` type
  and an `AsyncPredicate` trait are available for predicates returning
  futures.
* Predicates may now return a new `Request` type, allowing `Filter` and
  `AsyncFilter` to subsume `try_map_request`.
* Predicates may now return any error type, and errors are now converted
  into `BoxError`s.

Closes #502

Signed-off-by: Eliza Weisman <[email protected]>
  • Loading branch information
hawkw authored Jan 6, 2021
1 parent b7a7c28 commit 2a7d47a
Show file tree
Hide file tree
Showing 13 changed files with 423 additions and 231 deletions.
2 changes: 1 addition & 1 deletion tower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ log = ["tracing/log"]
balance = ["discover", "load", "ready-cache", "make", "rand", "slab", "tokio-stream"]
buffer = ["tokio/sync", "tokio/rt", "tokio-stream"]
discover = []
filter = []
filter = ["futures-util"]
hedge = ["util", "filter", "futures-util", "hdrhistogram", "tokio/time"]
limit = ["tokio/time", "tokio/sync"]
load = ["tokio/time"]
Expand Down
57 changes: 36 additions & 21 deletions tower/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,42 @@ impl<L> ServiceBuilder<L> {
self.layer(crate::timeout::TimeoutLayer::new(timeout))
}

/// Conditionally reject requests based on `predicate`.
///
/// `predicate` must implement the [`Predicate`] trait.
///
/// This wraps the inner service with an instance of the [`Filter`]
/// middleware.
///
/// [`Filter`]: crate::filter
/// [`Predicate`]: crate::filter::Predicate
#[cfg(feature = "filter")]
#[cfg_attr(docsrs, doc(cfg(feature = "filter")))]
pub fn filter<P>(
self,
predicate: P,
) -> ServiceBuilder<Stack<crate::filter::FilterLayer<P>, L>> {
self.layer(crate::filter::FilterLayer::new(predicate))
}

/// Conditionally reject requests based on an asynchronous `predicate`.
///
/// `predicate` must implement the [`AsyncPredicate`] trait.
///
/// This wraps the inner service with an instance of the [`AsyncFilter`]
/// middleware.
///
/// [`AsyncFilter`]: crate::filter::AsyncFilter
/// [`AsyncPredicate`]: crate::filter::AsyncPredicate
#[cfg(feature = "filter")]
#[cfg_attr(docsrs, doc(cfg(feature = "filter")))]
pub fn filter_async<P>(
self,
predicate: P,
) -> ServiceBuilder<Stack<crate::filter::AsyncFilterLayer<P>, L>> {
self.layer(crate::filter::AsyncFilterLayer::new(predicate))
}

/// Map one request type to another.
///
/// This wraps the inner service with an instance of the [`MapRequest`]
Expand Down Expand Up @@ -311,27 +347,6 @@ impl<L> ServiceBuilder<L> {
self.layer(crate::util::MapRequestLayer::new(f))
}

/// Fallibly one request type to another, or to an error.
///
/// This wraps the inner service with an instance of the [`TryMapRequest`]
/// middleware.
///
/// See the documentation for the [`try_map_request` combinator] for details.
///
/// [`TryMapRequest`]: crate::util::MapResponse
/// [`try_map_request` combinator]: crate::util::ServiceExt::try_map_request
#[cfg(feature = "util")]
#[cfg_attr(docsrs, doc(cfg(feature = "util")))]
pub fn try_map_request<F, R1, R2, E>(
self,
f: F,
) -> ServiceBuilder<Stack<crate::util::TryMapRequestLayer<F>, L>>
where
F: FnMut(R1) -> Result<R2, E> + Clone,
{
self.layer(crate::util::TryMapRequestLayer::new(f))
}

/// Map one response type to another.
///
/// This wraps the inner service with an instance of the [`MapResponse`]
Expand Down
46 changes: 0 additions & 46 deletions tower/src/filter/error.rs

This file was deleted.

83 changes: 41 additions & 42 deletions tower/src/filter/future.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Future types

use super::error::Error;
use super::AsyncPredicate;
use crate::BoxError;
use futures_core::ready;
use pin_project::pin_project;
use std::{
Expand All @@ -10,79 +11,77 @@ use std::{
};
use tower_service::Service;

/// Filtered response future
/// Filtered response future from [`AsyncFilter`] services.
///
/// [`AsyncFilter`]: crate::filter::AsyncFilter
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T, S, Request>
pub struct AsyncResponseFuture<P, S, Request>
where
S: Service<Request>,
P: AsyncPredicate<Request>,
S: Service<P::Request>,
{
#[pin]
/// Response future state
state: State<Request, S::Future>,

#[pin]
/// Predicate future
check: T,
state: State<P::Future, S::Future>,

/// Inner service
service: S,
}

opaque_future! {
/// Filtered response future from [`Filter`] services.
///
/// [`Filter`]: crate::filter::Filter
pub type ResponseFuture<R, F> =
futures_util::future::Either<
futures_util::future::Ready<Result<R, crate::BoxError>>,
futures_util::future::ErrInto<F, crate::BoxError>
>;
}

#[pin_project(project = StateProj)]
#[derive(Debug)]
enum State<Request, U> {
Check(Option<Request>),
WaitResponse(#[pin] U),
enum State<F, G> {
/// Waiting for the predicate future
Check(#[pin] F),
/// Waiting for the response future
WaitResponse(#[pin] G),
}

impl<F, T, S, Request> ResponseFuture<F, S, Request>
impl<P, S, Request> AsyncResponseFuture<P, S, Request>
where
F: Future<Output = Result<T, Error>>,
S: Service<Request>,
S::Error: Into<crate::BoxError>,
P: AsyncPredicate<Request>,
S: Service<P::Request>,
S::Error: Into<BoxError>,
{
pub(crate) fn new(request: Request, check: F, service: S) -> Self {
ResponseFuture {
state: State::Check(Some(request)),
check,
pub(crate) fn new(check: P::Future, service: S) -> Self {
Self {
state: State::Check(check),
service,
}
}
}

impl<F, T, S, Request> Future for ResponseFuture<F, S, Request>
impl<P, S, Request> Future for AsyncResponseFuture<P, S, Request>
where
F: Future<Output = Result<T, Error>>,
S: Service<Request>,
P: AsyncPredicate<Request>,
S: Service<P::Request>,
S::Error: Into<crate::BoxError>,
{
type Output = Result<S::Response, Error>;
type Output = Result<S::Response, crate::BoxError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

loop {
match this.state.as_mut().project() {
StateProj::Check(request) => {
let request = request
.take()
.expect("we either give it back or leave State::Check once we take");

// Poll predicate
match this.check.as_mut().poll(cx)? {
Poll::Ready(_) => {
let response = this.service.call(request);
this.state.set(State::WaitResponse(response));
}
Poll::Pending => {
this.state.set(State::Check(Some(request)));
return Poll::Pending;
}
}
StateProj::Check(mut check) => {
let request = ready!(check.as_mut().poll(cx))?;
let response = this.service.call(request);
this.state.set(State::WaitResponse(response));
}
StateProj::WaitResponse(response) => {
return Poll::Ready(ready!(response.poll(cx)).map_err(Error::inner));
return response.poll(cx).map_err(Into::into);
}
}
}
Expand Down
56 changes: 52 additions & 4 deletions tower/src/filter/layer.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,42 @@
use super::Filter;
use super::{AsyncFilter, Filter};
use tower_layer::Layer;

/// Conditionally dispatch requests to the inner service based on a predicate.
/// Conditionally dispatch requests to the inner service based on a synchronous
/// [predicate].
///
/// This [`Layer`] produces instances of the [`Filter`] service.
///
/// [predicate]: crate::filter::Predicate
/// [`Layer`]: crate::Layer
/// [`Filter`]: crate::filter::Filter
#[derive(Debug)]
pub struct FilterLayer<U> {
predicate: U,
}

/// Conditionally dispatch requests to the inner service based on an asynchronous
/// [predicate].
///
/// This [`Layer`] produces instances of the [`AsyncFilter`] service.
///
/// [predicate]: crate::filter::AsyncPredicate
/// [`Layer`]: crate::Layer
/// [`Filter`]: crate::filter::AsyncFilter
#[derive(Debug)]
pub struct AsyncFilterLayer<U> {
predicate: U,
}

// === impl FilterLayer ===

impl<U> FilterLayer<U> {
#[allow(missing_docs)]
/// Returns a new layer that produces [`Filter`] services with the given
/// [`Predicate`].
///
/// [`Predicate`]: crate::filter::Predicate
/// [`Filter`]: crate::filter::Filter
pub fn new(predicate: U) -> Self {
FilterLayer { predicate }
Self { predicate }
}
}

Expand All @@ -22,3 +48,25 @@ impl<U: Clone, S> Layer<S> for FilterLayer<U> {
Filter::new(service, predicate)
}
}

// === impl AsyncFilterLayer ===

impl<U> AsyncFilterLayer<U> {
/// Returns a new layer that produces [`AsyncFilter`] services with the given
/// [`AsyncPredicate`].
///
/// [`AsyncPredicate`]: crate::filter::AsyncPredicate
/// [`Filter`]: crate::filter::Filter
pub fn new(predicate: U) -> Self {
Self { predicate }
}
}

impl<U: Clone, S> Layer<S> for AsyncFilterLayer<U> {
type Service = AsyncFilter<S, U>;

fn layer(&self, service: S) -> Self::Service {
let predicate = self.predicate.clone();
AsyncFilter::new(service, predicate)
}
}
Loading

0 comments on commit 2a7d47a

Please sign in to comment.