From 22f230255d32cddf6e08295880a1abd69864a48d Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 5 Feb 2024 22:33:33 +0000 Subject: [PATCH] http: Parameterize NewServeHttp We plan to add defensive timeouts to the HTTP server to limit idle streams and connections. In preparation for this, we need to parameterize the server to accept these additonal configurations. This change updates NewServeHttp to use an ExtractParam to build a Params struct for each server. In follow-up changes, the timeout configuration will be instrumented through this Params struct. There are no functional changes in this commit. --- linkerd/app/admin/src/stack.rs | 11 ++- linkerd/app/inbound/src/http/server.rs | 14 ++-- linkerd/app/outbound/src/http/server.rs | 44 ++++++++--- linkerd/app/outbound/src/ingress.rs | 21 +++-- linkerd/app/outbound/src/protocol.rs | 13 ++- linkerd/proxy/http/src/lib.rs | 2 +- linkerd/proxy/http/src/server.rs | 101 ++++++++++++------------ 7 files changed, 120 insertions(+), 86 deletions(-) diff --git a/linkerd/app/admin/src/stack.rs b/linkerd/app/admin/src/stack.rs index 97514ae409..3c4874afd9 100644 --- a/linkerd/app/admin/src/stack.rs +++ b/linkerd/app/admin/src/stack.rs @@ -123,7 +123,16 @@ impl Config { let tcp = http .unlift_new() - .push(http::NewServeHttp::layer(Default::default(), drain.clone())) + .push(http::NewServeHttp::layer({ + let drain = drain.clone(); + move |t: &Http| { + http::ServerParams { + version: t.version, + h2: Default::default(), + drain: drain.clone(), + } + } + })) .push_filter( |(http, tcp): ( Result, detect::DetectTimeoutError<_>>, diff --git a/linkerd/app/inbound/src/http/server.rs b/linkerd/app/inbound/src/http/server.rs index fef0173ff3..4d14fffeb1 100644 --- a/linkerd/app/inbound/src/http/server.rs +++ b/linkerd/app/inbound/src/http/server.rs @@ -2,7 +2,7 @@ use super::set_identity_header::NewSetIdentityHeader; use crate::{policy, Inbound}; pub use linkerd_app_core::proxy::http::{normalize_uri, Version}; use linkerd_app_core::{ - config::{ProxyConfig, ServerConfig}, + config::ProxyConfig, errors, http_tracing, io, metrics::ServerLabel, proxy::http, @@ -112,16 +112,18 @@ impl Inbound { HSvc::Future: Send, { self.map_stack(|config, rt, http| { - let ProxyConfig { - server: ServerConfig { h2_settings, .. }, - .. - } = config.proxy; + let h2 = config.proxy.server.h2_settings; + let drain = rt.drain.clone(); http.push_on_service(http::BoxRequest::layer()) .check_new_service::>() .unlift_new() .check_new_new_service::>() - .push(http::NewServeHttp::layer(h2_settings, rt.drain.clone())) + .push(http::NewServeHttp::layer(move |t: &T| http::ServerParams { + version: t.param(), + h2, + drain: drain.clone(), + })) .check_new_service::() .arc_new_tcp() }) diff --git a/linkerd/app/outbound/src/http/server.rs b/linkerd/app/outbound/src/http/server.rs index 3777942104..e89b678694 100644 --- a/linkerd/app/outbound/src/http/server.rs +++ b/linkerd/app/outbound/src/http/server.rs @@ -1,16 +1,18 @@ use super::IdentityRequired; use crate::{http, trace_labels, Outbound}; -use linkerd_app_core::{ - errors, http_tracing, io, - svc::{self, ExtractParam}, - Error, Result, -}; +use linkerd_app_core::{drain, errors, http_tracing, io, svc, Error, Result}; #[derive(Copy, Clone, Debug)] pub(crate) struct ServerRescue { emit_headers: bool, } +#[derive(Clone, Debug)] +pub struct ExtractServerParams { + h2: http::h2::Settings, + drain: drain::Watch, +} + impl Outbound> { /// Builds a [`svc::NewService`] stack that prepares HTTP requests to be /// proxied. @@ -61,7 +63,9 @@ impl Outbound> { impl Outbound { pub fn push_tcp_http_server( self, - ) -> Outbound>>> + ) -> Outbound< + http::NewServeHttp>>, + > where // Target T: svc::Param, @@ -81,10 +85,10 @@ impl Outbound { self.map_stack(|config, rt, http| { http.unlift_new() .push(svc::ArcNewService::layer()) - .push(http::NewServeHttp::layer( - config.proxy.server.h2_settings, - rt.drain.clone(), - )) + .push(http::NewServeHttp::layer(ExtractServerParams { + h2: config.proxy.server.h2_settings, + drain: rt.drain.clone(), + })) }) } } @@ -99,14 +103,14 @@ impl ServerRescue { } } -impl ExtractParam for ServerRescue { +impl svc::ExtractParam for ServerRescue { #[inline] fn extract_param(&self, _: &T) -> Self { *self } } -impl ExtractParam for ServerRescue { +impl svc::ExtractParam for ServerRescue { #[inline] fn extract_param(&self, _: &T) -> errors::respond::EmitHeaders { errors::respond::EmitHeaders(self.emit_headers) @@ -179,3 +183,19 @@ impl errors::HttpRescue for ServerRescue { Ok(errors::SyntheticHttpResponse::unexpected_error()) } } + +// === impl ExtractServerParams === + +impl svc::ExtractParam for ExtractServerParams +where + T: svc::Param, +{ + #[inline] + fn extract_param(&self, t: &T) -> http::ServerParams { + http::ServerParams { + version: t.param(), + h2: self.h2, + drain: self.drain.clone(), + } + } +} diff --git a/linkerd/app/outbound/src/ingress.rs b/linkerd/app/outbound/src/ingress.rs index 057879c86b..60ea463cf0 100644 --- a/linkerd/app/outbound/src/ingress.rs +++ b/linkerd/app/outbound/src/ingress.rs @@ -1,6 +1,5 @@ -use crate::{http, opaq, policy, Config, Discovery, Outbound, ParentRef}; +use crate::{http, opaq, policy, Discovery, Outbound, ParentRef}; use linkerd_app_core::{ - config::{ProxyConfig, ServerConfig}, detect, errors, io, metrics::prom, profiles, @@ -278,14 +277,6 @@ impl Outbound { { self.map_stack(|config, rt, inner| { let detect_http = config.proxy.detect_http(); - let Config { - proxy: - ProxyConfig { - server: ServerConfig { h2_settings, .. }, - .. - }, - .. - } = config; // Route requests with destinations that can be discovered via the // `l5d-dst-override` header through the (load balanced) logical @@ -314,7 +305,15 @@ impl Outbound { // destination address. http.check_new_service::, http::Request<_>>() .unlift_new() - .push(http::NewServeHttp::layer(*h2_settings, rt.drain.clone())) + .push(http::NewServeHttp::layer({ + let h2 = config.proxy.server.h2_settings; + let drain = rt.drain.clone(); + move |http: &Http| http::ServerParams { + version: http.version, + h2, + drain: drain.clone() + } + })) .check_new_service::, I>() .push_switch( |(detected, target): (detect::Result, T)| -> Result<_, Infallible> { diff --git a/linkerd/app/outbound/src/protocol.rs b/linkerd/app/outbound/src/protocol.rs index 076a1662ab..abe60d0a81 100644 --- a/linkerd/app/outbound/src/protocol.rs +++ b/linkerd/app/outbound/src/protocol.rs @@ -47,12 +47,17 @@ impl Outbound { let opaq = self.clone().into_stack(); let http = self.with_stack(http).map_stack(|config, rt, stk| { + let h2 = config.proxy.server.h2_settings; + let drain = rt.drain.clone(); stk.push_on_service(http::BoxRequest::layer()) .unlift_new() - .push(http::NewServeHttp::layer( - config.proxy.server.h2_settings, - rt.drain.clone(), - )) + .push(http::NewServeHttp::layer(move |t: &Http| { + http::ServerParams { + version: t.version, + h2, + drain: drain.clone(), + } + })) .arc_new_tcp() }); diff --git a/linkerd/proxy/http/src/lib.rs b/linkerd/proxy/http/src/lib.rs index e4c15fea82..357f22a181 100644 --- a/linkerd/proxy/http/src/lib.rs +++ b/linkerd/proxy/http/src/lib.rs @@ -38,7 +38,7 @@ pub use self::{ normalize_uri::{MarkAbsoluteForm, NewNormalizeUri}, override_authority::{AuthorityOverride, NewOverrideAuthority}, retain::Retain, - server::{NewServeHttp, ServeHttp}, + server::{NewServeHttp, Params as ServerParams, ServeHttp}, strip_header::StripHeader, timeout::{NewTimeout, ResponseTimeout, ResponseTimeoutError}, version::Version, diff --git a/linkerd/proxy/http/src/server.rs b/linkerd/proxy/http/src/server.rs index d3ec364acd..1fb406113e 100644 --- a/linkerd/proxy/http/src/server.rs +++ b/linkerd/proxy/http/src/server.rs @@ -7,7 +7,7 @@ use crate::{ }; use linkerd_error::Error; use linkerd_io::{self as io, PeerAddr}; -use linkerd_stack::{layer, NewService, Param}; +use linkerd_stack::{layer, ExtractParam, NewService}; use std::{ future::Future, pin::Pin, @@ -18,13 +18,22 @@ use tracing::{debug, Instrument}; type Server = hyper::server::conn::Http; +/// Configures HTTP server behavior. #[derive(Clone, Debug)] -pub struct NewServeHttp { +pub struct Params { + pub version: Version, + pub h2: H2Settings, + pub drain: drain::Watch, +} + +// A stack that builds HTTP servers. +#[derive(Clone, Debug)] +pub struct NewServeHttp { inner: N, - server: Server, - drain: drain::Watch, + params: X, } +/// Serves HTTP connectionswith an inner service. #[derive(Clone, Debug)] pub struct ServeHttp { version: Version, @@ -35,55 +44,43 @@ pub struct ServeHttp { // === impl NewServeHttp === -impl NewServeHttp { - pub fn layer( - h2: H2Settings, - drain: drain::Watch, - ) -> impl layer::Layer + Clone { - layer::mk(move |inner| Self::new(h2, inner, drain.clone())) +impl NewServeHttp { + pub fn layer(params: X) -> impl layer::Layer + Clone { + layer::mk(move |inner| Self::new(params.clone(), inner)) } /// Creates a new `ServeHttp`. - fn new(h2: H2Settings, inner: N, drain: drain::Watch) -> Self { - let mut server = hyper::server::conn::Http::new().with_executor(trace::Executor::new()); - server - .http2_initial_stream_window_size(h2.initial_stream_window_size) - .http2_initial_connection_window_size(h2.initial_connection_window_size); - - // Configure HTTP/2 PING frames - if let Some(timeout) = h2.keepalive_timeout { - // XXX(eliza): is this a reasonable interval between - // PING frames? - let interval = timeout / 4; - server - .http2_keep_alive_timeout(timeout) - .http2_keep_alive_interval(interval); - } - - Self { - inner, - server, - drain, - } + fn new(params: X, inner: N) -> Self { + Self { inner, params } } } -impl NewService for NewServeHttp +impl NewService for NewServeHttp where - T: Param, + X: ExtractParam, N: NewService + Clone, { type Service = ServeHttp; fn new_service(&self, target: T) -> Self::Service { - let version = target.param(); + let Params { version, h2, drain } = self.params.extract_param(&target); + + let mut srv = hyper::server::conn::Http::new().with_executor(trace::Executor::new()); + srv.http2_initial_stream_window_size(h2.initial_stream_window_size) + .http2_initial_connection_window_size(h2.initial_connection_window_size); + // Configure HTTP/2 PING frames + if let Some(timeout) = h2.keepalive_timeout { + srv.http2_keep_alive_timeout(timeout) + .http2_keep_alive_interval(timeout / 4); + } + debug!(?version, "Creating HTTP service"); let inner = self.inner.new_service(target); ServeHttp { inner, version, - server: self.server.clone(), - drain: self.drain.clone(), + drain, + server: srv, } } } @@ -93,7 +90,7 @@ where impl Service for ServeHttp where I: io::AsyncRead + io::AsyncWrite + PeerAddr + Send + Unpin + 'static, - N: NewService + Clone + Send + 'static, + N: NewService + Send + 'static, S: Service, Response = http::Response, Error = Error> + Unpin + Send @@ -109,29 +106,30 @@ where } fn call(&mut self, io: I) -> Self::Future { - let Self { - version, - inner, - drain, - mut server, - } = self.clone(); - debug!(?version, "Handling as HTTP"); + let version = self.version; + let drain = self.drain.clone(); + let mut server = self.server.clone(); + + let res = io.peer_addr().map(|pa| { + let (handle, closed) = ClientHandle::new(pa); + let svc = self.inner.new_service(handle.clone()); + let svc = SetClientHandle::new(handle, svc); + (svc, closed) + }); Box::pin( async move { - let client_addr = io.peer_addr()?; - let (client_handle, closed) = ClientHandle::new(client_addr); - // TODO(ver): Move this into the inner stack. - let svc = - SetClientHandle::new(client_handle.clone(), inner.new_service(client_handle)); - + let (svc, closed) = res?; + debug!(?version, "Handling as HTTP"); match version { Version::Http1 => { // Enable support for HTTP upgrades (CONNECT and websockets). + let svc = upgrade::Service::new(svc, drain.clone()); let mut conn = server .http1_only(true) - .serve_connection(io, upgrade::Service::new(svc, drain.clone())) + .serve_connection(io, svc) .with_upgrades(); + tokio::select! { res = &mut conn => { debug!(?res, "The client is shutting down the connection"); @@ -154,6 +152,7 @@ where let mut conn = server .http2_only(true) .serve_connection(io, HyperServerSvc::new(svc)); + tokio::select! { res = &mut conn => { debug!(?res, "The client is shutting down the connection");