Skip to content

Commit

Permalink
feat(outbound)!: Remove HTTP route timeouts (#3087)
Browse files Browse the repository at this point in the history
In prepration to instrument timeouts in the endpoint stack, this change removes
the existing timeout enforcement in the HTTP router. This parameter could only
be configured with the older policy.linkerd.io resources (and not the
gateway.networking.k8s.io resources).

This timeout that is removed has a few problems:

1. It does not implement the proper 'request timeout' semantics, as described by
   the Gateway API spec. Instead, it enforces a timeout on on the response
   headers. There is no bounds on the lifetime of a response stream, for
   instance.
2. These errors are difficult to observe because they manifest as cancelations.

A follow-up change will replacae this with route configuration that enables
endpoint-level HTTP errors.
  • Loading branch information
olix0r authored Jul 24, 2024
1 parent 3343c08 commit d591271
Show file tree
Hide file tree
Showing 6 changed files with 3 additions and 106 deletions.
8 changes: 0 additions & 8 deletions linkerd/app/outbound/src/http/logical/policy/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub(crate) struct Route<T, F, E> {
pub(super) filters: Arc<[F]>,
pub(super) distribution: BackendDistribution<T, F>,
pub(super) failure_policy: E,
pub(super) request_timeout: Option<std::time::Duration>,
}

pub(crate) type MatchedRoute<T, M, F, E> = Matched<M, Route<T, F, E>>;
Expand Down Expand Up @@ -120,7 +119,6 @@ where
// leaking tasks onto the runtime.
.push_on_service(svc::LoadShed::layer())
.push(filters::NewApplyFilters::<Self, _, _>::layer())
.push(http::NewTimeout::layer())
.push(metrics::layer(&metrics.requests))
// Configure a classifier to use in the endpoint stack.
// FIXME(ver) move this into NewSetExtensions
Expand Down Expand Up @@ -153,12 +151,6 @@ impl<T: Clone, M, F, P> svc::Param<RouteLabels> for MatchedRoute<T, M, F, P> {
}
}

impl<T, M, F, P> svc::Param<http::timeout::ResponseTimeout> for MatchedRoute<T, M, F, P> {
fn param(&self) -> http::timeout::ResponseTimeout {
http::timeout::ResponseTimeout(self.params.request_timeout)
}
}

// === impl Http ===

impl<T> filters::Apply for Http<T> {
Expand Down
9 changes: 0 additions & 9 deletions linkerd/app/outbound/src/http/logical/policy/route/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub(crate) struct Backend<T, F> {
pub(crate) route_ref: RouteRef,
pub(crate) concrete: Concrete<T>,
pub(crate) filters: Arc<[F]>,
pub(crate) request_timeout: Option<std::time::Duration>,
}

pub(crate) type MatchedBackend<T, M, F> = super::Matched<M, Backend<T, F>>;
Expand Down Expand Up @@ -41,7 +40,6 @@ impl<T: Clone, F> Clone for Backend<T, F> {
route_ref: self.route_ref.clone(),
filters: self.filters.clone(),
concrete: self.concrete.clone(),
request_timeout: self.request_timeout,
}
}
}
Expand Down Expand Up @@ -101,7 +99,6 @@ where
}| concrete,
)
.push(filters::NewApplyFilters::<Self, _, _>::layer())
.push(http::NewTimeout::layer())
.push(metrics::layer(&metrics))
.push(svc::NewMapErr::layer_with(|t: &Self| {
let backend = t.params.concrete.backend_ref.clone();
Expand All @@ -118,12 +115,6 @@ where
}
}

impl<T, M, F> svc::Param<http::ResponseTimeout> for MatchedBackend<T, M, F> {
fn param(&self) -> http::ResponseTimeout {
http::ResponseTimeout(self.params.request_timeout)
}
}

impl<T, M, F> svc::Param<ParentRef> for MatchedBackend<T, M, F> {
fn param(&self) -> ParentRef {
self.params.concrete.parent_ref.clone()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,6 @@ fn mock_http_route_backend_metrics(
r#match,
params: Backend {
route_ref: route_ref.clone(),
request_timeout: None,
filters: [].into(),
concrete: Concrete {
target: concrete::Dispatch::Forward(
Expand Down Expand Up @@ -371,7 +370,6 @@ fn mock_grpc_route_backend_metrics(
r#match,
params: Backend {
route_ref: route_ref.clone(),
request_timeout: None,
filters: [].into(),
concrete: Concrete {
target: concrete::Dispatch::Forward(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ pub fn mock_http_route_metrics(
parent_ref: parent_ref.clone(),
route_ref: route_ref.clone(),
failure_policy: Default::default(),
request_timeout: None,
filters: [].into(),
distribution: Default::default(),
},
Expand Down Expand Up @@ -335,7 +334,6 @@ pub fn mock_grpc_route_metrics(
parent_ref: parent_ref.clone(),
route_ref: route_ref.clone(),
failure_policy: Default::default(),
request_timeout: None,
filters: [].into(),
distribution: Default::default(),
},
Expand Down
4 changes: 1 addition & 3 deletions linkerd/app/outbound/src/http/logical/policy/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ where
route_ref: route_ref.clone(),
filters,
concrete,
request_timeout: rb.request_timeout,
}
};

Expand Down Expand Up @@ -201,7 +200,7 @@ where
filters,
distribution,
failure_policy,
request_timeout,
request_timeout: _,
}| {
let route_ref = RouteRef(meta);
let distribution = mk_distribution(&route_ref, &distribution);
Expand All @@ -213,7 +212,6 @@ where
filters,
distribution,
failure_policy,
request_timeout,
}
}
};
Expand Down
84 changes: 2 additions & 82 deletions linkerd/app/outbound/src/http/logical/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,9 @@ async fn balancer_doesnt_select_tripped_breakers() {
}
}

// XXX(ver): Route request timeouts will be reintroduced.
#[tokio::test(flavor = "current_thread")]
#[ignore]
async fn route_request_timeout() {
tokio::time::pause();
let _trace = trace::test::trace_init();
Expand Down Expand Up @@ -368,88 +370,6 @@ async fn route_request_timeout() {
));
}

#[tokio::test(flavor = "current_thread")]
async fn backend_request_timeout() {
tokio::time::pause();
let _trace = trace::test::trace_init();
// must be less than the `default_config` failfast timeout, or we'll hit
// that instead.
const ROUTE_REQUEST_TIMEOUT: Duration = std::time::Duration::from_secs(2);
const BACKEND_REQUEST_TIMEOUT: Duration = std::time::Duration::from_secs(1);

let addr = SocketAddr::new([192, 0, 2, 41].into(), PORT);
let dest: NameAddr = format!("{AUTHORITY}:{PORT}")
.parse::<NameAddr>()
.expect("dest addr is valid");
let (svc, mut handle) = tower_test::mock::pair();
let connect = HttpConnect::default().service(addr, svc);
let resolve = support::resolver().endpoint_exists(dest.clone(), addr, Default::default());
let (rt, _shutdown) = runtime();
let stack = Outbound::new(default_config(), rt, &mut Default::default())
.with_stack(svc::ArcNewService::new(connect))
.push_http_cached(resolve)
.into_inner();

let (_route_tx, routes) = {
let backend = default_backend(&dest);
// Set both a route request timeout and a backend request timeout.
let route = timeout_route(
backend.clone(),
Some(ROUTE_REQUEST_TIMEOUT),
Some(BACKEND_REQUEST_TIMEOUT),
);
watch::channel(Routes::Policy(policy::Params::Http(policy::HttpParams {
addr: dest.into(),
meta: ParentRef(client_policy::Meta::new_default("parent")),
backends: Arc::new([backend]),
routes: Arc::new([route]),
failure_accrual: client_policy::FailureAccrual::None,
})))
};
let target = Target {
num: 1,
version: http::Version::H2,
routes,
};
let svc = stack.new_service(target);

handle.allow(1);
let rsp = send_req(svc.clone(), http::Request::get("/"));
serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")).await;
assert_eq!(
rsp.await.expect("request must succeed").status(),
http::StatusCode::OK
);

// Now, time out...
let rsp = send_req(svc.clone(), http::Request::get("/"));
// Wait until we actually get the request --- this timeout only starts once
// the service has been acquired.
handle.allow(1);
let (_, send_rsp) = handle
.next_request()
.await
.expect("service must receive request");
tokio::time::sleep(BACKEND_REQUEST_TIMEOUT + Duration::from_millis(1)).await;
// Still send a response, so that if we didn't hit the backend timeout
// timeout, we don't hit the route timeout and succeed incorrectly.
send_rsp.send_response(mk_rsp(StatusCode::OK, "good"));
let error = rsp.await.expect_err("request must fail with a timeout");
assert!(errors::is_caused_by::<http::timeout::ResponseTimeoutError>(
error.as_ref()
));

// The route request timeout should still apply to time spent before
// the backend is acquired.
let rsp = send_req(svc.clone(), http::Request::get("/"));
tokio::time::sleep(ROUTE_REQUEST_TIMEOUT + Duration::from_millis(1)).await;
handle.allow(1);
let error = rsp.await.expect_err("request must fail with a timeout");
assert!(errors::is_caused_by::<http::timeout::ResponseTimeoutError>(
error.as_ref()
));
}

#[derive(Clone, Debug)]
struct Target {
num: usize,
Expand Down

0 comments on commit d591271

Please sign in to comment.