diff --git a/src/lib.rs b/src/lib.rs index 3bb633f94ed5..08a985ad7fb5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -115,6 +115,3 @@ pub mod services; // // Please don't export any type from this module. mod error; - -#[deprecated] -pub mod readers; diff --git a/src/readers/callback.rs b/src/readers/callback.rs deleted file mode 100644 index 177e631ce128..000000000000 --- a/src/readers/callback.rs +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -use std::pin::Pin; -use std::task::Poll; - -use pin_project::pin_project; - -use crate::BytesReader; - -#[pin_project] -pub struct CallbackReader { - #[pin] - inner: BytesReader, - f: F, -} - -impl CallbackReader -where - F: FnMut(usize), -{ - /// # TODO - /// - /// Mark as dead_code for now, we will use it sooner while implement streams support. - #[allow(dead_code)] - pub fn new(r: BytesReader, f: F) -> Self { - CallbackReader { inner: r, f } - } -} - -impl futures::AsyncRead for CallbackReader -where - F: FnMut(usize), -{ - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut [u8], - ) -> Poll> { - let this = self.as_mut().project(); - - let r = this.inner.poll_read(cx, buf); - - if let Poll::Ready(Ok(len)) = r { - (self.f)(len); - }; - - r - } -} diff --git a/src/readers/mod.rs b/src/readers/mod.rs deleted file mode 100644 index f913ed01758e..000000000000 --- a/src/readers/mod.rs +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Reader related helper tools -mod callback; -pub use callback::CallbackReader; - -mod stream; -pub use stream::ReaderStream; - -mod observer; -pub use observer::ObserveReader; -pub use observer::ReadEvent; diff --git a/src/readers/observer.rs b/src/readers/observer.rs deleted file mode 100644 index 405935ec518f..000000000000 --- a/src/readers/observer.rs +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; - -use futures::AsyncRead; -use pin_project::pin_project; - -/// ReadEvent will emitted by `ObserveReader`. -#[derive(Copy, Clone, Debug)] -pub enum ReadEvent { - /// `poll_read` has been called. - Started, - /// `poll_read` returns `Pending`, we are back to the runtime. - Pending, - /// `poll_read` returns `Ready(Ok(n))`, we have read `n` bytes of data. - Read(usize), - /// `poll_read` returns `Ready(Err(e))`, we will have an `ErrorKind` here. - Error(std::io::ErrorKind), -} - -/// ObserveReader is used to observe state inside Reader. -/// -/// We will emit `ReadEvent` in different stages that the inner Reader reach. -/// Caller need to handle `ReadEvent` correctly and quickly. -#[pin_project] -pub struct ObserveReader { - r: R, - f: F, -} - -impl ObserveReader -where - R: AsyncRead + Send + Unpin, - F: FnMut(ReadEvent), -{ - pub fn new(r: R, f: F) -> Self { - Self { r, f } - } -} - -impl futures::AsyncRead for ObserveReader -where - R: AsyncRead + Send + Unpin, - F: FnMut(ReadEvent), -{ - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - (self.f)(ReadEvent::Started); - - match Pin::new(&mut self.r).poll_read(cx, buf) { - Poll::Ready(Ok(n)) => { - (self.f)(ReadEvent::Read(n)); - Poll::Ready(Ok(n)) - } - Poll::Ready(Err(e)) => { - (self.f)(ReadEvent::Error(e.kind())); - Poll::Ready(Err(e)) - } - Poll::Pending => { - (self.f)(ReadEvent::Pending); - Poll::Pending - } - } - } -} diff --git a/src/readers/stream.rs b/src/readers/stream.rs deleted file mode 100644 index 665ce2ece89b..000000000000 --- a/src/readers/stream.rs +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -use std::io; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; - -use futures::ready; -use futures::AsyncRead; -use pin_project::pin_project; - -use crate::BytesReader; - -const CAPACITY: usize = 4096; - -/// ReaderStream is used to convert a `futures::io::AsyncRead` into a `futures::Stream`. -/// -/// Most code inspired by `tokio_util::io::ReaderStream`. -#[pin_project] -pub struct ReaderStream { - #[pin] - reader: Option, - buf: bytes::BytesMut, -} - -impl ReaderStream { - pub fn new(r: BytesReader) -> Self { - ReaderStream { - reader: Some(r), - buf: bytes::BytesMut::new(), - } - } -} - -impl futures::Stream for ReaderStream { - type Item = io::Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.as_mut().project(); - - let reader = match this.reader.as_pin_mut() { - Some(r) => r, - None => return Poll::Ready(None), - }; - - // We will always use the same underlying buffer, the allocation happens only once. - if this.buf.is_empty() { - this.buf.resize(CAPACITY, 0); - } - - match ready!(reader.poll_read(cx, this.buf)) { - Err(err) => { - self.project().reader.set(None); - Poll::Ready(Some(Err(err))) - } - Ok(0) => { - self.project().reader.set(None); - Poll::Ready(None) - } - Ok(n) => { - let chunk = this.buf.split_to(n); - Poll::Ready(Some(Ok(chunk.freeze()))) - } - } - } -}