Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balance: Fail the discovery stream on queue backup #2486

Merged
merged 4 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,7 @@ dependencies = [
"linkerd-io",
"linkerd-proxy-client-policy",
"parking_lot",
"pin-project",
"regex",
"thiserror",
"tokio",
Expand Down Expand Up @@ -1480,6 +1481,7 @@ dependencies = [
"linkerd-stack",
"pin-project",
"rand",
"thiserror",
"tokio",
"tokio-stream",
"tokio-util",
Expand Down
6 changes: 5 additions & 1 deletion linkerd/app/outbound/src/metrics/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::http::IdentityRequired;
use linkerd_app_core::{
errors::{FailFastError, LoadShedError},
metrics::FmtLabels,
proxy::http::ResponseTimeoutError,
proxy::http::{balance::DiscoveryStreamOverflow, ResponseTimeoutError},
};
use std::fmt;

Expand All @@ -19,6 +19,7 @@ enum ErrorKind {
ResponseTimeout,
Unexpected,
LoadShed,
DiscoveryStreamOverflow,
}

// === impl ErrorKind ===
Expand All @@ -35,6 +36,8 @@ impl ErrorKind {
ErrorKind::ResponseTimeout
} else if err.is::<LoadShedError>() {
ErrorKind::LoadShed
} else if err.is::<DiscoveryStreamOverflow>() {
ErrorKind::DiscoveryStreamOverflow
} else if let Some(e) = err.source() {
Self::mk(e)
} else {
Expand All @@ -54,6 +57,7 @@ impl FmtLabels for ErrorKind {
ErrorKind::IdentityRequired => "identity required",
ErrorKind::Io => "i/o",
ErrorKind::ResponseTimeout => "response timeout",
ErrorKind::DiscoveryStreamOverflow => "discovery stream overflow",
ErrorKind::Unexpected => "unexpected",
}
)
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ linkerd-identity = { path = "../../identity" }
linkerd-proxy-client-policy = { path = "../../proxy/client-policy", optional = true }
linkerd-io = { path = "../../io", features = ["tokio-test"] }
parking_lot = "0.12"
pin-project = "1"
regex = "1"
tokio = { version = "1", features = ["io-util", "net", "rt", "sync"] }
tokio-test = "0.4"
Expand Down
22 changes: 18 additions & 4 deletions linkerd/app/test/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::{
};
use std::task::{Context, Poll};
use tokio::sync::{mpsc, watch};
use tokio_stream::wrappers::UnboundedReceiverStream;

#[cfg(feature = "client-policy")]
mod client_policy;
Expand Down Expand Up @@ -62,7 +61,8 @@ struct State<A, E> {
only: AtomicBool,
}

pub type DstReceiver<E> = UnboundedReceiverStream<Result<Update<E>, Error>>;
#[pin_project::pin_project]
pub struct DstReceiver<T>(#[pin] mpsc::UnboundedReceiver<Result<Update<T>, Error>>);

#[derive(Debug)]
pub struct SendFailed(());
Expand Down Expand Up @@ -106,7 +106,7 @@ impl<E> Dst<E> {
self.state
.endpoints
.lock()
.insert(addr.into(), UnboundedReceiverStream::new(rx));
.insert(addr.into(), DstReceiver(rx));
DstSender(tx)
}

Expand Down Expand Up @@ -146,7 +146,7 @@ impl<T: Param<ConcreteAddr>, E> tower::Service<T> for Dst<E> {
self.state.only.store(false, Ordering::Release);
let (tx, rx) = mpsc::unbounded_channel();
let _ = tx.send(Ok(Update::DoesNotExist));
UnboundedReceiverStream::new(rx)
DstReceiver(rx)
});

future::ok(res)
Expand Down Expand Up @@ -304,3 +304,17 @@ impl<T: Param<profiles::LookupAddr>> tower::Service<T> for NoProfiles {
);
}
}

impl<T> futures::Stream for DstReceiver<T> {
type Item = Result<Update<T>, Error>;

fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
match futures::ready!(this.0.poll_recv(cx)) {
Some(item) => Poll::Ready(Some(item)),
// If the stream terminates, the balancer will error, so we simply
// stop updating when the sender is closed.
None => Poll::Pending,
}
}
}
1 change: 1 addition & 0 deletions linkerd/proxy/balance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ linkerd-proxy-core = { path = "../core" }
linkerd-stack = { path = "../../stack" }
pin-project = "1"
rand = "0.8"
thiserror = "1"
tokio = { version = "1", features = ["rt", "sync", "time"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-util = "0.7"
Expand Down
103 changes: 80 additions & 23 deletions linkerd/proxy/balance/src/discover.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,88 @@
use futures::prelude::*;
use linkerd_proxy_core::Resolve;
use futures::{ready, Stream};
use linkerd_error::Error;
use linkerd_stack::NewService;
use std::net::SocketAddr;
use pin_project::pin_project;
use std::{
fmt::Debug,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use tower::discover::Change;
use tracing::{debug, info};

pub mod buffer;
pub mod from_resolve;
pub mod new;
pub(super) mod buffer;
mod from_resolve;

pub use self::{from_resolve::FromResolve, new::DiscoverNew};
pub(super) use self::from_resolve::FromResolve;

pub type Buffer<S> = buffer::Buffer<SocketAddr, S>;
/// Prepares a discovery update for the balancer by turning it into a Service.
#[pin_project]
pub struct NewServices<T, N> {
buffer: buffer::Buffer<SocketAddr, T>,
new: N,
}

#[derive(Debug, thiserror::Error)]
#[error("discovery stream lost")]
pub struct DiscoveryStreamOverflow(());

// === impl NewServices ===

impl<T, N> NewServices<T, N> {
pub(crate) fn new(buffer: buffer::Buffer<SocketAddr, T>, new: N) -> Self {
Self { buffer, new }
}
}

pub(crate) fn spawn_new_from_resolve<T, R, M, N>(
capacity: usize,
resolve: R,
new_service: M,
target: T,
) -> Buffer<N::Service>
impl<T, N> Stream for NewServices<T, N>
where
T: Clone,
R: Resolve<T>,
M: NewService<T, Service = N>,
N: NewService<(SocketAddr, R::Endpoint)> + Send + 'static,
N::Service: Send,
T: Clone + Debug,
N: NewService<(SocketAddr, T)>,
{
let new_endpoint = new_service.new_service(target.clone());
let resolution = resolve.resolve(target).try_flatten_stream();
let disco = DiscoverNew::new(FromResolve::new(resolution), new_endpoint);
buffer::spawn(capacity, disco)
type Item = Result<Change<SocketAddr, N::Service>, Error>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Change<SocketAddr, N::Service>, Error>>> {
let this = self.as_mut().project();

// If the buffer has lost the ability to process new discovery updates
// from its resolution, blow up the entire balancer without processing
// further updates from the channel.
if this
.buffer
.overflow
.load(std::sync::atomic::Ordering::Acquire)
{
return Poll::Ready(Some(Err(DiscoveryStreamOverflow(()).into())));
};

// Process any buffered updates.
//
// When an error is received from the buffer, that error is considered
// fatal, so this stream should not be polled again.
let change_tgt = match ready!(this.buffer.rx.poll_recv(cx)) {
Some(Ok(c)) => c,
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => return Poll::Ready(Some(Err(DiscoveryStreamOverflow(()).into()))),
olix0r marked this conversation as resolved.
Show resolved Hide resolved
};

// Build a new service for the endpoint and log the change at INFO.
let change_svc = match change_tgt {
Change::Insert(addr, target) => {
info!(endpoint.addr = %addr, "Adding endpoint to service");
debug!(endpoint.target = ?target);
let svc = this.new.new_service((addr, target));
Change::Insert(addr, svc)
}
Change::Remove(addr) => {
info!(endpoint.addr = %addr, "Removing endpoint from service");
Change::Remove(addr)
}
};

Poll::Ready(Some(Ok(change_svc)))
}
}
39 changes: 21 additions & 18 deletions linkerd/proxy/balance/src/discover/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use futures_util::future::poll_fn;
use linkerd_error::Error;
use std::sync::{atomic::AtomicBool, Arc};
use tokio::sync::mpsc;
use tower::discover;
use tracing::{debug, debug_span, instrument::Instrument, trace, warn};

pub type Result<K, S> = std::result::Result<discover::Change<K, S>, Error>;
pub type Buffer<K, S> = tokio_stream::wrappers::ReceiverStream<Result<K, S>>;

pub struct Buffer<K, S> {
pub(super) rx: mpsc::Receiver<Result<K, S>>,
pub(super) overflow: Arc<AtomicBool>,
}

pub fn spawn<D>(capacity: usize, inner: D) -> Buffer<D::Key, D::Service>
where
Expand Down Expand Up @@ -39,8 +44,11 @@ where
}
};

let overflow = Arc::new(AtomicBool::new(false));

debug!(%capacity, "Spawning discovery buffer");
tokio::spawn(
tokio::spawn({
let overflow = overflow.clone();
async move {
tokio::pin!(inner);

Expand All @@ -60,19 +68,14 @@ where
Some(Ok(change)) => {
trace!("Changed");
if !send(&tx, Ok(change)) {
// XXX(ver) We don't actually have a way to "blow
// up" the balancer in this situation. My
// understanding is that this will cause the
// balancer to get cut off from further updates,
// should it ever become available again. That needs
// to be fixed.
//
// One option would be to drop the discovery stream
// and rebuild it if the balancer ever becomes
// unblocked.
//
// Ultimately we need to track down how we're
// getting into this blocked/idle state
// Tell the outer discovery stream that we've
// stopped processing updates. We assume that the
// consumer is stuck in a _ready_ state (i.e.
// waiting for requests), otherwise it would be
// consuming from the channel. We flip the overflow
// state so that the consumer can check it and
// return an error if it ever tries to poll again.
overflow.store(true, std::sync::atomic::Ordering::Release);
return;
}
}
Expand All @@ -90,8 +93,8 @@ where
}
}
.in_current_span()
.instrument(debug_span!("discover")),
);
.instrument(debug_span!("discover"))
});

Buffer::new(rx)
Buffer { rx, overflow }
}
2 changes: 1 addition & 1 deletion linkerd/proxy/balance/src/discover/from_resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct FromResolve<T, R: TryStream> {
// === impl FromResolve ===

impl<T, R: TryStream> FromResolve<T, R> {
pub(super) fn new(resolution: R) -> Self {
pub(crate) fn new(resolution: R) -> Self {
Self {
resolution,
active: IndexMap::default(),
Expand Down
54 changes: 0 additions & 54 deletions linkerd/proxy/balance/src/discover/new.rs

This file was deleted.

Loading