Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

orig-proto: Avoid emiting HTTP/2 errors for upgraded requests #1245

Merged
merged 2 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions linkerd/proxy/http/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
//! HTTP/1 messages over an H2 transport; however, some requests cannot be
//! proxied via this method, so it also maintains a fallback HTTP/1 client.

use crate::{glue::UpgradeBody, h1, h2, orig_proto};
use crate::{h1, h2, orig_proto};
use futures::prelude::*;
use linkerd_error::Error;
use linkerd_error::{Error, Result};
use linkerd_http_box::BoxBody;
use linkerd_stack::{layer, Param};
use std::{
marker::PhantomData,
Expand Down Expand Up @@ -61,8 +62,7 @@ impl From<crate::Version> for Settings {

// === impl MakeClient ===

type MakeFuture<C, T, B> =
Pin<Box<dyn Future<Output = Result<Client<C, T, B>, Error>> + Send + 'static>>;
type MakeFuture<C, T, B> = Pin<Box<dyn Future<Output = Result<Client<C, T, B>>> + Send + 'static>>;

impl<C, T, B> tower::Service<T> for MakeClient<C, B>
where
Expand All @@ -80,7 +80,8 @@ where
type Error = Error;
type Future = MakeFuture<C, T, B>;

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
#[inline]
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}

Expand Down Expand Up @@ -128,9 +129,7 @@ impl<C: Clone, B> Clone for MakeClient<C, B> {

// === impl Client ===

type RspFuture = Pin<
Box<dyn Future<Output = Result<http::Response<UpgradeBody>, hyper::Error>> + Send + 'static>,
>;
type RspFuture = Pin<Box<dyn Future<Output = Result<http::Response<BoxBody>>> + Send + 'static>>;

impl<C, T, B> tower::Service<http::Request<B>> for Client<C, T, B>
where
Expand All @@ -143,18 +142,17 @@ where
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
{
type Response = http::Response<UpgradeBody>;
type Error = hyper::Error;
type Response = http::Response<BoxBody>;
type Error = Error;
type Future = Instrumented<RspFuture>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let res = match self {
Self::H2(ref mut svc) => futures::ready!(svc.poll_ready(cx)),
Self::OrigProtoUpgrade(ref mut svc) => futures::ready!(svc.poll_ready(cx)),
Self::Http1(_) => Ok(()),
};

Poll::Ready(res)
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
match self {
Self::H2(ref mut svc) => svc.poll_ready(cx).map_err(Into::into),
Self::OrigProtoUpgrade(ref mut svc) => svc.poll_ready(cx),
Self::Http1(_) => Poll::Ready(Ok(())),
}
}

fn call(&mut self, req: http::Request<B>) -> Self::Future {
Expand All @@ -174,9 +172,11 @@ where
match self {
Self::Http1(ref mut h1) => h1.request(req),
Self::OrigProtoUpgrade(ref mut svc) => svc.call(req),
Self::H2(ref mut svc) => {
Box::pin(svc.call(req).map_ok(|rsp| rsp.map(UpgradeBody::from))) as RspFuture
}
Self::H2(ref mut svc) => Box::pin(
svc.call(req)
.err_into::<Error>()
.map_ok(|rsp| rsp.map(BoxBody::new)),
) as RspFuture,
}
})
.instrument(span)
Expand Down
13 changes: 6 additions & 7 deletions linkerd/proxy/http/src/h1.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::{
glue::{HyperConnect, UpgradeBody},
glue::HyperConnect,
upgrade::{Http11Upgrade, HttpConnect},
};
use futures::prelude::*;
use http::{
header::{CONNECTION, HOST, UPGRADE},
uri::{Authority, Parts, Scheme, Uri},
};
use linkerd_error::Error;
use linkerd_error::{Error, Result};
use linkerd_http_box::BoxBody;
use std::{future::Future, mem, pin::Pin, time::Duration};
use tracing::{debug, trace};

Expand Down Expand Up @@ -59,9 +60,7 @@ impl<C: Clone, T: Clone, B> Clone for Client<C, T, B> {
}
}

type RspFuture = Pin<
Box<dyn Future<Output = Result<http::Response<UpgradeBody>, hyper::Error>> + Send + 'static>,
>;
type RspFuture = Pin<Box<dyn Future<Output = Result<http::Response<BoxBody>>> + Send + 'static>>;

impl<C, T, B> Client<C, T, B>
where
Expand Down Expand Up @@ -134,7 +133,7 @@ where
client.as_ref().unwrap().request(req)
};

Box::pin(rsp_fut.map_ok(move |mut rsp| {
Box::pin(rsp_fut.err_into().map_ok(move |mut rsp| {
if is_http_connect {
debug_assert!(
upgrade.is_some(),
Expand All @@ -152,7 +151,7 @@ where
strip_connection_headers(rsp.headers_mut());
}

rsp.map(UpgradeBody::from)
rsp.map(BoxBody::new)
}))
}
}
Expand Down
7 changes: 4 additions & 3 deletions linkerd/proxy/http/src/h2.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::trace;
use futures::prelude::*;
pub use h2::{Error as H2Error, Reason};
use hyper::{
body::HttpBody,
client::conn::{self, SendRequest},
};
use linkerd_error::Error;
use linkerd_error::{Error, Result};
use std::time::Duration;
use std::{
future::Future,
Expand Down Expand Up @@ -57,8 +58,7 @@ impl<C: Clone, B> Clone for Connect<C, B> {
}
}

type ConnectFuture<B> =
Pin<Box<dyn Future<Output = Result<Connection<B>, Error>> + Send + 'static>>;
type ConnectFuture<B> = Pin<Box<dyn Future<Output = Result<Connection<B>>> + Send + 'static>>;

impl<C, B, T> tower::Service<T> for Connect<C, B>
where
Expand All @@ -74,6 +74,7 @@ where
type Error = Error;
type Future = ConnectFuture<B>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.connect.poll_ready(cx).map_err(Into::into)
}
Expand Down
135 changes: 104 additions & 31 deletions linkerd/proxy/http/src/orig_proto.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use super::{glue::UpgradeBody, h1, h2, upgrade};
use super::{h1, h2, upgrade};
use futures::{future, prelude::*};
use http::header::{HeaderValue, TRANSFER_ENCODING};
use linkerd_error::Error;
use hyper::body::HttpBody;
use linkerd_error::{Error, Result};
use linkerd_http_box::BoxBody;
use linkerd_stack::layer;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use thiserror::Error;
use tracing::{debug, trace, warn};

pub const L5D_ORIG_PROTO: &str = "l5d-orig-proto";
Expand All @@ -19,14 +22,24 @@ pub struct Upgrade<C, T, B> {
h2: h2::Connection<B>,
}

#[derive(Clone, Copy, Debug, Error)]
#[error("upgraded connection failed with HTTP/2 reset: {0}")]
pub struct DowngradedH2Error(h2::Reason);

#[pin_project::pin_project]
#[derive(Debug)]
pub struct UpgradeResponseBody {
inner: hyper::Body,
}

/// Downgrades HTTP2 requests that were previousl upgraded to their original
/// protocol.
#[derive(Clone, Debug)]
pub struct Downgrade<S> {
inner: S,
}

// ==== impl Upgrade =====
// === impl Upgrade ===

impl<C, T, B> Upgrade<C, T, B> {
pub(crate) fn new(http1: h1::Client<C, T, B>, h2: h2::Connection<B>) -> Self {
Expand All @@ -45,23 +58,20 @@ where
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
{
type Response = http::Response<UpgradeBody>;
type Error = hyper::Error;
type Future = Pin<
Box<
dyn Future<Output = Result<http::Response<UpgradeBody>, hyper::Error>> + Send + 'static,
>,
>;
type Response = http::Response<BoxBody>;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<http::Response<BoxBody>>> + Send + 'static>>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.h2.poll_ready(cx)
self.h2.poll_ready(cx).map_err(downgrade_h2_error)
}

fn call(&mut self, mut req: http::Request<B>) -> Self::Future {
debug_assert!(req.version() != http::Version::HTTP_2);
if req.extensions().get::<upgrade::Http11Upgrade>().is_some() {
debug!("Skipping orig-proto upgrade due to HTTP/1.1 upgrade");
return self.http1.request(req);
return Box::pin(self.http1.request(req).map_ok(|rsp| rsp.map(BoxBody::new)));
}

let orig_version = req.version();
Expand Down Expand Up @@ -89,28 +99,91 @@ where

*req.version_mut() = http::Version::HTTP_2;

Box::pin(self.h2.call(req).map_ok(move |mut rsp| {
let version = rsp
.headers_mut()
.remove(L5D_ORIG_PROTO)
.and_then(|orig_proto| {
if orig_proto == "HTTP/1.1" {
Some(http::Version::HTTP_11)
} else if orig_proto == "HTTP/1.0" {
Some(http::Version::HTTP_10)
} else {
None
}
})
.unwrap_or(orig_version);
trace!(?version, "Downgrading response");
*rsp.version_mut() = version;
rsp.map(UpgradeBody::from)
}))
Box::pin(
self.h2
.call(req)
.map_err(downgrade_h2_error)
.map_ok(move |mut rsp| {
let version = rsp
.headers_mut()
.remove(L5D_ORIG_PROTO)
.and_then(|orig_proto| {
if orig_proto == "HTTP/1.1" {
Some(http::Version::HTTP_11)
} else if orig_proto == "HTTP/1.0" {
Some(http::Version::HTTP_10)
} else {
None
}
})
.unwrap_or(orig_version);
trace!(?version, "Downgrading response");
*rsp.version_mut() = version;
rsp.map(|inner| BoxBody::new(UpgradeResponseBody { inner }))
}),
)
}
}

/// Handles HTTP/2 client errors for HTTP/1.1 requests by wrapping the error type. This
/// simplifies error handling elsewhere so that HTTP/2 errors can only be encountered when the
/// original request was HTTP/2.
fn downgrade_h2_error(error: hyper::Error) -> Error {
use std::error::Error;

let mut cause = error.source();
while let Some(e) = cause {
if let Some(e) = e.downcast_ref::<h2::H2Error>() {
if let Some(reason) = e.reason() {
return DowngradedH2Error(reason).into();
}
}

cause = error.source();
}

error.into()
}

// === impl UpgradeResponseBody ===

impl Default for UpgradeResponseBody {
fn default() -> Self {
UpgradeResponseBody {
inner: Default::default(),
}
}
}

impl HttpBody for UpgradeResponseBody {
type Data = bytes::Bytes;
type Error = Error;

#[inline]
fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
Pin::new(self.project().inner)
.poll_data(cx)
.map_err(downgrade_h2_error)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Pin::new(self.project().inner)
.poll_trailers(cx)
.map_err(downgrade_h2_error)
}
}

// ===== impl Downgrade =====
// === impl Downgrade ===

impl<S> Downgrade<S> {
pub fn layer() -> impl layer::Layer<S, Service = Self> + Copy + Clone {
Expand Down