Skip to content

Commit

Permalink
refactor: Make read return BytesStream instead (#192)
Browse files Browse the repository at this point in the history
* Implement demo for rfc-191

Signed-off-by: Xuanwo <[email protected]>

* Implement for azblob

Signed-off-by: Xuanwo <[email protected]>

* Implement memory

Signed-off-by: Xuanwo <[email protected]>

* Implement for fs

Signed-off-by: Xuanwo <[email protected]>

* Implement Reader via BytesStream

Signed-off-by: Xuanwo <[email protected]>

* Remove read operation

Signed-off-by: Xuanwo <[email protected]>

* Refactor bench

Signed-off-by: Xuanwo <[email protected]>

* Rename to read

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Mar 28, 2022
1 parent 16931dd commit 5625507
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 77 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async-compat = "0.2"
async-trait = "0.1"
base64 = "0.13.0"
bstr = "0.2"
bytes = "1"
bytes = "1.1.0"
futures = { version = "0.3", features = ["alloc"] }
http = "0.2"
hyper = { version = "0.14", features = ["full"] }
Expand Down
21 changes: 9 additions & 12 deletions benches/ops/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use criterion::Criterion;
use futures::io;
use futures::AsyncReadExt;
use opendal::Operator;
use rand::prelude::*;
Expand Down Expand Up @@ -49,18 +50,16 @@ fn bench_read_full(c: &mut Criterion, op: Operator) {
] {
let content = gen_bytes(&mut rng, size.bytes() as usize);
let path = uuid::Uuid::new_v4().to_string();
let buf = vec![0; size.bytes() as usize];
let temp_data = TempData::generate(op.clone(), &path, content.clone());

group.throughput(criterion::Throughput::Bytes(size.bytes()));
group.bench_with_input(
size.to_string(Base::Base2, Style::Abbreviated),
&(op.clone(), &path, buf.clone()),
|b, (op, path, buf)| {
&(op.clone(), &path),
|b, (op, path)| {
b.to_async(&*TOKIO).iter(|| async {
let mut buf = buf.clone();
let mut r = op.object(path).limited_reader(size.bytes());
r.read_exact(&mut buf).await.unwrap();
let r = op.object(path).limited_reader(size.bytes());
io::copy(r, &mut io::sink()).await.unwrap();
})
},
);
Expand All @@ -86,18 +85,16 @@ fn bench_read_part(c: &mut Criterion, op: Operator) {
let content = gen_bytes(&mut rng, (size.bytes() * 2) as usize);
let path = uuid::Uuid::new_v4().to_string();
let offset = size.bytes() / 2;
let buf = vec![0; size.bytes() as usize];
let temp_data = TempData::generate(op.clone(), &path, content.clone());

group.throughput(criterion::Throughput::Bytes(size.bytes()));
group.bench_with_input(
size.to_string(Base::Base2, Style::Abbreviated),
&(op.clone(), &path, buf.clone()),
|b, (op, path, buf)| {
&(op.clone(), &path),
|b, (op, path)| {
b.to_async(&*TOKIO).iter(|| async {
let mut buf = buf.clone();
let mut r = op.object(path).offset_reader(offset);
r.read_exact(&mut buf).await.unwrap();
let r = op.object(path).offset_reader(offset);
io::copy(r, &mut io::sink()).await.unwrap();
})
},
);
Expand Down
6 changes: 3 additions & 3 deletions src/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use async_trait::async_trait;

use crate::error::Result;
use crate::io::BytesStream;
use crate::ops::OpDelete;
use crate::ops::OpList;
use crate::ops::OpRead;
Expand All @@ -35,8 +36,7 @@ use crate::Metadata;
/// use [`Operator`][crate::Operator] instead.
#[async_trait]
pub trait Accessor: Send + Sync + Debug {
/// Read data from the underlying storage into input writer.
async fn read(&self, args: &OpRead) -> Result<BoxedAsyncReader> {
async fn read(&self, args: &OpRead) -> Result<BytesStream> {
let _ = args;
unimplemented!()
}
Expand Down Expand Up @@ -78,7 +78,7 @@ pub trait Accessor: Send + Sync + Debug {
/// `Accessor` for `Arc<dyn Accessor>`.
#[async_trait]
impl<T: Accessor> Accessor for Arc<T> {
async fn read(&self, args: &OpRead) -> Result<BoxedAsyncReader> {
async fn read(&self, args: &OpRead) -> Result<BytesStream> {
self.as_ref().read(args).await
}
async fn write(&self, r: BoxedAsyncReader, args: &OpWrite) -> Result<usize> {
Expand Down
11 changes: 9 additions & 2 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use bytes::Bytes;
use futures::future::BoxFuture;
use futures::ready;
use futures::AsyncRead;
use futures::AsyncSeek;
use futures::Stream;
use futures::TryStreamExt;

use crate::error::Result;
use crate::ops::OpRead;
Expand All @@ -34,6 +37,8 @@ use crate::Metadata;

/// BoxedAsyncReader is a boxed AsyncRead.
pub type BoxedAsyncReader = Box<dyn AsyncRead + Unpin + Send>;
/// BytesStream represents a stream of bytes.
pub type BytesStream = Box<dyn Stream<Item = Result<Bytes>> + Unpin + Send>;

/// Reader is used for reading data from underlying backend.
///
Expand All @@ -53,7 +58,7 @@ pub struct Reader {

enum ReadState {
Idle,
Sending(BoxFuture<'static, Result<BoxedAsyncReader>>),
Sending(BoxFuture<'static, Result<BytesStream>>),
Seeking(BoxFuture<'static, Result<Metadata>>),
Reading(BoxedAsyncReader),
}
Expand Down Expand Up @@ -102,7 +107,9 @@ impl AsyncRead for Reader {
}
ReadState::Sending(future) => match ready!(Pin::new(future).poll(cx)) {
Ok(r) => {
self.state = ReadState::Reading(r);
self.state = ReadState::Reading(Box::new(
r.map_err(std::io::Error::from).into_async_read(),
));
self.poll_read(cx, buf)
}
Err(e) => Poll::Ready(Err(io::Error::from(e))),
Expand Down
12 changes: 12 additions & 0 deletions src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ use futures::ready;

use crate::error::Kind;
use crate::error::Result;
use crate::io::BytesStream;
use crate::ops::OpDelete;
use crate::ops::OpList;
use crate::ops::OpRead;
use crate::ops::OpStat;
use crate::Accessor;
use crate::Reader;
Expand All @@ -52,6 +54,16 @@ impl Object {
}
}

pub async fn stream(&self, offset: Option<u64>, size: Option<u64>) -> Result<BytesStream> {
self.acc
.read(&OpRead {
path: self.meta.path().to_string(),
offset,
size,
})
.await
}

/// Create a new reader which can read the whole object.
///
/// # Example
Expand Down
26 changes: 10 additions & 16 deletions src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@

use std::cmp::min;
use std::collections::HashMap;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use anyhow::anyhow;
use async_trait::async_trait;
Expand All @@ -43,6 +40,7 @@ use crate::credential::Credential;
use crate::error::Error;
use crate::error::Kind;
use crate::error::Result;
use crate::io::BytesStream;
use crate::object::Metadata;
use crate::ops::HeaderRange;
use crate::ops::OpDelete;
Expand Down Expand Up @@ -230,7 +228,7 @@ impl Backend {
#[async_trait]
impl Accessor for Backend {
#[trace("read")]
async fn read(&self, args: &OpRead) -> Result<BoxedAsyncReader> {
async fn read(&self, args: &OpRead) -> Result<BytesStream> {
increment_counter!("opendal_azure_read_requests");

let p = self.get_abs_path(&args.path);
Expand All @@ -247,7 +245,14 @@ impl Accessor for Backend {
&p, args.offset, args.size
);

Ok(Box::new(ByteStream(resp).into_async_read()))
Ok(Box::new(resp.into_body().into_stream().map_err(move |e| {
Error::Object {
kind: Kind::Unexpected,
op: "read",
path: p.to_string(),
source: anyhow::Error::from(e),
}
})))
}
_ => Err(parse_error_response(resp, "read", &p).await),
}
Expand Down Expand Up @@ -477,17 +482,6 @@ impl Backend {
})
}
}
struct ByteStream(hyper::Response<hyper::Body>);

impl futures::Stream for ByteStream {
type Item = std::io::Result<bytes::Bytes>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(self.0.body_mut())
.poll_next(cx)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
}
}

// Read and decode whole error response.
async fn parse_error_response(resp: Response<Body>, op: &'static str, path: &str) -> Error {
Expand Down
10 changes: 8 additions & 2 deletions src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use futures::io;
use futures::AsyncReadExt;
use futures::AsyncSeekExt;
use futures::AsyncWriteExt;
use futures::TryStreamExt;
use log::debug;
use log::error;
use log::info;
Expand All @@ -36,6 +37,7 @@ use super::object_stream::Readdir;
use crate::error::Error;
use crate::error::Kind;
use crate::error::Result;
use crate::io::BytesStream;
use crate::object::BoxedObjectStream;
use crate::object::Metadata;
use crate::object::ObjectMode;
Expand All @@ -44,6 +46,7 @@ use crate::ops::OpList;
use crate::ops::OpRead;
use crate::ops::OpStat;
use crate::ops::OpWrite;
use crate::readers::ReaderStream;
use crate::Accessor;
use crate::BoxedAsyncReader;

Expand Down Expand Up @@ -128,7 +131,7 @@ impl Backend {
#[async_trait]
impl Accessor for Backend {
#[trace("read")]
async fn read(&self, args: &OpRead) -> Result<BoxedAsyncReader> {
async fn read(&self, args: &OpRead) -> Result<BytesStream> {
increment_counter!("opendal_fs_read_requests");

let path = self.get_abs_path(&args.path);
Expand Down Expand Up @@ -162,11 +165,14 @@ impl Accessor for Backend {
None => Box::new(f),
};

// TODO: we need a better way to convert a file into stream.
let s = ReaderStream::new(r).map_err(|e| crate::error::Error::Unexpected(anyhow!(e)));

debug!(
"object {} reader created: offset {:?}, size {:?}",
&path, args.offset, args.size
);
Ok(r)
Ok(Box::new(s))
}

#[trace("write")]
Expand Down
29 changes: 6 additions & 23 deletions src/services/memory/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ use anyhow::anyhow;
use async_trait::async_trait;
use bytes::Bytes;
use futures::io;
use futures::TryStreamExt;
use futures::stream;
use minitrace::trace;

use crate::error::Error;
use crate::error::Kind;
use crate::error::Result;
use crate::io::BytesStream;
use crate::object::BoxedObjectStream;
use crate::ops::OpDelete;
use crate::ops::OpList;
Expand Down Expand Up @@ -81,7 +82,7 @@ impl Backend {
#[async_trait]
impl Accessor for Backend {
#[trace("read")]
async fn read(&self, args: &OpRead) -> Result<BoxedAsyncReader> {
async fn read(&self, args: &OpRead) -> Result<BytesStream> {
let path = Backend::normalize_path(&args.path);

let map = self.inner.lock().expect("lock poisoned");
Expand Down Expand Up @@ -118,8 +119,9 @@ impl Accessor for Backend {
data = data.slice(0..size as usize);
};

let r: BoxedAsyncReader = Box::new(BytesStream(data).into_async_read());
Ok(r)
Ok(Box::new(Box::pin(stream::once(async {
Ok::<_, Error>(data)
}))))
}
#[trace("write")]
async fn write(&self, mut r: BoxedAsyncReader, args: &OpWrite) -> Result<usize> {
Expand Down Expand Up @@ -209,25 +211,6 @@ impl Accessor for Backend {
}
}

struct BytesStream(Bytes);

impl futures::Stream for BytesStream {
type Item = std::result::Result<bytes::Bytes, std::io::Error>;

// Always poll the entire stream.
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let size = self.0.len();
match self.0.len() {
0 => Poll::Ready(None),
_ => Poll::Ready(Some(Ok(self.0.split_to(size)))),
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.0.len(), Some(self.0.len()))
}
}

struct EntryStream {
backend: Backend,
paths: Vec<String>,
Expand Down
Loading

0 comments on commit 5625507

Please sign in to comment.