Skip to content

Commit

Permalink
chore(http/prom): upgrade to hyper 1.x
Browse files Browse the repository at this point in the history
Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Jan 8, 2025
1 parent d0116ec commit b85b8bd
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 54 deletions.
35 changes: 14 additions & 21 deletions linkerd/http/prom/src/body_data/body.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::metrics::BodyDataMetrics;
use http::HeaderMap;
use http_body::SizeHint;
use http_body::{Frame, SizeHint};
use pin_project::pin_project;
use std::{
pin::Pin,
Expand Down Expand Up @@ -35,34 +34,28 @@ where
type Error = B::Error;

/// Attempt to pull out the next data buffer of this stream.
fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let this = self.project();
let inner = this.inner;
let BodyDataMetrics { frame_size } = this.metrics;

let data = std::task::ready!(inner.poll_data(cx));
let frame = std::task::ready!(inner.poll_frame(cx));

if let Some(Ok(data)) = data.as_ref() {
// We've polled and yielded a new chunk! Increment our telemetry.
//
// NB: We're careful to call `remaining()` rather than `chunk()`, which
// "can return a shorter slice (this allows non-continuous internal representation)."
let bytes = bytes::Buf::remaining(data);
frame_size.observe(linkerd_metrics::to_f64(bytes as u64));
if let Some(Ok(frame)) = &frame {
if let Some(data) = frame.data_ref() {
// We've polled and yielded a new chunk! Increment our telemetry.
//
// NB: We're careful to call `remaining()` rather than `chunk()`, which
// "can return a shorter slice (this allows non-continuous internal representation)."
let bytes = bytes::Buf::remaining(data);
frame_size.observe(linkerd_metrics::to_f64(bytes as u64));
}
}

Poll::Ready(data)
}

#[inline]
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
self.project().inner.poll_trailers(cx)
Poll::Ready(frame)
}

#[inline]
Expand Down
34 changes: 16 additions & 18 deletions linkerd/http/prom/src/record_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,29 +254,27 @@ where
type Data = <BoxBody as http_body::Body>::Data;
type Error = Error;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Error>>> {
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
let mut this = self.project();
let res =
futures::ready!(this.inner.as_mut().poll_data(cx)).map(|res| res.map_err(Into::into));
if let Some(Err(error)) = res.as_ref() {
end_stream(this.state, Err(error));
} else if (*this.inner).is_end_stream() {
end_stream(this.state, Ok(None));

// Poll the inner body for the next frame.
let poll = this.inner.as_mut().poll_frame(cx);
let frame = futures::ready!(poll).map(|res| res.map_err(Error::from));

match &frame {
Some(Ok(frame)) => {
if let trls @ Some(_) = frame.trailers_ref() {
end_stream(this.state, Ok(trls));
}
}
Some(Err(error)) => end_stream(this.state, Err(error)),
None => end_stream(this.state, Ok(None)),
}
Poll::Ready(res)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Error>> {
let this = self.project();
let res = futures::ready!(this.inner.poll_trailers(cx)).map_err(Into::into);
end_stream(this.state, res.as_ref().map(Option::as_ref));
Poll::Ready(res)
Poll::Ready(frame)
}

fn is_end_stream(&self) -> bool {
Expand Down
19 changes: 4 additions & 15 deletions linkerd/http/prom/src/record_response/response.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use http_body::Frame;
use linkerd_error::Error;
use linkerd_http_box::BoxBody;
use linkerd_metrics::prom::Counter;
Expand Down Expand Up @@ -150,12 +151,12 @@ where
type Data = B::Data;
type Error = B::Error;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, B::Error>>> {
) -> Poll<Option<Result<Frame<Self::Data>, B::Error>>> {
let mut this = self.project();
let res = futures::ready!(this.inner.as_mut().poll_data(cx));
let res = futures::ready!(this.inner.as_mut().poll_frame(cx));
if (*this.inner).is_end_stream() {
if let Some(tx) = this.flushed.take() {
let _ = tx.send(time::Instant::now());
Expand All @@ -164,18 +165,6 @@ where
Poll::Ready(res)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, B::Error>> {
let this = self.project();
let res = futures::ready!(this.inner.poll_trailers(cx));
if let Some(tx) = this.flushed.take() {
let _ = tx.send(time::Instant::now());
}
Poll::Ready(res)
}

fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}
Expand Down

0 comments on commit b85b8bd

Please sign in to comment.