From 0c30a564ff6db20a65fdd001cb21fabce19b8736 Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Mon, 9 Sep 2019 12:38:30 -0400 Subject: [PATCH 1/3] This bumps tower-retry to 0.3.0-alpha.1 --- Cargo.toml | 2 +- tower-retry/CHANGELOG.md | 4 ++ tower-retry/Cargo.toml | 18 +++--- tower-retry/src/budget.rs | 44 +++---------- tower-retry/src/future.rs | 76 +++++++++++----------- tower-retry/src/lib.rs | 10 +-- tower-retry/src/policy.rs | 12 ++-- tower-retry/tests/retry.rs | 126 ++++++++++++++++++++----------------- 8 files changed, 142 insertions(+), 150 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 91e9f2cec..09e21a040 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ "tower-load", # "tower-load-shed", # "tower-reconnect", - # "tower-retry", + "tower-retry", "tower-service", # "tower-spawn-ready", "tower-test", diff --git a/tower-retry/CHANGELOG.md b/tower-retry/CHANGELOG.md index 24a6a34e0..0730e7b17 100644 --- a/tower-retry/CHANGELOG.md +++ b/tower-retry/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.3.0-alpha.1 + +- Move to `std::future` + # 0.1.0 (April 26, 2019) - Initial release diff --git a/tower-retry/Cargo.toml b/tower-retry/Cargo.toml index fb602abed..cac9a2dda 100644 --- a/tower-retry/Cargo.toml +++ b/tower-retry/Cargo.toml @@ -8,13 +8,13 @@ name = "tower-retry" # - README.md # - Update CHANGELOG.md. # - Create "v0.1.x" git tag. -version = "0.1.0" +version = "0.3.0-alpha.1" authors = ["Tower Maintainers "] license = "MIT" readme = "README.md" repository = "https://github.com/tower-rs/tower" homepage = "https://github.com/tower-rs/tower" -documentation = "https://docs.rs/tower-retry/0.1.0" +documentation = "https://docs.rs/tower-retry/0.3.0-alpha.1" description = """ Retry failed requests. """ @@ -22,11 +22,13 @@ categories = ["asynchronous", "network-programming"] edition = "2018" [dependencies] -futures = "0.1.26" -tower-service = "0.2.0" -tower-layer = "0.1.0" -tokio-timer = "0.2.4" +tower-service = "0.3.0-alpha.1" +tower-layer = { version = "0.3.0-alpha.1", path = "../tower-layer" } +tokio-timer = "0.3.0-alpha.4" +pin-project = { version = "0.4.0-alpha.10", features = ["project_attr"] } +futures-core-preview = "0.3.0-alpha.18" [dev-dependencies] -tower-test = { version = "0.1.0", path = "../tower-test" } -tokio-executor = "0.1.2" +tower-test = { version = "0.3.0-alpha.1", path = "../tower-test" } +tokio-test = "0.2.0-alpha.4" +futures-util-preview = "0.3.0-alpha.18" diff --git a/tower-retry/src/budget.rs b/tower-retry/src/budget.rs index c1497aa94..fca04ad18 100644 --- a/tower-retry/src/budget.rs +++ b/tower-retry/src/budget.rs @@ -213,13 +213,8 @@ impl Bucket { #[cfg(test)] mod tests { - use self::tokio_executor::enter; use super::*; - use std::{ - sync::{Arc, Mutex, MutexGuard}, - time::Instant, - }; - use tokio_executor; + use tokio_test::clock; #[test] fn empty() { @@ -229,13 +224,11 @@ mod tests { #[test] fn leaky() { - let time = MockNow(Arc::new(Mutex::new(Instant::now()))); - let clock = clock::Clock::new_with_now(time.clone()); - clock::with_default(&clock, &mut enter().unwrap(), |_| { + clock::mock(|time| { let bgt = Budget::new(Duration::from_secs(1), 0, 1.0); bgt.deposit(); - *time.as_mut() += Duration::from_secs(3); + time.advance(Duration::from_secs(3)); bgt.withdraw().unwrap_err(); }); @@ -243,23 +236,21 @@ mod tests { #[test] fn slots() { - let time = MockNow(Arc::new(Mutex::new(Instant::now()))); - let clock = clock::Clock::new_with_now(time.clone()); - clock::with_default(&clock, &mut enter().unwrap(), |_| { + clock::mock(|time| { let bgt = Budget::new(Duration::from_secs(1), 0, 0.5); bgt.deposit(); bgt.deposit(); - *time.as_mut() += Duration::from_millis(900); + time.advance(Duration::from_millis(900)); // 900ms later, the deposit should still be valid bgt.withdraw().unwrap(); // blank slate - *time.as_mut() += Duration::from_millis(2000); + time.advance(Duration::from_millis(2000)); bgt.deposit(); - *time.as_mut() += Duration::from_millis(300); + time.advance(Duration::from_millis(300)); bgt.deposit(); - *time.as_mut() += Duration::from_millis(800); + time.advance(Duration::from_millis(800)); bgt.deposit(); // the first deposit is expired, but the 2nd should still be valid, @@ -270,9 +261,7 @@ mod tests { #[test] fn reserve() { - let time = MockNow(Arc::new(Mutex::new(Instant::now()))); - let clock = clock::Clock::new_with_now(time.clone()); - clock::with_default(&clock, &mut enter().unwrap(), |_| { + clock::mock(|_| { let bgt = Budget::new(Duration::from_secs(1), 5, 1.0); bgt.withdraw().unwrap(); bgt.withdraw().unwrap(); @@ -283,19 +272,4 @@ mod tests { bgt.withdraw().unwrap_err(); }); } - - #[derive(Clone)] - struct MockNow(Arc>); - - impl MockNow { - fn as_mut(&self) -> MutexGuard { - self.0.lock().unwrap() - } - } - - impl clock::Now for MockNow { - fn now(&self) -> Instant { - *self.0.lock().expect("now") - } - } } diff --git a/tower-retry/src/future.rs b/tower-retry/src/future.rs index 5ffa46143..85ea2d309 100644 --- a/tower-retry/src/future.rs +++ b/tower-retry/src/future.rs @@ -1,10 +1,15 @@ //! Future types use crate::{Policy, Retry}; -use futures::{try_ready, Async, Future, Poll}; +use futures_core::ready; +use pin_project::{pin_project, project}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; use tower_service::Service; /// The `Future` returned by a `Retry` service. +#[pin_project] #[derive(Debug)] pub struct ResponseFuture where @@ -13,15 +18,17 @@ where { request: Option, retry: Retry, + #[pin] state: State, } +#[pin_project] #[derive(Debug)] enum State { /// Polling the future from `Service::call` - Called(F), + Called(#[pin] F), /// Polling the future from `Policy::retry` - Checking(P, Option>), + Checking(#[pin] P, Option>), /// Polling `Service::poll_ready` after `Checking` was OK. Retrying, } @@ -46,59 +53,54 @@ where impl Future for ResponseFuture where - P: Policy + Clone, + P: Policy + Clone + Unpin, S: Service + Clone, { - type Item = S::Response; - type Error = S::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - loop { - let next = match self.state { - State::Called(ref mut future) => { - let result = match future.poll() { - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(res)) => Ok(res), - Err(err) => Err(err), - }; + #[project] + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); - if let Some(ref req) = self.request { - match self.retry.policy.retry(req, result.as_ref()) { - Some(checking) => State::Checking(checking, Some(result)), - None => return result.map(Async::Ready), + loop { + #[project] + match this.state.project() { + State::Called(future) => { + let result = ready!(future.poll(cx)); + if let Some(ref req) = this.request { + match this.retry.policy.retry(req, result.as_ref()) { + Some(checking) => { + this.state.set(State::Checking(checking, Some(result))); + } + None => return Poll::Ready(result), } } else { // request wasn't cloned, so no way to retry it - return result.map(Async::Ready); + return Poll::Ready(result); } } - State::Checking(ref mut future, ref mut result) => { - let policy = match future.poll() { - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(policy)) => policy, - Err(()) => { + State::Checking(future, result) => { + let policy = match ready!(future.poll(cx)) { + Some(policy) => policy, + None => { // if Policy::retry() fails, return the original // result... - return result - .take() - .expect("polled after complete") - .map(Async::Ready); + return Poll::Ready(result.take().expect("polled after complete")); } }; - self.retry.policy = policy; - State::Retrying + this.retry.policy = policy; + this.state.set(State::Retrying); } State::Retrying => { - try_ready!(self.retry.poll_ready()); - let req = self + ready!(this.retry.poll_ready(cx))?; + let req = this .request .take() .expect("retrying requires cloned request"); - self.request = self.retry.policy.clone_request(&req); - State::Called(self.retry.service.call(req)) + *this.request = this.retry.policy.clone_request(&req); + this.state.set(State::Called(this.retry.service.call(req))); } - }; - self.state = next; + } } } } diff --git a/tower-retry/src/lib.rs b/tower-retry/src/lib.rs index 6a326d620..c6ab63f66 100644 --- a/tower-retry/src/lib.rs +++ b/tower-retry/src/lib.rs @@ -1,4 +1,4 @@ -#![doc(html_root_url = "https://docs.rs/tower-retry/0.1.0")] +#![doc(html_root_url = "https://docs.rs/tower-retry/0.3.0-alpha.1")] #![deny(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![cfg_attr(test, deny(warnings))] #![allow(elided_lifetimes_in_paths)] @@ -14,7 +14,7 @@ pub use crate::layer::RetryLayer; pub use crate::policy::Policy; use crate::future::ResponseFuture; -use futures::Poll; +use std::task::{Context, Poll}; use tower_service::Service; /// Configure retrying requests of "failed" responses. @@ -37,15 +37,15 @@ impl Retry { impl Service for Retry where - P: Policy + Clone, + P: Policy + Clone + Unpin, S: Service + Clone, { type Response = S::Response; type Error = S::Error; type Future = ResponseFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.service.poll_ready() + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx) } fn call(&mut self, request: Request) -> Self::Future { diff --git a/tower-retry/src/policy.rs b/tower-retry/src/policy.rs index 6337fc74d..25bbee0b4 100644 --- a/tower-retry/src/policy.rs +++ b/tower-retry/src/policy.rs @@ -1,14 +1,12 @@ -use futures::Future; +use std::future::Future; /// A "retry policy" to classify if a request should be retried. /// /// # Example /// /// ``` -/// extern crate futures; -/// extern crate tower_retry; -/// /// use tower_retry::Policy; +/// use futures_util::future; /// /// type Req = String; /// type Res = String; @@ -16,7 +14,7 @@ use futures::Future; /// struct Attempts(usize); /// /// impl Policy for Attempts { -/// type Future = futures::future::FutureResult; +/// type Future = future::Ready>; /// /// fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option { /// match result { @@ -30,7 +28,7 @@ use futures::Future; /// // But we limit the number of attempts... /// if self.0 > 0 { /// // Try again! -/// Some(futures::future::ok(Attempts(self.0 - 1))) +/// Some(future::ready(Some(Attempts(self.0 - 1)))) /// } else { /// // Used all our attempts, no retry... /// None @@ -46,7 +44,7 @@ use futures::Future; /// ``` pub trait Policy: Sized { /// The `Future` type returned by `Policy::retry()`. - type Future: Future; + type Future: Future>; /// Check the policy if a certain request should be retried. /// /// This method is passed a reference to the original request, and either diff --git a/tower-retry/tests/retry.rs b/tower-retry/tests/retry.rs index 92120e5d7..80e0a9d01 100644 --- a/tower-retry/tests/retry.rs +++ b/tower-retry/tests/retry.rs @@ -1,77 +1,101 @@ -use futures::{future, Future}; +use futures_util::{future, pin_mut}; +use std::future::Future; +use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok, task}; use tower_retry::Policy; use tower_service::Service; use tower_test::{assert_request_eq, mock}; #[test] fn retry_errors() { - let (mut service, mut handle) = new_service(RetryErrors); + task::mock(|cx| { + let (mut service, handle) = new_service(RetryErrors); + pin_mut!(handle); - assert!(service.poll_ready().unwrap().is_ready()); - let mut fut = service.call("hello"); + assert_ready_ok!(service.poll_ready(cx)); - assert_request_eq!(handle, "hello").send_error("retry me"); + let fut = service.call("hello"); + pin_mut!(fut); - assert_not_ready(&mut fut); + assert_request_eq!(handle.as_mut(), "hello").send_error("retry me"); - assert_request_eq!(handle, "hello").send_response("world"); + assert_pending!(fut.as_mut().poll(cx)); - assert_eq!(fut.wait().unwrap(), "world"); + assert_request_eq!(handle.as_mut(), "hello").send_response("world"); + + assert_ready_ok!(fut.poll(cx), "world"); + }); } #[test] fn retry_limit() { - let (mut service, mut handle) = new_service(Limit(2)); + task::mock(|cx| { + let (mut service, handle) = new_service(Limit(2)); + pin_mut!(handle); + + assert_ready_ok!(service.poll_ready(cx)); - assert!(service.poll_ready().unwrap().is_ready()); - let mut fut = service.call("hello"); + let fut = service.call("hello"); + pin_mut!(fut); - assert_request_eq!(handle, "hello").send_error("retry 1"); - assert_not_ready(&mut fut); + assert_request_eq!(handle.as_mut(), "hello").send_error("retry 1"); + assert_pending!(fut.as_mut().poll(cx)); - assert_request_eq!(handle, "hello").send_error("retry 2"); - assert_not_ready(&mut fut); + assert_request_eq!(handle.as_mut(), "hello").send_error("retry 2"); + assert_pending!(fut.as_mut().poll(cx)); - assert_request_eq!(handle, "hello").send_error("retry 3"); - assert_eq!(fut.wait().unwrap_err().to_string(), "retry 3"); + assert_request_eq!(handle.as_mut(), "hello").send_error("retry 3"); + assert_eq!(assert_ready_err!(fut.poll(cx)).to_string(), "retry 3"); + }); } #[test] fn retry_error_inspection() { - let (mut service, mut handle) = new_service(UnlessErr("reject")); + task::mock(|cx| { + let (mut service, handle) = new_service(UnlessErr("reject")); + pin_mut!(handle); - assert!(service.poll_ready().unwrap().is_ready()); - let mut fut = service.call("hello"); + assert_ready_ok!(service.poll_ready(cx)); + let fut = service.call("hello"); + pin_mut!(fut); - assert_request_eq!(handle, "hello").send_error("retry 1"); - assert_not_ready(&mut fut); + assert_request_eq!(handle.as_mut(), "hello").send_error("retry 1"); + assert_pending!(fut.as_mut().poll(cx)); - assert_request_eq!(handle, "hello").send_error("reject"); - assert_eq!(fut.wait().unwrap_err().to_string(), "reject"); + assert_request_eq!(handle.as_mut(), "hello").send_error("reject"); + assert_eq!(assert_ready_err!(fut.poll(cx)).to_string(), "reject"); + }); } #[test] fn retry_cannot_clone_request() { - let (mut service, mut handle) = new_service(CannotClone); + task::mock(|cx| { + let (mut service, handle) = new_service(CannotClone); + pin_mut!(handle); - assert!(service.poll_ready().unwrap().is_ready()); - let fut = service.call("hello"); + assert_ready_ok!(service.poll_ready(cx)); + let fut = service.call("hello"); + pin_mut!(fut); - assert_request_eq!(handle, "hello").send_error("retry 1"); - assert_eq!(fut.wait().unwrap_err().to_string(), "retry 1"); + assert_request_eq!(handle, "hello").send_error("retry 1"); + assert_eq!(assert_ready_err!(fut.poll(cx)).to_string(), "retry 1"); + }); } #[test] fn success_with_cannot_clone() { - // Even though the request couldn't be cloned, if the first request succeeds, - // it should succeed overall. - let (mut service, mut handle) = new_service(CannotClone); - - assert!(service.poll_ready().unwrap().is_ready()); - let fut = service.call("hello"); - - assert_request_eq!(handle, "hello").send_response("world"); - assert_eq!(fut.wait().unwrap(), "world"); + task::mock(|cx| { + // Even though the request couldn't be cloned, if the first request succeeds, + // it should succeed overall. + let (mut service, handle) = new_service(CannotClone); + pin_mut!(handle); + + assert_ready_ok!(service.poll_ready(cx)); + let fut = service.call("hello"); + pin_mut!(fut); + + assert_request_eq!(handle, "hello").send_response("world"); + assert_ready_ok!(fut.poll(cx), "world"); + }); } type Req = &'static str; @@ -85,10 +109,10 @@ type Handle = mock::Handle; struct RetryErrors; impl Policy for RetryErrors { - type Future = future::FutureResult; + type Future = future::Ready>; fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option { if result.is_err() { - Some(future::ok(RetryErrors)) + Some(future::ready(Some(RetryErrors))) } else { None } @@ -103,10 +127,10 @@ impl Policy for RetryErrors { struct Limit(usize); impl Policy for Limit { - type Future = future::FutureResult; + type Future = future::Ready>; fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option { if result.is_err() && self.0 > 0 { - Some(future::ok(Limit(self.0 - 1))) + Some(future::ready(Some(Limit(self.0 - 1)))) } else { None } @@ -121,11 +145,11 @@ impl Policy for Limit { struct UnlessErr(InnerError); impl Policy for UnlessErr { - type Future = future::FutureResult; + type Future = future::Ready>; fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option { result.err().and_then(|err| { if err.to_string() != self.0 { - Some(future::ok(self.clone())) + Some(future::ready(Some(self.clone()))) } else { None } @@ -141,7 +165,7 @@ impl Policy for UnlessErr { struct CannotClone; impl Policy for CannotClone { - type Future = future::FutureResult; + type Future = future::Ready>; fn retry(&self, _: &Req, _: Result<&Res, &Error>) -> Option { unreachable!("retry cannot be called since request isn't cloned"); } @@ -158,15 +182,3 @@ fn new_service + Clone>( let service = tower_retry::Retry::new(policy, service); (service, handle) } - -fn assert_not_ready(f: &mut F) -where - F::Error: ::std::fmt::Debug, -{ - future::poll_fn(|| { - assert!(f.poll().unwrap().is_not_ready()); - Ok::<_, ()>(().into()) - }) - .wait() - .unwrap(); -} From 438826bca70030ae966fa9ce6822b2c20aecedf4 Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Mon, 9 Sep 2019 14:49:33 -0400 Subject: [PATCH 2/3] Remove extraneous Option --- tower-retry/src/future.rs | 20 ++++++-------------- tower-retry/src/policy.rs | 9 +++------ tower-retry/tests/retry.rs | 14 +++++++------- 3 files changed, 16 insertions(+), 27 deletions(-) diff --git a/tower-retry/src/future.rs b/tower-retry/src/future.rs index 85ea2d309..80f716116 100644 --- a/tower-retry/src/future.rs +++ b/tower-retry/src/future.rs @@ -19,16 +19,16 @@ where request: Option, retry: Retry, #[pin] - state: State, + state: State, } #[pin_project] #[derive(Debug)] -enum State { +enum State { /// Polling the future from `Service::call` Called(#[pin] F), /// Polling the future from `Policy::retry` - Checking(#[pin] P, Option>), + Checking(#[pin] P), /// Polling `Service::poll_ready` after `Checking` was OK. Retrying, } @@ -70,7 +70,7 @@ where if let Some(ref req) = this.request { match this.retry.policy.retry(req, result.as_ref()) { Some(checking) => { - this.state.set(State::Checking(checking, Some(result))); + this.state.set(State::Checking(checking)); } None => return Poll::Ready(result), } @@ -79,16 +79,8 @@ where return Poll::Ready(result); } } - State::Checking(future, result) => { - let policy = match ready!(future.poll(cx)) { - Some(policy) => policy, - None => { - // if Policy::retry() fails, return the original - // result... - return Poll::Ready(result.take().expect("polled after complete")); - } - }; - this.retry.policy = policy; + State::Checking(future) => { + this.retry.policy = ready!(future.poll(cx)); this.state.set(State::Retrying); } State::Retrying => { diff --git a/tower-retry/src/policy.rs b/tower-retry/src/policy.rs index 25bbee0b4..6f4c16dda 100644 --- a/tower-retry/src/policy.rs +++ b/tower-retry/src/policy.rs @@ -14,7 +14,7 @@ use std::future::Future; /// struct Attempts(usize); /// /// impl Policy for Attempts { -/// type Future = future::Ready>; +/// type Future = future::Ready; /// /// fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option { /// match result { @@ -28,7 +28,7 @@ use std::future::Future; /// // But we limit the number of attempts... /// if self.0 > 0 { /// // Try again! -/// Some(future::ready(Some(Attempts(self.0 - 1)))) +/// Some(future::ready(Attempts(self.0 - 1))) /// } else { /// // Used all our attempts, no retry... /// None @@ -44,7 +44,7 @@ use std::future::Future; /// ``` pub trait Policy: Sized { /// The `Future` type returned by `Policy::retry()`. - type Future: Future>; + type Future: Future; /// Check the policy if a certain request should be retried. /// /// This method is passed a reference to the original request, and either @@ -54,9 +54,6 @@ pub trait Policy: Sized { /// /// If the request *should* be retried, return `Some` future of a new /// policy that would apply for the next request attempt. - /// - /// If the returned `Future` errors, the request will **not** be retried - /// after all. fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option; /// Tries to clone a request before being passed to the inner service. /// diff --git a/tower-retry/tests/retry.rs b/tower-retry/tests/retry.rs index 80e0a9d01..f6eb9457f 100644 --- a/tower-retry/tests/retry.rs +++ b/tower-retry/tests/retry.rs @@ -109,10 +109,10 @@ type Handle = mock::Handle; struct RetryErrors; impl Policy for RetryErrors { - type Future = future::Ready>; + type Future = future::Ready; fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option { if result.is_err() { - Some(future::ready(Some(RetryErrors))) + Some(future::ready(RetryErrors)) } else { None } @@ -127,10 +127,10 @@ impl Policy for RetryErrors { struct Limit(usize); impl Policy for Limit { - type Future = future::Ready>; + type Future = future::Ready; fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option { if result.is_err() && self.0 > 0 { - Some(future::ready(Some(Limit(self.0 - 1)))) + Some(future::ready(Limit(self.0 - 1))) } else { None } @@ -145,11 +145,11 @@ impl Policy for Limit { struct UnlessErr(InnerError); impl Policy for UnlessErr { - type Future = future::Ready>; + type Future = future::Ready; fn retry(&self, _: &Req, result: Result<&Res, &Error>) -> Option { result.err().and_then(|err| { if err.to_string() != self.0 { - Some(future::ready(Some(self.clone()))) + Some(future::ready(self.clone())) } else { None } @@ -165,7 +165,7 @@ impl Policy for UnlessErr { struct CannotClone; impl Policy for CannotClone { - type Future = future::Ready>; + type Future = future::Ready; fn retry(&self, _: &Req, _: Result<&Res, &Error>) -> Option { unreachable!("retry cannot be called since request isn't cloned"); } From 86f449bfb8783d392a90b046c28d095e0e9446ee Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Mon, 9 Sep 2019 14:58:56 -0400 Subject: [PATCH 3/3] Remove extraneous Unpin bound --- tower-retry/src/future.rs | 25 +++++++++++++++++++++---- tower-retry/src/lib.rs | 8 +++++++- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/tower-retry/src/future.rs b/tower-retry/src/future.rs index 80f716116..292542cf5 100644 --- a/tower-retry/src/future.rs +++ b/tower-retry/src/future.rs @@ -8,6 +8,10 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tower_service::Service; +// NOTE: this is the trait generated for Ready::project() by pin-project. +// We need it here to be able to go "through" Ready to &mut Service without adding Unpin bounds. +use crate::__RetryProjectionTrait; + /// The `Future` returned by a `Retry` service. #[pin_project] #[derive(Debug)] @@ -17,6 +21,7 @@ where S: Service, { request: Option, + #[pin] retry: Retry, #[pin] state: State, @@ -53,7 +58,7 @@ where impl Future for ResponseFuture where - P: Policy + Clone + Unpin, + P: Policy + Clone, S: Service + Clone, { type Output = Result; @@ -80,17 +85,29 @@ where } } State::Checking(future) => { - this.retry.policy = ready!(future.poll(cx)); + this.retry.project().policy.set(ready!(future.poll(cx))); this.state.set(State::Retrying); } State::Retrying => { - ready!(this.retry.poll_ready(cx))?; + // NOTE: we assume here that + // + // this.retry.poll_ready() + // + // is equivalent to + // + // this.retry.service.poll_ready() + // + // we need to make that assumption to avoid adding an Unpin bound to the Policy + // in Ready to make it Unpin so that we can get &mut Ready as needed to call + // poll_ready on it. + ready!(this.retry.project().service.poll_ready(cx))?; let req = this .request .take() .expect("retrying requires cloned request"); *this.request = this.retry.policy.clone_request(&req); - this.state.set(State::Called(this.retry.service.call(req))); + this.state + .set(State::Called(this.retry.project().service.call(req))); } } } diff --git a/tower-retry/src/lib.rs b/tower-retry/src/lib.rs index c6ab63f66..0e66bea04 100644 --- a/tower-retry/src/lib.rs +++ b/tower-retry/src/lib.rs @@ -14,14 +14,17 @@ pub use crate::layer::RetryLayer; pub use crate::policy::Policy; use crate::future::ResponseFuture; +use pin_project::pin_project; use std::task::{Context, Poll}; use tower_service::Service; /// Configure retrying requests of "failed" responses. /// /// A `Policy` classifies what is a "failed" response. +#[pin_project] #[derive(Clone, Debug)] pub struct Retry { + #[pin] policy: P, service: S, } @@ -37,7 +40,7 @@ impl Retry { impl Service for Retry where - P: Policy + Clone + Unpin, + P: Policy + Clone, S: Service + Clone, { type Response = S::Response; @@ -45,6 +48,9 @@ where type Future = ResponseFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // NOTE: the Future::poll impl for ResponseFuture assumes that Retry::poll_ready is + // equivalent to Ready.service.poll_ready. If this ever changes, that code must be updated + // as well. self.service.poll_ready(cx) }