diff --git a/Cargo.lock b/Cargo.lock index c36e2f0285..3401c25213 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1564,7 +1564,6 @@ name = "linkerd-distribute" version = "0.1.0" dependencies = [ "ahash", - "indexmap 2.6.0", "linkerd-stack", "parking_lot", "rand", diff --git a/linkerd/distribute/Cargo.toml b/linkerd/distribute/Cargo.toml index 9911b4568a..d99437bc21 100644 --- a/linkerd/distribute/Cargo.toml +++ b/linkerd/distribute/Cargo.toml @@ -7,7 +7,6 @@ publish = false [dependencies] ahash = "0.8" -indexmap = "2" linkerd-stack = { path = "../stack" } parking_lot = "0.12" rand = { version = "0.8", features = ["small_rng"] } diff --git a/linkerd/distribute/src/keys.rs b/linkerd/distribute/src/keys.rs new file mode 100644 index 0000000000..3ad75275ee --- /dev/null +++ b/linkerd/distribute/src/keys.rs @@ -0,0 +1,163 @@ +use ahash::{HashMap, HashMapExt}; +use rand::{ + distributions::{WeightedError, WeightedIndex}, + prelude::Distribution as _, + Rng, +}; +use std::hash::Hash; + +/// Uniquely identifies a key/backend pair for a distribution. This allows +/// backends to have the same key and still participate in request distribution. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) struct KeyId { + idx: usize, +} + +#[derive(Debug)] +pub struct ServiceKeys { + ids: Vec, + keys: HashMap, +} + +pub type WeightedServiceKeys = ServiceKeys>; + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct WeightedKey { + pub key: K, + pub weight: u32, +} + +pub(crate) struct WeightedKeySelector<'a, K> { + keys: &'a WeightedServiceKeys, + index: WeightedIndex, +} + +// === impl KeyId === + +impl KeyId { + pub(crate) fn new(idx: usize) -> Self { + Self { idx } + } +} + +// === impl UnweightedKeys === + +// PartialEq, Eq, and Hash are all valid to implement for UnweightedKeys since +// there is a defined iteration order for the keys, but it cannot be automatically +// derived for HashMap fields. +impl PartialEq for ServiceKeys { + fn eq(&self, other: &Self) -> bool { + if self.ids != other.ids { + return false; + } + + for id in &self.ids { + if self.keys.get(id) != other.keys.get(id) { + return false; + } + } + + true + } +} + +impl Eq for ServiceKeys {} + +impl Hash for ServiceKeys { + fn hash(&self, state: &mut H) { + self.ids.hash(state); + // Normally we would also hash the length, but self.ids and + // self.keys have the same length + for id in &self.ids { + self.keys.get(id).hash(state); + } + } +} + +impl ServiceKeys { + pub(crate) fn new(iter: impl Iterator) -> Self { + let mut ids = Vec::new(); + let mut keys = HashMap::new(); + for (idx, key) in iter.enumerate() { + let id = KeyId::new(idx); + ids.push(id); + keys.insert(id, key); + } + + Self { ids, keys } + } + + pub(crate) fn is_empty(&self) -> bool { + self.ids.is_empty() + } + + pub(crate) fn len(&self) -> usize { + self.ids.len() + } + + /// Returns the key `K` associated with the given [`KeyId`]. + /// + /// The output of using a [`KeyId`] not produced by the same instance of + /// [`ServiceKeys`] is unspecified, and it is likely to panic. + /// + /// # Panics + /// + /// This will panic if no entry is associated with the given lookup key. + pub(crate) fn get(&self, id: KeyId) -> &K { + self.keys + .get(&id) + .expect("distribution lookup keys must be valid") + } + + fn try_get_id(&self, idx: usize) -> Option { + self.ids.get(idx).copied() + } + + pub(crate) fn iter(&self) -> impl Iterator { + self.ids.iter() + } +} + +// === impl WeightedKeys === + +impl WeightedServiceKeys { + pub(crate) fn into_unweighted(self) -> ServiceKeys { + ServiceKeys { + ids: self.ids, + keys: self + .keys + .into_iter() + .map(|(id, key)| (id, key.key)) + .collect(), + } + } + + pub(crate) fn weighted_index(&self) -> Result, WeightedError> { + WeightedIndex::new(self.ids.iter().map(|&id| self.get(id).weight)) + } + + pub(crate) fn validate_weights(&self) -> Result<(), WeightedError> { + self.weighted_index()?; + Ok(()) + } + + pub(crate) fn selector(&self) -> WeightedKeySelector<'_, K> { + let index = self.weighted_index().expect("distribution must be valid"); + WeightedKeySelector { keys: self, index } + } +} + +// === impl WeightedKeySelector === + +impl WeightedKeySelector<'_, K> { + pub(crate) fn select_weighted(&self, rng: &mut R) -> KeyId { + let idx = self.index.sample(rng); + self.keys + .try_get_id(idx) + .expect("distrubtion must select a valid backend") + } + + pub(crate) fn disable_backend(&mut self, id: KeyId) -> Result<(), WeightedError> { + self.index.update_weights(&[(id.idx, &0)]) + } +} diff --git a/linkerd/distribute/src/lib.rs b/linkerd/distribute/src/lib.rs index 85528bc396..4d22809825 100644 --- a/linkerd/distribute/src/lib.rs +++ b/linkerd/distribute/src/lib.rs @@ -4,13 +4,15 @@ #![forbid(unsafe_code)] mod cache; +mod keys; mod params; mod service; mod stack; pub use self::{ cache::{BackendCache, NewBackendCache}, - params::{Backends, Distribution, WeightedKeys}, + keys::WeightedServiceKeys, + params::{Backends, Distribution}, service::Distribute, stack::NewDistribute, }; diff --git a/linkerd/distribute/src/params.rs b/linkerd/distribute/src/params.rs index d7628e2980..b27728983f 100644 --- a/linkerd/distribute/src/params.rs +++ b/linkerd/distribute/src/params.rs @@ -1,5 +1,9 @@ +use crate::{ + keys::{ServiceKeys, WeightedKey}, + WeightedServiceKeys, +}; use ahash::AHashSet; -use rand::distributions::{WeightedError, WeightedIndex}; +use rand::distributions::WeightedError; use std::{fmt::Debug, hash::Hash, sync::Arc}; #[derive(Debug, Clone, PartialEq, Eq)] @@ -16,17 +20,11 @@ pub enum Distribution { Empty, /// A distribution that uses the first available backend in an ordered list. - FirstAvailable(Arc<[K]>), + FirstAvailable(Arc>), /// A distribution that uses the first available backend when randomly /// selecting over a weighted distribution of backends. - RandomAvailable(Arc>), -} - -#[derive(Debug, PartialEq, Eq, Hash)] -pub struct WeightedKeys { - keys: Vec, - weights: Vec, + RandomAvailable(Arc>), } // === impl Backends === @@ -64,46 +62,29 @@ impl Default for Distribution { } impl Distribution { - pub fn first_available(keys: impl IntoIterator) -> Self { - let keys: Arc<[K]> = keys.into_iter().collect(); + pub fn first_available(iter: impl IntoIterator) -> Self { + let keys = ServiceKeys::new(iter.into_iter()); if keys.is_empty() { return Self::Empty; } - Self::FirstAvailable(keys) + + Self::FirstAvailable(Arc::new(keys)) } pub fn random_available>( iter: T, ) -> Result { - let (keys, weights): (Vec<_>, Vec<_>) = iter.into_iter().filter(|(_, w)| *w > 0).unzip(); - if keys.len() < 2 { - return Ok(Self::first_available(keys)); - } - // Error if the distribution is invalid. - let _index = WeightedIndex::new(weights.iter().copied())?; - Ok(Self::RandomAvailable(Arc::new(WeightedKeys { - keys, - weights, - }))) - } - - pub(crate) fn keys(&self) -> &[K] { - match self { - Self::Empty => &[], - Self::FirstAvailable(keys) => keys, - Self::RandomAvailable(keys) => keys.keys(), + let weighted_keys = WeightedServiceKeys::new( + iter.into_iter() + .map(|(key, weight)| WeightedKey { key, weight }), + ); + if weighted_keys.len() < 2 { + return Ok(Self::FirstAvailable(Arc::new( + weighted_keys.into_unweighted(), + ))); } - } -} - -// === impl WeightedKeys === - -impl WeightedKeys { - pub(crate) fn keys(&self) -> &[K] { - &self.keys - } - pub(crate) fn index(&self) -> WeightedIndex { - WeightedIndex::new(self.weights.iter().copied()).expect("distribution must be valid") + weighted_keys.validate_weights()?; + Ok(Self::RandomAvailable(Arc::new(weighted_keys))) } } diff --git a/linkerd/distribute/src/service.rs b/linkerd/distribute/src/service.rs index 30e9246ff3..dbd8f633ff 100644 --- a/linkerd/distribute/src/service.rs +++ b/linkerd/distribute/src/service.rs @@ -1,48 +1,54 @@ -use super::{Distribution, WeightedKeys}; -use indexmap::IndexMap; -use linkerd_stack::Service; -use rand::{ - distributions::{Distribution as _, WeightedError}, - rngs::SmallRng, - SeedableRng, -}; +use self::{first::FirstAvailableSelection, random::RandomAvailableSelection}; +use super::Distribution; +use linkerd_stack::{NewService, Service}; use std::{ hash::Hash, - sync::Arc, task::{Context, Poll}, }; +mod first; +mod random; + /// A service that distributes requests over a set of backends. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Distribute { - backends: IndexMap, - selection: Selection, - - /// Stores the index of the backend that has been polled to ready. The - /// service at this index will be used on the next invocation of - /// `Service::call`. - ready_idx: Option, + selection: Selection, } /// Holds per-distribution state for a [`Distribute`] service. #[derive(Debug)] -enum Selection { +enum Selection { Empty, - FirstAvailable, - RandomAvailable { - keys: Arc>, - rng: SmallRng, - }, + FirstAvailable(FirstAvailableSelection), + RandomAvailable(RandomAvailableSelection), } // === impl Distribute === impl Distribute { - pub(crate) fn new(backends: IndexMap, dist: Distribution) -> Self { + pub(crate) fn new(dist: Distribution, make_svc: N) -> Self + where + N: for<'a> NewService<&'a K, Service = S>, + { Self { - backends, - selection: dist.into(), - ready_idx: None, + selection: Self::make_selection(&dist, make_svc), + } + } + + fn make_selection(dist: &Distribution, make_svc: N) -> Selection + where + N: for<'a> NewService<&'a K, Service = S>, + { + // Build the backends needed for this distribution, in the required + // order (so that weighted indices align). + match dist { + Distribution::Empty => Selection::Empty, + Distribution::FirstAvailable(keys) => { + Selection::FirstAvailable(FirstAvailableSelection::new(keys, make_svc)) + } + Distribution::RandomAvailable(keys) => { + Selection::RandomAvailable(RandomAvailableSelection::new(keys, make_svc)) + } } } } @@ -62,147 +68,96 @@ where /// readiness. We expect that these inner services should be buffered or /// otherwise drive themselves to readiness (i.e. via SpawnReady). fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - // If we've already chosen a ready index, then skip polling. - if self.ready_idx.is_some() { - return Poll::Ready(Ok(())); - } - - match self.selection { + match &mut self.selection { Selection::Empty => { tracing::debug!("empty distribution will never become ready"); + Poll::Pending } - - Selection::FirstAvailable => { - for (idx, svc) in self.backends.values_mut().enumerate() { - if svc.poll_ready(cx)?.is_ready() { - self.ready_idx = Some(idx); - return Poll::Ready(Ok(())); - } - } - } - - // Choose a random index (via the weighted distribution) to try to - // poll the backend. Continue selecting endpoints until we find one - // that is ready or we've tried all backends in the distribution. - Selection::RandomAvailable { - ref keys, - ref mut rng, - } => { - // Clone the weighted index so that we can zero out the weights - // as pending services are polled. - let mut index = keys.index(); - loop { - // Sample the weighted index to find a backend to try. - let idx = index.sample(rng); - let (_, svc) = self - .backends - .get_index_mut(idx) - .expect("distributions must not reference unknown backends"); - - if svc.poll_ready(cx)?.is_ready() { - self.ready_idx = Some(idx); - return Poll::Ready(Ok(())); - } - - // Zero out the weight of the backend we just tried so that - // it's not selected again. - match index.update_weights(&[(idx, &0)]) { - Ok(()) => {} - Err(WeightedError::AllWeightsZero) => { - // There are no backends remaining. - break; - } - Err(error) => { - tracing::error!(%error, "unexpected error updating weights; giving up"); - break; - } - } - } - } + Selection::FirstAvailable(s) => s.poll_ready(cx), + Selection::RandomAvailable(s) => s.poll_ready(cx), } - - debug_assert!(self.ready_idx.is_none()); - tracing::trace!("no ready services in distribution"); - Poll::Pending } fn call(&mut self, req: Req) -> Self::Future { - let idx = self - .ready_idx - .take() - .expect("poll_ready must be called first"); - - let (_, svc) = self.backends.get_index_mut(idx).expect("index must exist"); - - svc.call(req) - } -} - -impl Clone for Distribute { - fn clone(&self) -> Self { - Self { - backends: self.backends.clone(), - selection: self.selection.clone(), - // Clear the ready index so that the new clone must become ready - // independently. - ready_idx: None, + match &mut self.selection { + Selection::Empty => unreachable!("Empty selection is never ready"), + Selection::FirstAvailable(s) => s.call(req), + Selection::RandomAvailable(s) => s.call(req), } } } -impl Default for Distribute { +// === impl Selection === + +impl Default for Selection { /// Returns an empty distribution. This distribution will never become /// ready. fn default() -> Self { - Self { - backends: Default::default(), - selection: Selection::Empty, - ready_idx: None, - } + Self::Empty } } -// === impl Selection === - -impl From> for Selection { - fn from(dist: Distribution) -> Self { - match dist { - Distribution::Empty => Self::Empty, - Distribution::FirstAvailable(_) => Self::FirstAvailable, - Distribution::RandomAvailable(keys) => Self::RandomAvailable { - keys, - rng: SmallRng::from_rng(rand::thread_rng()).expect("RNG must initialize"), - }, +impl Clone for Selection { + fn clone(&self) -> Self { + match self { + Self::Empty => Self::Empty, + Self::FirstAvailable(s) => Self::FirstAvailable(s.clone()), + Self::RandomAvailable(s) => Self::RandomAvailable(s.clone()), } } } -impl Clone for Selection { - fn clone(&self) -> Self { - match self { - Self::Empty => Selection::Empty, - Self::FirstAvailable => Self::FirstAvailable, - Self::RandomAvailable { keys, .. } => Self::RandomAvailable { - keys: keys.clone(), - rng: SmallRng::from_rng(rand::thread_rng()).expect("RNG must initialize"), - }, +impl Default for Distribute { + fn default() -> Self { + Self { + selection: Selection::default(), } } } #[cfg(test)] mod tests { + use std::cell::RefCell; + + use crate::keys::KeyId; + use super::*; use tokio_test::*; use tower_test::mock; + fn mock_first_available( + svcs: Vec<(K, S)>, + ) -> Distribute { + // Wrap in RefCell because NewService is only blanked impl'd for Fn, not FnMut + let svcs = RefCell::new(svcs); + let dist = Distribution::first_available(svcs.borrow().iter().map(|(k, _)| k.clone())); + let dist = Distribute::new(dist, |_: &K| svcs.borrow_mut().remove(0).1); + assert!(svcs.borrow().is_empty()); + dist + } + + fn mock_random_available( + svcs: Vec<(K, S, u32)>, + ) -> Distribute { + let svcs = RefCell::new(svcs); + let dist = Distribution::random_available( + svcs.borrow() + .iter() + .map(|(k, _, weight)| (k.clone(), *weight)), + ) + .unwrap(); + let dist = Distribute::new(dist, |_: &K| svcs.borrow_mut().remove(0).1); + assert!(svcs.borrow().is_empty()); + dist + } + #[test] fn empty_pending() { let mut dist_svc = mock::Spawn::new(Distribute::<&'static str, mock::Mock<(), ()>>::new( Default::default(), - Default::default(), + |_: &&str| panic!("Empty service should never call make_svc"), )); - assert_eq!(dist_svc.get_ref().backends.len(), 0); + assert!(matches!(dist_svc.get_ref().selection, Selection::Empty)); assert_pending!(dist_svc.poll_ready()); } @@ -210,12 +165,10 @@ mod tests { fn first_available_woken() { let (mulder, mut mulder_ctl) = mock::pair::<(), ()>(); let (scully, mut scully_ctl) = mock::pair::<(), ()>(); - let mut dist_svc = mock::Spawn::new(Distribute::new( - vec![("mulder", mulder), ("scully", scully)] - .into_iter() - .collect(), - Distribution::FirstAvailable(Arc::new(["mulder", "scully"])), - )); + let mut dist_svc = mock::Spawn::new(mock_first_available(vec![ + ("mulder", mulder), + ("scully", scully), + ])); mulder_ctl.allow(0); scully_ctl.allow(0); @@ -228,17 +181,18 @@ mod tests { fn first_available_prefers_first() { let (mulder, mut mulder_ctl) = mock::pair(); let (scully, mut scully_ctl) = mock::pair(); - let mut dist_svc = mock::Spawn::new(Distribute::new( - vec![("mulder", mulder), ("scully", scully)] - .into_iter() - .collect(), - Distribution::FirstAvailable(Arc::new(["mulder", "scully"])), - )); + let mut dist_svc = mock::Spawn::new(mock_first_available(vec![ + ("mulder", mulder), + ("scully", scully), + ])); scully_ctl.allow(1); mulder_ctl.allow(1); assert_ready_ok!(dist_svc.poll_ready()); - assert_eq!(dist_svc.get_ref().ready_idx, Some(0)); + let Selection::FirstAvailable(selection) = &dist_svc.get_ref().selection else { + panic!() + }; + assert_eq!(selection.get_ready_idx(), Some(0)); let mut call = task::spawn(dist_svc.call(())); match assert_ready!(mulder_ctl.poll_request()) { Some(((), rsp)) => rsp.send_response(()), @@ -251,17 +205,18 @@ mod tests { fn first_available_uses_second() { let (mulder, mut mulder_ctl) = mock::pair(); let (scully, mut scully_ctl) = mock::pair(); - let mut dist_svc = mock::Spawn::new(Distribute::new( - vec![("mulder", mulder), ("scully", scully)] - .into_iter() - .collect(), - Distribution::FirstAvailable(Arc::new(["mulder", "scully"])), - )); + let mut dist_svc = mock::Spawn::new(mock_first_available(vec![ + ("mulder", mulder), + ("scully", scully), + ])); mulder_ctl.allow(0); scully_ctl.allow(1); assert_ready_ok!(dist_svc.poll_ready()); - assert_eq!(dist_svc.get_ref().ready_idx, Some(1)); + let Selection::FirstAvailable(selection) = &dist_svc.get_ref().selection else { + panic!() + }; + assert_eq!(selection.get_ready_idx(), Some(1)); let mut call = task::spawn(dist_svc.call(())); match assert_ready!(scully_ctl.poll_request()) { Some(((), rsp)) => rsp.send_response(()), @@ -270,18 +225,40 @@ mod tests { assert_ready_ok!(call.poll()); } + #[test] + fn first_available_duplicate_keys() { + let (mulder_1, mut mulder_1_ctl) = mock::pair(); + let (mulder_2, mut mulder_2_ctl) = mock::pair(); + let mut dist_svc = mock::Spawn::new(mock_first_available(vec![ + ("mulder", mulder_1), + ("mulder", mulder_2), + ])); + + mulder_2_ctl.allow(1); + mulder_1_ctl.allow(1); + assert_ready_ok!(dist_svc.poll_ready()); + let Selection::FirstAvailable(selection) = &dist_svc.get_ref().selection else { + panic!() + }; + assert_eq!(selection.get_ready_idx(), Some(0)); + let mut call = task::spawn(dist_svc.call(())); + match assert_ready!(mulder_1_ctl.poll_request()) { + Some(((), rsp)) => rsp.send_response(()), + _ => panic!("expected request"), + } + assert_ready_ok!(call.poll()); + } + #[test] fn random_available_woken() { let (mulder, mut mulder_ctl) = mock::pair::<(), ()>(); let (scully, mut scully_ctl) = mock::pair::<(), ()>(); let (skinner, mut skinner_ctl) = mock::pair::<(), ()>(); - let mut dist_svc = mock::Spawn::new(Distribute::new( - vec![("mulder", mulder), ("scully", scully), ("skinner", skinner)] - .into_iter() - .collect(), - Distribution::random_available([("mulder", 1), ("scully", 99998), ("skinner", 1)]) - .unwrap(), - )); + let mut dist_svc = mock::Spawn::new(mock_random_available(vec![ + ("mulder", mulder, 1), + ("scully", scully, 99998), + ("skinner", skinner, 1), + ])); mulder_ctl.allow(0); scully_ctl.allow(0); @@ -296,19 +273,20 @@ mod tests { let (mulder, mut mulder_ctl) = mock::pair(); let (scully, mut scully_ctl) = mock::pair(); let (skinner, mut skinner_ctl) = mock::pair(); - let mut dist_svc = mock::Spawn::new(Distribute::new( - vec![("mulder", mulder), ("scully", scully), ("skinner", skinner)] - .into_iter() - .collect(), - Distribution::random_available([("mulder", 1), ("scully", 99998), ("skinner", 1)]) - .unwrap(), - )); + let mut dist_svc = mock::Spawn::new(mock_random_available(vec![ + ("mulder", mulder, 1), + ("scully", scully, 99998), + ("skinner", skinner, 1), + ])); mulder_ctl.allow(1); scully_ctl.allow(1); skinner_ctl.allow(1); assert_ready_ok!(dist_svc.poll_ready()); - assert_eq!(dist_svc.get_ref().ready_idx, Some(1)); + let Selection::RandomAvailable(selection) = &dist_svc.get_ref().selection else { + panic!() + }; + assert_eq!(selection.get_ready_idx(), Some(KeyId::new(1))); let mut call = task::spawn(dist_svc.call(())); match assert_ready!(scully_ctl.poll_request()) { Some(((), rsp)) => rsp.send_response(()), @@ -322,19 +300,20 @@ mod tests { let (mulder, mut mulder_ctl) = mock::pair(); let (scully, mut scully_ctl) = mock::pair(); let (skinner, mut skinner_ctl) = mock::pair(); - let mut dist_svc = mock::Spawn::new(Distribute::new( - vec![("mulder", mulder), ("scully", scully), ("skinner", skinner)] - .into_iter() - .collect(), - Distribution::random_available([("mulder", 1), ("scully", 99998), ("skinner", 1)]) - .unwrap(), - )); + let mut dist_svc = mock::Spawn::new(mock_random_available(vec![ + ("mulder", mulder, 1), + ("scully", scully, 99998), + ("skinner", skinner, 1), + ])); mulder_ctl.allow(1); scully_ctl.allow(0); skinner_ctl.allow(0); assert_ready_ok!(dist_svc.poll_ready()); - assert_eq!(dist_svc.get_ref().ready_idx, Some(0)); + let Selection::RandomAvailable(selection) = &dist_svc.get_ref().selection else { + panic!() + }; + assert_eq!(selection.get_ready_idx(), Some(KeyId::new(0))); let mut call = task::spawn(dist_svc.call(())); match assert_ready!(mulder_ctl.poll_request()) { Some(((), rsp)) => rsp.send_response(()), @@ -342,4 +321,31 @@ mod tests { } assert_ready_ok!(call.poll()); } + + #[test] + fn random_available_duplicate_keys_allowed() { + let (mulder_1, mut mulder_1_ctl) = mock::pair(); + let (mulder_2, mut mulder_2_ctl) = mock::pair(); + let (mulder_3, mut mulder_3_ctl) = mock::pair(); + let mut dist_svc = mock::Spawn::new(mock_random_available(vec![ + ("mulder", mulder_1, 1), + ("mulder", mulder_2, 99998), + ("mulder", mulder_3, 1), + ])); + + mulder_1_ctl.allow(1); + mulder_2_ctl.allow(1); + mulder_3_ctl.allow(1); + assert_ready_ok!(dist_svc.poll_ready()); + let Selection::RandomAvailable(selection) = &dist_svc.get_ref().selection else { + panic!() + }; + assert_eq!(selection.get_ready_idx(), Some(KeyId::new(1))); + let mut call = task::spawn(dist_svc.call(())); + match assert_ready!(mulder_2_ctl.poll_request()) { + Some(((), rsp)) => rsp.send_response(()), + _ => panic!("expected request"), + } + assert_ready_ok!(call.poll()); + } } diff --git a/linkerd/distribute/src/service/first.rs b/linkerd/distribute/src/service/first.rs new file mode 100644 index 0000000000..696b91d227 --- /dev/null +++ b/linkerd/distribute/src/service/first.rs @@ -0,0 +1,80 @@ +use crate::keys::ServiceKeys; +use linkerd_stack::{NewService, Service}; +use std::task::{Context, Poll}; + +#[derive(Debug)] +pub(crate) struct FirstAvailableSelection { + backends: Vec, + + /// Stores the index of the backend that has been polled to ready. The + /// service at this index will be used on the next invocation of + /// `Service::call`. + ready_idx: Option, +} + +impl FirstAvailableSelection { + pub fn new(keys: &ServiceKeys, make_svc: N) -> Self + where + N: for<'a> NewService<&'a K, Service = S>, + { + Self { + backends: keys + .iter() + .map(|&id| make_svc.new_service(keys.get(id))) + .collect(), + ready_idx: None, + } + } + + #[cfg(test)] + pub fn get_ready_idx(&self) -> Option { + self.ready_idx + } +} + +impl Clone for FirstAvailableSelection { + fn clone(&self) -> Self { + Self { + backends: self.backends.clone(), + // Clear the ready index so that the new clone must become ready + // independently. + ready_idx: None, + } + } +} + +impl Service for FirstAvailableSelection +where + S: Service, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // If we've already chosen a ready index, then skip polling. + if self.ready_idx.is_some() { + return Poll::Ready(Ok(())); + } + + for (idx, svc) in self.backends.iter_mut().enumerate() { + if svc.poll_ready(cx)?.is_ready() { + self.ready_idx = Some(idx); + return Poll::Ready(Ok(())); + } + } + debug_assert!(self.ready_idx.is_none()); + Poll::Pending + } + + fn call(&mut self, req: Req) -> Self::Future { + let idx = self + .ready_idx + .take() + .expect("poll_ready must be called first"); + + let svc = self.backends.get_mut(idx).expect("index must exist"); + + svc.call(req) + } +} diff --git a/linkerd/distribute/src/service/random.rs b/linkerd/distribute/src/service/random.rs new file mode 100644 index 0000000000..028d0a5a0d --- /dev/null +++ b/linkerd/distribute/src/service/random.rs @@ -0,0 +1,120 @@ +use crate::{keys::KeyId, WeightedServiceKeys}; +use ahash::HashMap; +use linkerd_stack::{NewService, Service}; +use rand::{distributions::WeightedError, rngs::SmallRng, SeedableRng}; +use std::{ + hash::Hash, + sync::Arc, + task::{Context, Poll}, +}; + +#[derive(Debug)] +pub(crate) struct RandomAvailableSelection { + keys: Arc>, + backends: HashMap, + rng: SmallRng, + + /// Stores the index of the backend that has been polled to ready. The + /// service at this index will be used on the next invocation of + /// `Service::call`. + ready_idx: Option, +} + +fn new_rng() -> SmallRng { + SmallRng::from_rng(rand::thread_rng()).expect("RNG must initialize") +} + +impl RandomAvailableSelection { + pub fn new(keys: &Arc>, make_svc: N) -> Self + where + N: for<'a> NewService<&'a K, Service = S>, + { + Self { + keys: keys.clone(), + backends: keys + .iter() + .map(|&id| (id, make_svc.new_service(&keys.get(id).key))) + .collect(), + ready_idx: None, + rng: new_rng(), + } + } + + #[cfg(test)] + pub fn get_ready_idx(&self) -> Option { + self.ready_idx + } +} + +impl Clone for RandomAvailableSelection { + fn clone(&self) -> Self { + Self { + keys: self.keys.clone(), + backends: self.backends.clone(), + rng: new_rng(), + // Clear the ready index so that the new clone must become ready + // independently. + ready_idx: None, + } + } +} + +impl Service for RandomAvailableSelection +where + K: Hash + Eq, + S: Service, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // If we've already chosen a ready index, then skip polling. + if self.ready_idx.is_some() { + return Poll::Ready(Ok(())); + } + + let mut selector = self.keys.selector(); + loop { + let id = selector.select_weighted(&mut self.rng); + let svc = self + .backends + .get_mut(&id) + .expect("distributions must not reference unknown backends"); + + if svc.poll_ready(cx)?.is_ready() { + self.ready_idx = Some(id); + return Poll::Ready(Ok(())); + } + + // Since the backend we just tried isn't ready, zero out the weight + // so that it's not tried again in this round, i.e. subsequent calls + // to `poll_ready` can try this backend again. + match selector.disable_backend(id) { + Ok(()) => {} + Err(WeightedError::AllWeightsZero) => { + // There are no backends remaining. + break; + } + Err(error) => { + tracing::error!(%error, "unexpected error updating weights; giving up"); + break; + } + } + } + + debug_assert!(self.ready_idx.is_none()); + Poll::Pending + } + + fn call(&mut self, req: Req) -> Self::Future { + let id = self + .ready_idx + .take() + .expect("poll_ready must be called first"); + + let svc = self.backends.get_mut(&id).expect("index must exist"); + + svc.call(req) + } +} diff --git a/linkerd/distribute/src/stack.rs b/linkerd/distribute/src/stack.rs index e027630d7b..731481fe07 100644 --- a/linkerd/distribute/src/stack.rs +++ b/linkerd/distribute/src/stack.rs @@ -55,16 +55,9 @@ where /// Referencing other keys causes a panic. fn new_service(&self, target: T) -> Self::Service { let dist = self.extract.extract_param(&target); - tracing::debug!(backends = ?dist.keys(), "New distribution"); + tracing::debug!(backends = ?dist, "New distribution"); - // Build the backends needed for this distribution, in the required - // order (so that weighted indices align). let newk = self.inner.new_service(target); - let backends = dist - .keys() - .iter() - .map(|k| (k.clone(), newk.new_service(k.clone()))) - .collect(); - Distribute::new(backends, dist) + Distribute::new(dist, |k: &K| newk.new_service(k.clone())) } }