diff --git a/.github/actions/check/action.yml b/.github/actions/check/action.yml index 4a3cb99ceb99..1eb7ad9f6f9b 100644 --- a/.github/actions/check/action.yml +++ b/.github/actions/check/action.yml @@ -17,6 +17,12 @@ runs: command: fmt args: --all -- --check + - name: Doc check + uses: actions-rs/cargo@v1 + with: + command: doc + args: --no-deps + - name: Check License Header uses: apache/skywalking-eyes@main env: diff --git a/opendal_test/src/services/azblob.rs b/opendal_test/src/services/azblob.rs index 45d05d91ff9e..25334bbf65fa 100644 --- a/opendal_test/src/services/azblob.rs +++ b/opendal_test/src/services/azblob.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::env; +use std::io::Result; use std::sync::Arc; -use opendal::error::Result; use opendal::services::azblob; use opendal::Accessor; diff --git a/opendal_test/src/services/fs.rs b/opendal_test/src/services/fs.rs index 1cded0bed882..0778752101e5 100644 --- a/opendal_test/src/services/fs.rs +++ b/opendal_test/src/services/fs.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::env; +use std::io::Result; use std::path::PathBuf; use std::sync::Arc; -use opendal::error::Result; use opendal::services::fs; use opendal::Accessor; diff --git a/opendal_test/src/services/memory.rs b/opendal_test/src/services/memory.rs index 49587e65c915..0cbae9aecc8c 100644 --- a/opendal_test/src/services/memory.rs +++ b/opendal_test/src/services/memory.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::env; +use std::io::Result; use std::sync::Arc; -use opendal::error::Result; use opendal::services::memory; use opendal::Accessor; diff --git a/opendal_test/src/services/s3.rs b/opendal_test/src/services/s3.rs index cb4a45eaf95d..a06a50b66835 100644 --- a/opendal_test/src/services/s3.rs +++ b/opendal_test/src/services/s3.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::env; +use std::io::Result; use std::sync::Arc; -use opendal::error::Result; use opendal::services::s3; use opendal::Accessor; diff --git a/src/accessor.rs b/src/accessor.rs index e93879bdc027..9056c542c0b9 100644 --- a/src/accessor.rs +++ b/src/accessor.rs @@ -13,11 +13,11 @@ // limitations under the License. use std::fmt::Debug; +use std::io::Result; use std::sync::Arc; use async_trait::async_trait; -use crate::error::Result; use crate::io::BytesSinker; use crate::io::BytesStreamer; use crate::ops::OpDelete; diff --git a/src/error.rs b/src/error.rs index 60fd4eac6cb6..058a86e0a1ea 100644 --- a/src/error.rs +++ b/src/error.rs @@ -17,25 +17,21 @@ //! # Examples //! //! ``` -//! use anyhow::Result; -//! use opendal::ObjectMode; -//! use opendal::Operator; -//! use opendal::error::Kind; -//! use opendal::services::fs; -//! -//! #[tokio::main] -//! async fn main() -> Result<()> { -//! let op = Operator::new(fs::Backend::build().root("/tmp").finish().await?); -//! -//! // Get metadata of an object. -//! let meta = op.object("test_file").metadata().await; -//! if let Err(e) = op.object("test_file").metadata().await { -//! if e.kind() == Kind::ObjectNotExist { -//! println!("object not exist") -//! } +//! # use anyhow::Result; +//! # use opendal::ObjectMode; +//! # use opendal::Operator; +//! use std::io::ErrorKind; +//! # use opendal::services::fs; +//! # #[tokio::main] +//! # async fn main() -> Result<()> { +//! # let op = Operator::new(fs::Backend::build().root("/tmp").finish().await?); +//! if let Err(e) = op.object("test_file").metadata().await { +//! if e.kind() == ErrorKind::NotFound { +//! println!("object not exist") //! } -//! Ok(()) //! } +//! # Ok(()) +//! # } //! ``` use std::collections::HashMap; @@ -43,82 +39,56 @@ use std::io; use thiserror::Error; -// TODO: implement From for `common_exception::Result`.s -pub type Result = std::result::Result; - -/// Kind is all meaningful error kind, that means you can depend on `Kind` to -/// take some actions instead of just print. For example, you can try check -/// `ObjectNotExist` before starting a write operation. +/// BackendError carries backend related context. /// -/// # Style +/// # Notes /// -/// The kind will be named as `noun-adj`. For example, `ObjectNotExist` or -/// `ObjectPermissionDenied`. -#[derive(Error, Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] -pub enum Kind { - #[error("backend not supported")] - BackendNotSupported, - #[error("backend configuration invalid")] - BackendConfigurationInvalid, - - #[error("object not exist")] - ObjectNotExist, - #[error("object permission denied")] - ObjectPermissionDenied, +/// This error is used to carry context only, and should never be returned to users. +/// Please wrap in [`std::io::Error`] instead. +#[derive(Error, Debug)] +#[error("context: {context:?}, source: {source}")] +pub(crate) struct BackendError { + context: HashMap, + source: anyhow::Error, +} - #[error("unexpected")] - Unexpected, +impl BackendError { + pub fn new(context: HashMap, source: impl Into) -> Self { + BackendError { + context, + source: source.into(), + } + } } -/// Error is the error type for the dal2 crate. +/// ObjectError carries object related context. /// -/// ## Style +/// # Notes /// -/// The error will be formatted as `description: (keyA: valueA, keyB: valueB, ...)`. +/// This error is used to carry context only, and should never be returned to users. +/// Please wrap in [`std::io::Error`] with correct [`std::io::ErrorKind`] instead. #[derive(Error, Debug)] -pub enum Error { - #[error("{kind}: (context: {context:?}, source: {source})")] - Backend { - kind: Kind, - context: HashMap, - source: anyhow::Error, - }, - - #[error("{kind}: (op: {op}, path: {path}, source: {source})")] - Object { - kind: Kind, - op: &'static str, - path: String, - source: anyhow::Error, - }, - - #[error("unexpected: (source: {0})")] - Unexpected(#[from] anyhow::Error), +#[error("op: {op}, path: {path}, source: {source}")] +pub(crate) struct ObjectError { + op: &'static str, + path: String, + source: anyhow::Error, } -impl Error { - pub fn kind(&self) -> Kind { - match self { - Error::Backend { kind, .. } => *kind, - Error::Object { kind, .. } => *kind, - Error::Unexpected(_) => Kind::Unexpected, +impl ObjectError { + pub fn new(op: &'static str, path: &str, source: impl Into) -> Self { + ObjectError { + op, + path: path.to_string(), + source: source.into(), } } } -// Make it easier to convert to `std::io::Error` -impl From for io::Error { - fn from(err: Error) -> Self { - match err { - Error::Backend { .. } => io::Error::new(io::ErrorKind::Other, err), - Error::Object { kind, .. } => match kind { - Kind::ObjectNotExist => io::Error::new(io::ErrorKind::NotFound, err), - Kind::ObjectPermissionDenied => { - io::Error::new(io::ErrorKind::PermissionDenied, err) - } - _ => io::Error::new(io::ErrorKind::Other, err), - }, - Error::Unexpected(_) => io::Error::new(io::ErrorKind::Other, err), - } - } +/// Copied for [`io::Error::other`], should be removed after `io_error_other` stable. +pub(crate) fn other(error: E) -> io::Error +where + E: Into>, +{ + io::Error::new(io::ErrorKind::Other, error.into()) } diff --git a/src/io.rs b/src/io.rs index 1e4d5285a053..9bca411c2caa 100644 --- a/src/io.rs +++ b/src/io.rs @@ -12,15 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::io::Error; +use std::io::Result; + use bytes::Bytes; use futures::AsyncRead; use futures::AsyncWrite; use futures::Sink; use futures::Stream; -use crate::error::Error; -use crate::error::Result; - /// BytesRead represents a reader of bytes. pub trait BytesRead: AsyncRead + Unpin + Send {} impl BytesRead for T where T: AsyncRead + Unpin + Send {} diff --git a/src/io_util/http_body.rs b/src/io_util/http_body.rs index 9c2e186e917a..db2657a56afd 100644 --- a/src/io_util/http_body.rs +++ b/src/io_util/http_body.rs @@ -13,11 +13,12 @@ // limitations under the License. use std::future::Future; +use std::io::Error; +use std::io::Result; use std::pin::Pin; use std::task::Context; use std::task::Poll; -use anyhow::anyhow; use bytes::Bytes; use futures::channel::mpsc::Sender; use futures::channel::mpsc::{self}; @@ -28,8 +29,7 @@ use http::Response; use hyper::client::ResponseFuture; use hyper::Body; -use crate::error::Error; -use crate::error::Result; +use crate::error::other; use crate::ops::OpWrite; /// Create a HTTP channel. @@ -38,7 +38,7 @@ use crate::ops::OpWrite; /// /// ```no_run /// use opendal::io_util::new_http_channel; -/// # use opendal::error::Result; +/// # use std::io::Result; /// # use bytes::Bytes; /// # use futures::SinkExt; /// @@ -87,18 +87,20 @@ impl HttpBodySinker { /// # Example /// /// ```rust - /// # use opendal::io_util::new_http_channel; + /// use opendal::io_util::new_http_channel; /// # use opendal::io_util::HttpBodySinker; /// # use http::StatusCode; - /// # use opendal::error::Error; - /// # use opendal::error::Result; /// # use bytes::Bytes; /// # use futures::SinkExt; /// # use opendal::ops::OpWrite; /// # use anyhow::anyhow; + /// # use anyhow::Result; + /// use std::io; + /// use std::io::ErrorKind; + /// /// # #[tokio::main] /// # async fn main() -> Result<()> { - /// # let client = hyper::Client::builder().build(hyper_tls::HttpsConnector::new()); + /// let client = hyper::Client::builder().build(hyper_tls::HttpsConnector::new()); /// # let op = OpWrite::default(); /// let (mut tx, body) = new_http_channel(); /// let req = hyper::Request::put("https://httpbin.org/anything") @@ -108,7 +110,7 @@ impl HttpBodySinker { /// let mut bs = HttpBodySinker::new(&op, tx, client.request(req), |op, resp| { /// match resp.status() { /// StatusCode::OK => Ok(()), - /// _ => Err(Error::Unexpected(anyhow!("unexpected"))) + /// _ => Err(io::Error::from(ErrorKind::Other)) /// } /// }); /// bs.send(Bytes::from("Hello, World!")).await?; @@ -136,8 +138,8 @@ impl HttpBodySinker { ) -> Poll> { match Pin::new(&mut self.fut).poll(cx) { Poll::Ready(Ok(resp)) => Poll::Ready((self.handle)(&self.op, resp)), - // TODO: we need better error output. - Poll::Ready(Err(e)) => Poll::Ready(Err(Error::Unexpected(anyhow!(e)))), + // TODO: we need to inject an object error here. + Poll::Ready(Err(e)) => Poll::Ready(Err(other(e))), Poll::Pending => Poll::Pending, } } @@ -156,17 +158,13 @@ impl Sink for HttpBodySinker { unreachable!("response returned too early: {:?}", v) } - self.tx - .poll_ready(cx) - .map_err(|e| Error::Unexpected(anyhow!(e))) + self.tx.poll_ready(cx).map_err(other) } fn start_send(mut self: Pin<&mut Self>, item: Bytes) -> std::result::Result<(), Self::Error> { let this = &mut *self; - this.tx - .start_send(item) - .map_err(|e| Error::Unexpected(anyhow!(e))) + this.tx.start_send(item).map_err(other) } fn poll_flush( @@ -181,7 +179,7 @@ impl Sink for HttpBodySinker { cx: &mut Context<'_>, ) -> Poll> { if let Err(e) = ready!(Pin::new(&mut self.tx).poll_close(cx)) { - return Poll::Ready(Err(Error::Unexpected(anyhow!(e)))); + return Poll::Ready(Err(other(e))); } self.poll_response(cx) diff --git a/src/io_util/into_reader.rs b/src/io_util/into_reader.rs index 0bdfc3ea5204..8f76aab2cdd2 100644 --- a/src/io_util/into_reader.rs +++ b/src/io_util/into_reader.rs @@ -35,8 +35,8 @@ use crate::BytesStream; /// /// ```rust /// use opendal::io_util::into_reader; -/// # use opendal::error::Result; -/// # use opendal::error::Error; +/// # use anyhow::Result; +/// # use std::io::Error; /// # use futures::io; /// # use bytes::Bytes; /// # use futures::StreamExt; @@ -103,7 +103,7 @@ where } Some(Err(err)) => { self.state = State::Eof; - return Poll::Ready(Err(err.into())); + return Poll::Ready(Err(err)); } None => { self.state = State::Eof; @@ -120,12 +120,13 @@ where #[cfg(test)] mod tests { + use std::io::Error; + use futures::stream; use futures::AsyncReadExt; use rand::prelude::*; use super::*; - use crate::error::Error; #[tokio::test] async fn test_into_reader() { diff --git a/src/io_util/into_sink.rs b/src/io_util/into_sink.rs index d74a1eded4ce..f049a37ca3c8 100644 --- a/src/io_util/into_sink.rs +++ b/src/io_util/into_sink.rs @@ -12,19 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::io::Error; +use std::io::Result; use std::pin::Pin; use std::task::Context; use std::task::Poll; -use anyhow::anyhow; use bytes::Buf; use bytes::Bytes; use futures::ready; use futures::Sink; use pin_project::pin_project; -use crate::error::Error; -use crate::error::Result; use crate::BytesWrite; /// Convert [`BytesWrite`][crate::BytesWrite] into [`BytesSink`][crate::BytesSink]. @@ -37,7 +36,7 @@ use crate::BytesWrite; /// /// ```rust /// use opendal::io_util::into_sink; -/// # use opendal::error::Result; +/// # use std::io::Result; /// # use bytes::Bytes; /// # use futures::SinkExt; /// @@ -78,8 +77,7 @@ where if this.buf.is_empty() { break; } - let n = ready!(this.w.as_mut().poll_write(cx, this.buf)) - .map_err(|e| Error::Unexpected(anyhow!(e)))?; + let n = ready!(this.w.as_mut().poll_write(cx, this.buf))?; this.buf.advance(n); } @@ -113,10 +111,7 @@ where ) -> Poll> { ready!(self.as_mut().poll_flush_buffer(cx))?; - self.project() - .w - .poll_flush(cx) - .map_err(|e| Error::Unexpected(anyhow!(e))) + self.project().w.poll_flush(cx) } fn poll_close( @@ -125,10 +120,7 @@ where ) -> Poll> { ready!(self.as_mut().poll_flush_buffer(cx))?; - self.project() - .w - .poll_close(cx) - .map_err(|e| Error::Unexpected(anyhow!(e))) + self.project().w.poll_close(cx) } } diff --git a/src/io_util/into_stream.rs b/src/io_util/into_stream.rs index 7f445a2075e2..7389fb3d9130 100644 --- a/src/io_util/into_stream.rs +++ b/src/io_util/into_stream.rs @@ -12,19 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::io::Result; use std::pin::Pin; use std::task::Context; use std::task::Poll; -use anyhow::anyhow; use bytes::Bytes; use bytes::BytesMut; use futures::ready; use futures::Stream; use pin_project::pin_project; -use crate::error::Error; -use crate::error::Result; use crate::BytesRead; /// Convert [`BytesRead`][crate::BytesRead] into [`BytesStream`][crate::BytesStream]. @@ -37,7 +35,7 @@ use crate::BytesRead; /// /// ```rust /// use opendal::io_util::into_stream; -/// # use opendal::error::Result; +/// # use std::io::Result; /// # use futures::io; /// # use bytes::Bytes; /// # use futures::StreamExt; @@ -82,7 +80,7 @@ where } match ready!(this.r.poll_read(cx, this.buf)) { - Err(err) => Poll::Ready(Some(Err(Error::Unexpected(anyhow!(err))))), + Err(err) => Poll::Ready(Some(Err(err))), Ok(0) => Poll::Ready(None), Ok(n) => { let chunk = this.buf.split_to(n); diff --git a/src/io_util/into_writer.rs b/src/io_util/into_writer.rs index ac498f631a84..29677f5d611b 100644 --- a/src/io_util/into_writer.rs +++ b/src/io_util/into_writer.rs @@ -34,7 +34,7 @@ use crate::BytesSink; /// ```rust /// use opendal::io_util::into_writer; /// # use opendal::io_util::into_sink; -/// # use opendal::error::Result; +/// # use std::io::Result; /// # use futures::io; /// # use bytes::Bytes; /// # use futures::StreamExt; @@ -83,15 +83,11 @@ where } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.s) - .poll_flush(cx) - .map_err(io::Error::from) + Pin::new(&mut self.s).poll_flush(cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.s) - .poll_close(cx) - .map_err(io::Error::from) + Pin::new(&mut self.s).poll_close(cx) } } diff --git a/src/io_util/seekable_reader.rs b/src/io_util/seekable_reader.rs index d6589feb0d27..a00ba96450b5 100644 --- a/src/io_util/seekable_reader.rs +++ b/src/io_util/seekable_reader.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::future::Future; -use std::io; +use std::io::Result; use std::io::SeekFrom; use std::ops::RangeBounds; use std::pin::Pin; @@ -27,7 +27,6 @@ use futures::AsyncRead; use futures::AsyncSeek; use futures::TryStreamExt; -use crate::error::Result; use crate::ops::BytesRange; use crate::ops::OpRead; use crate::ops::OpStat; @@ -126,7 +125,7 @@ impl AsyncRead for SeekableReader { State::Reading(Box::new(r.map_err(std::io::Error::from).into_async_read())); self.poll_read(cx, buf) } - Err(e) => Poll::Ready(Err(io::Error::from(e))), + Err(e) => Poll::Ready(Err(e)), }, State::Reading(r) => match ready!(Pin::new(r).poll_read(cx, buf)) { Ok(n) => { @@ -147,12 +146,8 @@ impl AsyncSeek for SeekableReader { pos: SeekFrom, ) -> Poll> { if let State::Seeking(future) = &mut self.state { - match ready!(Pin::new(future).poll(cx)) { - Ok(meta) => { - self.size = Some(meta.content_length() - self.offset.unwrap_or_default()) - } - Err(e) => return Poll::Ready(Err(io::Error::from(e))), - } + let meta = ready!(Pin::new(future).poll(cx))?; + self.size = Some(meta.content_length() - self.offset.unwrap_or_default()) } let cur = self.pos as i64; diff --git a/src/io_util/sink_observer.rs b/src/io_util/sink_observer.rs index ddc762421228..d32960d2ce56 100644 --- a/src/io_util/sink_observer.rs +++ b/src/io_util/sink_observer.rs @@ -11,6 +11,9 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +use std::io::Error; +use std::io::ErrorKind; +use std::io::Result; use std::pin::Pin; use std::task::Context; use std::task::Poll; @@ -19,9 +22,6 @@ use bytes::Bytes; use futures::Sink; use pin_project::pin_project; -use crate::error::Error; -use crate::error::Kind; -use crate::error::Result; use crate::io::BytesSinker; /// Create an observer over BytesSink. @@ -35,7 +35,7 @@ use crate::io::BytesSinker; /// use opendal::io_util::observe_sink; /// use opendal::io_util::SinkEvent; /// # use opendal::io_util::into_sink; -/// # use opendal::error::Result; +/// # use std::io::Result; /// # use futures::io; /// # use bytes::Bytes; /// # use futures::StreamExt; @@ -77,7 +77,7 @@ pub enum SinkEvent { /// # Note /// /// We only emit the error kind here so that we don't need clone the whole error. - Error(Kind), + Error(ErrorKind), } /// Observer that created via [`observe_sink`]. diff --git a/src/io_util/stream_observer.rs b/src/io_util/stream_observer.rs index 1046a3f286f1..ee2329c09f8a 100644 --- a/src/io_util/stream_observer.rs +++ b/src/io_util/stream_observer.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::io::ErrorKind; +use std::io::Result; use std::pin::Pin; use std::task::Context; use std::task::Poll; @@ -20,8 +22,6 @@ use bytes::Bytes; use futures::Stream; use pin_project::pin_project; -use crate::error::Kind; -use crate::error::Result; use crate::BytesStreamer; /// Create an observer over BytesStream. @@ -35,7 +35,7 @@ use crate::BytesStreamer; /// use opendal::io_util::observe_stream; /// use opendal::io_util::StreamEvent; /// # use opendal::io_util::into_stream; -/// # use opendal::error::Result; +/// # use std::io::Result; /// # use futures::io; /// # use bytes::Bytes; /// # use futures::StreamExt; @@ -72,7 +72,7 @@ pub enum StreamEvent { /// # Note /// /// We only emit the error kind here so that we don't need clone the whole error. - Error(Kind), + Error(ErrorKind), } /// Observer that created via [`observe_stream`]. diff --git a/src/layer.rs b/src/layer.rs index 74affb76ad16..ea10b40977bd 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -56,12 +56,12 @@ impl Layer for Arc { #[cfg(test)] mod tests { + use std::io::Result; use std::sync::Arc; use futures::lock::Mutex; use super::*; - use crate::error::Result; use crate::ops::OpDelete; use crate::services::fs; use crate::Accessor; diff --git a/src/lib.rs b/src/lib.rs index e248147b61c1..93d899f546b8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,6 +63,8 @@ //! - [fs][crate::services::fs]: POSIX alike file system. //! - [memory][crate::services::memory]: In memory backend support. //! - [s3][crate::services::s3]: AWS services like S3. + +/// Private module with public types, they will be accessed via `opendal::Xxxx` mod accessor; pub use accessor::Accessor; @@ -92,10 +94,15 @@ pub use object::ObjectStreamer; mod scheme; pub use scheme::Scheme; -pub mod error; +/// Public modules, they will be accessed via `opendal::io_util::Xxxx` pub mod io_util; pub mod ops; pub mod services; +/// Private modules, internal use only. +/// +/// Please don't export any type from this module. +mod error; + #[deprecated] pub mod readers; diff --git a/src/object.rs b/src/object.rs index 0875f4f25748..f18a545a9d8e 100644 --- a/src/object.rs +++ b/src/object.rs @@ -14,6 +14,8 @@ use std::fmt::Debug; use std::fmt::Display; use std::fmt::Formatter; +use std::io::ErrorKind; +use std::io::Result; use std::ops::RangeBounds; use std::sync::Arc; use std::time::SystemTime; @@ -24,8 +26,6 @@ use bytes::BytesMut; use futures::SinkExt; use futures::StreamExt; -use crate::error::Kind; -use crate::error::Result; use crate::io::BytesRead; use crate::io::BytesSinker; use crate::io::BytesStreamer; @@ -72,7 +72,7 @@ impl Object { /// /// ``` /// # use opendal::services::memory; - /// # use opendal::error::Result; + /// # use std::io::Result; /// # use opendal::Operator; /// # use futures::StreamExt; /// use bytes::{BufMut, BytesMut}; @@ -96,7 +96,7 @@ impl Object { /// /// ``` /// # use opendal::services::memory; - /// # use opendal::error::Result; + /// # use std::io::Result; /// # use opendal::Operator; /// # use futures::TryStreamExt; /// # #[tokio::main] @@ -113,7 +113,7 @@ impl Object { /// /// ``` /// # use opendal::services::memory; - /// # use opendal::error::Result; + /// # use std::io::Result; /// # use opendal::Operator; /// # use futures::TryStreamExt; /// # #[tokio::main] @@ -130,7 +130,7 @@ impl Object { /// /// ``` /// # use opendal::services::memory; - /// # use opendal::error::Result; + /// # use std::io::Result; /// # use opendal::Operator; /// # use futures::TryStreamExt; /// # #[tokio::main] @@ -158,7 +158,7 @@ impl Object { /// /// ``` /// # use opendal::services::memory; - /// # use opendal::error::Result; + /// # use std::io::Result; /// # use opendal::Operator; /// # use futures::TryStreamExt; /// # #[tokio::main] @@ -183,7 +183,7 @@ impl Object { /// /// ``` /// # use opendal::services::memory; - /// # use opendal::error::Result; + /// # use std::io::Result; /// # use opendal::Operator; /// # use futures::TryStreamExt; /// # #[tokio::main] @@ -217,7 +217,7 @@ impl Object { /// /// ``` /// # use opendal::services::memory; - /// # use opendal::error::Result; + /// # use std::io::Result; /// # use opendal::Operator; /// # use futures::TryStreamExt; /// # #[tokio::main] @@ -241,7 +241,7 @@ impl Object { /// /// ``` /// # use opendal::services::memory; - /// # use opendal::error::Result; + /// # use std::io::Result; /// # use opendal::Operator; /// # use futures::TryStreamExt; /// # #[tokio::main] @@ -274,7 +274,7 @@ impl Object { /// /// ``` /// # use opendal::services::memory; - /// # use opendal::error::Result; + /// # use std::io::Result; /// # use opendal::Operator; /// # use futures::StreamExt; /// # use futures::SinkExt; @@ -308,7 +308,7 @@ impl Object { /// /// ``` /// # use opendal::services::memory; - /// # use opendal::error::Result; + /// # use std::io::Result; /// # use opendal::Operator; /// # use futures::StreamExt; /// # use futures::SinkExt; @@ -342,7 +342,7 @@ impl Object { /// /// ``` /// # use opendal::services::memory; - /// # use opendal::error::Result; + /// # use std::io::Result; /// # use opendal::Operator; /// # use futures::StreamExt; /// # use futures::SinkExt; @@ -372,7 +372,7 @@ impl Object { /// /// ``` /// # use opendal::services::memory; - /// # use opendal::error::Result; + /// # use std::io::Result; /// # use opendal::Operator; /// # use futures::StreamExt; /// # use futures::SinkExt; @@ -399,7 +399,7 @@ impl Object { /// /// ``` /// # use opendal::services::memory; - /// # use opendal::error::Result; + /// # use std::io::Result; /// # use opendal::Operator; /// # use futures::StreamExt; /// # use futures::SinkExt; @@ -466,12 +466,13 @@ impl Object { /// # use anyhow::Result; /// # use futures::io; /// # use opendal::Operator; - /// # use opendal::error::Kind; + /// use std::io::ErrorKind; + /// # /// # #[tokio::main] /// # async fn main() -> Result<()> { /// # let op = Operator::new(memory::Backend::build().finish().await?); /// if let Err(e) = op.object("test").metadata().await { - /// if e.kind() == Kind::ObjectNotExist { + /// if e.kind() == ErrorKind::NotFound { /// println!("object not exist") /// } /// } @@ -493,7 +494,7 @@ impl Object { /// use anyhow::Result; /// use futures::io; /// use opendal::Operator; - /// use opendal::error::Kind; + /// /// /// #[tokio::main] /// async fn main() -> Result<()> { @@ -527,7 +528,7 @@ impl Object { /// use anyhow::Result; /// use futures::io; /// use opendal::Operator; - /// use opendal::error::Kind; + /// /// /// #[tokio::main] /// async fn main() -> Result<()> { @@ -542,7 +543,7 @@ impl Object { match r { Ok(_) => Ok(true), Err(err) => match err.kind() { - Kind::ObjectNotExist => Ok(false), + ErrorKind::NotFound => Ok(false), _ => Err(err), }, } diff --git a/src/operator.rs b/src/operator.rs index b3db71ea7b08..9c0a5b6f9c9e 100644 --- a/src/operator.rs +++ b/src/operator.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::io::Result; use std::sync::Arc; -use crate::error::Result; use crate::ops::OpList; use crate::Accessor; use crate::Layer; diff --git a/src/scheme.rs b/src/scheme.rs index d2d350ce27d8..bdc6afd60883 100644 --- a/src/scheme.rs +++ b/src/scheme.rs @@ -1,3 +1,4 @@ +use std::io; // Copyright 2022 Datafuse Labs. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,8 +16,8 @@ use std::str::FromStr; use anyhow::anyhow; -use super::error::Error; -use crate::error::Kind; +use crate::error::other; +use crate::error::BackendError; /// Backends that OpenDAL supports #[derive(Clone, Debug, PartialEq)] @@ -29,7 +30,7 @@ pub enum Scheme { } impl FromStr for Scheme { - type Err = Error; + type Err = io::Error; fn from_str(s: &str) -> Result { let s = s.to_lowercase(); @@ -43,11 +44,10 @@ impl FromStr for Scheme { "local" | "disk" => Ok(Scheme::Fs), "azurestorageblob" => Ok(Scheme::Azblob), - v => Err(Error::Backend { - kind: Kind::BackendNotSupported, - context: Default::default(), - source: anyhow!("{} is not supported", v), - }), + v => Err(other(BackendError::new( + Default::default(), + anyhow!("{} is not supported", v), + ))), } } } diff --git a/src/services/azblob/backend.rs b/src/services/azblob/backend.rs index 915c88425bd5..7f1300b1e530 100644 --- a/src/services/azblob/backend.rs +++ b/src/services/azblob/backend.rs @@ -16,6 +16,10 @@ use std::cmp::min; use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; +use std::io; +use std::io::Error; +use std::io::ErrorKind; +use std::io::Result; use std::mem; use std::str::FromStr; use std::sync::Arc; @@ -38,9 +42,9 @@ use reqsign::services::azure::storage::Signer; use time::format_description::well_known::Rfc2822; use time::OffsetDateTime; -use crate::error::Error; -use crate::error::Kind; -use crate::error::Result; +use crate::error::other; +use crate::error::BackendError; +use crate::error::ObjectError; use crate::io::BytesSinker; use crate::io::BytesStreamer; use crate::io_util::new_http_channel; @@ -146,11 +150,10 @@ impl Builder { // Handle endpoint, region and container name. let container = match self.container.is_empty() { false => Ok(&self.container), - true => Err(Error::Backend { - kind: Kind::BackendConfigurationInvalid, - context: HashMap::from([("container".to_string(), "".to_string())]), - source: anyhow!("container is empty"), - }), + true => Err(other(BackendError::new( + HashMap::from([("container".to_string(), "".to_string())]), + anyhow!("container is empty"), + ))), }?; debug!("backend use container {}", &container); @@ -159,6 +162,11 @@ impl Builder { None => "blob.core.windows.net".to_string(), }; + let context = HashMap::from([ + ("container".to_string(), container.to_string()), + ("endpoint".to_string(), endpoint.to_string()), + ]); + let client = hyper::Client::builder().build(hyper_tls::HttpsConnector::new()); let mut signer_builder = Signer::builder(); @@ -166,7 +174,10 @@ impl Builder { signer_builder.account_name(name).account_key(key); } - let signer = signer_builder.build().await?; + let signer = signer_builder + .build() + .await + .map_err(|e| other(BackendError::new(context, e)))?; info!("backend build finished: {:?}", &self); Ok(Arc::new(Backend { @@ -250,14 +261,11 @@ impl Accessor for Backend { &p, args.offset, args.size ); - Ok(Box::new(resp.into_body().into_stream().map_err(move |e| { - Error::Object { - kind: Kind::Unexpected, - op: "read", - path: p.to_string(), - source: anyhow::Error::from(e), - } - }))) + Ok(Box::new( + resp.into_body() + .into_stream() + .map_err(move |e| other(ObjectError::new("read", &p, e))), + )) } _ => Err(parse_error_response_with_body(resp, "read", &p).await), } @@ -418,12 +426,7 @@ impl Backend { self.client.request(req).await.map_err(|e| { error!("object {} get_object: {:?}", path, e); - Error::Object { - kind: Kind::Unexpected, - op: "read", - path: path.to_string(), - source: anyhow::Error::from(e), - } + other(ObjectError::new("read", path, e)) }) } @@ -468,12 +471,7 @@ impl Backend { self.client.request(req).await.map_err(|e| { error!("object {} head_object: {:?}", path, e); - Error::Object { - kind: Kind::Unexpected, - op: "stat", - path: path.to_string(), - source: anyhow::Error::from(e), - } + other(ObjectError::new("stat", path, e)) }) } @@ -492,12 +490,7 @@ impl Backend { self.client.request(req).await.map_err(|e| { error!("object {} delete_object: {:?}", path, e); - Error::Object { - kind: Kind::Unexpected, - op: "delete", - path: path.to_string(), - source: anyhow::Error::from(e), - } + other(ObjectError::new("delete", path, e)) }) } @@ -526,30 +519,27 @@ impl Backend { self.client.request(req).await.map_err(|e| { error!("object {} list_object: {:?}", path, e); - Error::Object { - kind: Kind::Unexpected, - op: "list", - path: path.to_string(), - source: anyhow::Error::from(e), - } + other(ObjectError::new("list", path, e)) }) } } -fn parse_error_response_without_body(resp: Response, op: &'static str, path: &str) -> Error { +fn parse_error_response_without_body( + resp: Response, + op: &'static str, + path: &str, +) -> io::Error { let (part, _) = resp.into_parts(); let kind = match part.status { - StatusCode::NOT_FOUND => Kind::ObjectNotExist, - StatusCode::FORBIDDEN => Kind::ObjectPermissionDenied, - _ => Kind::Unexpected, + StatusCode::NOT_FOUND => ErrorKind::NotFound, + StatusCode::FORBIDDEN => ErrorKind::PermissionDenied, + _ => ErrorKind::Other, }; - Error::Object { + io::Error::new( kind, - op, - path: path.to_string(), - source: anyhow!("response part: {:?}", part), - } + ObjectError::new(op, path, anyhow!("response part: {:?}", part)), + ) } // Read and decode whole error response. @@ -560,9 +550,9 @@ async fn parse_error_response_with_body( ) -> Error { let (part, mut body) = resp.into_parts(); let kind = match part.status { - StatusCode::NOT_FOUND => Kind::ObjectNotExist, - StatusCode::FORBIDDEN => Kind::ObjectPermissionDenied, - _ => Kind::Unexpected, + StatusCode::NOT_FOUND => ErrorKind::NotFound, + StatusCode::FORBIDDEN => ErrorKind::PermissionDenied, + _ => ErrorKind::Other, }; // Only read 4KiB from the response to avoid broken services. @@ -578,18 +568,20 @@ async fn parse_error_response_with_body( break; } } - Err(e) => return Error::Unexpected(anyhow!("parse error response parse: {:?}", e)), + Err(e) => return other(anyhow!("parse error response parse: {:?}", e)), } } - Error::Object { + io::Error::new( kind, - op, - path: path.to_string(), - source: anyhow!( - "response part: {:?}, body: {:?}", - part, - String::from_utf8_lossy(&bs) + ObjectError::new( + op, + path, + anyhow!( + "response part: {:?}, body: {:?}", + part, + String::from_utf8_lossy(&bs) + ), ), - } + ) } diff --git a/src/services/azblob/object_stream.rs b/src/services/azblob/object_stream.rs index d139d79df6e4..b505b94e8546 100644 --- a/src/services/azblob/object_stream.rs +++ b/src/services/azblob/object_stream.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::future::Future; +use std::io::Result; use std::pin::Pin; use std::sync::Arc; use std::task::Context; @@ -28,9 +29,8 @@ use quick_xml::de; use serde::Deserialize; use super::Backend; -use crate::error::Error; -use crate::error::Kind; -use crate::error::Result; +use crate::error::other; +use crate::error::ObjectError; use crate::Object; use crate::ObjectMode; @@ -75,25 +75,15 @@ impl futures::Stream for AzblobObjectStream { let mut resp = backend.list_blobs(&path, &next_marker).await?; if resp.status() != http::StatusCode::OK { - let e = Err(Error::Object { - kind: Kind::Unexpected, - op: "list", - path: path.clone(), - source: anyhow!("{:?}", resp), - }); + let e = other(ObjectError::new("list", &path, anyhow!("{:?}", resp))); debug!("error response: {:?}", resp); - return e; + return Err(e); } let body = resp.body_mut(); let mut bs = bytes::BytesMut::new(); while let Some(b) = body.next().await { - let b = b.map_err(|e| Error::Object { - kind: Kind::Unexpected, - op: "list", - path: path.clone(), - source: anyhow!("read body: {:?}", e), - })?; + let b = b.map_err(|e| other(ObjectError::new("list", &path, e)))?; bs.put_slice(&b) } @@ -104,12 +94,8 @@ impl futures::Stream for AzblobObjectStream { } State::Sending(fut) => { let bs = ready!(Pin::new(fut).poll(cx))?; - let output: Output = de::from_reader(bs.reader()).map_err(|e| Error::Object { - kind: Kind::Unexpected, - op: "list", - path: self.path.clone(), - source: anyhow!("deserialize list_bucket output: {:?}", e), - })?; + let output: Output = de::from_reader(bs.reader()) + .map_err(|e| other(ObjectError::new("list", &self.path, e)))?; // Try our best to check whether this list is done. // diff --git a/src/services/fs/backend.rs b/src/services/fs/backend.rs index b15a2e101ab7..6767d129b114 100644 --- a/src/services/fs/backend.rs +++ b/src/services/fs/backend.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::io::Result; use std::io::SeekFrom; use std::path::PathBuf; use std::sync::Arc; @@ -31,9 +32,9 @@ use tokio::fs; use super::error::parse_io_error; use super::object_stream::Readdir; -use crate::error::Error; -use crate::error::Kind; -use crate::error::Result; +use crate::error::other; +use crate::error::BackendError; +use crate::error::ObjectError; use crate::io::BytesSinker; use crate::io::BytesStreamer; use crate::io_util::into_sink; @@ -69,11 +70,10 @@ impl Builder { None => "/".to_string(), Some(v) => { if !v.starts_with('/') { - return Err(Error::Backend { - kind: Kind::BackendConfigurationInvalid, - context: HashMap::from([("root".to_string(), v.clone())]), - source: anyhow!("Root must start with /"), - }); + return Err(other(BackendError::new( + HashMap::from([("root".to_string(), v.clone())]), + anyhow!("Root must start with /"), + ))); } v.to_string() } @@ -188,7 +188,13 @@ impl Accessor for Backend { // - Is it better to check the parent dir exists before call mkdir? let parent = PathBuf::from(&path) .parent() - .ok_or_else(|| anyhow!("malformed path: {:?}", &path))? + .ok_or_else(|| { + other(ObjectError::new( + "write", + &path, + anyhow!("malformed path: {:?}", &path), + )) + })? .to_path_buf(); fs::create_dir_all(&parent).await.map_err(|e| { diff --git a/src/services/fs/error.rs b/src/services/fs/error.rs index 58350bdd274e..8cccb08a13c6 100644 --- a/src/services/fs/error.rs +++ b/src/services/fs/error.rs @@ -12,35 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::error::Error; -use crate::error::Kind; +use std::io; +use std::io::Error; + +use crate::error::ObjectError; /// Parse all path related errors. /// /// ## Notes /// /// Skip utf-8 check to allow invalid path input. -pub fn parse_io_error(err: std::io::Error, op: &'static str, path: &str) -> Error { - use std::io::ErrorKind; - - match err.kind() { - ErrorKind::NotFound => Error::Object { - kind: Kind::ObjectNotExist, - op, - path: path.to_string(), - source: anyhow::Error::from(err), - }, - ErrorKind::PermissionDenied => Error::Object { - kind: Kind::ObjectPermissionDenied, - op, - path: path.to_string(), - source: anyhow::Error::from(err), - }, - _ => Error::Object { - kind: Kind::Unexpected, - op, - path: path.to_string(), - source: anyhow::Error::from(err), - }, - } +pub fn parse_io_error(err: io::Error, op: &'static str, path: &str) -> io::Error { + Error::new(err.kind(), ObjectError::new(op, path, err)) } diff --git a/src/services/fs/object_stream.rs b/src/services/fs/object_stream.rs index 6592e902d811..6aadc73a5494 100644 --- a/src/services/fs/object_stream.rs +++ b/src/services/fs/object_stream.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::io::Result; use std::pin::Pin; use std::sync::Arc; use std::task::Context; @@ -23,9 +24,8 @@ use log::error; use tokio::fs; use super::error::parse_io_error; -use crate::error::Error; -use crate::error::Kind; -use crate::error::Result; +use crate::error::other; +use crate::error::ObjectError; use crate::Accessor; use crate::Object; @@ -64,12 +64,7 @@ impl futures::Stream for Readdir { Ok(Some(de)) => { let de_path = de.path(); let de_path = de_path.strip_prefix(&self.root).map_err(|e| { - let e = Error::Object { - kind: Kind::Unexpected, - op: "list", - path: de.path().to_string_lossy().to_string(), - source: anyhow::Error::from(e), - }; + let e = other(ObjectError::new("list", &de.path().to_string_lossy(), e)); error!("object {:?} path strip_prefix: {:?}", &de.path(), e); e })?; diff --git a/src/services/memory/backend.rs b/src/services/memory/backend.rs index 0729ac02f061..8a9306efaa97 100644 --- a/src/services/memory/backend.rs +++ b/src/services/memory/backend.rs @@ -13,6 +13,9 @@ // limitations under the License. use std::collections::HashMap; +use std::io::Error; +use std::io::ErrorKind; +use std::io::Result; use std::mem; use std::pin::Pin; use std::sync::Arc; @@ -28,9 +31,8 @@ use futures::stream; use futures::Sink; use minitrace::trace; -use crate::error::Error; -use crate::error::Kind; -use crate::error::Result; +use crate::error::other; +use crate::error::ObjectError; use crate::io::BytesSinker; use crate::io::BytesStreamer; use crate::object::ObjectStreamer; @@ -89,34 +91,32 @@ impl Accessor for Backend { let map = self.inner.lock().expect("lock poisoned"); - let data = map.get(&path).ok_or_else(|| Error::Object { - kind: Kind::ObjectNotExist, - op: "read", - path: path.to_string(), - source: anyhow!("key not exists in map"), + let data = map.get(&path).ok_or_else(|| { + Error::new( + ErrorKind::NotFound, + ObjectError::new("read", &path, anyhow!("key not exists in map")), + ) })?; let mut data = data.clone(); if let Some(offset) = args.offset { if offset >= data.len() as u64 { - return Err(Error::Object { - kind: Kind::Unexpected, - op: "read", - path: path.to_string(), - source: anyhow!("offset out of bound {} >= {}", offset, data.len()), - }); + return Err(other(ObjectError::new( + "read", + &path, + anyhow!("offset out of bound {} >= {}", offset, data.len()), + ))); } data = data.slice(offset as usize..data.len()); }; if let Some(size) = args.size { if size > data.len() as u64 { - return Err(Error::Object { - kind: Kind::Unexpected, - op: "read", - path: path.to_string(), - source: anyhow!("size out of bound {} > {}", size, data.len()), - }); + return Err(other(ObjectError::new( + "read", + &path, + anyhow!("size out of bound {} > {}", size, data.len()), + ))); } data = data.slice(0..size as usize); }; @@ -153,11 +153,11 @@ impl Accessor for Backend { let map = self.inner.lock().expect("lock poisoned"); - let data = map.get(&path).ok_or_else(|| Error::Object { - kind: Kind::ObjectNotExist, - op: "stat", - path: path.to_string(), - source: anyhow!("key not exists in map"), + let data = map.get(&path).ok_or_else(|| { + Error::new( + ErrorKind::NotFound, + ObjectError::new("read", &path, anyhow!("key not exists in map")), + ) })?; let mut meta = Metadata::default(); @@ -232,16 +232,15 @@ impl Sink for MapSink { _cx: &mut Context<'_>, ) -> Poll> { if self.buf.len() != self.size as usize { - return Poll::Ready(Err(Error::Object { - kind: Kind::Unexpected, - op: "write", - path: self.path.clone(), - source: anyhow!( + return Poll::Ready(Err(other(ObjectError::new( + "write", + &self.path, + anyhow!( "write short, expect {} actual {}", self.size, self.buf.len() ), - })); + )))); } let buf = mem::take(&mut self.buf); diff --git a/src/services/s3/backend.rs b/src/services/s3/backend.rs index 35d23b24d95b..215167a9bcab 100644 --- a/src/services/s3/backend.rs +++ b/src/services/s3/backend.rs @@ -16,6 +16,9 @@ use std::cmp::min; use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; +use std::io::Error; +use std::io::ErrorKind; +use std::io::Result; use std::mem; use std::str::FromStr; use std::sync::Arc; @@ -41,9 +44,9 @@ use time::format_description::well_known::Rfc2822; use time::OffsetDateTime; use super::object_stream::S3ObjectStream; -use crate::error::Error; -use crate::error::Kind; -use crate::error::Result; +use crate::error::other; +use crate::error::BackendError; +use crate::error::ObjectError; use crate::io::BytesSinker; use crate::io::BytesStreamer; use crate::io_util::new_http_channel; @@ -413,11 +416,10 @@ impl Builder { let req = hyper::Request::head(format!("{endpoint}/{bucket}")) .body(hyper::Body::empty()) .expect("must be valid request"); - let res = client.request(req).await.map_err(|e| Error::Backend { - kind: Kind::BackendConfigurationInvalid, - context: context.clone(), - source: anyhow::Error::new(e), - })?; + let res = client + .request(req) + .await + .map_err(|e| other(BackendError::new(context.clone(), e)))?; debug!( "auto detect region got response: status {:?}, header: {:?}", @@ -433,11 +435,7 @@ impl Builder { .get("x-amz-bucket-region") .unwrap_or(&HeaderValue::from_static("us-east-1")) .to_str() - .map_err(|e| Error::Backend { - kind: Kind::BackendConfigurationInvalid, - context: context.clone(), - source: anyhow::Error::new(e), - })? + .map_err(|e| other(BackendError::new(context.clone(), e)))? .to_string(); Ok((endpoint.to_string(), region)) } @@ -446,25 +444,23 @@ impl Builder { let region = res .headers() .get("x-amz-bucket-region") - .ok_or(Error::Backend { - kind: Kind::BackendConfigurationInvalid, - context: context.clone(), - source: anyhow!("can't detect region automatically, region is empty"), + .ok_or_else(|| { + other(BackendError::new( + context.clone(), + anyhow!("can't detect region automatically, region is empty"), + )) })? .to_str() - .map_err(|e| Error::Backend { - kind: Kind::BackendConfigurationInvalid, - context: context.clone(), - source: anyhow::Error::new(e), - })? + .map_err(|e| other(BackendError::new(context.clone(), e)))? .to_string(); - let template = ENDPOINT_TEMPLATES.get(endpoint).ok_or(Error::Backend { - kind: Kind::BackendConfigurationInvalid, - context: context.clone(), - source: anyhow!( - "can't detect region automatically, no valid endpoint template for {}", - &endpoint - ), + let template = ENDPOINT_TEMPLATES.get(endpoint).ok_or_else(|| { + other(BackendError::new( + context.clone(), + anyhow!( + "can't detect region automatically, no valid endpoint template for {}", + &endpoint + ), + )) })?; let endpoint = template.replace("{region}", ®ion); @@ -473,14 +469,13 @@ impl Builder { } // Unexpected status code code => { - return Err(Error::Backend { - kind: Kind::BackendConfigurationInvalid, - context: context.clone(), - source: anyhow!( + return Err(other(BackendError::new( + context.clone(), + anyhow!( "can't detect region automatically, unexpected response: status code {}", code ), - }); + ))) } } } @@ -507,11 +502,10 @@ impl Builder { // Handle endpoint, region and bucket name. let bucket = match self.bucket.is_empty() { false => Ok(&self.bucket), - true => Err(Error::Backend { - kind: Kind::BackendConfigurationInvalid, - context: HashMap::from([("bucket".to_string(), "".to_string())]), - source: anyhow!("bucket is empty"), - }), + true => Err(other(BackendError::new( + HashMap::from([("bucket".to_string(), "".to_string())]), + anyhow!("bucket is empty"), + ))), }?; debug!("backend use bucket {}", &bucket); @@ -536,7 +530,10 @@ impl Builder { signer_builder.secret_key(sk); } - let signer = signer_builder.build().await?; + let signer = signer_builder + .build() + .await + .map_err(|e| other(BackendError::new(context, e)))?; info!("backend build finished: {:?}", &self); Ok(Arc::new(Backend { @@ -710,14 +707,11 @@ impl Accessor for Backend { &p, args.offset, args.size ); - Ok(Box::new(resp.into_body().into_stream().map_err(move |e| { - Error::Object { - kind: Kind::Unexpected, - op: "read", - path: p.to_string(), - source: anyhow::Error::from(e), - } - }))) + Ok(Box::new( + resp.into_body() + .into_stream() + .map_err(move |e| other(ObjectError::new("read", &p, e))), + )) } _ => Err(parse_error_response_with_body(resp, "read", &p).await), } @@ -878,12 +872,7 @@ impl Backend { self.client.request(req).await.map_err(|e| { error!("object {} get_object: {:?}", path, e); - Error::Object { - kind: Kind::Unexpected, - op: "read", - path: path.to_string(), - source: anyhow::Error::from(e), - } + other(ObjectError::new("read", path, e)) }) } @@ -925,12 +914,7 @@ impl Backend { self.client.request(req).await.map_err(|e| { error!("object {} head_object: {:?}", path, e); - Error::Object { - kind: Kind::Unexpected, - op: "stat", - path: path.to_string(), - source: anyhow::Error::from(e), - } + other(ObjectError::new("stat", path, e)) }) } @@ -945,12 +929,7 @@ impl Backend { self.client.request(req).await.map_err(|e| { error!("object {} delete_object: {:?}", path, e); - Error::Object { - kind: Kind::Unexpected, - op: "delete", - path: path.to_string(), - source: anyhow::Error::from(e), - } + other(ObjectError::new("delete", path, e)) }) } @@ -976,12 +955,7 @@ impl Backend { self.client.request(req).await.map_err(|e| { error!("object {} list_object: {:?}", path, e); - Error::Object { - kind: Kind::Unexpected, - op: "list", - path: path.to_string(), - source: anyhow::Error::from(e), - } + other(ObjectError::new("list", path, e)) }) } } @@ -990,17 +964,15 @@ impl Backend { fn parse_error_response_without_body(resp: Response, op: &'static str, path: &str) -> Error { let (part, _) = resp.into_parts(); let kind = match part.status { - StatusCode::NOT_FOUND => Kind::ObjectNotExist, - StatusCode::FORBIDDEN => Kind::ObjectPermissionDenied, - _ => Kind::Unexpected, + StatusCode::NOT_FOUND => ErrorKind::NotFound, + StatusCode::FORBIDDEN => ErrorKind::PermissionDenied, + _ => ErrorKind::Other, }; - Error::Object { + Error::new( kind, - op, - path: path.to_string(), - source: anyhow!("response part: {:?}", part), - } + ObjectError::new(op, path, anyhow!("response part: {:?}", part)), + ) } // Read and decode whole error response. @@ -1011,9 +983,9 @@ async fn parse_error_response_with_body( ) -> Error { let (part, mut body) = resp.into_parts(); let kind = match part.status { - StatusCode::NOT_FOUND => Kind::ObjectNotExist, - StatusCode::FORBIDDEN => Kind::ObjectPermissionDenied, - _ => Kind::Unexpected, + StatusCode::NOT_FOUND => ErrorKind::NotFound, + StatusCode::FORBIDDEN => ErrorKind::PermissionDenied, + _ => ErrorKind::Other, }; // Only read 4KiB from the response to avoid broken services. @@ -1029,20 +1001,28 @@ async fn parse_error_response_with_body( break; } } - Err(e) => return Error::Unexpected(anyhow!("parse error response parse: {:?}", e)), + Err(e) => { + return other(ObjectError::new( + op, + path, + anyhow!("parse error response parse: {:?}", e), + )) + } } } - Error::Object { + Error::new( kind, - op, - path: path.to_string(), - source: anyhow!( - "response part: {:?}, body: {:?}", - part, - String::from_utf8_lossy(&bs) + ObjectError::new( + op, + path, + anyhow!( + "response part: {:?}, body: {:?}", + part, + String::from_utf8_lossy(&bs) + ), ), - } + ) } #[cfg(test)] diff --git a/src/services/s3/object_stream.rs b/src/services/s3/object_stream.rs index acb88fa32da0..c131b184ac7f 100644 --- a/src/services/s3/object_stream.rs +++ b/src/services/s3/object_stream.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::future::Future; +use std::io::Result; use std::pin::Pin; use std::sync::Arc; use std::task::Context; @@ -29,9 +30,8 @@ use quick_xml::de; use serde::Deserialize; use super::Backend; -use crate::error::Error; -use crate::error::Kind; -use crate::error::Result; +use crate::error::other; +use crate::error::ObjectError; use crate::Object; use crate::ObjectMode; @@ -78,24 +78,21 @@ impl futures::Stream for S3ObjectStream { let mut resp = backend.list_objects(&path, &token).await?; if resp.status() != http::StatusCode::OK { - let e = Err(Error::Object { - kind: Kind::Unexpected, - op: "list", - path: path.clone(), - source: anyhow!("{:?}", resp), - }); + let e = other(ObjectError::new("list", &path, anyhow!("{:?}", resp))); + debug!("error response: {:?}", resp); - return e; + return Err(e); } let body = resp.body_mut(); let mut bs = bytes::BytesMut::new(); while let Some(b) = body.next().await { - let b = b.map_err(|e| Error::Object { - kind: Kind::Unexpected, - op: "list", - path: path.clone(), - source: anyhow!("read body: {:?}", e), + let b = b.map_err(|e| { + other(ObjectError::new( + "list", + &path, + anyhow!("read body: {:?}", e), + )) })?; bs.put_slice(&b) } @@ -107,11 +104,12 @@ impl futures::Stream for S3ObjectStream { } State::Sending(fut) => { let bs = ready!(Pin::new(fut).poll(cx))?; - let output: Output = de::from_reader(bs.reader()).map_err(|e| Error::Object { - kind: Kind::Unexpected, - op: "list", - path: self.path.clone(), - source: anyhow!("deserialize list_bucket output: {:?}", e), + let output: Output = de::from_reader(bs.reader()).map_err(|e| { + other(ObjectError::new( + "list", + &self.path, + anyhow!("deserialize list_bucket output: {:?}", e), + )) })?; // Try our best to check whether this list is done. diff --git a/tests/behavior/behavior.rs b/tests/behavior/behavior.rs index 362a7404575c..c00a6fbc39ce 100644 --- a/tests/behavior/behavior.rs +++ b/tests/behavior/behavior.rs @@ -20,10 +20,11 @@ //! //! For examples, we depends `write` to create a file before testing `read`. If `write` doesn't works well, we can't test `read` correctly too. +use std::io::ErrorKind; + use anyhow::Result; use futures::AsyncReadExt; use futures::StreamExt; -use opendal::error::Kind; use opendal::ObjectMode; use opendal::Operator; use rand::prelude::*; @@ -142,7 +143,7 @@ impl BehaviorTest { .await; assert!(meta.is_err()); let err = meta.unwrap_err(); - assert_eq!(err.kind(), Kind::ObjectNotExist); + assert_eq!(err.kind(), ErrorKind::NotFound); Ok(()) }