Skip to content

Commit

Permalink
load: Add "weight" load variant
Browse files Browse the repository at this point in the history
Adds a `weight` load variant, which weights an inner load. This is
useful in circumstances where it is desireable to artificially inflate
or deflate load. One such example is canary deployments, where it might
be preferable for a canary to accept less load than its non-canary
counterparts.

This change is adapted from the weight implementation that used to
exist within tower but was removed (see
a496fbf) and an
associated unmerged PR (#282).
  • Loading branch information
samvrlewis committed Oct 4, 2022
1 parent c5632a2 commit f857465
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 4 deletions.
44 changes: 41 additions & 3 deletions tower/examples/tower-balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tower::balance as lb;
use tower::discover::{Change, Discover};
use tower::limit::concurrency::ConcurrencyLimit;
use tower::load;
use tower::load::weight::{HasWeight, Weight};
use tower::util::ServiceExt;
use tower_service::Service;

Expand All @@ -35,6 +36,7 @@ static MAX_ENDPOINT_LATENCIES: [Duration; 10] = [
Duration::from_millis(500),
Duration::from_millis(1000),
];
static ENDPOINT_WEIGHTS: [f64; 10] = [1.0, 1.0, 0.0, 0.01, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0];

struct Summary {
latencies: Histogram<u64>,
Expand All @@ -55,6 +57,11 @@ async fn main() {
print!("{}ms, ", l);
}
println!("]");
print!("ENDPOINT_WEIGHTS=[");
for weight in &ENDPOINT_WEIGHTS {
print!("{}, ", weight);
}
println!("]");

let decay = Duration::from_secs(10);
let d = gen_disco();
Expand All @@ -66,17 +73,42 @@ async fn main() {
));
run("P2C+PeakEWMA...", pe).await;

let d = gen_disco();
let pe = lb::p2c::Balance::new(load::WeightedDiscover::new(load::PeakEwmaDiscover::new(
d,
DEFAULT_RTT,
decay,
load::CompleteOnResponse::default(),
)));
run("P2C+PeakEWMA+Weighted...", pe).await;

let d = gen_disco();
let ll = lb::p2c::Balance::new(load::PendingRequestsDiscover::new(
d,
load::CompleteOnResponse::default(),
));
run("P2C+LeastLoaded...", ll).await;

let d = gen_disco();
let ll = lb::p2c::Balance::new(load::WeightedDiscover::new(
load::PendingRequestsDiscover::new(d, load::CompleteOnResponse::default()),
));
run("P2C+LeastLoaded+Weighted...", ll).await;
}

type Error = Box<dyn std::error::Error + Send + Sync>;

type Key = usize;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct Key {
instance: usize,
weight: Weight,
}

impl HasWeight for Key {
fn weight(&self) -> Weight {
self.weight
}
}

pin_project! {
struct Disco<S> {
Expand Down Expand Up @@ -117,8 +149,9 @@ fn gen_disco() -> impl Discover<
Disco::new(
MAX_ENDPOINT_LATENCIES
.iter()
.zip(&ENDPOINT_WEIGHTS)
.enumerate()
.map(|(instance, latency)| {
.map(|(instance, (latency, weight))| {
let svc = tower::service_fn(move |_| {
let start = Instant::now();

Expand All @@ -133,7 +166,12 @@ fn gen_disco() -> impl Discover<
}
});

(instance, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY))
let key = Key {
instance,
weight: Weight::from(*weight),
};

(key, ConcurrencyLimit::new(svc, ENDPOINT_CAPACITY))
})
.collect(),
)
Expand Down
8 changes: 7 additions & 1 deletion tower/src/load/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! - [`Constant`] — Always returns the same constant load value for a service.
//! - [`PendingRequests`] — Measures load by tracking the number of in-flight requests.
//! - [`PeakEwma`] — Measures load using a moving average of the peak latency for the service.
//! - [`Weight`] - Adds a weighting to an inner Load.
//!
//! In general, you will want to use one of these when using the types in [`tower::balance`] which
//! balance services depending on their load. Which load metric to use depends on your exact
Expand Down Expand Up @@ -63,16 +64,21 @@ pub mod completion;
mod constant;
pub mod peak_ewma;
pub mod pending_requests;
pub mod weight;

pub use self::{
completion::{CompleteOnResponse, TrackCompletion},
constant::Constant,
peak_ewma::PeakEwma,
pending_requests::PendingRequests,
weight::Weight,
};

#[cfg(feature = "discover")]
pub use self::{peak_ewma::PeakEwmaDiscover, pending_requests::PendingRequestsDiscover};
pub use self::{
peak_ewma::PeakEwmaDiscover, pending_requests::PendingRequestsDiscover,
weight::WeightedDiscover,
};

/// Types that implement this trait can give an estimate of how loaded they are.
///
Expand Down
10 changes: 10 additions & 0 deletions tower/src/load/peak_ewma.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ use pin_project_lite::pin_project;
use std::pin::Pin;

use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture};
use super::weight::Weight;
use super::Load;
use std::task::{Context, Poll};
use std::{
ops,
sync::{Arc, Mutex},
time::Duration,
};
Expand Down Expand Up @@ -69,6 +71,14 @@ pin_project! {
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
pub struct Cost(f64);

impl ops::Div<Weight> for Cost {
type Output = f64;

fn div(self, weight: Weight) -> f64 {
self.0 / weight
}
}

/// Tracks an in-flight request and updates the RTT-estimate on Drop.
#[derive(Debug)]
pub struct Handle {
Expand Down
10 changes: 10 additions & 0 deletions tower/src/load/pending_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ use crate::discover::{Change, Discover};
use futures_core::{ready, Stream};
#[cfg(feature = "discover")]
use pin_project_lite::pin_project;
use std::ops;
#[cfg(feature = "discover")]
use std::pin::Pin;

use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture};
use super::weight::Weight;
use super::Load;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -43,6 +45,14 @@ pin_project! {
#[derive(Clone, Copy, Debug, Default, PartialOrd, PartialEq, Ord, Eq)]
pub struct Count(usize);

impl ops::Div<Weight> for Count {
type Output = f64;

fn div(self, weight: Weight) -> f64 {
self.0 / weight
}
}

/// Tracks an in-flight request by reference count.
#[derive(Debug)]
pub struct Handle(RefCount);
Expand Down
198 changes: 198 additions & 0 deletions tower/src/load/weight.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
//! A [`Load`] implementation which implements weighting on top of an inner [`Load`].
//!
//! This can be useful in such cases as canary deployments, where it is desirable for a
//! particular service to receive less than its fair share of load than other services.

#[cfg(feature = "discover")]
use crate::discover::{Change, Discover};
#[cfg(feature = "discover")]
use futures_core::ready;
#[cfg(feature = "discover")]
use futures_core::Stream;
#[cfg(feature = "discover")]
use pin_project_lite::pin_project;
#[cfg(feature = "discover")]
use std::pin::Pin;

use std::ops;
use std::task::{Context, Poll};
use tower_service::Service;

use super::Load;

/// A weight on [0.0, ∞].
///
/// Lesser-weighted nodes receive less traffic than heavier-weighted nodes.
///
/// This is represented internally as an integer, rather than a float, so that it can implement
/// `Hash` and `Eq`.
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash)]
pub struct Weight(u32);

impl Weight {
/// Minimum Weight
pub const MIN: Weight = Weight(0);
/// Unit of Weight - what 1.0_f64 corresponds to
pub const UNIT: Weight = Weight(10_000);
/// Maximum Weight
pub const MAX: Weight = Weight(u32::MAX);
}

impl Default for Weight {
fn default() -> Self {
Weight::UNIT
}
}

impl From<f64> for Weight {
fn from(w: f64) -> Self {
if w < 0.0 || w == f64::NAN {
Self::MIN
} else if w == f64::INFINITY {
Self::MAX
} else {
Weight((w * (Weight::UNIT.0 as f64)).round() as u32)
}
}
}

impl Into<f64> for Weight {
fn into(self) -> f64 {
(self.0 as f64) / (Weight::UNIT.0 as f64)
}
}

impl ops::Div<Weight> for f64 {
type Output = f64;

fn div(self, w: Weight) -> f64 {
if w == Weight::MIN {
f64::INFINITY
} else {
let w: f64 = w.into();
self / w
}
}
}

impl ops::Div<Weight> for usize {
type Output = f64;

fn div(self, w: Weight) -> f64 {
(self as f64) / w
}
}

/// Measures the load of the underlying service by weighting that service's load by a constant
/// weighting factor.
#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub struct Weighted<S> {
inner: S,
weight: Weight,
}

impl<S> Weighted<S> {
/// Wraps an `S`-typed service so that its load is weighted by the given weight.
pub fn new<W: Into<Weight>>(inner: S, w: W) -> Self {
let weight = w.into();
Self { inner, weight }
}
}

impl<S> Load for Weighted<S>
where
S: Load,
S::Metric: ops::Div<Weight>,
<S::Metric as ops::Div<Weight>>::Output: PartialOrd,
{
type Metric = <S::Metric as ops::Div<Weight>>::Output;

fn load(&self) -> Self::Metric {
self.inner.load() / self.weight
}
}

impl<R, S: Service<R>> Service<R> for Weighted<S> {
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: R) -> Self::Future {
self.inner.call(req)
}
}

#[cfg(feature = "discover")]
pin_project! {
/// Wraps a `D`-typed stream of discovered services with [`Weighted`].
#[cfg_attr(docsrs, doc(cfg(feature = "discover")))]
#[derive(Debug)]
pub struct WeightedDiscover<D>{
#[pin]
discover: D,
}
}

#[cfg(feature = "discover")]
impl<D> WeightedDiscover<D> {
/// Wraps a [`Discover`], wrapping all of its services with [`Weighted`].
pub fn new(discover: D) -> Self {
Self { discover }
}
}

/// Allows [`Discover::Key`] to expose a weight, so that they can be included in a discover
/// stream
pub trait HasWeight {
/// Returns the [`Weight`]
fn weight(&self) -> Weight;
}

impl<T: HasWeight> From<T> for Weighted<T> {
fn from(inner: T) -> Self {
let weight = inner.weight();
Self { inner, weight }
}
}

impl<T> HasWeight for Weighted<T> {
fn weight(&self) -> Weight {
self.weight
}
}

#[cfg(feature = "discover")]
impl<D> Stream for WeightedDiscover<D>
where
D: Discover,
D::Key: HasWeight,
{
type Item = Result<Change<D::Key, Weighted<D::Service>>, D::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use self::Change::*;

let this = self.project();
let change = match ready!(this.discover.poll_discover(cx)).transpose()? {
None => return Poll::Ready(None),
Some(Insert(k, svc)) => {
let w = k.weight();
Insert(k, Weighted::new(svc, w))
}
Some(Remove(k)) => Remove(k),
};

Poll::Ready(Some(Ok(change)))
}
}

#[test]
fn div_min() {
assert_eq!(10.0 / Weight::MIN, f64::INFINITY);
assert_eq!(10 / Weight::MIN, f64::INFINITY);
assert_eq!(0 / Weight::MIN, f64::INFINITY);
}

0 comments on commit f857465

Please sign in to comment.