From 3e73ffc26bd69d20e41bdabfe1402bbe1837bfb8 Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Fri, 3 Apr 2020 10:18:23 -0400 Subject: [PATCH 1/3] Test out Service::disarm for all tower services --- tower-layer/src/lib.rs | 4 ++ tower-service/src/lib.rs | 83 ++++++++++++++++++++++++++ tower-test/src/mock/mod.rs | 4 ++ tower/Cargo.toml | 2 +- tower/src/balance/p2c/make.rs | 4 ++ tower/src/balance/p2c/service.rs | 8 +++ tower/src/balance/pool/mod.rs | 8 +++ tower/src/buffer/service.rs | 7 +++ tower/src/filter/mod.rs | 4 ++ tower/src/hedge/delay.rs | 4 ++ tower/src/hedge/latency.rs | 4 ++ tower/src/hedge/mod.rs | 4 ++ tower/src/hedge/select.rs | 6 ++ tower/src/limit/concurrency/service.rs | 10 ++++ tower/src/limit/rate/service.rs | 8 +++ tower/src/load/constant.rs | 4 ++ tower/src/load/peak_ewma.rs | 6 ++ tower/src/load/pending_requests.rs | 6 ++ tower/src/load_shed/mod.rs | 9 +++ tower/src/ready_cache/cache.rs | 29 +++++++++ tower/src/reconnect/mod.rs | 9 +++ tower/src/retry/mod.rs | 4 ++ tower/src/spawn_ready/make.rs | 4 ++ tower/src/spawn_ready/service.rs | 8 +++ tower/src/steer/mod.rs | 30 ++++++++++ tower/src/timeout/mod.rs | 4 ++ tower/src/util/boxed/sync.rs | 8 +++ tower/src/util/boxed/unsync.rs | 8 +++ tower/src/util/call_all/ordered.rs | 2 + tower/src/util/either.rs | 7 +++ tower/src/util/optional/mod.rs | 8 +++ tower/src/util/service_fn.rs | 2 + tower/tests/balance/main.rs | 3 + tower/tests/steer/main.rs | 4 ++ tower/tests/util/call_all.rs | 14 ++++- 35 files changed, 327 insertions(+), 2 deletions(-) diff --git a/tower-layer/src/lib.rs b/tower-layer/src/lib.rs index f4f739453..c6b68a99f 100644 --- a/tower-layer/src/lib.rs +++ b/tower-layer/src/lib.rs @@ -75,6 +75,10 @@ pub use self::{identity::Identity, stack::Stack}; /// println!("request = {:?}, target = {:?}", request, self.target); /// self.service.call(request) /// } +/// +/// fn disarm(&mut self) { +/// self.service.disarm() +/// } /// } /// ``` /// diff --git a/tower-service/src/lib.rs b/tower-service/src/lib.rs index ba90c0dca..259f15a20 100644 --- a/tower-service/src/lib.rs +++ b/tower-service/src/lib.rs @@ -79,6 +79,8 @@ use std::task::{Context, Poll}; /// // Return the response as an immediate future /// Box::pin(fut) /// } +/// +/// fn disarm(&mut self) {} /// } /// ``` /// @@ -234,6 +236,79 @@ pub trait Service { /// Implementations are permitted to panic if `call` is invoked without /// obtaining `Poll::Ready(Ok(()))` from `poll_ready`. fn call(&mut self, req: Request) -> Self::Future; + + /// Undo a successful call to `poll_ready`. + /// + /// Once a call to `poll_ready` returns `Poll::Ready(Ok(()))`, the service must keep capacity + /// set aside for the coming request. `disarm` allows you to give up that reserved capacity if + /// you decide you do not wish to issue a request after all. After calling `disarm`, you must + /// call `poll_ready` until it returns `Poll::Ready(Ok(()))` before attempting to issue another + /// request. + /// + /// Returns `false` if capacity has not been reserved for this service (usually because + /// `poll_ready` was not previously called, or did not succeed). + /// + /// # Motivation + /// + /// If `poll_ready` reserves part of a service's finite capacity, callers need to send an item + /// shortly after `poll_ready` succeeds. If they do not, an idle caller may take up all the + /// slots of the channel, and prevent active callers from getting any requests through. + /// Consider this code that forwards from a channel to a `BufferService` (which has limited + /// capacity): + /// + /// ```rust,ignore + /// let mut fut = None; + /// loop { + /// if let Some(ref mut fut) = fut { + /// let _ = ready!(fut.poll(cx)); + /// let _ = fut.take(); + /// } + /// ready!(buffer.poll_ready(cx)); + /// if let Some(item) = ready!(rx.poll_next(cx)) { + /// fut = Some(buffer.call(item)); + /// } else { + /// break; + /// } + /// } + /// ``` + /// + /// If many such forwarders exist, and they all forward into a single (cloned) `BufferService`, + /// then any number of the forwarders may be waiting for `rx.next` at the same time. While they + /// do, they are effectively each reducing the buffer's capacity by 1. If enough of these + /// forwarders are idle, forwarders whose `rx` _do_ have elements will be unable to find a spot + /// for them through `poll_ready`, and the system will deadlock. + /// + /// `disarm` solves this problem by allowing you to give up the reserved slot if you find that + /// you have to block. We can then fix the code above by writing: + /// + /// ```rust,ignore + /// let mut fut = None; + /// loop { + /// if let Some(ref mut fut) = fut { + /// let _ = ready!(fut.poll(cx)); + /// let _ = fut.take(); + /// } + /// ready!(buffer.poll_ready(cx)); + /// let item = rx.poll_next(cx); + /// if let Poll::Ready(Ok(_)) = item { + /// // we're going to send the item below, so don't disarm + /// } else { + /// // give up our slot, since we won't need it for a while + /// buffer.disarm(); + /// } + /// if let Some(item) = ready!(item) { + /// fut = Some(buffer.call(item)); + /// } else { + /// break; + /// } + /// } + /// ``` + /// + /// # Panics + /// + /// Implementations are permitted to panic if `disarm` is invoked without + /// obtaining `Poll::Ready(Ok(()))` from `poll_ready`. + fn disarm(&mut self); } impl<'a, S, Request> Service for &'a mut S @@ -251,6 +326,10 @@ where fn call(&mut self, request: Request) -> S::Future { (**self).call(request) } + + fn disarm(&mut self) { + (**self).disarm() + } } impl Service for Box @@ -268,4 +347,8 @@ where fn call(&mut self, request: Request) -> S::Future { (**self).call(request) } + + fn disarm(&mut self) { + (**self).disarm() + } } diff --git a/tower-test/src/mock/mod.rs b/tower-test/src/mock/mod.rs index 6720c994e..0f82e4ddd 100644 --- a/tower-test/src/mock/mod.rs +++ b/tower-test/src/mock/mod.rs @@ -185,6 +185,10 @@ impl Service for Mock { ResponseFuture::new(rx) } + + fn disarm(&mut self) { + self.can_send = false; + } } impl Clone for Mock { diff --git a/tower/Cargo.toml b/tower/Cargo.toml index 2c9c9e5f6..6482ad0fc 100644 --- a/tower/Cargo.toml +++ b/tower/Cargo.toml @@ -55,7 +55,7 @@ hdrhistogram = { version = "6.0", optional = true } indexmap = { version = "1.0.2", optional = true } rand = { version = "0.7", features = ["small_rng"], optional = true } slab = { version = "0.4", optional = true } -tokio = { version = "0.2", optional = true } +tokio = { version = "0.2.15", optional = true } [dev-dependencies] futures-util = { version = "0.3", default-features = false, features = ["alloc", "async-await"] } diff --git a/tower/src/balance/p2c/make.rs b/tower/src/balance/p2c/make.rs index f8e8b9395..a79b020b8 100644 --- a/tower/src/balance/p2c/make.rs +++ b/tower/src/balance/p2c/make.rs @@ -67,6 +67,10 @@ where _marker: PhantomData, } } + + fn disarm(&mut self) { + self.inner.disarm() + } } impl Future for MakeFuture diff --git a/tower/src/balance/p2c/service.rs b/tower/src/balance/p2c/service.rs index cc2058e61..4bb781df3 100644 --- a/tower/src/balance/p2c/service.rs +++ b/tower/src/balance/p2c/service.rs @@ -281,6 +281,14 @@ where .call_ready_index(index, request) .map_err(Into::into) } + + fn disarm(&mut self) { + assert_ne!( + self.services.disarm(), + 0, + "called disarm when poll_ready did not succeed" + ) + } } impl, Req> Future for UnreadyService { diff --git a/tower/src/balance/pool/mod.rs b/tower/src/balance/pool/mod.rs index b049d49bb..38bb7ce92 100644 --- a/tower/src/balance/pool/mod.rs +++ b/tower/src/balance/pool/mod.rs @@ -427,6 +427,10 @@ where fn call(&mut self, req: Req) -> Self::Future { self.balance.call(req) } + + fn disarm(&mut self) { + self.balance.disarm() + } } #[doc(hidden)] @@ -462,4 +466,8 @@ impl> Service for DropNotifyService fn call(&mut self, req: Request) -> Self::Future { self.svc.call(req) } + + fn disarm(&mut self) { + self.svc.disarm() + } } diff --git a/tower/src/buffer/service.rs b/tower/src/buffer/service.rs index 63227c4ac..89f78c1ec 100644 --- a/tower/src/buffer/service.rs +++ b/tower/src/buffer/service.rs @@ -121,6 +121,13 @@ where Ok(_) => ResponseFuture::new(rx), } } + + fn disarm(&mut self) { + assert!( + self.tx.disarm(), + "called disarm when poll_ready did not succeed" + ); + } } impl Clone for Buffer diff --git a/tower/src/filter/mod.rs b/tower/src/filter/mod.rs index 23d668bdb..50ca6001f 100644 --- a/tower/src/filter/mod.rs +++ b/tower/src/filter/mod.rs @@ -52,4 +52,8 @@ where ResponseFuture::new(request, check, inner) } + + fn disarm(&mut self) { + self.inner.disarm() + } } diff --git a/tower/src/hedge/delay.rs b/tower/src/hedge/delay.rs index 25356d113..70bf86ab2 100644 --- a/tower/src/hedge/delay.rs +++ b/tower/src/hedge/delay.rs @@ -74,6 +74,10 @@ where state: State::Delaying(tokio::time::delay_until(deadline), Some(request)), } } + + fn disarm(&mut self) { + self.service.disarm() + } } impl Future for ResponseFuture diff --git a/tower/src/hedge/latency.rs b/tower/src/hedge/latency.rs index 9450716f4..f2ebda3f6 100644 --- a/tower/src/hedge/latency.rs +++ b/tower/src/hedge/latency.rs @@ -67,6 +67,10 @@ where inner: self.service.call(request), } } + + fn disarm(&mut self) { + self.service.disarm() + } } impl Future for ResponseFuture diff --git a/tower/src/hedge/mod.rs b/tower/src/hedge/mod.rs index 807ecd591..cb6a87f6f 100644 --- a/tower/src/hedge/mod.rs +++ b/tower/src/hedge/mod.rs @@ -185,6 +185,10 @@ where inner: self.0.call(request), } } + + fn disarm(&mut self) { + self.0.disarm() + } } impl std::future::Future for Future diff --git a/tower/src/hedge/select.rs b/tower/src/hedge/select.rs index a30c3d902..0eef6adeb 100644 --- a/tower/src/hedge/select.rs +++ b/tower/src/hedge/select.rs @@ -77,6 +77,12 @@ where b_fut, } } + + fn disarm(&mut self) { + // poll_ready only succeeds when _both_ services are ready, so disarming both is fine + self.a.disarm(); + self.b.disarm(); + } } impl Future for ResponseFuture diff --git a/tower/src/limit/concurrency/service.rs b/tower/src/limit/concurrency/service.rs index cf4ba4c5c..9e2fab1f5 100644 --- a/tower/src/limit/concurrency/service.rs +++ b/tower/src/limit/concurrency/service.rs @@ -84,6 +84,16 @@ where ResponseFuture::new(future, self.limit.semaphore.clone()) } + + fn disarm(&mut self) { + if self.limit.permit.is_acquired() { + // NOTE: even in this case there is a chance the user did not get Ready from poll_ready + // but we did what we could to check early! + self.inner.disarm() + } else { + panic!("poll_ready did not succeed, so cannot disarm"); + } + } } #[cfg(feature = "load")] diff --git a/tower/src/limit/rate/service.rs b/tower/src/limit/rate/service.rs index 7e55ec8ef..4f8f1fa34 100644 --- a/tower/src/limit/rate/service.rs +++ b/tower/src/limit/rate/service.rs @@ -107,6 +107,14 @@ where State::Limited(..) => panic!("service not ready; poll_ready must be called first"), } } + + fn disarm(&mut self) { + if let State::Ready { .. } = self.state { + self.inner.disarm() + } else { + panic!("poll_ready did not succeed, so cannot disarm"); + } + } } #[cfg(feature = "load")] diff --git a/tower/src/load/constant.rs b/tower/src/load/constant.rs index 4d78af703..d83a6d3d9 100644 --- a/tower/src/load/constant.rs +++ b/tower/src/load/constant.rs @@ -52,6 +52,10 @@ where fn call(&mut self, req: Request) -> Self::Future { self.inner.call(req) } + + fn disarm(&mut self) { + self.inner.disarm() + } } /// Proxies `Discover` such that all changes are wrapped with a constant load. diff --git a/tower/src/load/peak_ewma.rs b/tower/src/load/peak_ewma.rs index 6e16e8a99..db4e75270 100644 --- a/tower/src/load/peak_ewma.rs +++ b/tower/src/load/peak_ewma.rs @@ -182,6 +182,10 @@ where self.service.call(req), ) } + + fn disarm(&mut self) { + self.service.disarm() + } } impl Load for PeakEwma { @@ -328,6 +332,8 @@ mod tests { fn call(&mut self, (): ()) -> Self::Future { future::ok(()) } + + fn disarm(&mut self) {} } /// The default RTT estimate decays, so that new nodes are considered if the diff --git a/tower/src/load/pending_requests.rs b/tower/src/load/pending_requests.rs index a881fa38c..70b1fe75b 100644 --- a/tower/src/load/pending_requests.rs +++ b/tower/src/load/pending_requests.rs @@ -87,6 +87,10 @@ where self.service.call(req), ) } + + fn disarm(&mut self) { + self.service.disarm() + } } // ===== impl PendingRequestsDiscover ===== @@ -159,6 +163,8 @@ mod tests { fn call(&mut self, (): ()) -> Self::Future { future::ok(()) } + + fn disarm(&mut self) {} } #[test] diff --git a/tower/src/load_shed/mod.rs b/tower/src/load_shed/mod.rs index d1e405b1a..c9dd629d3 100644 --- a/tower/src/load_shed/mod.rs +++ b/tower/src/load_shed/mod.rs @@ -61,6 +61,15 @@ where ResponseFuture::overloaded() } } + + fn disarm(&mut self) { + if self.is_ready { + self.inner.disarm() + } else { + // we do not panic here since the user may still have called poll_ready + // and gotten a succeess. + } + } } impl Clone for LoadShed { diff --git a/tower/src/ready_cache/cache.rs b/tower/src/ready_cache/cache.rs index 3218423ae..966380040 100644 --- a/tower/src/ready_cache/cache.rs +++ b/tower/src/ready_cache/cache.rs @@ -367,6 +367,35 @@ where fut } + + /// Call `Service::disarm` on all ready services and return them to the not-ready set. + /// + /// Returns the number of services disarmed this way. + /// + /// Note that `ReadyCache` eagerly polls services (one call to `poll_ready` may ready several + /// services), so a call to disarm may disarm a large number of underlying services. + pub fn disarm(&mut self) -> usize { + let were_ready = self.ready.len(); + + // Take self.ready so we can call &mut self methods below + let mut ready = std::mem::take(&mut self.ready); + for (key, (mut svc, cancel)) in ready.drain(..) { + // If a new version of this service has been added to the + // unready set, don't overwrite it. + if !self.pending_contains(&key) { + // Disarm the once-ready service and mark it as pending + svc.disarm(); + self.push_pending(key, svc, cancel); + } + } + + // Restore the original IndexMap to preserve its allocation + std::mem::swap(&mut self.ready, &mut ready); + // Sanity check that the &mut self methods above didn't add stuff to the ready set + debug_assert!(ready.is_empty()); + + were_ready + } } // === Pending === diff --git a/tower/src/reconnect/mod.rs b/tower/src/reconnect/mod.rs index 2464ee4f3..ab22248a4 100644 --- a/tower/src/reconnect/mod.rs +++ b/tower/src/reconnect/mod.rs @@ -146,6 +146,15 @@ where let fut = service.call(request); ResponseFuture::new(fut) } + + fn disarm(&mut self) { + // we must be in State::Connected if poll_ready succeeded + if let State::Connected(ref mut service) = self.state { + service.disarm() + } else { + panic!("poll_ready did not succeed, so cannot disarm"); + } + } } impl fmt::Debug for Reconnect diff --git a/tower/src/retry/mod.rs b/tower/src/retry/mod.rs index e369dbbae..ce63d4a4a 100644 --- a/tower/src/retry/mod.rs +++ b/tower/src/retry/mod.rs @@ -55,4 +55,8 @@ where ResponseFuture::new(cloned, self.clone(), future) } + + fn disarm(&mut self) { + self.service.disarm() + } } diff --git a/tower/src/spawn_ready/make.rs b/tower/src/spawn_ready/make.rs index 0a413ef8a..d17c9fd98 100644 --- a/tower/src/spawn_ready/make.rs +++ b/tower/src/spawn_ready/make.rs @@ -46,6 +46,10 @@ where inner: self.inner.call(target), } } + + fn disarm(&mut self) { + self.inner.disarm() + } } impl Future for MakeFuture diff --git a/tower/src/spawn_ready/service.rs b/tower/src/spawn_ready/service.rs index 362069e85..b54027bd0 100644 --- a/tower/src/spawn_ready/service.rs +++ b/tower/src/spawn_ready/service.rs @@ -70,4 +70,12 @@ where _ => unreachable!("poll_ready must be called"), } } + + fn disarm(&mut self) { + if let Inner::Service(ref mut svc) = self.inner { + svc.as_mut().expect("illegal state").disarm() + } else { + panic!("poll_ready did not succeed, so cannot disarm"); + } + } } diff --git a/tower/src/steer/mod.rs b/tower/src/steer/mod.rs index 653d9d588..fc8418d4f 100644 --- a/tower/src/steer/mod.rs +++ b/tower/src/steer/mod.rs @@ -22,6 +22,8 @@ //! println!("{}: {}", self.0, req); //! ready(Ok(())) //! } +//! +//! fn disarm(&mut self) {} //! } //! //! #[tokio::main] @@ -134,4 +136,32 @@ where self.not_ready.push_back(idx); cl.call(req) } + + fn disarm(&mut self) { + // NOTE: we are not allowed to disarm a service that has not return success from + // `poll_ready`, since `disarm` implementations are allowed to panic in that case! + let not_ready: std::collections::HashSet<_> = self.not_ready.iter().cloned().collect(); + for (idx, service) in self.services.iter_mut().enumerate() { + if !not_ready.contains(&idx) { + // service is ready, so disarm it + service.disarm(); + self.not_ready.push_back(idx); + } + } + + // TODO: it'd be great to be cleverer here and avoid the allocation + // we could want to pick a different strategy depending on whether most services are ready. + // with https://github.com/rust-lang/rust/pull/69425 we could do this by sorting not_ready: + /* + let not_ready = self.not_ready.make_contiguous(); + not_ready.sort(); + let mut at = 0; + for &mut not_ready_idx in not_ready { + for ready_idx in at..not_ready_idx { + self.services[ready_idx].disarm(); + } + at = not_ready_idx + 1; + } + */ + } } diff --git a/tower/src/timeout/mod.rs b/tower/src/timeout/mod.rs index 463755dd7..ae8c5940b 100644 --- a/tower/src/timeout/mod.rs +++ b/tower/src/timeout/mod.rs @@ -52,4 +52,8 @@ where ResponseFuture::new(response, sleep) } + + fn disarm(&mut self) { + self.inner.disarm() + } } diff --git a/tower/src/util/boxed/sync.rs b/tower/src/util/boxed/sync.rs index b3a9f8948..c0cde57f0 100644 --- a/tower/src/util/boxed/sync.rs +++ b/tower/src/util/boxed/sync.rs @@ -53,6 +53,10 @@ impl Service for BoxService { fn call(&mut self, request: T) -> BoxFuture { self.inner.call(request) } + + fn disarm(&mut self) { + self.inner.disarm() + } } impl fmt::Debug for BoxService @@ -82,4 +86,8 @@ where fn call(&mut self, request: Request) -> Self::Future { Box::pin(self.inner.call(request)) } + + fn disarm(&mut self) { + self.inner.disarm() + } } diff --git a/tower/src/util/boxed/unsync.rs b/tower/src/util/boxed/unsync.rs index ef1568256..4e232d37f 100644 --- a/tower/src/util/boxed/unsync.rs +++ b/tower/src/util/boxed/unsync.rs @@ -47,6 +47,10 @@ impl Service for UnsyncBoxService { fn call(&mut self, request: T) -> UnsyncBoxFuture { self.inner.call(request) } + + fn disarm(&mut self) { + self.inner.disarm() + } } impl fmt::Debug for UnsyncBoxService @@ -76,4 +80,8 @@ where fn call(&mut self, request: Request) -> Self::Future { Box::pin(self.inner.call(request)) } + + fn disarm(&mut self) { + self.inner.disarm() + } } diff --git a/tower/src/util/call_all/ordered.rs b/tower/src/util/call_all/ordered.rs index f0e05bfd2..c56e48869 100644 --- a/tower/src/util/call_all/ordered.rs +++ b/tower/src/util/call_all/ordered.rs @@ -41,6 +41,8 @@ use tower_service::Service; /// fn call(&mut self, req: &'static str) -> Self::Future { /// ready(Ok(&req[..1])) /// } +/// +/// fn disarm(&mut self) {} /// } /// /// #[tokio::main] diff --git a/tower/src/util/either.rs b/tower/src/util/either.rs index 51f7e7d27..3f23c4b7a 100644 --- a/tower/src/util/either.rs +++ b/tower/src/util/either.rs @@ -55,6 +55,13 @@ where B(service) => B(service.call(request)), } } + + fn disarm(&mut self) { + match *self { + Either::A(ref mut s) => s.disarm(), + Either::B(ref mut s) => s.disarm(), + } + } } impl Future for Either diff --git a/tower/src/util/optional/mod.rs b/tower/src/util/optional/mod.rs index b498af5e3..5b19134bf 100644 --- a/tower/src/util/optional/mod.rs +++ b/tower/src/util/optional/mod.rs @@ -55,4 +55,12 @@ where let inner = self.inner.as_mut().map(|i| i.call(request)); ResponseFuture::new(inner) } + + fn disarm(&mut self) { + if let Some(ref mut i) = self.inner { + i.disarm() + } else { + // None service is always ready, so user may well have called poll_ready + } + } } diff --git a/tower/src/util/service_fn.rs b/tower/src/util/service_fn.rs index f109d6b77..bae1fb8d7 100644 --- a/tower/src/util/service_fn.rs +++ b/tower/src/util/service_fn.rs @@ -29,4 +29,6 @@ where fn call(&mut self, req: Request) -> Self::Future { (self.f)(req) } + + fn disarm(&mut self) {} } diff --git a/tower/tests/balance/main.rs b/tower/tests/balance/main.rs index d43ab9db0..4e57ee584 100644 --- a/tower/tests/balance/main.rs +++ b/tower/tests/balance/main.rs @@ -21,6 +21,9 @@ impl Service for Mock { fn call(&mut self, req: Req) -> Self::Future { self.0.call(req) } + fn disarm(&mut self) { + self.0.disarm() + } } impl tower::load::Load for Mock { diff --git a/tower/tests/steer/main.rs b/tower/tests/steer/main.rs index ab6482d93..f5af744cf 100644 --- a/tower/tests/steer/main.rs +++ b/tower/tests/steer/main.rs @@ -25,6 +25,10 @@ impl Service for MyService { fn call(&mut self, _req: String) -> Self::Future { ready(Ok(self.0)) } + + fn disarm(&mut self) { + assert!(self.1); + } } #[test] diff --git a/tower/tests/util/call_all.rs b/tower/tests/util/call_all.rs index 5964f9da0..33487b4ac 100644 --- a/tower/tests/util/call_all.rs +++ b/tower/tests/util/call_all.rs @@ -16,6 +16,7 @@ type Error = Box; struct Srv { admit: Rc>, count: Rc>, + ready: bool, } impl Service<&'static str> for Srv { type Response = &'static str; @@ -27,6 +28,7 @@ impl Service<&'static str> for Srv { return Poll::Pending; } + self.ready = true; self.admit.set(false); Poll::Ready(Ok(())) } @@ -35,6 +37,14 @@ impl Service<&'static str> for Srv { self.count.set(self.count.get() + 1); ready(Ok(req)) } + + fn disarm(&mut self) { + assert!(self.ready, "disarm called when poll_ready did not succeed"); + self.ready = false; + if !self.admit.get() { + self.admit.set(true); + } + } } #[test] @@ -46,6 +56,7 @@ fn ordered() { let srv = Srv { count: count.clone(), admit: admit.clone(), + ready: false, }; let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let ca = srv.call_all(rx); @@ -103,7 +114,8 @@ fn ordered() { ca.take_service(), Srv { count: count.clone(), - admit + ready: true, + admit, } ); } From 134e9278d39c40ce47fca4a80354f4402df3c1e7 Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Fri, 3 Apr 2020 10:41:30 -0400 Subject: [PATCH 2/3] Avoid 'block' in disarm docs --- tower-service/src/lib.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tower-service/src/lib.rs b/tower-service/src/lib.rs index 259f15a20..39151e79c 100644 --- a/tower-service/src/lib.rs +++ b/tower-service/src/lib.rs @@ -279,7 +279,8 @@ pub trait Service { /// for them through `poll_ready`, and the system will deadlock. /// /// `disarm` solves this problem by allowing you to give up the reserved slot if you find that - /// you have to block. We can then fix the code above by writing: + /// you will not be calling `call` for the foreseeable future. We can then fix the code above + /// by writing: /// /// ```rust,ignore /// let mut fut = None; @@ -304,6 +305,9 @@ pub trait Service { /// } /// ``` /// + /// Note that if we later decide that we _do_ want to call `Service::call`, then we first call + /// `Service::poll_ready` again, since the call to `disarm` made the service not ready. + /// /// # Panics /// /// Implementations are permitted to panic if `disarm` is invoked without From 6128d74729223dde664fb005a41b1016cc377f69 Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Fri, 3 Apr 2020 13:06:02 -0400 Subject: [PATCH 3/3] Mention that disarm should prob. forward --- tower-service/src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tower-service/src/lib.rs b/tower-service/src/lib.rs index 39151e79c..1bc39d5eb 100644 --- a/tower-service/src/lib.rs +++ b/tower-service/src/lib.rs @@ -248,6 +248,9 @@ pub trait Service { /// Returns `false` if capacity has not been reserved for this service (usually because /// `poll_ready` was not previously called, or did not succeed). /// + /// In general, when implementing this method for anything that wraps a service, you will + /// simply want to forward the call to `disarm` to the `disarm` method of the inner service. + /// /// # Motivation /// /// If `poll_ready` reserves part of a service's finite capacity, callers need to send an item