Skip to content

Commit

Permalink
feat(outbound): Add per-route stream timeout policies
Browse files Browse the repository at this point in the history
This change adds a new timeout configuration to the outbound HTTP and gRPC
policies that sets request extensions to configure timeout behavior in the
endpoint stack. This ensures that timeouts manifest as endpoint-level errors
(i.e. visible to endpoint-level circuit breakers) instead of as a route-level
cancellation.

Several timeouts are implemented:

1. A request timeout, which is the maximum time a full request-response stream
   can remain in the proxy.
2. A response headers timeout that limits the amount of time after the request
   has been fully sent in which response headers must be fully received. This
   will be used to implement retry timeouts.
3. A response timeout, which limits the amount of time after the request has
   been fully sent in which a response may be in flight.
4. An idle timeout that limits the time a request-response stream may be sit in
   the proxy without any DATA frame activity.

NOTE: As this timeout is enforced in the endpoint stack, it is currently
possible for requests to remain in the balancer queue until a failfast timeout
is tripped. The configured timeouts only apply to requests that have been
dequeued and dispatched to an endpoint.

NOTE: A followup API change is required to support configuring all timeouts via
the API.
  • Loading branch information
olix0r committed Jul 24, 2024
1 parent f175c06 commit a1f68b3
Show file tree
Hide file tree
Showing 30 changed files with 1,312 additions and 396 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1882,6 +1882,7 @@ dependencies = [
"linkerd-proxy-balance",
"linkerd-stack",
"linkerd-tracing",
"parking_lot",
"pin-project",
"rand",
"thiserror",
Expand Down
10 changes: 10 additions & 0 deletions linkerd/app/core/src/errors/respond.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ impl SyntheticHttpResponse {
}
}

pub fn gateway_timeout_nonfatal(msg: impl ToString) -> Self {
Self {
close_connection: false,
http_status: http::StatusCode::GATEWAY_TIMEOUT,
grpc_status: tonic::Code::Unavailable,
message: Cow::Owned(msg.to_string()),
location: None,
}
}

pub fn unavailable(msg: impl ToString) -> Self {
Self {
close_connection: true,
Expand Down
8 changes: 4 additions & 4 deletions linkerd/app/integration/src/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ pub fn outbound_default(dst: impl ToString) -> outbound::OutboundPolicy {
timeout: Some(Duration::from_secs(10).try_into().unwrap()),
http1: Some(proxy_protocol::Http1 {
routes: vec![route.clone()],
failure_accrual: None,
..Default::default()
}),
http2: Some(proxy_protocol::Http2 {
routes: vec![route],
failure_accrual: None,
..Default::default()
}),
opaque: Some(proxy_protocol::Opaque {
routes: vec![outbound_default_opaque_route(dst)],
Expand Down Expand Up @@ -150,7 +150,7 @@ pub fn outbound_default_http_route(dst: impl ToString) -> outbound::HttpRoute {
}],
filters: Vec::new(),
backends: Some(http_first_available(std::iter::once(backend(dst)))),
request_timeout: None,
..Default::default()
}],
}
}
Expand Down Expand Up @@ -214,7 +214,7 @@ pub fn http_first_available(
.map(|backend| http_route::RouteBackend {
backend: Some(backend),
filters: Vec::new(),
request_timeout: None,
..Default::default()
})
.collect(),
},
Expand Down
25 changes: 12 additions & 13 deletions linkerd/app/integration/src/tests/client_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ async fn empty_http1_route() {
hosts: Vec::new(),
rules: Vec::new(),
}],
failure_accrual: None,
..Default::default()
}),
http2: Some(proxy_protocol::Http2 {
routes: vec![policy::outbound_default_http_route(&dst)],
failure_accrual: None,
..Default::default()
}),
opaque: Some(proxy_protocol::Opaque {
routes: vec![policy::outbound_default_opaque_route(&dst)],
Expand Down Expand Up @@ -148,15 +148,15 @@ async fn empty_http2_route() {
timeout: Some(Duration::from_secs(10).try_into().unwrap()),
http1: Some(proxy_protocol::Http1 {
routes: vec![policy::outbound_default_http_route(&dst)],
failure_accrual: None,
..Default::default()
}),
http2: Some(proxy_protocol::Http2 {
routes: vec![outbound::HttpRoute {
metadata: Some(httproute_meta("empty")),
hosts: Vec::new(),
rules: Vec::new(),
}],
failure_accrual: None,
..Default::default()
}),
opaque: Some(proxy_protocol::Opaque {
routes: vec![policy::outbound_default_opaque_route(&dst)],
Expand Down Expand Up @@ -223,7 +223,7 @@ async fn header_based_routing() {
backends: Some(policy::http_first_available(std::iter::once(
policy::backend(dst),
))),
request_timeout: None,
..Default::default()
};

let route = outbound::HttpRoute {
Expand All @@ -237,7 +237,7 @@ async fn header_based_routing() {
backends: Some(policy::http_first_available(std::iter::once(
policy::backend(&dst_world),
))),
request_timeout: None,
..Default::default()
},
// x-hello-city: sf | x-hello-city: san francisco
mk_header_rule(
Expand Down Expand Up @@ -266,11 +266,11 @@ async fn header_based_routing() {
timeout: Some(Duration::from_secs(10).try_into().unwrap()),
http1: Some(proxy_protocol::Http1 {
routes: vec![route.clone()],
failure_accrual: None,
..Default::default()
}),
http2: Some(proxy_protocol::Http2 {
routes: vec![route],
failure_accrual: None,
..Default::default()
}),
opaque: Some(proxy_protocol::Opaque {
routes: vec![policy::outbound_default_opaque_route(&dst_world)],
Expand Down Expand Up @@ -400,8 +400,7 @@ async fn path_based_routing() {
backends: Some(policy::http_first_available(std::iter::once(
policy::backend(dst),
))),

request_timeout: None,
..Default::default()
};

let route = outbound::HttpRoute {
Expand All @@ -415,7 +414,7 @@ async fn path_based_routing() {
backends: Some(policy::http_first_available(std::iter::once(
policy::backend(&dst_world),
))),
request_timeout: None,
..Default::default()
},
// /goodbye/*
mk_path_rule(
Expand Down Expand Up @@ -449,11 +448,11 @@ async fn path_based_routing() {
timeout: Some(Duration::from_secs(10).try_into().unwrap()),
http1: Some(proxy_protocol::Http1 {
routes: vec![route.clone()],
failure_accrual: None,
..Default::default()
}),
http2: Some(proxy_protocol::Http2 {
routes: vec![route],
failure_accrual: None,
..Default::default()
}),
opaque: Some(proxy_protocol::Opaque {
routes: vec![policy::outbound_default_opaque_route(&dst_world)],
Expand Down
10 changes: 5 additions & 5 deletions linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ futures = { version = "0.3", default-features = false }
linkerd2-proxy-api = { version = "0.13", features = ["outbound"] }
once_cell = "1"
parking_lot = "0.12"
pin-project = "1"
prometheus-client = "0.22"
thiserror = "1"
tokio = { version = "1", features = ["sync"] }
tonic = { version = "0.10", default-features = false }
tower = { version = "0.4", features = ["util"] }
tracing = "0.1"
pin-project = "1"

linkerd-app-core = { path = "../core" }
linkerd-app-test = { path = "../test", optional = true }
Expand All @@ -49,6 +49,10 @@ linkerd-tonic-watch = { path = "../../tonic-watch" }

[dev-dependencies]
hyper = { version = "0.14", features = ["http1", "http2"] }
tokio = { version = "1", features = ["macros", "sync", "time"] }
tokio-test = "0.4"
tower-test = "0.4"

linkerd-app-test = { path = "../test", features = ["client-policy"] }
linkerd-http-prom = { path = "../../http/prom", features = ["test-util"] }
linkerd-io = { path = "../../io", features = ["tokio-test"] }
Expand All @@ -58,7 +62,3 @@ linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = [
] }
linkerd-stack = { path = "../../stack", features = ["test-util"] }
linkerd-tracing = { path = "../../tracing", features = ["ansi"] }
parking_lot = "0.12"
tokio = { version = "1", features = ["macros", "sync", "time"] }
tokio-test = "0.4"
tower-test = "0.4"
8 changes: 2 additions & 6 deletions linkerd/app/outbound/src/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,11 @@ fn policy_for_backend(
policy: Some(policy::opaq::Policy {
meta: meta.clone(),
filters: NO_OPAQ_FILTERS.clone(),
failure_policy: Default::default(),
request_timeout: None,
params: Default::default(),
distribution: policy::RouteDistribution::FirstAvailable(Arc::new([
policy::RouteBackend {
filters: NO_OPAQ_FILTERS.clone(),
backend: backend.clone(),
request_timeout: None,
},
])),
}),
Expand All @@ -256,13 +254,11 @@ fn policy_for_backend(
policy: policy::http::Policy {
meta: meta.clone(),
filters: NO_HTTP_FILTERS.clone(),
failure_policy: Default::default(),
request_timeout: None,
params: Default::default(),
distribution: policy::RouteDistribution::FirstAvailable(Arc::new([
policy::RouteBackend {
filters: NO_HTTP_FILTERS.clone(),
backend: backend.clone(),
request_timeout: None,
},
])),
},
Expand Down
3 changes: 2 additions & 1 deletion linkerd/app/outbound/src/http/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,13 @@ impl<T> Outbound<svc::ArcNewHttp<T, http::BoxBody>> {
// is only done when the `Closable` parameter is set to true.
// This module always strips error headers from responses.
.push(NewHandleProxyErrorHeaders::layer())
.push_on_service(http::BoxRequest::layer())
.push_on_service(http::EnforceTimeouts::layer())
// Handle connection-level errors eagerly so that we can report 5XX failures in tap
// and metrics. HTTP error metrics are not incremented here so that errors are not
// double-counted--i.e., endpoint metrics track these responses and error metrics
// track proxy errors that occur higher in the stack.
.push(ClientRescue::layer(config.emit_headers))
.push_on_service(http::BoxRequest::layer())
.push(tap::NewTapHttp::layer(rt.tap.clone()))
.push(
rt.metrics
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/outbound/src/http/logical/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ mod route;
mod router;
#[cfg(test)]
mod tests;
#[cfg(test)]
pub use self::route::metrics::test_util::MockBody;

pub use self::{
route::{errors, GrpcRouteMetrics, HttpRouteMetrics},
Expand Down
37 changes: 29 additions & 8 deletions linkerd/app/outbound/src/http/logical/policy/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use linkerd_proxy_client_policy as policy;
use std::{fmt::Debug, hash::Hash, sync::Arc};

pub(crate) mod backend;
pub(crate) mod extensions;
pub(crate) mod filters;
pub(crate) mod metrics;

Expand All @@ -27,28 +28,28 @@ pub(crate) struct Matched<M, P> {
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub(crate) struct Route<T, F, E> {
pub(crate) struct Route<T, F, P> {
pub(super) parent: T,
pub(super) addr: Addr,
pub(super) parent_ref: ParentRef,
pub(super) route_ref: RouteRef,
pub(super) filters: Arc<[F]>,
pub(super) distribution: BackendDistribution<T, F>,
pub(super) failure_policy: E,
pub(super) params: P,
}

pub(crate) type MatchedRoute<T, M, F, E> = Matched<M, Route<T, F, E>>;
pub(crate) type MatchedRoute<T, M, F, P> = Matched<M, Route<T, F, P>>;
pub(crate) type Http<T> = MatchedRoute<
T,
http_route::http::r#match::RequestMatch,
policy::http::Filter,
policy::http::StatusRanges,
policy::http::RouteParams,
>;
pub(crate) type Grpc<T> = MatchedRoute<
T,
http_route::grpc::r#match::RouteMatch,
policy::grpc::Filter,
policy::grpc::Codes,
policy::grpc::RouteParams,
>;

pub(crate) type BackendDistribution<T, F> = distribute::Distribution<Backend<T, F>>;
Expand All @@ -70,7 +71,7 @@ struct RouteError {

// === impl MatchedRoute ===

impl<T, M, F, E> MatchedRoute<T, M, F, E>
impl<T, M, F, P> MatchedRoute<T, M, F, P>
where
// Parent target.
T: Debug + Eq + Hash,
Expand All @@ -80,11 +81,12 @@ where
// Request filter.
F: Debug + Eq + Hash,
F: Clone + Send + Sync + 'static,
// Failure policy.
E: Clone + Send + Sync + 'static,
// Route params.
P: Clone + Send + Sync + 'static,
// Assert that filters can be applied.
Self: filters::Apply,
Self: svc::Param<classify::Request>,
Self: svc::Param<extensions::Params>,
Self: metrics::MkStreamLabel,
MatchedBackend<T, M, F>: filters::Apply,
MatchedBackend<T, M, F>: metrics::MkStreamLabel,
Expand Down Expand Up @@ -119,6 +121,9 @@ where
// leaking tasks onto the runtime.
.push_on_service(svc::LoadShed::layer())
.push(filters::NewApplyFilters::<Self, _, _>::layer())
// Set request extensions based on the route configuration
// AND/OR headers
.push(extensions::NewSetExtensions::layer())
.push(metrics::layer(&metrics.requests))
// Configure a classifier to use in the endpoint stack.
// FIXME(ver) move this into NewSetExtensions
Expand Down Expand Up @@ -179,6 +184,14 @@ impl<T> metrics::MkStreamLabel for Http<T> {
}
}

impl<T> svc::Param<extensions::Params> for Http<T> {
fn param(&self) -> extensions::Params {
extensions::Params {
timeouts: self.params.params.timeouts.clone(),
}
}
}

impl<T> svc::Param<classify::Request> for Http<T> {
fn param(&self) -> classify::Request {
classify::Request::ClientPolicy(classify::ClientPolicy::Http(
Expand Down Expand Up @@ -215,6 +228,14 @@ impl<T> metrics::MkStreamLabel for Grpc<T> {
}
}

impl<T> svc::Param<extensions::Params> for Grpc<T> {
fn param(&self) -> extensions::Params {
extensions::Params {
timeouts: self.params.params.timeouts.clone(),
}
}
}

impl<T> svc::Param<classify::Request> for Grpc<T> {
fn param(&self) -> classify::Request {
classify::Request::ClientPolicy(
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/http/logical/policy/route/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ impl<T: Clone, F> Clone for Backend<T, F> {

// === impl Matched ===

impl<M, T, F, E> From<(Backend<T, F>, super::MatchedRoute<T, M, F, E>)>
impl<M, T, F, P> From<(Backend<T, F>, super::MatchedRoute<T, M, F, P>)>
for MatchedBackend<T, M, F>
{
fn from((params, route): (Backend<T, F>, super::MatchedRoute<T, M, F, E>)) -> Self {
fn from((params, route): (Backend<T, F>, super::MatchedRoute<T, M, F, P>)) -> Self {
MatchedBackend {
r#match: route.r#match,
params,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,8 @@ fn mock_http_route_backend_metrics(
policy: policy::http::Policy {
meta: route_ref.0.clone(),
filters: [].into(),
request_timeout: None,
failure_policy: Default::default(),
distribution: policy::RouteDistribution::Empty,
params: Default::default(),
},
}],
}],
Expand Down Expand Up @@ -353,9 +352,8 @@ fn mock_grpc_route_backend_metrics(
policy: policy::grpc::Policy {
meta: route_ref.0.clone(),
filters: [].into(),
request_timeout: None,
failure_policy: Default::default(),
distribution: policy::RouteDistribution::Empty,
params: Default::default(),
},
}],
}],
Expand Down
Loading

0 comments on commit a1f68b3

Please sign in to comment.