Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update tower-retry to std::future #326

Merged
merged 3 commits into from
Sep 9, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
"tower-load",
# "tower-load-shed",
# "tower-reconnect",
# "tower-retry",
"tower-retry",
"tower-service",
# "tower-spawn-ready",
"tower-test",
Expand Down
4 changes: 4 additions & 0 deletions tower-retry/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 0.3.0-alpha.1

- Move to `std::future`

# 0.1.0 (April 26, 2019)

- Initial release
18 changes: 10 additions & 8 deletions tower-retry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,27 @@ name = "tower-retry"
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.1.0"
version = "0.3.0-alpha.1"
authors = ["Tower Maintainers <[email protected]>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-retry/0.1.0"
documentation = "https://docs.rs/tower-retry/0.3.0-alpha.1"
description = """
Retry failed requests.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"

[dependencies]
futures = "0.1.26"
tower-service = "0.2.0"
tower-layer = "0.1.0"
tokio-timer = "0.2.4"
tower-service = "0.3.0-alpha.1"
tower-layer = { version = "0.3.0-alpha.1", path = "../tower-layer" }
tokio-timer = "0.3.0-alpha.4"
pin-project = { version = "0.4.0-alpha.10", features = ["project_attr"] }
futures-core-preview = "0.3.0-alpha.18"

[dev-dependencies]
tower-test = { version = "0.1.0", path = "../tower-test" }
tokio-executor = "0.1.2"
tower-test = { version = "0.3.0-alpha.1", path = "../tower-test" }
tokio-test = "0.2.0-alpha.4"
futures-util-preview = "0.3.0-alpha.18"
44 changes: 9 additions & 35 deletions tower-retry/src/budget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,8 @@ impl Bucket {

#[cfg(test)]
mod tests {
use self::tokio_executor::enter;
use super::*;
use std::{
sync::{Arc, Mutex, MutexGuard},
time::Instant,
};
use tokio_executor;
use tokio_test::clock;

#[test]
fn empty() {
Expand All @@ -229,37 +224,33 @@ mod tests {

#[test]
fn leaky() {
let time = MockNow(Arc::new(Mutex::new(Instant::now())));
let clock = clock::Clock::new_with_now(time.clone());
clock::with_default(&clock, &mut enter().unwrap(), |_| {
clock::mock(|time| {
let bgt = Budget::new(Duration::from_secs(1), 0, 1.0);
bgt.deposit();

*time.as_mut() += Duration::from_secs(3);
time.advance(Duration::from_secs(3));

bgt.withdraw().unwrap_err();
});
}

#[test]
fn slots() {
let time = MockNow(Arc::new(Mutex::new(Instant::now())));
let clock = clock::Clock::new_with_now(time.clone());
clock::with_default(&clock, &mut enter().unwrap(), |_| {
clock::mock(|time| {
let bgt = Budget::new(Duration::from_secs(1), 0, 0.5);
bgt.deposit();
bgt.deposit();
*time.as_mut() += Duration::from_millis(900);
time.advance(Duration::from_millis(900));
// 900ms later, the deposit should still be valid
bgt.withdraw().unwrap();

// blank slate
*time.as_mut() += Duration::from_millis(2000);
time.advance(Duration::from_millis(2000));

bgt.deposit();
*time.as_mut() += Duration::from_millis(300);
time.advance(Duration::from_millis(300));
bgt.deposit();
*time.as_mut() += Duration::from_millis(800);
time.advance(Duration::from_millis(800));
bgt.deposit();

// the first deposit is expired, but the 2nd should still be valid,
Expand All @@ -270,9 +261,7 @@ mod tests {

#[test]
fn reserve() {
let time = MockNow(Arc::new(Mutex::new(Instant::now())));
let clock = clock::Clock::new_with_now(time.clone());
clock::with_default(&clock, &mut enter().unwrap(), |_| {
clock::mock(|_| {
let bgt = Budget::new(Duration::from_secs(1), 5, 1.0);
bgt.withdraw().unwrap();
bgt.withdraw().unwrap();
Expand All @@ -283,19 +272,4 @@ mod tests {
bgt.withdraw().unwrap_err();
});
}

#[derive(Clone)]
struct MockNow(Arc<Mutex<Instant>>);

impl MockNow {
fn as_mut(&self) -> MutexGuard<Instant> {
self.0.lock().unwrap()
}
}

impl clock::Now for MockNow {
fn now(&self) -> Instant {
*self.0.lock().expect("now")
}
}
}
76 changes: 39 additions & 37 deletions tower-retry/src/future.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
//! Future types

use crate::{Policy, Retry};
use futures::{try_ready, Async, Future, Poll};
use futures_core::ready;
use pin_project::{pin_project, project};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower_service::Service;

/// The `Future` returned by a `Retry` service.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<P, S, Request>
where
Expand All @@ -13,15 +18,17 @@ where
{
request: Option<Request>,
retry: Retry<P, S>,
#[pin]
state: State<S::Future, P::Future, S::Response, S::Error>,
}

#[pin_project]
#[derive(Debug)]
enum State<F, P, R, E> {
/// Polling the future from `Service::call`
Called(F),
Called(#[pin] F),
/// Polling the future from `Policy::retry`
Checking(P, Option<Result<R, E>>),
Checking(#[pin] P, Option<Result<R, E>>),
/// Polling `Service::poll_ready` after `Checking` was OK.
Retrying,
}
Expand All @@ -46,59 +53,54 @@ where

impl<P, S, Request> Future for ResponseFuture<P, S, Request>
where
P: Policy<Request, S::Response, S::Error> + Clone,
P: Policy<Request, S::Response, S::Error> + Clone + Unpin,
S: Service<Request> + Clone,
{
type Item = S::Response;
type Error = S::Error;
type Output = Result<S::Response, S::Error>;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let next = match self.state {
State::Called(ref mut future) => {
let result = match future.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(res)) => Ok(res),
Err(err) => Err(err),
};
#[project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

if let Some(ref req) = self.request {
match self.retry.policy.retry(req, result.as_ref()) {
Some(checking) => State::Checking(checking, Some(result)),
None => return result.map(Async::Ready),
loop {
#[project]
match this.state.project() {
State::Called(future) => {
let result = ready!(future.poll(cx));
if let Some(ref req) = this.request {
match this.retry.policy.retry(req, result.as_ref()) {
Some(checking) => {
this.state.set(State::Checking(checking, Some(result)));
}
None => return Poll::Ready(result),
}
} else {
// request wasn't cloned, so no way to retry it
return result.map(Async::Ready);
return Poll::Ready(result);
}
}
State::Checking(ref mut future, ref mut result) => {
let policy = match future.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(policy)) => policy,
Err(()) => {
State::Checking(future, result) => {
let policy = match ready!(future.poll(cx)) {
Some(policy) => policy,
None => {
// if Policy::retry() fails, return the original
// result...
return result
.take()
.expect("polled after complete")
.map(Async::Ready);
return Poll::Ready(result.take().expect("polled after complete"));
}
};
self.retry.policy = policy;
State::Retrying
this.retry.policy = policy;
this.state.set(State::Retrying);
}
State::Retrying => {
try_ready!(self.retry.poll_ready());
let req = self
ready!(this.retry.poll_ready(cx))?;
let req = this
.request
.take()
.expect("retrying requires cloned request");
self.request = self.retry.policy.clone_request(&req);
State::Called(self.retry.service.call(req))
*this.request = this.retry.policy.clone_request(&req);
this.state.set(State::Called(this.retry.service.call(req)));
}
};
self.state = next;
}
}
}
}
10 changes: 5 additions & 5 deletions tower-retry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![doc(html_root_url = "https://docs.rs/tower-retry/0.1.0")]
#![doc(html_root_url = "https://docs.rs/tower-retry/0.3.0-alpha.1")]
#![deny(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![cfg_attr(test, deny(warnings))]
#![allow(elided_lifetimes_in_paths)]
Expand All @@ -14,7 +14,7 @@ pub use crate::layer::RetryLayer;
pub use crate::policy::Policy;

use crate::future::ResponseFuture;
use futures::Poll;
use std::task::{Context, Poll};
use tower_service::Service;

/// Configure retrying requests of "failed" responses.
Expand All @@ -37,15 +37,15 @@ impl<P, S> Retry<P, S> {

impl<P, S, Request> Service<Request> for Retry<P, S>
where
P: Policy<Request, S::Response, S::Error> + Clone,
P: Policy<Request, S::Response, S::Error> + Clone + Unpin,
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
S: Service<Request> + Clone,
{
type Response = S::Response;
type Error = S::Error;
type Future = ResponseFuture<P, S, Request>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready()
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}

fn call(&mut self, request: Request) -> Self::Future {
Expand Down
12 changes: 5 additions & 7 deletions tower-retry/src/policy.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
use futures::Future;
use std::future::Future;

/// A "retry policy" to classify if a request should be retried.
///
/// # Example
///
/// ```
/// extern crate futures;
/// extern crate tower_retry;
///
/// use tower_retry::Policy;
/// use futures_util::future;
///
/// type Req = String;
/// type Res = String;
///
/// struct Attempts(usize);
///
/// impl<E> Policy<Req, Res, E> for Attempts {
/// type Future = futures::future::FutureResult<Self, ()>;
/// type Future = future::Ready<Option<Self>>;
///
/// fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option<Self::Future> {
/// match result {
Expand All @@ -30,7 +28,7 @@ use futures::Future;
/// // But we limit the number of attempts...
/// if self.0 > 0 {
/// // Try again!
/// Some(futures::future::ok(Attempts(self.0 - 1)))
/// Some(future::ready(Some(Attempts(self.0 - 1))))
/// } else {
/// // Used all our attempts, no retry...
/// None
Expand All @@ -46,7 +44,7 @@ use futures::Future;
/// ```
pub trait Policy<Req, Res, E>: Sized {
/// The `Future` type returned by `Policy::retry()`.
type Future: Future<Item = Self, Error = ()>;
type Future: Future<Output = Option<Self>>;
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
/// Check the policy if a certain request should be retried.
///
/// This method is passed a reference to the original request, and either
Expand Down
Loading