diff --git a/Cargo.lock b/Cargo.lock index 52d2636a69..555aee07bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1856,6 +1856,7 @@ dependencies = [ "futures", "http 1.2.0", "http-body", + "http-body-util", "hyper", "linkerd-error", "linkerd-exp-backoff", diff --git a/linkerd/http/retry/Cargo.toml b/linkerd/http/retry/Cargo.toml index bd7655c953..9a2ee77896 100644 --- a/linkerd/http/retry/Cargo.toml +++ b/linkerd/http/retry/Cargo.toml @@ -10,6 +10,7 @@ publish = false bytes = { workspace = true } futures = { version = "0.3", default-features = false } http-body = { workspace = true } +http-body-util = { workspace = true } http = { workspace = true } parking_lot = "0.12" pin-project = "1" diff --git a/linkerd/http/retry/src/lib.rs b/linkerd/http/retry/src/lib.rs index 5384a55f62..ef322c9117 100644 --- a/linkerd/http/retry/src/lib.rs +++ b/linkerd/http/retry/src/lib.rs @@ -4,7 +4,7 @@ pub mod peek_trailers; pub mod replay; -mod compat; +// mod compat; pub use self::{peek_trailers::PeekTrailersBody, replay::ReplayBody}; pub use tower::retry::budget::Budget; diff --git a/linkerd/http/retry/src/peek_trailers.rs b/linkerd/http/retry/src/peek_trailers.rs index b11bb583ec..226314f46d 100644 --- a/linkerd/http/retry/src/peek_trailers.rs +++ b/linkerd/http/retry/src/peek_trailers.rs @@ -3,7 +3,7 @@ use futures::{ FutureExt, }; use http::HeaderMap; -use http_body::Body; +use http_body::{Body, Frame}; use linkerd_http_box::BoxBody; use pin_project::pin_project; use std::{ @@ -115,16 +115,13 @@ impl PeekTrailersBody { })) } - async fn read_body(body: B) -> Self + async fn read_body(mut body: B) -> Self where B: Send + Unpin, B::Data: Send + Unpin, B::Error: Send, { - // XXX(kate): for now, wrap this in a compatibility adapter that yields `Frame`s. - // this can be removed when we upgrade to http-body 1.0. - use crate::compat::ForwardCompatibleBody; - let mut body = ForwardCompatibleBody::new(body); + use http_body_util::BodyExt; // First, poll the body for its first frame. tracing::debug!("Buffering first data frame"); @@ -150,7 +147,7 @@ impl PeekTrailersBody { Some(Ok(None)) => Inner::Buffered { first: None, second: None, - inner: body.into_inner(), + inner: body, }, // The body yielded a DATA frame. Check for a second frame, without yielding again. Some(Ok(Some(Either::Left(first)))) => { @@ -176,7 +173,7 @@ impl PeekTrailersBody { Some(Ok(Some(Either::Left(second)))) => Inner::Buffered { first: Some(Ok(first)), second: Some(Ok(second)), - inner: body.into_inner(), + inner: body, }, // The body immediately yielded a second TRAILERS frame. Nice! Some(Ok(Some(Either::Right(trailers)))) => Inner::Unary { @@ -187,7 +184,7 @@ impl PeekTrailersBody { Some(Ok(None)) => Inner::Buffered { first: None, second: None, - inner: body.into_inner(), + inner: body, }, } } else { @@ -197,7 +194,7 @@ impl PeekTrailersBody { Inner::Buffered { first: None, second: None, - inner: body.into_inner(), + inner: body, } } } @@ -220,9 +217,9 @@ impl PeekTrailersBody { /// /// This is an internal helper to facilitate pattern matching in `read_body(..)`, above. fn split_frame( - frame: crate::compat::Frame, + frame: http_body::Frame, ) -> Option> { - use {crate::compat::Frame, futures::future::Either}; + use {futures::future::Either, http_body::Frame}; match frame.into_data().map_err(Frame::into_trailers) { Ok(data) => Some(Either::Left(data)), Err(Ok(trailers)) => Some(Either::Right(trailers)), @@ -244,42 +241,35 @@ where type Data = B::Data; type Error = B::Error; - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll, Self::Error>>> { let this = self.project().0.project(); match this { Projection::Empty => Poll::Ready(None), - Projection::Passthru { inner } => inner.poll_data(cx), - Projection::Unary { data, .. } => Poll::Ready(data.take()), + Projection::Passthru { inner } => inner.poll_frame(cx), + Projection::Unary { data, trailers } => { + let mut take_data = || data.take().map(|r| r.map(Frame::data)); + let take_trailers = || trailers.take().map(|r| r.map(Frame::trailers)); + let frame = take_data().or_else(take_trailers); + Poll::Ready(frame) + } Projection::Buffered { first, second, inner, } => { - if let data @ Some(_) = first.take().or_else(|| second.take()) { - Poll::Ready(data) + if let Some(data) = first.take().or_else(|| second.take()) { + let frame = data.map(Frame::data); + Poll::Ready(Some(frame)) } else { - inner.poll_data(cx) + inner.poll_frame(cx) } } } } - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - let this = self.project().0.project(); - match this { - Projection::Empty => Poll::Ready(Ok(None)), - Projection::Passthru { inner } => inner.poll_trailers(cx), - Projection::Unary { trailers, .. } => Poll::Ready(trailers.take().transpose()), - Projection::Buffered { inner, .. } => inner.poll_trailers(cx), - } - } - #[inline] fn is_end_stream(&self) -> bool { let Self(inner) = self; @@ -347,7 +337,7 @@ mod tests { use super::PeekTrailersBody; use bytes::Bytes; use http::{HeaderMap, HeaderValue}; - use http_body::Body; + use http_body::{Body, Frame}; use linkerd_error::Error; use std::{ collections::VecDeque, @@ -362,7 +352,7 @@ mod tests { #[derive(Default)] struct MockBody { data_polls: VecDeque>>>, - trailer_polls: VecDeque, Error>>>, + trailer_polls: VecDeque>>>, } fn data() -> Option> { @@ -370,11 +360,11 @@ mod tests { Some(Ok(bytes)) } - fn trailers() -> Result, Error> { + fn trailers() -> Option> { let mut trls = HeaderMap::with_capacity(1); let value = HeaderValue::from_static("shiny"); trls.insert("trailer", value); - Ok(Some(trls)) + Some(Ok(trls)) } #[tokio::test] @@ -454,7 +444,7 @@ mod tests { /// Appends a poll outcome for [`Body::poll_trailers()`]. fn then_yield_trailer( mut self, - poll: Poll, Error>>, + poll: Poll>>, ) -> Self { self.trailer_polls.push_back(poll); self @@ -473,38 +463,21 @@ mod tests { type Data = Bytes; type Error = Error; - fn poll_data( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - let poll = self - .get_mut() - .data_polls - .pop_front() - .unwrap_or(Poll::Ready(None)); - // If we return `Poll::Pending`, we must schedule the task to be awoken. - if poll.is_pending() { - Self::schedule(cx); - } - poll - } - - fn poll_trailers( + fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { + ) -> Poll, Self::Error>>> { let Self { data_polls, trailer_polls, } = self.get_mut(); - let poll = if data_polls.is_empty() { - trailer_polls.pop_front().unwrap_or(Poll::Ready(Ok(None))) - } else { - // The called has polled for trailers before exhausting the stream of DATA frames. - // This indicates `PeekTrailersBody` isn't respecting the contract outlined in - // . - panic!("`poll_trailers()` was called before `poll_data()` returned `Poll::Ready(None)`"); + let poll = { + let mut next_data = || data_polls.pop_front().map(|p| p.map_ok(Frame::data)); + let next_trailer = || trailer_polls.pop_front().map(|p| p.map_ok(Frame::trailers)); + next_data() + .or_else(next_trailer) + .unwrap_or(Poll::Ready(None)) }; // If we return `Poll::Pending`, we must schedule the task to be awoken.