From 8118dea17b1ddb9c77a42b4ac3ca67b116cfe920 Mon Sep 17 00:00:00 2001 From: Sam Lewis Date: Fri, 30 Sep 2022 10:52:54 +1000 Subject: [PATCH] load: Add "weight" load variant Adds a `weight` load variant, which weights an inner load. This is useful in circumstances where it is desireable to artificially inflate or deflate load. One such example is canary deployments, where it might be preferable for a canary to accept less load than its non-canary counterparts. This change is adapted from the weight implementation that used to exist within tower but was removed (see tower-rs/tower@a496fbf72c335052de772d1c8c65033f1bc733de) and an associated unmerged PR (https://github.com/tower-rs/tower/pull/282). --- tower/examples/tower-balance.rs | 44 ++++++- tower/src/load/mod.rs | 7 +- tower/src/load/peak_ewma.rs | 10 ++ tower/src/load/pending_requests.rs | 10 ++ tower/src/load/weight.rs | 193 +++++++++++++++++++++++++++++ 5 files changed, 260 insertions(+), 4 deletions(-) create mode 100644 tower/src/load/weight.rs diff --git a/tower/examples/tower-balance.rs b/tower/examples/tower-balance.rs index 998bb7c51..397596941 100644 --- a/tower/examples/tower-balance.rs +++ b/tower/examples/tower-balance.rs @@ -16,6 +16,7 @@ use tower::balance as lb; use tower::discover::{Change, Discover}; use tower::limit::concurrency::ConcurrencyLimit; use tower::load; +use tower::load::weight::{HasWeight, Weight}; use tower::util::ServiceExt; use tower_service::Service; @@ -35,6 +36,7 @@ static MAX_ENDPOINT_LATENCIES: [Duration; 10] = [ Duration::from_millis(500), Duration::from_millis(1000), ]; +static ENDPOINT_WEIGHTS: [f64; 10] = [1.0, 1.0, 0.0, 0.01, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]; struct Summary { latencies: Histogram, @@ -55,6 +57,11 @@ async fn main() { print!("{}ms, ", l); } println!("]"); + print!("ENDPOINT_WEIGHTS=["); + for weight in &ENDPOINT_WEIGHTS { + print!("{weight}, ") + } + println!("]"); let decay = Duration::from_secs(10); let d = gen_disco(); @@ -66,17 +73,42 @@ async fn main() { )); run("P2C+PeakEWMA...", pe).await; + let d = gen_disco(); + let pe = lb::p2c::Balance::new(load::WeightedDiscover::new(load::PeakEwmaDiscover::new( + d, + DEFAULT_RTT, + decay, + load::CompleteOnResponse::default(), + ))); + run("P2C+PeakEWMA+Weighted...", pe).await; + let d = gen_disco(); let ll = lb::p2c::Balance::new(load::PendingRequestsDiscover::new( d, load::CompleteOnResponse::default(), )); run("P2C+LeastLoaded...", ll).await; + + let d = gen_disco(); + let ll = lb::p2c::Balance::new(load::WeightedDiscover::new( + load::PendingRequestsDiscover::new(d, load::CompleteOnResponse::default()), + )); + run("P2C+LeastLoaded+Weighted...", ll).await; } type Error = Box; -type Key = usize; +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct Key { + instance: usize, + weight: Weight, +} + +impl HasWeight for Key { + fn weight(&self) -> Weight { + self.weight + } +} pin_project! { struct Disco { @@ -117,8 +149,9 @@ fn gen_disco() -> impl Discover< Disco::new( MAX_ENDPOINT_LATENCIES .iter() + .zip(ENDPOINT_WEIGHTS) .enumerate() - .map(|(instance, latency)| { + .map(|(instance, (latency, weight))| { let svc = tower::service_fn(move |_| { let start = Instant::now(); @@ -133,7 +166,12 @@ fn gen_disco() -> impl Discover< } }); - (instance, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY)) + let key = Key { + instance, + weight: Weight::from(weight), + }; + + (key, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY)) }) .collect(), ) diff --git a/tower/src/load/mod.rs b/tower/src/load/mod.rs index 2f9a37371..2000dd360 100644 --- a/tower/src/load/mod.rs +++ b/tower/src/load/mod.rs @@ -6,6 +6,7 @@ //! - [`Constant`] — Always returns the same constant load value for a service. //! - [`PendingRequests`] — Measures load by tracking the number of in-flight requests. //! - [`PeakEwma`] — Measures load using a moving average of the peak latency for the service. +//! - [`Weight`] - Adds a weighting to an inner Load. //! //! In general, you will want to use one of these when using the types in [`tower::balance`] which //! balance services depending on their load. Which load metric to use depends on your exact @@ -63,6 +64,7 @@ pub mod completion; mod constant; pub mod peak_ewma; pub mod pending_requests; +pub mod weight; pub use self::{ completion::{CompleteOnResponse, TrackCompletion}, @@ -72,7 +74,10 @@ pub use self::{ }; #[cfg(feature = "discover")] -pub use self::{peak_ewma::PeakEwmaDiscover, pending_requests::PendingRequestsDiscover}; +pub use self::{ + peak_ewma::PeakEwmaDiscover, pending_requests::PendingRequestsDiscover, + weight::WeightedDiscover, +}; /// Types that implement this trait can give an estimate of how loaded they are. /// diff --git a/tower/src/load/peak_ewma.rs b/tower/src/load/peak_ewma.rs index 61ac2011f..7b3d14c6f 100644 --- a/tower/src/load/peak_ewma.rs +++ b/tower/src/load/peak_ewma.rs @@ -10,9 +10,11 @@ use pin_project_lite::pin_project; use std::pin::Pin; use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture}; +use super::weight::Weight; use super::Load; use std::task::{Context, Poll}; use std::{ + ops, sync::{Arc, Mutex}, time::Duration, }; @@ -69,6 +71,14 @@ pin_project! { #[derive(Copy, Clone, Debug, PartialEq, PartialOrd)] pub struct Cost(f64); +impl ops::Div for Cost { + type Output = f64; + + fn div(self, weight: Weight) -> f64 { + self.0 / weight + } +} + /// Tracks an in-flight request and updates the RTT-estimate on Drop. #[derive(Debug)] pub struct Handle { diff --git a/tower/src/load/pending_requests.rs b/tower/src/load/pending_requests.rs index 3d8689bbe..4a153a30c 100644 --- a/tower/src/load/pending_requests.rs +++ b/tower/src/load/pending_requests.rs @@ -6,10 +6,12 @@ use crate::discover::{Change, Discover}; use futures_core::{ready, Stream}; #[cfg(feature = "discover")] use pin_project_lite::pin_project; +use std::ops; #[cfg(feature = "discover")] use std::pin::Pin; use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture}; +use super::weight::Weight; use super::Load; use std::sync::Arc; use std::task::{Context, Poll}; @@ -43,6 +45,14 @@ pin_project! { #[derive(Clone, Copy, Debug, Default, PartialOrd, PartialEq, Ord, Eq)] pub struct Count(usize); +impl ops::Div for Count { + type Output = f64; + + fn div(self, weight: Weight) -> f64 { + self.0 / weight + } +} + /// Tracks an in-flight request by reference count. #[derive(Debug)] pub struct Handle(RefCount); diff --git a/tower/src/load/weight.rs b/tower/src/load/weight.rs new file mode 100644 index 000000000..594bf22a7 --- /dev/null +++ b/tower/src/load/weight.rs @@ -0,0 +1,193 @@ +//! A [`Load`] implementation which implements weighting on top of an inner [`Load`]. +//! +//! This can be useful in such cases as canary deployments, where it is desirable for a +//! particular service to receive less than its fair share of load than other services. + +#[cfg(feature = "discover")] +use crate::discover::{Change, Discover}; +#[cfg(feature = "discover")] +use futures_core::Stream; +#[cfg(feature = "discover")] +use pin_project_lite::pin_project; +#[cfg(feature = "discover")] +use std::pin::Pin; + +use futures_core::ready; +use std::ops; +use std::task::{Context, Poll}; +use tower_service::Service; + +use super::Load; + +/// A weight on [0.0, ∞]. +/// +/// Lesser-weighted nodes receive less traffic than heavier-weighted nodes. +#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash)] +pub struct Weight(u32); + +impl Weight { + /// Minimum Weight + pub const MIN: Weight = Weight(0); + /// Unit of Weight + pub const UNIT: Weight = Weight(10_000); + /// Maximum Weight + pub const MAX: Weight = Weight(u32::MAX); +} + +impl Default for Weight { + fn default() -> Self { + Weight::UNIT + } +} + +impl From for Weight { + fn from(w: f64) -> Self { + if w < 0.0 || w == f64::NAN { + Self::MIN + } else if w == f64::INFINITY { + Self::MAX + } else { + Weight((w * (Weight::UNIT.0 as f64)).round() as u32) + } + } +} + +impl Into for Weight { + fn into(self) -> f64 { + (self.0 as f64) / (Weight::UNIT.0 as f64) + } +} + +impl ops::Div for f64 { + type Output = f64; + + fn div(self, w: Weight) -> f64 { + if w == Weight::MIN { + f64::INFINITY + } else { + let w: f64 = w.into(); + self / w + } + } +} + +impl ops::Div for usize { + type Output = f64; + + fn div(self, w: Weight) -> f64 { + (self as f64) / w + } +} + +/// Measures the load of the underlying service by weighting that service's load by a constant +/// weighting factor. +#[derive(Clone, Debug, PartialEq, PartialOrd)] +pub struct Weighted { + inner: S, + weight: Weight, +} + +impl Weighted { + /// Wraps an `S`-typed service so that its load is weighted by the given weight. + pub fn new>(inner: S, w: W) -> Self { + let weight = w.into(); + Self { inner, weight } + } +} + +impl Load for Weighted +where + S: Load, + S::Metric: ops::Div, + >::Output: PartialOrd, +{ + type Metric = >::Output; + + fn load(&self) -> Self::Metric { + self.inner.load() / self.weight + } +} + +impl> Service for Weighted { + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: R) -> Self::Future { + self.inner.call(req) + } +} + +#[cfg(feature = "discover")] +pin_project! { + /// Wraps a `D`-typed stream of discovered services with [`Weighted`]. + #[cfg_attr(docsrs, doc(cfg(feature = "discover")))] + #[derive(Debug)] + pub struct WeightedDiscover{ + #[pin] + discover: D, + } +} + +impl WeightedDiscover { + /// Wraps a [`Discover`], wrapping all of its services with [`Weighted`]. + pub fn new(discover: D) -> Self { + Self { discover } + } +} + +/// Allows [`tower::Discover::Key`] to expose a weight, so that they can be included in a discover +/// stream +pub trait HasWeight { + /// Returns the [`Weight`] + fn weight(&self) -> Weight; +} + +impl From for Weighted { + fn from(inner: T) -> Self { + let weight = inner.weight(); + Self { inner, weight } + } +} + +impl HasWeight for Weighted { + fn weight(&self) -> Weight { + self.weight + } +} + +#[cfg(feature = "discover")] +impl Stream for WeightedDiscover +where + D: Discover, + D::Key: HasWeight, +{ + type Item = Result>, D::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use self::Change::*; + + let this = self.project(); + let change = match ready!(this.discover.poll_discover(cx)).transpose()? { + None => return Poll::Ready(None), + Some(Insert(k, svc)) => { + let w = k.weight(); + Insert(k, Weighted::new(svc, w)) + } + Some(Remove(k)) => Remove(k), + }; + + Poll::Ready(Some(Ok(change))) + } +} + +#[test] +fn div_min() { + assert_eq!(10.0 / Weight::MIN, f64::INFINITY); + assert_eq!(10 / Weight::MIN, f64::INFINITY); + assert_eq!(0 / Weight::MIN, f64::INFINITY); +}