Skip to content

Commit

Permalink
wip: upgrading http-retry
Browse files Browse the repository at this point in the history
Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Feb 6, 2025
1 parent 967b843 commit 5f98818
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 63 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1856,6 +1856,7 @@ dependencies = [
"futures",
"http 1.2.0",
"http-body",
"http-body-util",
"hyper",
"linkerd-error",
"linkerd-exp-backoff",
Expand Down
1 change: 1 addition & 0 deletions linkerd/http/retry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ publish = false
bytes = { workspace = true }
futures = { version = "0.3", default-features = false }
http-body = { workspace = true }
http-body-util = { workspace = true }
http = { workspace = true }
parking_lot = "0.12"
pin-project = "1"
Expand Down
2 changes: 1 addition & 1 deletion linkerd/http/retry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
pub mod peek_trailers;
pub mod replay;

mod compat;
// mod compat;

pub use self::{peek_trailers::PeekTrailersBody, replay::ReplayBody};
pub use tower::retry::budget::Budget;
Expand Down
97 changes: 35 additions & 62 deletions linkerd/http/retry/src/peek_trailers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use futures::{
FutureExt,
};
use http::HeaderMap;
use http_body::Body;
use http_body::{Body, Frame};
use linkerd_http_box::BoxBody;
use pin_project::pin_project;
use std::{
Expand Down Expand Up @@ -115,16 +115,13 @@ impl<B: Body> PeekTrailersBody<B> {
}))
}

async fn read_body(body: B) -> Self
async fn read_body(mut body: B) -> Self
where
B: Send + Unpin,
B::Data: Send + Unpin,
B::Error: Send,
{
// XXX(kate): for now, wrap this in a compatibility adapter that yields `Frame<T>`s.
// this can be removed when we upgrade to http-body 1.0.
use crate::compat::ForwardCompatibleBody;
let mut body = ForwardCompatibleBody::new(body);
use http_body_util::BodyExt;

// First, poll the body for its first frame.
tracing::debug!("Buffering first data frame");
Expand All @@ -150,7 +147,7 @@ impl<B: Body> PeekTrailersBody<B> {
Some(Ok(None)) => Inner::Buffered {
first: None,
second: None,
inner: body.into_inner(),
inner: body,
},
// The body yielded a DATA frame. Check for a second frame, without yielding again.
Some(Ok(Some(Either::Left(first)))) => {
Expand All @@ -176,7 +173,7 @@ impl<B: Body> PeekTrailersBody<B> {
Some(Ok(Some(Either::Left(second)))) => Inner::Buffered {
first: Some(Ok(first)),
second: Some(Ok(second)),
inner: body.into_inner(),
inner: body,
},
// The body immediately yielded a second TRAILERS frame. Nice!
Some(Ok(Some(Either::Right(trailers)))) => Inner::Unary {
Expand All @@ -187,7 +184,7 @@ impl<B: Body> PeekTrailersBody<B> {
Some(Ok(None)) => Inner::Buffered {
first: None,
second: None,
inner: body.into_inner(),
inner: body,
},
}
} else {
Expand All @@ -197,7 +194,7 @@ impl<B: Body> PeekTrailersBody<B> {
Inner::Buffered {
first: None,
second: None,
inner: body.into_inner(),
inner: body,
}
}
}
Expand All @@ -220,9 +217,9 @@ impl<B: Body> PeekTrailersBody<B> {
///
/// This is an internal helper to facilitate pattern matching in `read_body(..)`, above.
fn split_frame(
frame: crate::compat::Frame<B::Data>,
frame: http_body::Frame<B::Data>,
) -> Option<futures::future::Either<B::Data, HeaderMap>> {
use {crate::compat::Frame, futures::future::Either};
use {futures::future::Either, http_body::Frame};
match frame.into_data().map_err(Frame::into_trailers) {
Ok(data) => Some(Either::Left(data)),
Err(Ok(trailers)) => Some(Either::Right(trailers)),
Expand All @@ -244,42 +241,35 @@ where
type Data = B::Data;
type Error = B::Error;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let this = self.project().0.project();
match this {
Projection::Empty => Poll::Ready(None),
Projection::Passthru { inner } => inner.poll_data(cx),
Projection::Unary { data, .. } => Poll::Ready(data.take()),
Projection::Passthru { inner } => inner.poll_frame(cx),
Projection::Unary { data, trailers } => {
let mut take_data = || data.take().map(|r| r.map(Frame::data));
let take_trailers = || trailers.take().map(|r| r.map(Frame::trailers));
let frame = take_data().or_else(take_trailers);
Poll::Ready(frame)
}
Projection::Buffered {
first,
second,
inner,
} => {
if let data @ Some(_) = first.take().or_else(|| second.take()) {
Poll::Ready(data)
if let Some(data) = first.take().or_else(|| second.take()) {
let frame = data.map(Frame::data);
Poll::Ready(Some(frame))
} else {
inner.poll_data(cx)
inner.poll_frame(cx)
}
}
}
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let this = self.project().0.project();
match this {
Projection::Empty => Poll::Ready(Ok(None)),
Projection::Passthru { inner } => inner.poll_trailers(cx),
Projection::Unary { trailers, .. } => Poll::Ready(trailers.take().transpose()),
Projection::Buffered { inner, .. } => inner.poll_trailers(cx),
}
}

#[inline]
fn is_end_stream(&self) -> bool {
let Self(inner) = self;
Expand Down Expand Up @@ -347,7 +337,7 @@ mod tests {
use super::PeekTrailersBody;
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use http_body::Body;
use http_body::{Body, Frame};
use linkerd_error::Error;
use std::{
collections::VecDeque,
Expand All @@ -362,19 +352,19 @@ mod tests {
#[derive(Default)]
struct MockBody {
data_polls: VecDeque<Poll<Option<Result<Bytes, Error>>>>,
trailer_polls: VecDeque<Poll<Result<Option<http::HeaderMap>, Error>>>,
trailer_polls: VecDeque<Poll<Option<Result<http::HeaderMap, Error>>>>,
}

fn data() -> Option<Result<Bytes, Error>> {
let bytes = Bytes::from_static(b"hello");
Some(Ok(bytes))
}

fn trailers() -> Result<Option<http::HeaderMap>, Error> {
fn trailers() -> Option<Result<http::HeaderMap, Error>> {
let mut trls = HeaderMap::with_capacity(1);
let value = HeaderValue::from_static("shiny");
trls.insert("trailer", value);
Ok(Some(trls))
Some(Ok(trls))
}

#[tokio::test]
Expand Down Expand Up @@ -454,7 +444,7 @@ mod tests {
/// Appends a poll outcome for [`Body::poll_trailers()`].
fn then_yield_trailer(
mut self,
poll: Poll<Result<Option<http::HeaderMap>, Error>>,
poll: Poll<Option<Result<http::HeaderMap, Error>>>,
) -> Self {
self.trailer_polls.push_back(poll);
self
Expand All @@ -473,38 +463,21 @@ mod tests {
type Data = Bytes;
type Error = Error;

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let poll = self
.get_mut()
.data_polls
.pop_front()
.unwrap_or(Poll::Ready(None));
// If we return `Poll::Pending`, we must schedule the task to be awoken.
if poll.is_pending() {
Self::schedule(cx);
}
poll
}

fn poll_trailers(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
let Self {
data_polls,
trailer_polls,
} = self.get_mut();

let poll = if data_polls.is_empty() {
trailer_polls.pop_front().unwrap_or(Poll::Ready(Ok(None)))
} else {
// The called has polled for trailers before exhausting the stream of DATA frames.
// This indicates `PeekTrailersBody<B>` isn't respecting the contract outlined in
// <https://docs.rs/http-body/0.4.6/http_body/trait.Body.html#tymethod.poll_trailers>.
panic!("`poll_trailers()` was called before `poll_data()` returned `Poll::Ready(None)`");
let poll = {
let mut next_data = || data_polls.pop_front().map(|p| p.map_ok(Frame::data));
let next_trailer = || trailer_polls.pop_front().map(|p| p.map_ok(Frame::trailers));
next_data()
.or_else(next_trailer)
.unwrap_or(Poll::Ready(None))
};

// If we return `Poll::Pending`, we must schedule the task to be awoken.
Expand Down

0 comments on commit 5f98818

Please sign in to comment.