Skip to content

Commit

Permalink
refactor: Adopt io::Result instead (#204)
Browse files Browse the repository at this point in the history
* refactor: Adopt io::Result instead

Signed-off-by: Xuanwo <[email protected]>

* Add comment for opendal modules

Signed-off-by: Xuanwo <[email protected]>

* Make sure doc is buildable

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Apr 2, 2022
1 parent e6ad350 commit 4c27584
Show file tree
Hide file tree
Showing 30 changed files with 360 additions and 458 deletions.
6 changes: 6 additions & 0 deletions .github/actions/check/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ runs:
command: fmt
args: --all -- --check

- name: Doc check
uses: actions-rs/cargo@v1
with:
command: doc
args: --no-deps

- name: Check License Header
uses: apache/skywalking-eyes@main
env:
Expand Down
2 changes: 1 addition & 1 deletion opendal_test/src/services/azblob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::env;
use std::io::Result;
use std::sync::Arc;

use opendal::error::Result;
use opendal::services::azblob;
use opendal::Accessor;

Expand Down
2 changes: 1 addition & 1 deletion opendal_test/src/services/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::env;
use std::io::Result;
use std::path::PathBuf;
use std::sync::Arc;

use opendal::error::Result;
use opendal::services::fs;
use opendal::Accessor;

Expand Down
2 changes: 1 addition & 1 deletion opendal_test/src/services/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::env;
use std::io::Result;
use std::sync::Arc;

use opendal::error::Result;
use opendal::services::memory;
use opendal::Accessor;

Expand Down
2 changes: 1 addition & 1 deletion opendal_test/src/services/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::env;
use std::io::Result;
use std::sync::Arc;

use opendal::error::Result;
use opendal::services::s3;
use opendal::Accessor;

Expand Down
2 changes: 1 addition & 1 deletion src/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
// limitations under the License.

use std::fmt::Debug;
use std::io::Result;
use std::sync::Arc;

use async_trait::async_trait;

use crate::error::Result;
use crate::io::BytesSinker;
use crate::io::BytesStreamer;
use crate::ops::OpDelete;
Expand Down
132 changes: 51 additions & 81 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,108 +17,78 @@
//! # Examples
//!
//! ```
//! use anyhow::Result;
//! use opendal::ObjectMode;
//! use opendal::Operator;
//! use opendal::error::Kind;
//! use opendal::services::fs;
//!
//! #[tokio::main]
//! async fn main() -> Result<()> {
//! let op = Operator::new(fs::Backend::build().root("/tmp").finish().await?);
//!
//! // Get metadata of an object.
//! let meta = op.object("test_file").metadata().await;
//! if let Err(e) = op.object("test_file").metadata().await {
//! if e.kind() == Kind::ObjectNotExist {
//! println!("object not exist")
//! }
//! # use anyhow::Result;
//! # use opendal::ObjectMode;
//! # use opendal::Operator;
//! use std::io::ErrorKind;
//! # use opendal::services::fs;
//! # #[tokio::main]
//! # async fn main() -> Result<()> {
//! # let op = Operator::new(fs::Backend::build().root("/tmp").finish().await?);
//! if let Err(e) = op.object("test_file").metadata().await {
//! if e.kind() == ErrorKind::NotFound {
//! println!("object not exist")
//! }
//! Ok(())
//! }
//! # Ok(())
//! # }
//! ```
use std::collections::HashMap;
use std::io;

use thiserror::Error;

// TODO: implement From<Result> for `common_exception::Result`.s
pub type Result<T> = std::result::Result<T, Error>;

/// Kind is all meaningful error kind, that means you can depend on `Kind` to
/// take some actions instead of just print. For example, you can try check
/// `ObjectNotExist` before starting a write operation.
/// BackendError carries backend related context.
///
/// # Style
/// # Notes
///
/// The kind will be named as `noun-adj`. For example, `ObjectNotExist` or
/// `ObjectPermissionDenied`.
#[derive(Error, Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq)]
pub enum Kind {
#[error("backend not supported")]
BackendNotSupported,
#[error("backend configuration invalid")]
BackendConfigurationInvalid,

#[error("object not exist")]
ObjectNotExist,
#[error("object permission denied")]
ObjectPermissionDenied,
/// This error is used to carry context only, and should never be returned to users.
/// Please wrap in [`std::io::Error`] instead.
#[derive(Error, Debug)]
#[error("context: {context:?}, source: {source}")]
pub(crate) struct BackendError {
context: HashMap<String, String>,
source: anyhow::Error,
}

#[error("unexpected")]
Unexpected,
impl BackendError {
pub fn new(context: HashMap<String, String>, source: impl Into<anyhow::Error>) -> Self {
BackendError {
context,
source: source.into(),
}
}
}

/// Error is the error type for the dal2 crate.
/// ObjectError carries object related context.
///
/// ## Style
/// # Notes
///
/// The error will be formatted as `description: (keyA: valueA, keyB: valueB, ...)`.
/// This error is used to carry context only, and should never be returned to users.
/// Please wrap in [`std::io::Error`] with correct [`std::io::ErrorKind`] instead.
#[derive(Error, Debug)]
pub enum Error {
#[error("{kind}: (context: {context:?}, source: {source})")]
Backend {
kind: Kind,
context: HashMap<String, String>,
source: anyhow::Error,
},

#[error("{kind}: (op: {op}, path: {path}, source: {source})")]
Object {
kind: Kind,
op: &'static str,
path: String,
source: anyhow::Error,
},

#[error("unexpected: (source: {0})")]
Unexpected(#[from] anyhow::Error),
#[error("op: {op}, path: {path}, source: {source}")]
pub(crate) struct ObjectError {
op: &'static str,
path: String,
source: anyhow::Error,
}

impl Error {
pub fn kind(&self) -> Kind {
match self {
Error::Backend { kind, .. } => *kind,
Error::Object { kind, .. } => *kind,
Error::Unexpected(_) => Kind::Unexpected,
impl ObjectError {
pub fn new(op: &'static str, path: &str, source: impl Into<anyhow::Error>) -> Self {
ObjectError {
op,
path: path.to_string(),
source: source.into(),
}
}
}

// Make it easier to convert to `std::io::Error`
impl From<Error> for io::Error {
fn from(err: Error) -> Self {
match err {
Error::Backend { .. } => io::Error::new(io::ErrorKind::Other, err),
Error::Object { kind, .. } => match kind {
Kind::ObjectNotExist => io::Error::new(io::ErrorKind::NotFound, err),
Kind::ObjectPermissionDenied => {
io::Error::new(io::ErrorKind::PermissionDenied, err)
}
_ => io::Error::new(io::ErrorKind::Other, err),
},
Error::Unexpected(_) => io::Error::new(io::ErrorKind::Other, err),
}
}
/// Copied for [`io::Error::other`], should be removed after `io_error_other` stable.
pub(crate) fn other<E>(error: E) -> io::Error
where
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
io::Error::new(io::ErrorKind::Other, error.into())
}
6 changes: 3 additions & 3 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::Error;
use std::io::Result;

use bytes::Bytes;
use futures::AsyncRead;
use futures::AsyncWrite;
use futures::Sink;
use futures::Stream;

use crate::error::Error;
use crate::error::Result;

/// BytesRead represents a reader of bytes.
pub trait BytesRead: AsyncRead + Unpin + Send {}
impl<T> BytesRead for T where T: AsyncRead + Unpin + Send {}
Expand Down
34 changes: 16 additions & 18 deletions src/io_util/http_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
// limitations under the License.

use std::future::Future;
use std::io::Error;
use std::io::Result;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;

use anyhow::anyhow;
use bytes::Bytes;
use futures::channel::mpsc::Sender;
use futures::channel::mpsc::{self};
Expand All @@ -28,8 +29,7 @@ use http::Response;
use hyper::client::ResponseFuture;
use hyper::Body;

use crate::error::Error;
use crate::error::Result;
use crate::error::other;
use crate::ops::OpWrite;

/// Create a HTTP channel.
Expand All @@ -38,7 +38,7 @@ use crate::ops::OpWrite;
///
/// ```no_run
/// use opendal::io_util::new_http_channel;
/// # use opendal::error::Result;
/// # use std::io::Result;
/// # use bytes::Bytes;
/// # use futures::SinkExt;
///
Expand Down Expand Up @@ -87,18 +87,20 @@ impl HttpBodySinker {
/// # Example
///
/// ```rust
/// # use opendal::io_util::new_http_channel;
/// use opendal::io_util::new_http_channel;
/// # use opendal::io_util::HttpBodySinker;
/// # use http::StatusCode;
/// # use opendal::error::Error;
/// # use opendal::error::Result;
/// # use bytes::Bytes;
/// # use futures::SinkExt;
/// # use opendal::ops::OpWrite;
/// # use anyhow::anyhow;
/// # use anyhow::Result;
/// use std::io;
/// use std::io::ErrorKind;
///
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// # let client = hyper::Client::builder().build(hyper_tls::HttpsConnector::new());
/// let client = hyper::Client::builder().build(hyper_tls::HttpsConnector::new());
/// # let op = OpWrite::default();
/// let (mut tx, body) = new_http_channel();
/// let req = hyper::Request::put("https://httpbin.org/anything")
Expand All @@ -108,7 +110,7 @@ impl HttpBodySinker {
/// let mut bs = HttpBodySinker::new(&op, tx, client.request(req), |op, resp| {
/// match resp.status() {
/// StatusCode::OK => Ok(()),
/// _ => Err(Error::Unexpected(anyhow!("unexpected")))
/// _ => Err(io::Error::from(ErrorKind::Other))
/// }
/// });
/// bs.send(Bytes::from("Hello, World!")).await?;
Expand Down Expand Up @@ -136,8 +138,8 @@ impl HttpBodySinker {
) -> Poll<std::result::Result<(), Error>> {
match Pin::new(&mut self.fut).poll(cx) {
Poll::Ready(Ok(resp)) => Poll::Ready((self.handle)(&self.op, resp)),
// TODO: we need better error output.
Poll::Ready(Err(e)) => Poll::Ready(Err(Error::Unexpected(anyhow!(e)))),
// TODO: we need to inject an object error here.
Poll::Ready(Err(e)) => Poll::Ready(Err(other(e))),
Poll::Pending => Poll::Pending,
}
}
Expand All @@ -156,17 +158,13 @@ impl Sink<Bytes> for HttpBodySinker {
unreachable!("response returned too early: {:?}", v)
}

self.tx
.poll_ready(cx)
.map_err(|e| Error::Unexpected(anyhow!(e)))
self.tx.poll_ready(cx).map_err(other)
}

fn start_send(mut self: Pin<&mut Self>, item: Bytes) -> std::result::Result<(), Self::Error> {
let this = &mut *self;

this.tx
.start_send(item)
.map_err(|e| Error::Unexpected(anyhow!(e)))
this.tx.start_send(item).map_err(other)
}

fn poll_flush(
Expand All @@ -181,7 +179,7 @@ impl Sink<Bytes> for HttpBodySinker {
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
if let Err(e) = ready!(Pin::new(&mut self.tx).poll_close(cx)) {
return Poll::Ready(Err(Error::Unexpected(anyhow!(e))));
return Poll::Ready(Err(other(e)));
}

self.poll_response(cx)
Expand Down
9 changes: 5 additions & 4 deletions src/io_util/into_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use crate::BytesStream;
///
/// ```rust
/// use opendal::io_util::into_reader;
/// # use opendal::error::Result;
/// # use opendal::error::Error;
/// # use anyhow::Result;
/// # use std::io::Error;
/// # use futures::io;
/// # use bytes::Bytes;
/// # use futures::StreamExt;
Expand Down Expand Up @@ -103,7 +103,7 @@ where
}
Some(Err(err)) => {
self.state = State::Eof;
return Poll::Ready(Err(err.into()));
return Poll::Ready(Err(err));
}
None => {
self.state = State::Eof;
Expand All @@ -120,12 +120,13 @@ where

#[cfg(test)]
mod tests {
use std::io::Error;

use futures::stream;
use futures::AsyncReadExt;
use rand::prelude::*;

use super::*;
use crate::error::Error;

#[tokio::test]
async fn test_into_reader() {
Expand Down
Loading

0 comments on commit 4c27584

Please sign in to comment.