Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove BodyDataStream #1455

Merged
merged 1 commit into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ kubelet-debug = ["ws", "kube-core/kubelet-debug"]
oauth = ["client", "tame-oauth"]
oidc = ["client", "form_urlencoded"]
gzip = ["client", "tower-http/decompression-gzip"]
client = ["config", "__non_core", "hyper", "hyper-util", "http-body", "http-body-util", "tower", "tower-http", "hyper-timeout", "pin-project", "chrono", "jsonpath-rust", "bytes", "futures", "tokio", "tokio-util", "either"]
client = ["config", "__non_core", "hyper", "hyper-util", "http-body", "http-body-util", "tower", "tower-http", "hyper-timeout", "chrono", "jsonpath-rust", "bytes", "futures", "tokio", "tokio-util", "either"]
jsonpatch = ["kube-core/jsonpatch"]
admission = ["kube-core/admission"]
config = ["__non_core", "pem", "home"]
Expand Down Expand Up @@ -69,7 +69,6 @@ tower = { workspace = true, features = ["buffer", "filter", "util"], optional =
tower-http = { workspace = true, features = ["auth", "map-response-body", "trace"], optional = true }
hyper-timeout = { workspace = true, optional = true }
tame-oauth = { workspace = true, features = ["gcp"], optional = true }
pin-project = { workspace = true, optional = true }
rand = { workspace = true, optional = true }
secrecy = { workspace = true, features = ["alloc", "serde"] }
tracing = { workspace = true, features = ["log"], optional = true }
Expand Down
49 changes: 6 additions & 43 deletions kube-client/src/client/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ use std::{
};

use bytes::Bytes;
use futures::stream::Stream;
use futures::{stream::Stream, TryStreamExt};
use http_body::{Body as HttpBody, Frame, SizeHint};
use http_body_util::{combinators::UnsyncBoxBody, BodyExt};
use pin_project::pin_project;
use http_body_util::{combinators::UnsyncBoxBody, BodyExt, BodyStream};

/// A request body.
pub struct Body {
Expand Down Expand Up @@ -45,11 +44,10 @@ impl Body {
Body::new(Kind::Wrap(body.map_err(Into::into).boxed_unsync()))
}

pub(crate) fn into_stream(self) -> BodyDataStream<Self>
where
Self: Sized,
{
BodyDataStream::new(self)
pub(crate) fn into_data_stream(
self,
) -> impl Stream<Item = Result<<Self as HttpBody>::Data, <Self as HttpBody>::Error>> {
Box::pin(BodyStream::new(self).try_filter_map(|frame| async { Ok(frame.into_data().ok()) }))
}
}

Expand Down Expand Up @@ -108,38 +106,3 @@ impl HttpBody for Body {
}
}
}

// Wrap `http_body::Body` to implement `Stream`.
#[pin_project]
pub struct BodyDataStream<B> {
#[pin]
body: B,
}

impl<B> BodyDataStream<B> {
pub(crate) fn new(body: B) -> Self {
Self { body }
}
}

impl<B> Stream for BodyDataStream<B>
where
B: HttpBody<Data = Bytes>,
{
type Item = Result<B::Data, B::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
return match ready!(self.as_mut().project().body.poll_frame(cx)) {
Some(Ok(frame)) => {
let Ok(bytes) = frame.into_data() else {
continue;
};
Poll::Ready(Some(Ok(bytes)))
}
Some(Err(err)) => Poll::Ready(Some(Err(err))),
None => Poll::Ready(None),
};
}
}
}
4 changes: 2 additions & 2 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl Client {
let res = handle_api_errors(res).await?;
// Map the error, since we want to convert this into an `AsyncBufReader` using
// `into_async_read` which specifies `std::io::Error` as the stream's error type.
let body = res.into_body().into_stream().map_err(std::io::Error::other);
let body = res.into_body().into_data_stream().map_err(std::io::Error::other);
Ok(body.into_async_read())
}

Expand Down Expand Up @@ -309,7 +309,7 @@ impl Client {
tracing::trace!("headers: {:?}", res.headers());

let frames = FramedRead::new(
StreamReader::new(res.into_body().into_stream().map_err(|e| {
StreamReader::new(res.into_body().into_data_stream().map_err(|e| {
// Unexpected EOF from chunked decoder.
// Tends to happen when watching for 300+s. This will be ignored.
if e.to_string().contains("unexpected EOF during chunk") {
Expand Down
Loading