Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: Parameterize NewServeHttp #2696

Merged
merged 1 commit into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,16 @@ impl Config {

let tcp = http
.unlift_new()
.push(http::NewServeHttp::layer(Default::default(), drain.clone()))
.push(http::NewServeHttp::layer({
let drain = drain.clone();
move |t: &Http| {
http::ServerParams {
version: t.version,
h2: Default::default(),
drain: drain.clone(),
}
}
}))
.push_filter(
|(http, tcp): (
Result<Option<http::Version>, detect::DetectTimeoutError<_>>,
Expand Down
14 changes: 8 additions & 6 deletions linkerd/app/inbound/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::set_identity_header::NewSetIdentityHeader;
use crate::{policy, Inbound};
pub use linkerd_app_core::proxy::http::{normalize_uri, Version};
use linkerd_app_core::{
config::{ProxyConfig, ServerConfig},
config::ProxyConfig,
errors, http_tracing, io,
metrics::ServerLabel,
proxy::http,
Expand Down Expand Up @@ -112,16 +112,18 @@ impl<H> Inbound<H> {
HSvc::Future: Send,
{
self.map_stack(|config, rt, http| {
let ProxyConfig {
server: ServerConfig { h2_settings, .. },
..
} = config.proxy;
let h2 = config.proxy.server.h2_settings;
let drain = rt.drain.clone();

http.push_on_service(http::BoxRequest::layer())
.check_new_service::<T, http::Request<_>>()
.unlift_new()
.check_new_new_service::<T, http::ClientHandle, http::Request<_>>()
.push(http::NewServeHttp::layer(h2_settings, rt.drain.clone()))
.push(http::NewServeHttp::layer(move |t: &T| http::ServerParams {
version: t.param(),
h2,
drain: drain.clone(),
}))
.check_new_service::<T, I>()
.arc_new_tcp()
})
Expand Down
44 changes: 32 additions & 12 deletions linkerd/app/outbound/src/http/server.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use super::IdentityRequired;
use crate::{http, trace_labels, Outbound};
use linkerd_app_core::{
errors, http_tracing, io,
svc::{self, ExtractParam},
Error, Result,
};
use linkerd_app_core::{drain, errors, http_tracing, io, svc, Error, Result};

#[derive(Copy, Clone, Debug)]
pub(crate) struct ServerRescue {
emit_headers: bool,
}

#[derive(Clone, Debug)]
pub struct ExtractServerParams {
h2: http::h2::Settings,
drain: drain::Watch,
}

impl<T> Outbound<svc::ArcNewCloneHttp<T>> {
/// Builds a [`svc::NewService`] stack that prepares HTTP requests to be
/// proxied.
Expand Down Expand Up @@ -61,7 +63,9 @@
impl<N> Outbound<N> {
pub fn push_tcp_http_server<T, I, NSvc>(
self,
) -> Outbound<http::NewServeHttp<svc::ArcNewService<T, svc::NewCloneService<NSvc>>>>
) -> Outbound<
http::NewServeHttp<ExtractServerParams, svc::ArcNewService<T, svc::NewCloneService<NSvc>>>,
>
where
// Target
T: svc::Param<http::Version>,
Expand All @@ -81,10 +85,10 @@
self.map_stack(|config, rt, http| {
http.unlift_new()
.push(svc::ArcNewService::layer())
.push(http::NewServeHttp::layer(
config.proxy.server.h2_settings,
rt.drain.clone(),
))
.push(http::NewServeHttp::layer(ExtractServerParams {
h2: config.proxy.server.h2_settings,
drain: rt.drain.clone(),

Check warning on line 90 in linkerd/app/outbound/src/http/server.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/outbound/src/http/server.rs#L88-L90

Added lines #L88 - L90 were not covered by tests
}))
})
}
}
Expand All @@ -99,14 +103,14 @@
}
}

impl<T> ExtractParam<Self, T> for ServerRescue {
impl<T> svc::ExtractParam<Self, T> for ServerRescue {
#[inline]
fn extract_param(&self, _: &T) -> Self {
*self
}
}

impl<T> ExtractParam<errors::respond::EmitHeaders, T> for ServerRescue {
impl<T> svc::ExtractParam<errors::respond::EmitHeaders, T> for ServerRescue {
#[inline]
fn extract_param(&self, _: &T) -> errors::respond::EmitHeaders {
errors::respond::EmitHeaders(self.emit_headers)
Expand Down Expand Up @@ -179,3 +183,19 @@
Ok(errors::SyntheticHttpResponse::unexpected_error())
}
}

// === impl ExtractServerParams ===

impl<T> svc::ExtractParam<http::ServerParams, T> for ExtractServerParams
where
T: svc::Param<http::Version>,
{
#[inline]
fn extract_param(&self, t: &T) -> http::ServerParams {

Check warning on line 194 in linkerd/app/outbound/src/http/server.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/outbound/src/http/server.rs#L194

Added line #L194 was not covered by tests
http::ServerParams {
version: t.param(),
h2: self.h2,
drain: self.drain.clone(),

Check warning on line 198 in linkerd/app/outbound/src/http/server.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/outbound/src/http/server.rs#L196-L198

Added lines #L196 - L198 were not covered by tests
}
}
}
21 changes: 10 additions & 11 deletions linkerd/app/outbound/src/ingress.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::{http, opaq, policy, Config, Discovery, Outbound, ParentRef};
use crate::{http, opaq, policy, Discovery, Outbound, ParentRef};
use linkerd_app_core::{
config::{ProxyConfig, ServerConfig},
detect, errors, io,
metrics::prom,
profiles,
Expand Down Expand Up @@ -278,14 +277,6 @@ impl<N> Outbound<N> {
{
self.map_stack(|config, rt, inner| {
let detect_http = config.proxy.detect_http();
let Config {
proxy:
ProxyConfig {
server: ServerConfig { h2_settings, .. },
..
},
..
} = config;

// Route requests with destinations that can be discovered via the
// `l5d-dst-override` header through the (load balanced) logical
Expand Down Expand Up @@ -314,7 +305,15 @@ impl<N> Outbound<N> {
// destination address.
http.check_new_service::<Http<T>, http::Request<_>>()
.unlift_new()
.push(http::NewServeHttp::layer(*h2_settings, rt.drain.clone()))
.push(http::NewServeHttp::layer({
let h2 = config.proxy.server.h2_settings;
let drain = rt.drain.clone();
move |http: &Http<T>| http::ServerParams {
version: http.version,
h2,
drain: drain.clone()
}
}))
.check_new_service::<Http<T>, I>()
.push_switch(
|(detected, target): (detect::Result<http::Version>, T)| -> Result<_, Infallible> {
Expand Down
13 changes: 9 additions & 4 deletions linkerd/app/outbound/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,17 @@ impl<N> Outbound<N> {
let opaq = self.clone().into_stack();

let http = self.with_stack(http).map_stack(|config, rt, stk| {
let h2 = config.proxy.server.h2_settings;
let drain = rt.drain.clone();
stk.push_on_service(http::BoxRequest::layer())
.unlift_new()
.push(http::NewServeHttp::layer(
config.proxy.server.h2_settings,
rt.drain.clone(),
))
.push(http::NewServeHttp::layer(move |t: &Http<T>| {
http::ServerParams {
version: t.version,
h2,
drain: drain.clone(),
}
}))
.arc_new_tcp()
});

Expand Down
2 changes: 1 addition & 1 deletion linkerd/proxy/http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub use self::{
normalize_uri::{MarkAbsoluteForm, NewNormalizeUri},
override_authority::{AuthorityOverride, NewOverrideAuthority},
retain::Retain,
server::{NewServeHttp, ServeHttp},
server::{NewServeHttp, Params as ServerParams, ServeHttp},
strip_header::StripHeader,
timeout::{NewTimeout, ResponseTimeout, ResponseTimeoutError},
version::Version,
Expand Down
101 changes: 50 additions & 51 deletions linkerd/proxy/http/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
};
use linkerd_error::Error;
use linkerd_io::{self as io, PeerAddr};
use linkerd_stack::{layer, NewService, Param};
use linkerd_stack::{layer, ExtractParam, NewService};
use std::{
future::Future,
pin::Pin,
Expand All @@ -18,13 +18,22 @@

type Server = hyper::server::conn::Http<trace::Executor>;

/// Configures HTTP server behavior.
#[derive(Clone, Debug)]
pub struct NewServeHttp<N> {
pub struct Params {
pub version: Version,
pub h2: H2Settings,
pub drain: drain::Watch,
}

// A stack that builds HTTP servers.
#[derive(Clone, Debug)]
pub struct NewServeHttp<X, N> {
inner: N,
server: Server,
drain: drain::Watch,
params: X,
}

/// Serves HTTP connectionswith an inner service.
#[derive(Clone, Debug)]
pub struct ServeHttp<N> {
version: Version,
Expand All @@ -35,55 +44,43 @@

// === impl NewServeHttp ===

impl<N> NewServeHttp<N> {
pub fn layer(
h2: H2Settings,
drain: drain::Watch,
) -> impl layer::Layer<N, Service = Self> + Clone {
layer::mk(move |inner| Self::new(h2, inner, drain.clone()))
impl<X: Clone, N> NewServeHttp<X, N> {
pub fn layer(params: X) -> impl layer::Layer<N, Service = Self> + Clone {
layer::mk(move |inner| Self::new(params.clone(), inner))
}

/// Creates a new `ServeHttp`.
fn new(h2: H2Settings, inner: N, drain: drain::Watch) -> Self {
let mut server = hyper::server::conn::Http::new().with_executor(trace::Executor::new());
server
.http2_initial_stream_window_size(h2.initial_stream_window_size)
.http2_initial_connection_window_size(h2.initial_connection_window_size);

// Configure HTTP/2 PING frames
if let Some(timeout) = h2.keepalive_timeout {
// XXX(eliza): is this a reasonable interval between
// PING frames?
let interval = timeout / 4;
server
.http2_keep_alive_timeout(timeout)
.http2_keep_alive_interval(interval);
}

Self {
inner,
server,
drain,
}
fn new(params: X, inner: N) -> Self {
Self { inner, params }
}
}

impl<T, N> NewService<T> for NewServeHttp<N>
impl<T, X, N> NewService<T> for NewServeHttp<X, N>
where
T: Param<Version>,
X: ExtractParam<Params, T>,
N: NewService<T> + Clone,
{
type Service = ServeHttp<N::Service>;

fn new_service(&self, target: T) -> Self::Service {
let version = target.param();
let Params { version, h2, drain } = self.params.extract_param(&target);

let mut srv = hyper::server::conn::Http::new().with_executor(trace::Executor::new());
srv.http2_initial_stream_window_size(h2.initial_stream_window_size)
.http2_initial_connection_window_size(h2.initial_connection_window_size);
// Configure HTTP/2 PING frames
if let Some(timeout) = h2.keepalive_timeout {
srv.http2_keep_alive_timeout(timeout)
.http2_keep_alive_interval(timeout / 4);

Check warning on line 74 in linkerd/proxy/http/src/server.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/proxy/http/src/server.rs#L73-L74

Added lines #L73 - L74 were not covered by tests
}

debug!(?version, "Creating HTTP service");
let inner = self.inner.new_service(target);
ServeHttp {
inner,
version,
server: self.server.clone(),
drain: self.drain.clone(),
drain,
server: srv,
}
}
}
Expand All @@ -93,7 +90,7 @@
impl<I, N, S> Service<I> for ServeHttp<N>
where
I: io::AsyncRead + io::AsyncWrite + PeerAddr + Send + Unpin + 'static,
N: NewService<ClientHandle, Service = S> + Clone + Send + 'static,
N: NewService<ClientHandle, Service = S> + Send + 'static,
S: Service<http::Request<UpgradeBody>, Response = http::Response<http::BoxBody>, Error = Error>
+ Unpin
+ Send
Expand All @@ -109,29 +106,30 @@
}

fn call(&mut self, io: I) -> Self::Future {
let Self {
version,
inner,
drain,
mut server,
} = self.clone();
debug!(?version, "Handling as HTTP");
let version = self.version;
let drain = self.drain.clone();
let mut server = self.server.clone();

let res = io.peer_addr().map(|pa| {
let (handle, closed) = ClientHandle::new(pa);
let svc = self.inner.new_service(handle.clone());
let svc = SetClientHandle::new(handle, svc);
(svc, closed)
});

Box::pin(
async move {
let client_addr = io.peer_addr()?;
let (client_handle, closed) = ClientHandle::new(client_addr);
// TODO(ver): Move this into the inner stack.
let svc =
SetClientHandle::new(client_handle.clone(), inner.new_service(client_handle));

let (svc, closed) = res?;
debug!(?version, "Handling as HTTP");
match version {
Version::Http1 => {
// Enable support for HTTP upgrades (CONNECT and websockets).
let svc = upgrade::Service::new(svc, drain.clone());
let mut conn = server
.http1_only(true)
.serve_connection(io, upgrade::Service::new(svc, drain.clone()))
.serve_connection(io, svc)
.with_upgrades();

tokio::select! {
res = &mut conn => {
debug!(?res, "The client is shutting down the connection");
Expand All @@ -154,6 +152,7 @@
let mut conn = server
.http2_only(true)
.serve_connection(io, HyperServerSvc::new(svc));

tokio::select! {
res = &mut conn => {
debug!(?res, "The client is shutting down the connection");
Expand Down
Loading