diff --git a/core/src/layers/capability_check.rs b/core/src/layers/capability_check.rs index 257ecbf0bf8..40ae15d1bf8 100644 --- a/core/src/layers/capability_check.rs +++ b/core/src/layers/capability_check.rs @@ -15,584 +15,74 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::{Debug, Formatter}; -use std::sync::Arc; - +use crate::layers::correctness_check::new_unsupported_args_error; use crate::raw::{ - Access, AccessorInfo, Layer, LayeredAccess, OpBatch, OpCopy, OpCreateDir, OpDelete, OpList, - OpPresign, OpRead, OpRename, OpStat, OpWrite, Operation, RpBatch, RpCopy, RpCreateDir, - RpDelete, RpList, RpPresign, RpRead, RpRename, RpStat, RpWrite, TwoWays, + Access, AccessorInfo, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpStat, OpWrite, + Operation, RpDelete, RpList, RpRead, RpStat, RpWrite, }; -use crate::{Error, ErrorKind}; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; -/// Add a capability check layer for every operation -/// -/// Before performing any operations, we will first check -/// the operation against capability of the underlying service. If the -/// operation is not supported, an error will be returned directly. +/// Add an extra capability check layer for every operation /// -/// # Notes +/// Similar to `CorrectnessChecker`, Before performing any operations, this layer will first verify +/// its arguments against the capability of the underlying service. If the arguments is not supported, +/// an error will be returned directly. /// -/// Currently, we have two types of capability checkers: `DefaultCapabilityChecker` and `CorrectnessCapabilityChecker` +/// Notes /// -/// ## DefaultCapabilityChecker +/// There are two main differences between this checker with the `CorrectnessChecker`: +/// 1. This checker provides additional checks for capabilities like write_with_content_type and +/// list_with_version, among others. These capabilities do not affect data integrity, even if +/// the underlying storage services do not support them. /// -/// OpenDAL applies this checker to every accessor by default, so users don't need to invoke it manually. -/// in `DefaultCapabilityChecker`, we'll verify whether the operation itself is supported by underlying service. -/// -/// for example, when calling `list()`, if `list` is not supported by the underlying service, an `Unsupported` error -/// is returned. -/// -/// ## CorrectnessCapabilityChecker -/// -/// this checker ensures that critical arguments, which might affect the correctness of the call, are -/// supported by the underlying service. -/// -/// for example, when calling `read()` with a specified version, but `read_with_version` is not supported by -/// the underlying service, an `Unsupported` error is returned. without this check, incorrect or undesired data -/// may be retrieved. +/// 2. OpenDAL doesn't apply this checker by default. Users can enable this layer if they want to +/// enforce stricter requirements. /// /// # examples /// /// ```no_run -/// # use opendal::layers::CapabilityCheckLayer; +/// # use opendal::layers::CapabilityChecker; /// # use opendal::services; /// # use opendal::Operator; /// # use opendal::Result; /// # use opendal::Scheme; /// /// # fn main() -> Result<()> { -/// use opendal::layers::CapabilityCheckLayer; +/// use opendal::layers::CapabilityChecker; /// let _ = Operator::new(services::Memory::default())? -/// .layer(CapabilityCheckLayer::with_correctness_checker()) +/// .layer(CapabilityChecker) /// .finish(); /// Ok(()) /// # } /// ``` #[derive(Default)] -pub struct CapabilityCheckLayer { - check_correctness: bool, -} +pub struct CapabilityChecker; -impl CapabilityCheckLayer { - /// Create a `CapabilityLayer` with default settings - pub fn new() -> Self { - Self::default() - } - - /// Create a `CapabilityLayer` with correctness checker - pub fn with_correctness_checker() -> Self { - CapabilityCheckLayer { - check_correctness: true, - } - } - - fn new_unsupported_error(info: &AccessorInfo, op: impl Into<&'static str>) -> Error { - let scheme = info.scheme(); - let op = op.into(); - - Error::new( - ErrorKind::Unsupported, - format!("service {scheme} doesn't support operation {op}"), - ) - .with_operation(op) - } - - fn new_unsupported_args_error( - info: &AccessorInfo, - op: impl Into<&'static str>, - args: &str, - ) -> Error { - let scheme = info.scheme(); - let op = op.into(); - - Error::new( - ErrorKind::Unsupported, - format!("service {scheme} doesn't support operation {op} with args {args}"), - ) - .with_operation(op) - } -} - -impl Layer for CapabilityCheckLayer { - type LayeredAccess = - TwoWays, CorrectnessCapabilityCheckAccessor>; +impl Layer for CapabilityChecker { + type LayeredAccess = CapabilityAccessor; fn layer(&self, inner: A) -> Self::LayeredAccess { - if !self.check_correctness { - TwoWays::One(DefaultCapabilityCheckAccessor { - info: inner.info(), - inner, - }) - } else { - TwoWays::Two(CorrectnessCapabilityCheckAccessor { - info: inner.info(), - inner, - }) - } - } -} - -impl Debug - for TwoWays, CorrectnessCapabilityCheckAccessor> -{ - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - self.inner().fmt(f) - } -} - -impl LayeredAccess - for TwoWays, CorrectnessCapabilityCheckAccessor> -{ - type Inner = A; - type Reader = A::Reader; - type BlockingReader = A::BlockingReader; - type Writer = A::Writer; - type BlockingWriter = A::BlockingWriter; - type Lister = A::Lister; - type BlockingLister = A::BlockingLister; - - fn inner(&self) -> &Self::Inner { - match self { - TwoWays::One(v) => v.inner(), - TwoWays::Two(v) => v.inner(), - } - } - - fn info(&self) -> Arc { - match self { - TwoWays::One(v) => LayeredAccess::info(v), - TwoWays::Two(v) => LayeredAccess::info(v), - } - } - - async fn create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::create_dir(v, path, args).await, - TwoWays::Two(v) => LayeredAccess::create_dir(v, path, args).await, - } - } - - async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { - match self { - TwoWays::One(v) => LayeredAccess::read(v, path, args).await, - TwoWays::Two(v) => LayeredAccess::read(v, path, args).await, - } - } - - async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { - match self { - TwoWays::One(v) => LayeredAccess::write(v, path, args).await, - TwoWays::Two(v) => LayeredAccess::write(v, path, args).await, - } - } - - async fn copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::copy(v, from, to, args).await, - TwoWays::Two(v) => LayeredAccess::copy(v, from, to, args).await, - } - } - - async fn rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::rename(v, from, to, args).await, - TwoWays::Two(v) => LayeredAccess::rename(v, from, to, args).await, - } - } - - async fn stat(&self, path: &str, args: OpStat) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::stat(v, path, args).await, - TwoWays::Two(v) => LayeredAccess::stat(v, path, args).await, - } - } - - async fn delete(&self, path: &str, args: OpDelete) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::delete(v, path, args).await, - TwoWays::Two(v) => LayeredAccess::delete(v, path, args).await, - } - } - - async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> { - match self { - TwoWays::One(v) => LayeredAccess::list(v, path, args).await, - TwoWays::Two(v) => LayeredAccess::list(v, path, args).await, - } - } - - async fn batch(&self, args: OpBatch) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::batch(v, args).await, - TwoWays::Two(v) => LayeredAccess::batch(v, args).await, - } - } - - async fn presign(&self, path: &str, args: OpPresign) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::presign(v, path, args).await, - TwoWays::Two(v) => LayeredAccess::presign(v, path, args).await, - } - } - - fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::blocking_create_dir(v, path, args), - TwoWays::Two(v) => LayeredAccess::blocking_create_dir(v, path, args), - } - } - - fn blocking_read( - &self, - path: &str, - args: OpRead, - ) -> crate::Result<(RpRead, Self::BlockingReader)> { - match self { - TwoWays::One(v) => LayeredAccess::blocking_read(v, path, args), - TwoWays::Two(v) => LayeredAccess::blocking_read(v, path, args), - } - } - - fn blocking_write( - &self, - path: &str, - args: OpWrite, - ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { - match self { - TwoWays::One(v) => LayeredAccess::blocking_write(v, path, args), - TwoWays::Two(v) => LayeredAccess::blocking_write(v, path, args), - } - } - - fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::blocking_copy(v, from, to, args), - TwoWays::Two(v) => LayeredAccess::blocking_copy(v, from, to, args), - } - } - - fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::blocking_rename(v, from, to, args), - TwoWays::Two(v) => LayeredAccess::blocking_rename(v, from, to, args), - } - } - - fn blocking_stat(&self, path: &str, args: OpStat) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::blocking_stat(v, path, args), - TwoWays::Two(v) => LayeredAccess::blocking_stat(v, path, args), - } - } - - fn blocking_delete(&self, path: &str, args: OpDelete) -> crate::Result { - match self { - TwoWays::One(v) => LayeredAccess::blocking_delete(v, path, args), - TwoWays::Two(v) => LayeredAccess::blocking_delete(v, path, args), - } - } - - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> crate::Result<(RpList, Self::BlockingLister)> { - match self { - TwoWays::One(v) => LayeredAccess::blocking_list(v, path, args), - TwoWays::Two(v) => LayeredAccess::blocking_list(v, path, args), - } - } -} - -pub struct DefaultCapabilityCheckAccessor { - info: Arc, - inner: A, -} - -impl Debug for DefaultCapabilityCheckAccessor { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("DefaultCapabilityCheckAccessor") - .field("inner", &self.inner) - .finish_non_exhaustive() - } -} - -impl LayeredAccess for DefaultCapabilityCheckAccessor { - type Inner = A; - type Reader = A::Reader; - type BlockingReader = A::BlockingReader; - type Writer = A::Writer; - type BlockingWriter = A::BlockingWriter; - type Lister = A::Lister; - type BlockingLister = A::BlockingLister; - - fn inner(&self) -> &Self::Inner { - &self.inner - } - - fn info(&self) -> Arc { - self.info.clone() - } - - async fn create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.create_dir { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::CreateDir, - )); - } - - self.inner.create_dir(path, args).await - } - - async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { - let capability = self.info.full_capability(); - if !capability.read { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::Read, - )); - } - - self.inner.read(path, args).await - } - - async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { - let capability = self.info.full_capability(); - if !capability.write { - return Err(CapabilityCheckLayer::new_unsupported_error( - &self.info, - Operation::Write, - )); - } - if args.append() && !capability.write_can_append { - return Err(CapabilityCheckLayer::new_unsupported_args_error( - &self.info, - Operation::Write, - "append", - )); - } - - self.inner.write(path, args).await - } - - async fn copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.copy { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::Copy, - )); - } - - self.inner.copy(from, to, args).await - } - - async fn rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.rename { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::Rename, - )); - } - - self.inner.rename(from, to, args).await - } - - async fn stat(&self, path: &str, args: OpStat) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.stat { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::Stat, - )); - } - - self.inner.stat(path, args).await - } - - async fn delete(&self, path: &str, args: OpDelete) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.delete { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::Delete, - )); - } - - self.inner.delete(path, args).await - } - - async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> { - let capability = self.info.full_capability(); - if !capability.list { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::List, - )); - } - - self.inner.list(path, args).await - } - - async fn batch(&self, args: OpBatch) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.batch { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::Batch, - )); - } - - self.inner.batch(args).await - } - - async fn presign(&self, path: &str, args: OpPresign) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.presign { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::Presign, - )); - } - - self.inner.presign(path, args).await - } - - fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.create_dir || !capability.blocking { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::BlockingCreateDir, - )); - } - - self.inner.blocking_create_dir(path, args) - } - - fn blocking_read( - &self, - path: &str, - args: OpRead, - ) -> crate::Result<(RpRead, Self::BlockingReader)> { - let capability = self.info.full_capability(); - if !capability.read || !capability.blocking { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::BlockingRead, - )); - } - - self.inner.blocking_read(path, args) - } - - fn blocking_write( - &self, - path: &str, - args: OpWrite, - ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { - let capability = self.info.full_capability(); - if !capability.write || !capability.blocking { - return Err(CapabilityCheckLayer::new_unsupported_error( - &self.info, - Operation::BlockingWrite, - )); - } - - if args.append() && !capability.write_can_append { - return Err(CapabilityCheckLayer::new_unsupported_args_error( - &self.info, - Operation::BlockingWrite, - "append", - )); - } - - self.inner.blocking_write(path, args) - } - - fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.copy || !capability.blocking { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::BlockingCopy, - )); - } - - self.inner().blocking_copy(from, to, args) - } - - fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.rename || !capability.blocking { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::BlockingRename, - )); + CapabilityAccessor { + info: inner.info(), + inner, } - - self.inner().blocking_rename(from, to, args) - } - - fn blocking_stat(&self, path: &str, args: OpStat) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.stat || !capability.blocking { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::BlockingStat, - )); - } - - self.inner.blocking_stat(path, args) - } - - fn blocking_delete(&self, path: &str, args: OpDelete) -> crate::Result { - let capability = self.info.full_capability(); - if !capability.delete || !capability.blocking { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::BlockingDelete, - )); - } - - self.inner().blocking_delete(path, args) - } - - fn blocking_list( - &self, - path: &str, - args: OpList, - ) -> crate::Result<(RpList, Self::BlockingLister)> { - let capability = self.info.full_capability(); - if !capability.list || !capability.blocking { - return Err(CapabilityCheckLayer::new_unsupported_error( - self.info.as_ref(), - Operation::BlockingList, - )); - } - - self.inner.blocking_list(path, args) } } - -pub struct CorrectnessCapabilityCheckAccessor { +pub struct CapabilityAccessor { info: Arc, inner: A, } -impl Debug for CorrectnessCapabilityCheckAccessor { +impl Debug for CapabilityAccessor { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("CorrectnessCapabilityCheckAccessor") + f.debug_struct("CapabilityCheckAccessor") .field("inner", &self.inner) .finish_non_exhaustive() } } -impl LayeredAccess for CorrectnessCapabilityCheckAccessor { +impl LayeredAccess for CapabilityAccessor { type Inner = A; type Reader = A::Reader; type BlockingReader = A::BlockingReader; @@ -608,7 +98,7 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { let capability = self.info.full_capability(); if !capability.read_with_version && args.version().is_some() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::Read, "version", @@ -620,11 +110,11 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { let capability = self.info.full_capability(); - if !capability.write_with_if_none_match && args.if_none_match().is_some() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + if !capability.write_with_content_type && args.content_type().is_some() { + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::Write, - "if_none_match", + "content_type", )); } @@ -634,7 +124,7 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { async fn stat(&self, path: &str, args: OpStat) -> crate::Result { let capability = self.info.full_capability(); if !capability.stat_with_version && args.version().is_some() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::Stat, "version", @@ -647,7 +137,7 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { async fn delete(&self, path: &str, args: OpDelete) -> crate::Result { let capability = self.info.full_capability(); if !capability.delete_with_version && args.version().is_some() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::Delete, "version", @@ -660,7 +150,7 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> { let capability = self.info.full_capability(); if !capability.list_with_version && args.version() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::List, "version", @@ -677,7 +167,7 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { ) -> crate::Result<(RpRead, Self::BlockingReader)> { let capability = self.info.full_capability(); if !capability.read_with_version && args.version().is_some() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::BlockingRead, "version", @@ -693,11 +183,11 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { args: OpWrite, ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { let capability = self.info.full_capability(); - if !capability.write_with_if_none_match && args.if_none_match().is_some() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + if !capability.write_with_content_type && args.content_type().is_some() { + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::BlockingWrite, - "if_none_match", + "content_type", )); } @@ -707,7 +197,7 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { fn blocking_stat(&self, path: &str, args: OpStat) -> crate::Result { let capability = self.info.full_capability(); if !capability.stat_with_version && args.version().is_some() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::BlockingStat, "version", @@ -720,7 +210,7 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { fn blocking_delete(&self, path: &str, args: OpDelete) -> crate::Result { let capability = self.info.full_capability(); if !capability.delete_with_version && args.version().is_some() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::BlockingDelete, "version", @@ -737,7 +227,7 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { ) -> crate::Result<(RpList, Self::BlockingLister)> { let capability = self.info.full_capability(); if !capability.list_with_version && args.version() { - return Err(CapabilityCheckLayer::new_unsupported_args_error( + return Err(new_unsupported_args_error( self.info.as_ref(), Operation::BlockingList, "version", @@ -750,11 +240,12 @@ impl LayeredAccess for CorrectnessCapabilityCheckAccessor { #[cfg(test)] mod tests { - use std::time::Duration; - use super::*; - use crate::raw::{oio, PresignedRequest}; - use crate::{Capability, EntryMode, Metadata, Operator}; + use crate::raw::{ + oio, OpCopy, OpCreateDir, OpPresign, OpRename, PresignedRequest, RpCopy, RpCreateDir, + RpPresign, RpRename, + }; + use crate::{Capability, EntryMode, ErrorKind, Metadata, Operator}; use http::HeaderMap; use http::Method as HttpMethod; @@ -822,144 +313,100 @@ mod tests { fn new_test_operator(capability: Capability) -> Operator { let srv = MockService { capability }; - Operator::from_inner(Arc::new(srv)).layer(CapabilityCheckLayer::new()) + Operator::from_inner(Arc::new(srv)).layer(CapabilityChecker) } #[tokio::test] - async fn test_read() { - let op = new_test_operator(Capability::default()); - let res = op.read("path").await; + async fn test_read_with() { + let op = new_test_operator(Capability { + read: true, + ..Default::default() + }); + let res = op.read_with("path").version("version").await; assert!(res.is_err()); assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); let op = new_test_operator(Capability { read: true, - stat: true, + read_with_version: true, ..Default::default() }); - let res = op.read("path").await; + let res = op.read_with("path").version("version").await; assert!(res.is_ok()) } #[tokio::test] - async fn test_stat() { - let op = new_test_operator(Capability::default()); - let res = op.stat("path").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - + async fn test_stat_with() { let op = new_test_operator(Capability { stat: true, ..Default::default() }); - let res = op.stat("path").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_writer() { - let op = new_test_operator(Capability::default()); - let bs: Vec = vec![]; - let res = op.write("path", bs).await; + let res = op.stat_with("path").version("version").await; assert!(res.is_err()); assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); let op = new_test_operator(Capability { - write: true, + stat: true, + stat_with_version: true, ..Default::default() }); - let res = op.writer("path").await; + let res = op.stat_with("path").version("version").await; assert!(res.is_ok()) } #[tokio::test] - async fn test_create_dir() { - let op = new_test_operator(Capability::default()); - let res = op.create_dir("path/").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - + async fn test_writer_with() { let op = new_test_operator(Capability { - create_dir: true, + write: true, ..Default::default() }); - let res = op.create_dir("path/").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_delete() { - let op = new_test_operator(Capability::default()); - let res = op.delete("path").await; + let res = op.writer_with("path").content_type("type").await; assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); let op = new_test_operator(Capability { - delete: true, + write: true, + write_with_content_type: true, ..Default::default() }); - let res = op.delete("path").await; - assert!(res.is_ok()) + let res = op.writer_with("path").content_type("type").await; + assert!(res.is_ok()); } #[tokio::test] - async fn test_copy() { - let op = new_test_operator(Capability::default()); - let res = op.copy("path_a", "path_b").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - + async fn test_delete_with() { let op = new_test_operator(Capability { - copy: true, + delete: true, ..Default::default() }); - let res = op.copy("path_a", "path_b").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_rename() { - let op = new_test_operator(Capability::default()); - let res = op.rename("path_a", "path_b").await; + let res = op.delete_with("path").version("version").await; assert!(res.is_err()); assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); let op = new_test_operator(Capability { - rename: true, + delete: true, + delete_with_version: true, ..Default::default() }); - let res = op.rename("path_a", "path_b").await; + let res = op.delete_with("path").version("version").await; assert!(res.is_ok()) } #[tokio::test] - async fn test_list() { - let op = new_test_operator(Capability::default()); - let res = op.list("path/").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - + async fn test_list_with() { let op = new_test_operator(Capability { list: true, - list_with_recursive: true, ..Default::default() }); - let res = op.list("path/").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_presign() { - let op = new_test_operator(Capability::default()); - let res = op.presign_read("path", Duration::from_secs(1)).await; + let res = op.list_with("path/").version(true).await; assert!(res.is_err()); assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); let op = new_test_operator(Capability { - presign: true, + list: true, + list_with_version: true, ..Default::default() }); - let res = op.presign_read("path", Duration::from_secs(1)).await; + let res = op.lister_with("path/").version(true).await; assert!(res.is_ok()) } } diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 1136a7eaef9..98290ba8678 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -136,7 +136,7 @@ impl CompleteAccessor { return Ok(RpCreateDir::default()); } - unreachable!("with capability check, we cannot reach here") + unreachable!("with correctness check, we cannot reach here") } fn complete_blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { @@ -151,7 +151,7 @@ impl CompleteAccessor { return Ok(RpCreateDir::default()); } - unreachable!("with capability check, we cannot reach here") + unreachable!("with correctness check, we cannot reach here") } async fn complete_stat(&self, path: &str, args: OpStat) -> Result { diff --git a/core/src/layers/correctness_check.rs b/core/src/layers/correctness_check.rs new file mode 100644 index 00000000000..15eea6022de --- /dev/null +++ b/core/src/layers/correctness_check.rs @@ -0,0 +1,602 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use crate::raw::{ + Access, AccessorInfo, Layer, LayeredAccess, OpBatch, OpCopy, OpCreateDir, OpDelete, OpList, + OpPresign, OpRead, OpRename, OpStat, OpWrite, Operation, RpBatch, RpCopy, RpCreateDir, + RpDelete, RpList, RpPresign, RpRead, RpRename, RpStat, RpWrite, +}; +use crate::{Error, ErrorKind}; + +/// Add a correctness capability check layer for every operation +/// +/// Before performing any operations, we will first verify the operation and its critical arguments +/// against the capability of the underlying service. If the operation or arguments is not supported, +/// an error will be returned directly. +/// +/// # Notes +/// +/// OpenDAL applies this checker to every accessor by default, so users don't need to invoke it manually. +/// this checker ensures the operation and its critical arguments, which might affect the correctness of +/// the call, are supported by the underlying service. +/// +/// for example, when calling `write_with_append`, but `append` is not supported by the underlying +/// service, an `Unsupported` error is returned. without this check, undesired data may be written. +#[derive(Default)] +pub struct CorrectnessChecker; + +impl Layer for CorrectnessChecker { + type LayeredAccess = CorrectnessAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccess { + CorrectnessAccessor { + info: inner.info(), + inner, + } + } +} + +pub(crate) fn new_unsupported_error(info: &AccessorInfo, op: impl Into<&'static str>) -> Error { + let scheme = info.scheme(); + let op = op.into(); + + Error::new( + ErrorKind::Unsupported, + format!("service {scheme} doesn't support operation {op}"), + ) + .with_operation(op) +} + +pub(crate) fn new_unsupported_args_error( + info: &AccessorInfo, + op: impl Into<&'static str>, + args: &str, +) -> Error { + let scheme = info.scheme(); + let op = op.into(); + + Error::new( + ErrorKind::Unsupported, + format!("service {scheme} doesn't support operation {op} with args {args}"), + ) + .with_operation(op) +} + +pub struct CorrectnessAccessor { + info: Arc, + inner: A, +} + +impl Debug for CorrectnessAccessor { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CorrectnessCheckAccessor") + .field("inner", &self.inner) + .finish_non_exhaustive() + } +} + +impl LayeredAccess for CorrectnessAccessor { + type Inner = A; + type Reader = A::Reader; + type BlockingReader = A::BlockingReader; + type Writer = A::Writer; + type BlockingWriter = A::BlockingWriter; + type Lister = A::Lister; + type BlockingLister = A::BlockingLister; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + fn info(&self) -> Arc { + self.info.clone() + } + + async fn create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.create_dir { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::CreateDir, + )); + } + + self.inner.create_dir(path, args).await + } + + async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { + let capability = self.info.full_capability(); + if !capability.read { + return Err(new_unsupported_error(self.info.as_ref(), Operation::Read)); + } + + self.inner.read(path, args).await + } + + async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { + let capability = self.info.full_capability(); + if !capability.write { + return Err(new_unsupported_error(&self.info, Operation::Write)); + } + if args.append() && !capability.write_can_append { + return Err(new_unsupported_args_error( + &self.info, + Operation::Write, + "append", + )); + } + if args.if_not_exists() && !capability.write_with_if_not_exists { + return Err(new_unsupported_args_error( + &self.info, + Operation::Write, + "if_not_exists", + )); + } + if args.if_none_match().is_some() && !capability.write_with_if_none_match { + return Err(new_unsupported_args_error( + self.info.as_ref(), + Operation::Write, + "if_none_match", + )); + } + + self.inner.write(path, args).await + } + + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.copy { + return Err(new_unsupported_error(self.info.as_ref(), Operation::Copy)); + } + + self.inner.copy(from, to, args).await + } + + async fn rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.rename { + return Err(new_unsupported_error(self.info.as_ref(), Operation::Rename)); + } + + self.inner.rename(from, to, args).await + } + + async fn stat(&self, path: &str, args: OpStat) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.stat { + return Err(new_unsupported_error(self.info.as_ref(), Operation::Stat)); + } + + self.inner.stat(path, args).await + } + + async fn delete(&self, path: &str, args: OpDelete) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.delete { + return Err(new_unsupported_error(self.info.as_ref(), Operation::Delete)); + } + + self.inner.delete(path, args).await + } + + async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> { + let capability = self.info.full_capability(); + if !capability.list { + return Err(new_unsupported_error(self.info.as_ref(), Operation::List)); + } + + self.inner.list(path, args).await + } + + async fn batch(&self, args: OpBatch) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.batch { + return Err(new_unsupported_error(self.info.as_ref(), Operation::Batch)); + } + + self.inner.batch(args).await + } + + async fn presign(&self, path: &str, args: OpPresign) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.presign { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::Presign, + )); + } + + self.inner.presign(path, args).await + } + + fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.create_dir || !capability.blocking { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::BlockingCreateDir, + )); + } + + self.inner.blocking_create_dir(path, args) + } + + fn blocking_read( + &self, + path: &str, + args: OpRead, + ) -> crate::Result<(RpRead, Self::BlockingReader)> { + let capability = self.info.full_capability(); + if !capability.read || !capability.blocking { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::BlockingRead, + )); + } + + self.inner.blocking_read(path, args) + } + + fn blocking_write( + &self, + path: &str, + args: OpWrite, + ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { + let capability = self.info.full_capability(); + if !capability.write || !capability.blocking { + return Err(new_unsupported_error(&self.info, Operation::BlockingWrite)); + } + if args.append() && !capability.write_can_append { + return Err(new_unsupported_args_error( + &self.info, + Operation::BlockingWrite, + "append", + )); + } + if args.if_not_exists() && !capability.write_with_if_not_exists { + return Err(new_unsupported_args_error( + &self.info, + Operation::BlockingWrite, + "if_not_exists", + )); + } + if args.if_none_match().is_some() && !capability.write_with_if_none_match { + return Err(new_unsupported_args_error( + self.info.as_ref(), + Operation::BlockingWrite, + "if_none_match", + )); + } + + self.inner.blocking_write(path, args) + } + + fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.copy || !capability.blocking { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::BlockingCopy, + )); + } + + self.inner().blocking_copy(from, to, args) + } + + fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.rename || !capability.blocking { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::BlockingRename, + )); + } + + self.inner().blocking_rename(from, to, args) + } + + fn blocking_stat(&self, path: &str, args: OpStat) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.stat || !capability.blocking { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::BlockingStat, + )); + } + + self.inner.blocking_stat(path, args) + } + + fn blocking_delete(&self, path: &str, args: OpDelete) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.delete || !capability.blocking { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::BlockingDelete, + )); + } + + self.inner().blocking_delete(path, args) + } + + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> crate::Result<(RpList, Self::BlockingLister)> { + let capability = self.info.full_capability(); + if !capability.list || !capability.blocking { + return Err(new_unsupported_error( + self.info.as_ref(), + Operation::BlockingList, + )); + } + + self.inner.blocking_list(path, args) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + use crate::raw::{oio, PresignedRequest}; + use crate::{Capability, EntryMode, Metadata, Operator}; + use http::HeaderMap; + use http::Method as HttpMethod; + + #[derive(Debug)] + struct MockService { + capability: Capability, + } + + impl Access for MockService { + type Reader = oio::Reader; + type Writer = oio::Writer; + type Lister = oio::Lister; + type BlockingReader = oio::BlockingReader; + type BlockingWriter = oio::BlockingWriter; + type BlockingLister = oio::BlockingLister; + + fn info(&self) -> Arc { + let mut info = AccessorInfo::default(); + info.set_native_capability(self.capability); + + info.into() + } + + async fn create_dir(&self, _: &str, _: OpCreateDir) -> crate::Result { + Ok(RpCreateDir {}) + } + + async fn stat(&self, _: &str, _: OpStat) -> crate::Result { + Ok(RpStat::new(Metadata::new(EntryMode::Unknown))) + } + + async fn read(&self, _: &str, _: OpRead) -> crate::Result<(RpRead, Self::Reader)> { + Ok((RpRead::new(), Box::new(bytes::Bytes::new()))) + } + + async fn write(&self, _: &str, _: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { + Ok((RpWrite::new(), Box::new(()))) + } + + async fn delete(&self, _: &str, _: OpDelete) -> crate::Result { + Ok(RpDelete {}) + } + + async fn list(&self, _: &str, _: OpList) -> crate::Result<(RpList, Self::Lister)> { + Ok((RpList {}, Box::new(()))) + } + + async fn copy(&self, _: &str, _: &str, _: OpCopy) -> crate::Result { + Ok(RpCopy {}) + } + + async fn rename(&self, _: &str, _: &str, _: OpRename) -> crate::Result { + Ok(RpRename {}) + } + + async fn presign(&self, _: &str, _: OpPresign) -> crate::Result { + Ok(RpPresign::new(PresignedRequest::new( + HttpMethod::POST, + "https://example.com/presign".parse().expect("should parse"), + HeaderMap::new(), + ))) + } + } + + fn new_test_operator(capability: Capability) -> Operator { + let srv = MockService { capability }; + + Operator::from_inner(Arc::new(srv)).layer(CorrectnessChecker) + } + + #[tokio::test] + async fn test_read() { + let op = new_test_operator(Capability::default()); + let res = op.read("path").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + read: true, + stat: true, + ..Default::default() + }); + let res = op.read("path").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_stat() { + let op = new_test_operator(Capability::default()); + let res = op.stat("path").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + stat: true, + ..Default::default() + }); + let res = op.stat("path").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_writer() { + let op = new_test_operator(Capability::default()); + let bs: Vec = vec![]; + let res = op.write("path", bs).await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + write: true, + ..Default::default() + }); + let res = op.writer("path").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_write_with() { + let op = new_test_operator(Capability { + write: true, + ..Default::default() + }); + let res = op.write_with("path", "".as_bytes()).append(true).await; + assert!(res.is_err()); + + let res = op + .write_with("path", "".as_bytes()) + .if_not_exists(true) + .await; + assert!(res.is_err()); + + let res = op + .write_with("path", "".as_bytes()) + .if_none_match("etag") + .await; + assert!(res.is_err()); + + let op = new_test_operator(Capability { + write: true, + write_can_append: true, + write_with_if_not_exists: true, + write_with_if_none_match: true, + ..Default::default() + }); + let res = op.writer_with("path").append(true).await; + assert!(res.is_ok()); + } + + #[tokio::test] + async fn test_create_dir() { + let op = new_test_operator(Capability::default()); + let res = op.create_dir("path/").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + create_dir: true, + ..Default::default() + }); + let res = op.create_dir("path/").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_delete() { + let op = new_test_operator(Capability::default()); + let res = op.delete("path").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + delete: true, + ..Default::default() + }); + let res = op.delete("path").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_copy() { + let op = new_test_operator(Capability::default()); + let res = op.copy("path_a", "path_b").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + copy: true, + ..Default::default() + }); + let res = op.copy("path_a", "path_b").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_rename() { + let op = new_test_operator(Capability::default()); + let res = op.rename("path_a", "path_b").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + rename: true, + ..Default::default() + }); + let res = op.rename("path_a", "path_b").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_list() { + let op = new_test_operator(Capability::default()); + let res = op.list("path/").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + list: true, + list_with_recursive: true, + ..Default::default() + }); + let res = op.list("path/").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_presign() { + let op = new_test_operator(Capability::default()); + let res = op.presign_read("path", Duration::from_secs(1)).await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + presign: true, + ..Default::default() + }); + let res = op.presign_read("path", Duration::from_secs(1)).await; + assert!(res.is_ok()) + } +} diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index cf672d59032..c46b4ec7dfb 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -114,5 +114,7 @@ pub use self::dtrace::DtraceLayer; pub mod observe; +mod correctness_check; +pub(crate) use correctness_check::CorrectnessChecker; mod capability_check; -pub use capability_check::CapabilityCheckLayer; +pub use capability_check::CapabilityChecker; diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 2c6c33fc0b7..f8349ae2c26 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -472,7 +472,7 @@ impl OperatorBuilder { OperatorBuilder { accessor } .layer(ErrorContextLayer) .layer(CompleteLayer) - .layer(CapabilityCheckLayer::default()) + .layer(CorrectnessChecker) } /// Create a new layer with static dispatch.