Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update tower-retry to std::future #326

Merged
merged 3 commits into from
Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
"tower-load",
# "tower-load-shed",
# "tower-reconnect",
# "tower-retry",
"tower-retry",
"tower-service",
# "tower-spawn-ready",
"tower-test",
Expand Down
4 changes: 4 additions & 0 deletions tower-retry/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 0.3.0-alpha.1

- Move to `std::future`

# 0.1.0 (April 26, 2019)

- Initial release
18 changes: 10 additions & 8 deletions tower-retry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,27 @@ name = "tower-retry"
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.1.0"
version = "0.3.0-alpha.1"
authors = ["Tower Maintainers <[email protected]>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-retry/0.1.0"
documentation = "https://docs.rs/tower-retry/0.3.0-alpha.1"
description = """
Retry failed requests.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"

[dependencies]
futures = "0.1.26"
tower-service = "0.2.0"
tower-layer = "0.1.0"
tokio-timer = "0.2.4"
tower-service = "0.3.0-alpha.1"
tower-layer = { version = "0.3.0-alpha.1", path = "../tower-layer" }
tokio-timer = "0.3.0-alpha.4"
pin-project = { version = "0.4.0-alpha.10", features = ["project_attr"] }
futures-core-preview = "0.3.0-alpha.18"

[dev-dependencies]
tower-test = { version = "0.1.0", path = "../tower-test" }
tokio-executor = "0.1.2"
tower-test = { version = "0.3.0-alpha.1", path = "../tower-test" }
tokio-test = "0.2.0-alpha.4"
futures-util-preview = "0.3.0-alpha.18"
44 changes: 9 additions & 35 deletions tower-retry/src/budget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,8 @@ impl Bucket {

#[cfg(test)]
mod tests {
use self::tokio_executor::enter;
use super::*;
use std::{
sync::{Arc, Mutex, MutexGuard},
time::Instant,
};
use tokio_executor;
use tokio_test::clock;

#[test]
fn empty() {
Expand All @@ -229,37 +224,33 @@ mod tests {

#[test]
fn leaky() {
let time = MockNow(Arc::new(Mutex::new(Instant::now())));
let clock = clock::Clock::new_with_now(time.clone());
clock::with_default(&clock, &mut enter().unwrap(), |_| {
clock::mock(|time| {
let bgt = Budget::new(Duration::from_secs(1), 0, 1.0);
bgt.deposit();

*time.as_mut() += Duration::from_secs(3);
time.advance(Duration::from_secs(3));

bgt.withdraw().unwrap_err();
});
}

#[test]
fn slots() {
let time = MockNow(Arc::new(Mutex::new(Instant::now())));
let clock = clock::Clock::new_with_now(time.clone());
clock::with_default(&clock, &mut enter().unwrap(), |_| {
clock::mock(|time| {
let bgt = Budget::new(Duration::from_secs(1), 0, 0.5);
bgt.deposit();
bgt.deposit();
*time.as_mut() += Duration::from_millis(900);
time.advance(Duration::from_millis(900));
// 900ms later, the deposit should still be valid
bgt.withdraw().unwrap();

// blank slate
*time.as_mut() += Duration::from_millis(2000);
time.advance(Duration::from_millis(2000));

bgt.deposit();
*time.as_mut() += Duration::from_millis(300);
time.advance(Duration::from_millis(300));
bgt.deposit();
*time.as_mut() += Duration::from_millis(800);
time.advance(Duration::from_millis(800));
bgt.deposit();

// the first deposit is expired, but the 2nd should still be valid,
Expand All @@ -270,9 +261,7 @@ mod tests {

#[test]
fn reserve() {
let time = MockNow(Arc::new(Mutex::new(Instant::now())));
let clock = clock::Clock::new_with_now(time.clone());
clock::with_default(&clock, &mut enter().unwrap(), |_| {
clock::mock(|_| {
let bgt = Budget::new(Duration::from_secs(1), 5, 1.0);
bgt.withdraw().unwrap();
bgt.withdraw().unwrap();
Expand All @@ -283,19 +272,4 @@ mod tests {
bgt.withdraw().unwrap_err();
});
}

#[derive(Clone)]
struct MockNow(Arc<Mutex<Instant>>);

impl MockNow {
fn as_mut(&self) -> MutexGuard<Instant> {
self.0.lock().unwrap()
}
}

impl clock::Now for MockNow {
fn now(&self) -> Instant {
*self.0.lock().expect("now")
}
}
}
95 changes: 53 additions & 42 deletions tower-retry/src/future.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,39 @@
//! Future types

use crate::{Policy, Retry};
use futures::{try_ready, Async, Future, Poll};
use futures_core::ready;
use pin_project::{pin_project, project};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower_service::Service;

// NOTE: this is the trait generated for Ready::project() by pin-project.
// We need it here to be able to go "through" Ready to &mut Service without adding Unpin bounds.
use crate::__RetryProjectionTrait;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed taiki-e/pin-project#80 to resolve this.


/// The `Future` returned by a `Retry` service.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<P, S, Request>
where
P: Policy<Request, S::Response, S::Error>,
S: Service<Request>,
{
request: Option<Request>,
#[pin]
retry: Retry<P, S>,
state: State<S::Future, P::Future, S::Response, S::Error>,
#[pin]
state: State<S::Future, P::Future>,
}

#[pin_project]
#[derive(Debug)]
enum State<F, P, R, E> {
enum State<F, P> {
/// Polling the future from `Service::call`
Called(F),
Called(#[pin] F),
/// Polling the future from `Policy::retry`
Checking(P, Option<Result<R, E>>),
Checking(#[pin] P),
/// Polling `Service::poll_ready` after `Checking` was OK.
Retrying,
}
Expand Down Expand Up @@ -49,56 +61,55 @@ where
P: Policy<Request, S::Response, S::Error> + Clone,
S: Service<Request> + Clone,
{
type Item = S::Response;
type Error = S::Error;
type Output = Result<S::Response, S::Error>;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let next = match self.state {
State::Called(ref mut future) => {
let result = match future.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(res)) => Ok(res),
Err(err) => Err(err),
};
#[project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

if let Some(ref req) = self.request {
match self.retry.policy.retry(req, result.as_ref()) {
Some(checking) => State::Checking(checking, Some(result)),
None => return result.map(Async::Ready),
loop {
#[project]
match this.state.project() {
State::Called(future) => {
let result = ready!(future.poll(cx));
if let Some(ref req) = this.request {
match this.retry.policy.retry(req, result.as_ref()) {
Some(checking) => {
this.state.set(State::Checking(checking));
}
None => return Poll::Ready(result),
}
} else {
// request wasn't cloned, so no way to retry it
return result.map(Async::Ready);
return Poll::Ready(result);
}
}
State::Checking(ref mut future, ref mut result) => {
let policy = match future.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(policy)) => policy,
Err(()) => {
// if Policy::retry() fails, return the original
// result...
return result
.take()
.expect("polled after complete")
.map(Async::Ready);
}
};
self.retry.policy = policy;
State::Retrying
State::Checking(future) => {
this.retry.project().policy.set(ready!(future.poll(cx)));
this.state.set(State::Retrying);
}
State::Retrying => {
try_ready!(self.retry.poll_ready());
let req = self
// NOTE: we assume here that
//
// this.retry.poll_ready()
//
// is equivalent to
//
// this.retry.service.poll_ready()
//
// we need to make that assumption to avoid adding an Unpin bound to the Policy
// in Ready to make it Unpin so that we can get &mut Ready as needed to call
// poll_ready on it.
ready!(this.retry.project().service.poll_ready(cx))?;
let req = this
.request
.take()
.expect("retrying requires cloned request");
self.request = self.retry.policy.clone_request(&req);
State::Called(self.retry.service.call(req))
*this.request = this.retry.policy.clone_request(&req);
this.state
.set(State::Called(this.retry.project().service.call(req)));
}
};
self.state = next;
}
}
}
}
14 changes: 10 additions & 4 deletions tower-retry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![doc(html_root_url = "https://docs.rs/tower-retry/0.1.0")]
#![doc(html_root_url = "https://docs.rs/tower-retry/0.3.0-alpha.1")]
#![deny(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![cfg_attr(test, deny(warnings))]
#![allow(elided_lifetimes_in_paths)]
Expand All @@ -14,14 +14,17 @@ pub use crate::layer::RetryLayer;
pub use crate::policy::Policy;

use crate::future::ResponseFuture;
use futures::Poll;
use pin_project::pin_project;
use std::task::{Context, Poll};
use tower_service::Service;

/// Configure retrying requests of "failed" responses.
///
/// A `Policy` classifies what is a "failed" response.
#[pin_project]
#[derive(Clone, Debug)]
pub struct Retry<P, S> {
#[pin]
policy: P,
service: S,
}
Expand All @@ -44,8 +47,11 @@ where
type Error = S::Error;
type Future = ResponseFuture<P, S, Request>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready()
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// NOTE: the Future::poll impl for ResponseFuture assumes that Retry::poll_ready is
// equivalent to Ready.service.poll_ready. If this ever changes, that code must be updated
// as well.
self.service.poll_ready(cx)
}

fn call(&mut self, request: Request) -> Self::Future {
Expand Down
15 changes: 5 additions & 10 deletions tower-retry/src/policy.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
use futures::Future;
use std::future::Future;

/// A "retry policy" to classify if a request should be retried.
///
/// # Example
///
/// ```
/// extern crate futures;
/// extern crate tower_retry;
///
/// use tower_retry::Policy;
/// use futures_util::future;
///
/// type Req = String;
/// type Res = String;
///
/// struct Attempts(usize);
///
/// impl<E> Policy<Req, Res, E> for Attempts {
/// type Future = futures::future::FutureResult<Self, ()>;
/// type Future = future::Ready<Self>;
///
/// fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
/// match result {
Expand All @@ -30,7 +28,7 @@ use futures::Future;
/// // But we limit the number of attempts...
/// if self.0 > 0 {
/// // Try again!
/// Some(futures::future::ok(Attempts(self.0 - 1)))
/// Some(future::ready(Attempts(self.0 - 1)))
/// } else {
/// // Used all our attempts, no retry...
/// None
Expand All @@ -46,7 +44,7 @@ use futures::Future;
/// ```
pub trait Policy<Req, Res, E>: Sized {
/// The `Future` type returned by `Policy::retry()`.
type Future: Future<Item = Self, Error = ()>;
type Future: Future<Output = Self>;
/// Check the policy if a certain request should be retried.
///
/// This method is passed a reference to the original request, and either
Expand All @@ -56,9 +54,6 @@ pub trait Policy<Req, Res, E>: Sized {
///
/// If the request *should* be retried, return `Some` future of a new
/// policy that would apply for the next request attempt.
///
/// If the returned `Future` errors, the request will **not** be retried
/// after all.
fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future>;
/// Tries to clone a request before being passed to the inner service.
///
Expand Down
Loading