Skip to content

Commit

Permalink
New sub-crate: tower-steer (#426)
Browse files Browse the repository at this point in the history
  • Loading branch information
akshayknarayan authored Apr 1, 2020
1 parent 81cfbab commit 0520a6a
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 1 deletion.
3 changes: 2 additions & 1 deletion tower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ ready-cache = ["futures-util", "indexmap", "tokio/sync"]
reconnect = ["make", "tokio/io-std"]
retry = ["tokio/time"]
spawn-ready = ["futures-util", "tokio/sync", "tokio/rt-core"]
steer = ["futures-util"]
timeout = ["tokio/time"]
util = ["futures-util"]

Expand All @@ -57,7 +58,7 @@ slab = { version = "0.4", optional = true }
tokio = { version = "0.2", optional = true }

[dev-dependencies]
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3", default-features = false, features = ["alloc", "async-await"] }
hdrhistogram = "6.0"
quickcheck = { version = "0.6", default-features = false }
tokio = { version = "0.2", features = ["macros", "stream", "sync", "test-util" ] }
Expand Down
2 changes: 2 additions & 0 deletions tower/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub mod reconnect;
pub mod retry;
#[cfg(feature = "spawn-ready")]
pub mod spawn_ready;
#[cfg(feature = "steer")]
pub mod steer;
#[cfg(feature = "timeout")]
pub mod timeout;
#[cfg(feature = "util")]
Expand Down
137 changes: 137 additions & 0 deletions tower/src/steer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//! This module provides functionality to aid managing routing requests between Tower [`Service`]s.
//!
//! # Example
//! ```rust
//! # use std::task::{Context, Poll};
//! # use tower_service::Service;
//! # use futures_util::future::{ready, Ready, poll_fn};
//! # use tower::steer::Steer;
//! type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
//! struct MyService(u8);
//!
//! impl Service<String> for MyService {
//! type Response = ();
//! type Error = StdError;
//! type Future = Ready<Result<(), Self::Error>>;
//!
//! fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
//! Poll::Ready(Ok(()))
//! }
//!
//! fn call(&mut self, req: String) -> Self::Future {
//! println!("{}: {}", self.0, req);
//! ready(Ok(()))
//! }
//! }
//!
//! #[tokio::main]
//! async fn main() {
//! let mut s = Steer::new(
//! vec![MyService(0), MyService(1)],
//! // one service handles strings with uppercase first letters. the other handles the rest.
//! |r: &String, _: &[_]| if r.chars().next().unwrap().is_uppercase() { 0 } else { 1 },
//! );
//!
//! let reqs = vec!["A", "b", "C", "d"];
//! let reqs: Vec<String> = reqs.into_iter().map(String::from).collect();
//! for r in reqs {
//! poll_fn(|cx| s.poll_ready(cx)).await.unwrap();
//! s.call(r).await;
//! }
//! }
//! ```
use std::collections::VecDeque;
use std::task::{Context, Poll};
use tower_service::Service;

/// This is how callers of [`Steer`] tell it which `Service` a `Req` corresponds to.
pub trait Picker<S, Req> {
/// Return an index into the iterator of `Service` passed to [`Steer::new`].
fn pick(&mut self, r: &Req, services: &[S]) -> usize;
}

impl<S, F, Req> Picker<S, Req> for F
where
F: Fn(&Req, &[S]) -> usize,
{
fn pick(&mut self, r: &Req, services: &[S]) -> usize {
self(r, services)
}
}

/// `Steer` manages a list of `Service`s which all handle the same type of request.
///
/// An example use case is a sharded service.
/// It accepts new requests, then:
/// 1. Determines, via the provided [`Picker`], which `Service` the request coresponds to.
/// 2. Waits (in `poll_ready`) for *all* services to be ready.
/// 3. Calls the correct `Service` with the request, and returns a future corresponding to the
/// call.
///
/// Note that `Steer` must wait for all services to be ready since it can't know ahead of time
/// which `Service` the next message will arrive for, and is unwilling to buffer items
/// indefinitely. This will cause head-of-line blocking unless paired with a `Service` that does
/// buffer items indefinitely, and thus always returns `Poll::Ready`. For example, wrapping each
/// component service with a `tower-buffer` with a high enough limit (the maximum number of
/// concurrent requests) will prevent head-of-line blocking in `Steer`.
#[derive(Debug)]
pub struct Steer<S, F, Req> {
router: F,
services: Vec<S>,
not_ready: VecDeque<usize>,
_phantom: std::marker::PhantomData<Req>,
}

impl<S, F, Req> Steer<S, F, Req> {
/// Make a new [`Steer`] with a list of `Service`s and a `Picker`.
///
/// Note: the order of the `Service`s is significant for [`Picker::pick`]'s return value.
pub fn new(services: impl IntoIterator<Item = S>, router: F) -> Self {
let services: Vec<_> = services.into_iter().collect();
let not_ready: VecDeque<_> = services.iter().enumerate().map(|(i, _)| i).collect();
Self {
router,
services,
not_ready,
_phantom: Default::default(),
}
}
}

impl<S, Req, F> Service<Req> for Steer<S, F, Req>
where
S: Service<Req>,
F: Picker<S, Req>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
// must wait for *all* services to be ready.
// this will cause head-of-line blocking unless the underlying services are always ready.
if self.not_ready.is_empty() {
return Poll::Ready(Ok(()));
} else {
if let Poll::Pending = self.services[self.not_ready[0]].poll_ready(cx)? {
return Poll::Pending;
}

self.not_ready.pop_front();
}
}
}

fn call(&mut self, req: Req) -> Self::Future {
assert!(
self.not_ready.is_empty(),
"Steer must wait for all services to be ready. Did you forget to call poll_ready()?"
);

let idx = self.router.pick(&req, &self.services[..]);
let cl = &mut self.services[idx];
self.not_ready.push_back(idx);
cl.call(req)
}
}
60 changes: 60 additions & 0 deletions tower/tests/steer/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#![cfg(feature = "steer")]

use futures_util::future::{ready, Ready};
use std::task::{Context, Poll};
use tower::steer::Steer;
use tower_service::Service;

type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;

struct MyService(u8, bool);

impl Service<String> for MyService {
type Response = u8;
type Error = StdError;
type Future = Ready<Result<u8, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if !self.1 {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}

fn call(&mut self, _req: String) -> Self::Future {
ready(Ok(self.0))
}
}

#[test]
fn pick_correctly() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let srvs = vec![MyService(42, true), MyService(57, true)];
let mut st = Steer::new(srvs, |_: &_, _: &[_]| 1);

futures_util::future::poll_fn(|cx| st.poll_ready(cx))
.await
.unwrap();
let r = st.call(String::from("foo")).await.unwrap();
assert_eq!(r, 57);
});
}

#[test]
fn pending_all_ready() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let srvs = vec![MyService(42, true), MyService(57, false)];
let mut st = Steer::new(srvs, |_: &_, _: &[_]| 0);

let p = futures_util::poll!(futures_util::future::poll_fn(|cx| st.poll_ready(cx)));
match p {
Poll::Pending => (),
_ => panic!(
"Steer should not return poll_ready if at least one component service is not ready"
),
}
});
}

0 comments on commit 0520a6a

Please sign in to comment.