diff --git a/core/src/layers/capability_check.rs b/core/src/layers/capability_check.rs new file mode 100644 index 000000000000..257ecbf0bf87 --- /dev/null +++ b/core/src/layers/capability_check.rs @@ -0,0 +1,965 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use crate::raw::{ + Access, AccessorInfo, Layer, LayeredAccess, OpBatch, OpCopy, OpCreateDir, OpDelete, OpList, + OpPresign, OpRead, OpRename, OpStat, OpWrite, Operation, RpBatch, RpCopy, RpCreateDir, + RpDelete, RpList, RpPresign, RpRead, RpRename, RpStat, RpWrite, TwoWays, +}; +use crate::{Error, ErrorKind}; + +/// Add a capability check layer for every operation +/// +/// Before performing any operations, we will first check +/// the operation against capability of the underlying service. If the +/// operation is not supported, an error will be returned directly. +/// +/// # Notes +/// +/// Currently, we have two types of capability checkers: `DefaultCapabilityChecker` and `CorrectnessCapabilityChecker` +/// +/// ## DefaultCapabilityChecker +/// +/// OpenDAL applies this checker to every accessor by default, so users don't need to invoke it manually. +/// in `DefaultCapabilityChecker`, we'll verify whether the operation itself is supported by underlying service. +/// +/// for example, when calling `list()`, if `list` is not supported by the underlying service, an `Unsupported` error +/// is returned. +/// +/// ## CorrectnessCapabilityChecker +/// +/// this checker ensures that critical arguments, which might affect the correctness of the call, are +/// supported by the underlying service. +/// +/// for example, when calling `read()` with a specified version, but `read_with_version` is not supported by +/// the underlying service, an `Unsupported` error is returned. without this check, incorrect or undesired data +/// may be retrieved. +/// +/// # examples +/// +/// ```no_run +/// # use opendal::layers::CapabilityCheckLayer; +/// # use opendal::services; +/// # use opendal::Operator; +/// # use opendal::Result; +/// # use opendal::Scheme; +/// +/// # fn main() -> Result<()> { +/// use opendal::layers::CapabilityCheckLayer; +/// let _ = Operator::new(services::Memory::default())? +/// .layer(CapabilityCheckLayer::with_correctness_checker()) +/// .finish(); +/// Ok(()) +/// # } +/// ``` +#[derive(Default)] +pub struct CapabilityCheckLayer { + check_correctness: bool, +} + +impl CapabilityCheckLayer { + /// Create a `CapabilityLayer` with default settings + pub fn new() -> Self { + Self::default() + } + + /// Create a `CapabilityLayer` with correctness checker + pub fn with_correctness_checker() -> Self { + CapabilityCheckLayer { + check_correctness: true, + } + } + + fn new_unsupported_error(info: &AccessorInfo, op: impl Into<&'static str>) -> Error { + let scheme = info.scheme(); + let op = op.into(); + + Error::new( + ErrorKind::Unsupported, + format!("service {scheme} doesn't support operation {op}"), + ) + .with_operation(op) + } + + fn new_unsupported_args_error( + info: &AccessorInfo, + op: impl Into<&'static str>, + args: &str, + ) -> Error { + let scheme = info.scheme(); + let op = op.into(); + + Error::new( + ErrorKind::Unsupported, + format!("service {scheme} doesn't support operation {op} with args {args}"), + ) + .with_operation(op) + } +} + +impl Layer for CapabilityCheckLayer { + type LayeredAccess = + TwoWays, CorrectnessCapabilityCheckAccessor>; + + fn layer(&self, inner: A) -> Self::LayeredAccess { + if !self.check_correctness { + TwoWays::One(DefaultCapabilityCheckAccessor { + info: inner.info(), + inner, + }) + } else { + TwoWays::Two(CorrectnessCapabilityCheckAccessor { + info: inner.info(), + inner, + }) + } + } +} + +impl Debug + for TwoWays, CorrectnessCapabilityCheckAccessor> +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.inner().fmt(f) + } +} + +impl LayeredAccess + for TwoWays, CorrectnessCapabilityCheckAccessor> +{ + type Inner = A; + type Reader = A::Reader; + type BlockingReader = A::BlockingReader; + type Writer = A::Writer; + type BlockingWriter = A::BlockingWriter; + type Lister = A::Lister; + type BlockingLister = A::BlockingLister; + + fn inner(&self) -> &Self::Inner { + match self { + TwoWays::One(v) => v.inner(), + TwoWays::Two(v) => v.inner(), + } + } + + fn info(&self) -> Arc { + match self { + TwoWays::One(v) => LayeredAccess::info(v), + TwoWays::Two(v) => LayeredAccess::info(v), + } + } + + async fn create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::create_dir(v, path, args).await, + TwoWays::Two(v) => LayeredAccess::create_dir(v, path, args).await, + } + } + + async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { + match self { + TwoWays::One(v) => LayeredAccess::read(v, path, args).await, + TwoWays::Two(v) => LayeredAccess::read(v, path, args).await, + } + } + + async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { + match self { + TwoWays::One(v) => LayeredAccess::write(v, path, args).await, + TwoWays::Two(v) => LayeredAccess::write(v, path, args).await, + } + } + + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::copy(v, from, to, args).await, + TwoWays::Two(v) => LayeredAccess::copy(v, from, to, args).await, + } + } + + async fn rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::rename(v, from, to, args).await, + TwoWays::Two(v) => LayeredAccess::rename(v, from, to, args).await, + } + } + + async fn stat(&self, path: &str, args: OpStat) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::stat(v, path, args).await, + TwoWays::Two(v) => LayeredAccess::stat(v, path, args).await, + } + } + + async fn delete(&self, path: &str, args: OpDelete) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::delete(v, path, args).await, + TwoWays::Two(v) => LayeredAccess::delete(v, path, args).await, + } + } + + async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> { + match self { + TwoWays::One(v) => LayeredAccess::list(v, path, args).await, + TwoWays::Two(v) => LayeredAccess::list(v, path, args).await, + } + } + + async fn batch(&self, args: OpBatch) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::batch(v, args).await, + TwoWays::Two(v) => LayeredAccess::batch(v, args).await, + } + } + + async fn presign(&self, path: &str, args: OpPresign) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::presign(v, path, args).await, + TwoWays::Two(v) => LayeredAccess::presign(v, path, args).await, + } + } + + fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::blocking_create_dir(v, path, args), + TwoWays::Two(v) => LayeredAccess::blocking_create_dir(v, path, args), + } + } + + fn blocking_read( + &self, + path: &str, + args: OpRead, + ) -> crate::Result<(RpRead, Self::BlockingReader)> { + match self { + TwoWays::One(v) => LayeredAccess::blocking_read(v, path, args), + TwoWays::Two(v) => LayeredAccess::blocking_read(v, path, args), + } + } + + fn blocking_write( + &self, + path: &str, + args: OpWrite, + ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { + match self { + TwoWays::One(v) => LayeredAccess::blocking_write(v, path, args), + TwoWays::Two(v) => LayeredAccess::blocking_write(v, path, args), + } + } + + fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::blocking_copy(v, from, to, args), + TwoWays::Two(v) => LayeredAccess::blocking_copy(v, from, to, args), + } + } + + fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::blocking_rename(v, from, to, args), + TwoWays::Two(v) => LayeredAccess::blocking_rename(v, from, to, args), + } + } + + fn blocking_stat(&self, path: &str, args: OpStat) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::blocking_stat(v, path, args), + TwoWays::Two(v) => LayeredAccess::blocking_stat(v, path, args), + } + } + + fn blocking_delete(&self, path: &str, args: OpDelete) -> crate::Result { + match self { + TwoWays::One(v) => LayeredAccess::blocking_delete(v, path, args), + TwoWays::Two(v) => LayeredAccess::blocking_delete(v, path, args), + } + } + + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> crate::Result<(RpList, Self::BlockingLister)> { + match self { + TwoWays::One(v) => LayeredAccess::blocking_list(v, path, args), + TwoWays::Two(v) => LayeredAccess::blocking_list(v, path, args), + } + } +} + +pub struct DefaultCapabilityCheckAccessor { + info: Arc, + inner: A, +} + +impl Debug for DefaultCapabilityCheckAccessor { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DefaultCapabilityCheckAccessor") + .field("inner", &self.inner) + .finish_non_exhaustive() + } +} + +impl LayeredAccess for DefaultCapabilityCheckAccessor { + type Inner = A; + type Reader = A::Reader; + type BlockingReader = A::BlockingReader; + type Writer = A::Writer; + type BlockingWriter = A::BlockingWriter; + type Lister = A::Lister; + type BlockingLister = A::BlockingLister; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + fn info(&self) -> Arc { + self.info.clone() + } + + async fn create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.create_dir { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::CreateDir, + )); + } + + self.inner.create_dir(path, args).await + } + + async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { + let capability = self.info.full_capability(); + if !capability.read { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::Read, + )); + } + + self.inner.read(path, args).await + } + + async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { + let capability = self.info.full_capability(); + if !capability.write { + return Err(CapabilityCheckLayer::new_unsupported_error( + &self.info, + Operation::Write, + )); + } + if args.append() && !capability.write_can_append { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + &self.info, + Operation::Write, + "append", + )); + } + + self.inner.write(path, args).await + } + + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.copy { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::Copy, + )); + } + + self.inner.copy(from, to, args).await + } + + async fn rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.rename { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::Rename, + )); + } + + self.inner.rename(from, to, args).await + } + + async fn stat(&self, path: &str, args: OpStat) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.stat { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::Stat, + )); + } + + self.inner.stat(path, args).await + } + + async fn delete(&self, path: &str, args: OpDelete) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.delete { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::Delete, + )); + } + + self.inner.delete(path, args).await + } + + async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> { + let capability = self.info.full_capability(); + if !capability.list { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::List, + )); + } + + self.inner.list(path, args).await + } + + async fn batch(&self, args: OpBatch) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.batch { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::Batch, + )); + } + + self.inner.batch(args).await + } + + async fn presign(&self, path: &str, args: OpPresign) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.presign { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::Presign, + )); + } + + self.inner.presign(path, args).await + } + + fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.create_dir || !capability.blocking { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::BlockingCreateDir, + )); + } + + self.inner.blocking_create_dir(path, args) + } + + fn blocking_read( + &self, + path: &str, + args: OpRead, + ) -> crate::Result<(RpRead, Self::BlockingReader)> { + let capability = self.info.full_capability(); + if !capability.read || !capability.blocking { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::BlockingRead, + )); + } + + self.inner.blocking_read(path, args) + } + + fn blocking_write( + &self, + path: &str, + args: OpWrite, + ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { + let capability = self.info.full_capability(); + if !capability.write || !capability.blocking { + return Err(CapabilityCheckLayer::new_unsupported_error( + &self.info, + Operation::BlockingWrite, + )); + } + + if args.append() && !capability.write_can_append { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + &self.info, + Operation::BlockingWrite, + "append", + )); + } + + self.inner.blocking_write(path, args) + } + + fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.copy || !capability.blocking { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::BlockingCopy, + )); + } + + self.inner().blocking_copy(from, to, args) + } + + fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.rename || !capability.blocking { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::BlockingRename, + )); + } + + self.inner().blocking_rename(from, to, args) + } + + fn blocking_stat(&self, path: &str, args: OpStat) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.stat || !capability.blocking { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::BlockingStat, + )); + } + + self.inner.blocking_stat(path, args) + } + + fn blocking_delete(&self, path: &str, args: OpDelete) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.delete || !capability.blocking { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::BlockingDelete, + )); + } + + self.inner().blocking_delete(path, args) + } + + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> crate::Result<(RpList, Self::BlockingLister)> { + let capability = self.info.full_capability(); + if !capability.list || !capability.blocking { + return Err(CapabilityCheckLayer::new_unsupported_error( + self.info.as_ref(), + Operation::BlockingList, + )); + } + + self.inner.blocking_list(path, args) + } +} + +pub struct CorrectnessCapabilityCheckAccessor { + info: Arc, + inner: A, +} + +impl Debug for CorrectnessCapabilityCheckAccessor { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CorrectnessCapabilityCheckAccessor") + .field("inner", &self.inner) + .finish_non_exhaustive() + } +} + +impl LayeredAccess for CorrectnessCapabilityCheckAccessor { + type Inner = A; + type Reader = A::Reader; + type BlockingReader = A::BlockingReader; + type Writer = A::Writer; + type BlockingWriter = A::BlockingWriter; + type Lister = A::Lister; + type BlockingLister = A::BlockingLister; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + async fn read(&self, path: &str, args: OpRead) -> crate::Result<(RpRead, Self::Reader)> { + let capability = self.info.full_capability(); + if !capability.read_with_version && args.version().is_some() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::Read, + "version", + )); + } + + self.inner.read(path, args).await + } + + async fn write(&self, path: &str, args: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { + let capability = self.info.full_capability(); + if !capability.write_with_if_none_match && args.if_none_match().is_some() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::Write, + "if_none_match", + )); + } + + self.inner.write(path, args).await + } + + async fn stat(&self, path: &str, args: OpStat) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.stat_with_version && args.version().is_some() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::Stat, + "version", + )); + } + + self.inner.stat(path, args).await + } + + async fn delete(&self, path: &str, args: OpDelete) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.delete_with_version && args.version().is_some() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::Delete, + "version", + )); + } + + self.inner.delete(path, args).await + } + + async fn list(&self, path: &str, args: OpList) -> crate::Result<(RpList, Self::Lister)> { + let capability = self.info.full_capability(); + if !capability.list_with_version && args.version() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::List, + "version", + )); + } + + self.inner.list(path, args).await + } + + fn blocking_read( + &self, + path: &str, + args: OpRead, + ) -> crate::Result<(RpRead, Self::BlockingReader)> { + let capability = self.info.full_capability(); + if !capability.read_with_version && args.version().is_some() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::BlockingRead, + "version", + )); + } + + self.inner.blocking_read(path, args) + } + + fn blocking_write( + &self, + path: &str, + args: OpWrite, + ) -> crate::Result<(RpWrite, Self::BlockingWriter)> { + let capability = self.info.full_capability(); + if !capability.write_with_if_none_match && args.if_none_match().is_some() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::BlockingWrite, + "if_none_match", + )); + } + + self.inner.blocking_write(path, args) + } + + fn blocking_stat(&self, path: &str, args: OpStat) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.stat_with_version && args.version().is_some() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::BlockingStat, + "version", + )); + } + + self.inner.blocking_stat(path, args) + } + + fn blocking_delete(&self, path: &str, args: OpDelete) -> crate::Result { + let capability = self.info.full_capability(); + if !capability.delete_with_version && args.version().is_some() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::BlockingDelete, + "version", + )); + } + + self.inner.blocking_delete(path, args) + } + + fn blocking_list( + &self, + path: &str, + args: OpList, + ) -> crate::Result<(RpList, Self::BlockingLister)> { + let capability = self.info.full_capability(); + if !capability.list_with_version && args.version() { + return Err(CapabilityCheckLayer::new_unsupported_args_error( + self.info.as_ref(), + Operation::BlockingList, + "version", + )); + } + + self.inner.blocking_list(path, args) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + use crate::raw::{oio, PresignedRequest}; + use crate::{Capability, EntryMode, Metadata, Operator}; + use http::HeaderMap; + use http::Method as HttpMethod; + + #[derive(Debug)] + struct MockService { + capability: Capability, + } + + impl Access for MockService { + type Reader = oio::Reader; + type Writer = oio::Writer; + type Lister = oio::Lister; + type BlockingReader = oio::BlockingReader; + type BlockingWriter = oio::BlockingWriter; + type BlockingLister = oio::BlockingLister; + + fn info(&self) -> Arc { + let mut info = AccessorInfo::default(); + info.set_native_capability(self.capability); + + info.into() + } + + async fn create_dir(&self, _: &str, _: OpCreateDir) -> crate::Result { + Ok(RpCreateDir {}) + } + + async fn stat(&self, _: &str, _: OpStat) -> crate::Result { + Ok(RpStat::new(Metadata::new(EntryMode::Unknown))) + } + + async fn read(&self, _: &str, _: OpRead) -> crate::Result<(RpRead, Self::Reader)> { + Ok((RpRead::new(), Box::new(bytes::Bytes::new()))) + } + + async fn write(&self, _: &str, _: OpWrite) -> crate::Result<(RpWrite, Self::Writer)> { + Ok((RpWrite::new(), Box::new(()))) + } + + async fn delete(&self, _: &str, _: OpDelete) -> crate::Result { + Ok(RpDelete {}) + } + + async fn list(&self, _: &str, _: OpList) -> crate::Result<(RpList, Self::Lister)> { + Ok((RpList {}, Box::new(()))) + } + + async fn copy(&self, _: &str, _: &str, _: OpCopy) -> crate::Result { + Ok(RpCopy {}) + } + + async fn rename(&self, _: &str, _: &str, _: OpRename) -> crate::Result { + Ok(RpRename {}) + } + + async fn presign(&self, _: &str, _: OpPresign) -> crate::Result { + Ok(RpPresign::new(PresignedRequest::new( + HttpMethod::POST, + "https://example.com/presign".parse().expect("should parse"), + HeaderMap::new(), + ))) + } + } + + fn new_test_operator(capability: Capability) -> Operator { + let srv = MockService { capability }; + + Operator::from_inner(Arc::new(srv)).layer(CapabilityCheckLayer::new()) + } + + #[tokio::test] + async fn test_read() { + let op = new_test_operator(Capability::default()); + let res = op.read("path").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + read: true, + stat: true, + ..Default::default() + }); + let res = op.read("path").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_stat() { + let op = new_test_operator(Capability::default()); + let res = op.stat("path").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + stat: true, + ..Default::default() + }); + let res = op.stat("path").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_writer() { + let op = new_test_operator(Capability::default()); + let bs: Vec = vec![]; + let res = op.write("path", bs).await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + write: true, + ..Default::default() + }); + let res = op.writer("path").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_create_dir() { + let op = new_test_operator(Capability::default()); + let res = op.create_dir("path/").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + create_dir: true, + ..Default::default() + }); + let res = op.create_dir("path/").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_delete() { + let op = new_test_operator(Capability::default()); + let res = op.delete("path").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + delete: true, + ..Default::default() + }); + let res = op.delete("path").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_copy() { + let op = new_test_operator(Capability::default()); + let res = op.copy("path_a", "path_b").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + copy: true, + ..Default::default() + }); + let res = op.copy("path_a", "path_b").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_rename() { + let op = new_test_operator(Capability::default()); + let res = op.rename("path_a", "path_b").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + rename: true, + ..Default::default() + }); + let res = op.rename("path_a", "path_b").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_list() { + let op = new_test_operator(Capability::default()); + let res = op.list("path/").await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + list: true, + list_with_recursive: true, + ..Default::default() + }); + let res = op.list("path/").await; + assert!(res.is_ok()) + } + + #[tokio::test] + async fn test_presign() { + let op = new_test_operator(Capability::default()); + let res = op.presign_read("path", Duration::from_secs(1)).await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); + + let op = new_test_operator(Capability { + presign: true, + ..Default::default() + }); + let res = op.presign_read("path", Duration::from_secs(1)).await; + assert!(res.is_ok()) + } +} diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 46f9e4ccc0c1..1136a7eaef96 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -98,11 +98,6 @@ use crate::*; /// - If support `list_with_recursive`, return directly. /// - if not, wrap with [`FlatLister`]. /// -/// ## Capability Check -/// -/// Before performing any operations, `CompleteLayer` will first check -/// the operation against capability of the underlying service. If the -/// operation is not supported, an error will be returned directly. pub struct CompleteLayer; impl Layer for CompleteLayer { @@ -129,28 +124,19 @@ impl Debug for CompleteAccessor { } impl CompleteAccessor { - fn new_unsupported_error(&self, op: impl Into<&'static str>) -> Error { - let scheme = self.info.scheme(); - let op = op.into(); - Error::new( - ErrorKind::Unsupported, - format!("service {scheme} doesn't support operation {op}"), - ) - .with_operation(op) - } - async fn complete_create_dir(&self, path: &str, args: OpCreateDir) -> Result { let capability = self.info.full_capability(); if capability.create_dir { return self.inner().create_dir(path, args).await; } + if capability.write_can_empty && capability.list { let (_, mut w) = self.inner.write(path, OpWrite::default()).await?; oio::Write::close(&mut w).await?; return Ok(RpCreateDir::default()); } - Err(self.new_unsupported_error(Operation::CreateDir)) + unreachable!("with capability check, we cannot reach here") } fn complete_blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { @@ -158,20 +144,18 @@ impl CompleteAccessor { if capability.create_dir && capability.blocking { return self.inner().blocking_create_dir(path, args); } + if capability.write_can_empty && capability.list && capability.blocking { let (_, mut w) = self.inner.blocking_write(path, OpWrite::default())?; oio::BlockingWrite::close(&mut w)?; return Ok(RpCreateDir::default()); } - Err(self.new_unsupported_error(Operation::BlockingCreateDir)) + unreachable!("with capability check, we cannot reach here") } async fn complete_stat(&self, path: &str, args: OpStat) -> Result { let capability = self.info.full_capability(); - if !capability.stat { - return Err(self.new_unsupported_error(Operation::Stat)); - } if path == "/" { return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); @@ -214,9 +198,6 @@ impl CompleteAccessor { fn complete_blocking_stat(&self, path: &str, args: OpStat) -> Result { let capability = self.info.full_capability(); - if !capability.stat { - return Err(self.new_unsupported_error(Operation::Stat)); - } if path == "/" { return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); @@ -262,9 +243,6 @@ impl CompleteAccessor { args: OpList, ) -> Result<(RpList, CompleteLister)> { let cap = self.info.full_capability(); - if !cap.list { - return Err(self.new_unsupported_error(Operation::List)); - } let recursive = args.recursive(); @@ -310,9 +288,6 @@ impl CompleteAccessor { args: OpList, ) -> Result<(RpList, CompleteLister)> { let cap = self.info.full_capability(); - if !cap.list { - return Err(self.new_unsupported_error(Operation::BlockingList)); - } let recursive = args.recursive(); @@ -381,11 +356,6 @@ impl LayeredAccess for CompleteAccessor { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let capability = self.info.full_capability(); - if !capability.read { - return Err(self.new_unsupported_error(Operation::Read)); - } - let size = args.range().size(); self.inner .read(path, args) @@ -394,93 +364,24 @@ impl LayeredAccess for CompleteAccessor { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let capability = self.info.full_capability(); - if !capability.write { - return Err(self.new_unsupported_error(Operation::Write)); - } - if args.append() && !capability.write_can_append { - return Err(Error::new( - ErrorKind::Unsupported, - format!( - "service {} doesn't support operation write with append", - self.info.scheme() - ), - )); - } - let (rp, w) = self.inner.write(path, args.clone()).await?; let w = CompleteWriter::new(w); Ok((rp, w)) } - async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { - let capability = self.info.full_capability(); - if !capability.copy { - return Err(self.new_unsupported_error(Operation::Copy)); - } - - self.inner().copy(from, to, args).await - } - - async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result { - let capability = self.info.full_capability(); - if !capability.rename { - return Err(self.new_unsupported_error(Operation::Rename)); - } - - self.inner().rename(from, to, args).await - } - async fn stat(&self, path: &str, args: OpStat) -> Result { self.complete_stat(path, args).await } - async fn delete(&self, path: &str, args: OpDelete) -> Result { - let capability = self.info.full_capability(); - if !capability.delete { - return Err(self.new_unsupported_error(Operation::Delete)); - } - - self.inner().delete(path, args).await - } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let capability = self.info.full_capability(); - if !capability.list { - return Err(self.new_unsupported_error(Operation::List)); - } - self.complete_list(path, args).await } - async fn batch(&self, args: OpBatch) -> Result { - let capability = self.info.full_capability(); - if !capability.batch { - return Err(self.new_unsupported_error(Operation::Batch)); - } - - self.inner().batch(args).await - } - - async fn presign(&self, path: &str, args: OpPresign) -> Result { - let capability = self.info.full_capability(); - if !capability.presign { - return Err(self.new_unsupported_error(Operation::Presign)); - } - - self.inner.presign(path, args).await - } - fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { self.complete_blocking_create_dir(path, args) } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - let capability = self.info.full_capability(); - if !capability.read || !capability.blocking { - return Err(self.new_unsupported_error(Operation::Read)); - } - let size = args.range().size(); self.inner .blocking_read(path, args) @@ -488,63 +389,16 @@ impl LayeredAccess for CompleteAccessor { } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - let capability = self.info.full_capability(); - if !capability.write || !capability.blocking { - return Err(self.new_unsupported_error(Operation::BlockingWrite)); - } - - if args.append() && !capability.write_can_append { - return Err(Error::new( - ErrorKind::Unsupported, - format!( - "service {} doesn't support operation write with append", - self.info.scheme() - ), - )); - } - self.inner .blocking_write(path, args) .map(|(rp, w)| (rp, CompleteWriter::new(w))) } - fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result { - let capability = self.info.full_capability(); - if !capability.copy || !capability.blocking { - return Err(self.new_unsupported_error(Operation::BlockingCopy)); - } - - self.inner().blocking_copy(from, to, args) - } - - fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result { - let capability = self.info.full_capability(); - if !capability.rename || !capability.blocking { - return Err(self.new_unsupported_error(Operation::BlockingRename)); - } - - self.inner().blocking_rename(from, to, args) - } - fn blocking_stat(&self, path: &str, args: OpStat) -> Result { self.complete_blocking_stat(path, args) } - fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { - let capability = self.info.full_capability(); - if !capability.delete || !capability.blocking { - return Err(self.new_unsupported_error(Operation::BlockingDelete)); - } - - self.inner().blocking_delete(path, args) - } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { - let capability = self.info.full_capability(); - if !capability.list || !capability.blocking { - return Err(self.new_unsupported_error(Operation::BlockingList)); - } - self.complete_blocking_list(path, args) } } @@ -694,218 +548,3 @@ where Ok(()) } } - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use http::HeaderMap; - use http::Method as HttpMethod; - - use super::*; - - #[derive(Debug)] - struct MockService { - capability: Capability, - } - - impl Access for MockService { - type Reader = oio::Reader; - type Writer = oio::Writer; - type Lister = oio::Lister; - type BlockingReader = oio::BlockingReader; - type BlockingWriter = oio::BlockingWriter; - type BlockingLister = oio::BlockingLister; - - fn info(&self) -> Arc { - let mut info = AccessorInfo::default(); - info.set_native_capability(self.capability); - - info.into() - } - - async fn create_dir(&self, _: &str, _: OpCreateDir) -> Result { - Ok(RpCreateDir {}) - } - - async fn stat(&self, _: &str, _: OpStat) -> Result { - Ok(RpStat::new(Metadata::new(EntryMode::Unknown))) - } - - async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> { - Ok((RpRead::new(), Box::new(bytes::Bytes::new()))) - } - - async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { - Ok((RpWrite::new(), Box::new(()))) - } - - async fn delete(&self, _: &str, _: OpDelete) -> Result { - Ok(RpDelete {}) - } - - async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> { - Ok((RpList {}, Box::new(()))) - } - - async fn copy(&self, _: &str, _: &str, _: OpCopy) -> Result { - Ok(RpCopy {}) - } - - async fn rename(&self, _: &str, _: &str, _: OpRename) -> Result { - Ok(RpRename {}) - } - - async fn presign(&self, _: &str, _: OpPresign) -> Result { - Ok(RpPresign::new(PresignedRequest::new( - HttpMethod::POST, - "https://example.com/presign".parse().expect("should parse"), - HeaderMap::new(), - ))) - } - } - - fn new_test_operator(capability: Capability) -> Operator { - let srv = MockService { capability }; - - Operator::from_inner(Arc::new(srv)).layer(CompleteLayer) - } - - #[tokio::test] - async fn test_read() { - let op = new_test_operator(Capability::default()); - let res = op.read("path").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - read: true, - stat: true, - ..Default::default() - }); - let res = op.read("path").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_stat() { - let op = new_test_operator(Capability::default()); - let res = op.stat("path").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - stat: true, - ..Default::default() - }); - let res = op.stat("path").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_writer() { - let op = new_test_operator(Capability::default()); - let bs: Vec = vec![]; - let res = op.write("path", bs).await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - write: true, - ..Default::default() - }); - let res = op.writer("path").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_create_dir() { - let op = new_test_operator(Capability::default()); - let res = op.create_dir("path/").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - create_dir: true, - ..Default::default() - }); - let res = op.create_dir("path/").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_delete() { - let op = new_test_operator(Capability::default()); - let res = op.delete("path").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - delete: true, - ..Default::default() - }); - let res = op.delete("path").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_copy() { - let op = new_test_operator(Capability::default()); - let res = op.copy("path_a", "path_b").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - copy: true, - ..Default::default() - }); - let res = op.copy("path_a", "path_b").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_rename() { - let op = new_test_operator(Capability::default()); - let res = op.rename("path_a", "path_b").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - rename: true, - ..Default::default() - }); - let res = op.rename("path_a", "path_b").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_list() { - let op = new_test_operator(Capability::default()); - let res = op.list("path/").await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - list: true, - list_with_recursive: true, - ..Default::default() - }); - let res = op.list("path/").await; - assert!(res.is_ok()) - } - - #[tokio::test] - async fn test_presign() { - let op = new_test_operator(Capability::default()); - let res = op.presign_read("path", Duration::from_secs(1)).await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported); - - let op = new_test_operator(Capability { - presign: true, - ..Default::default() - }); - let res = op.presign_read("path", Duration::from_secs(1)).await; - assert!(res.is_ok()) - } -} diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index c43db2331aff..cf672d59032a 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -113,3 +113,6 @@ mod dtrace; pub use self::dtrace::DtraceLayer; pub mod observe; + +mod capability_check; +pub use capability_check::CapabilityCheckLayer; diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 2bda1292b03b..41f6e0ab5c35 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -107,9 +107,9 @@ use crate::*; /// This might introduce a bit overhead for IO operations, but it's the only way to implement /// timeout correctly. We used to implement timeout layer in zero cost way that only stores /// a [`std::time::Instant`] and check the timeout by comparing the instant with current time. -/// However, it doesn't works for all cases. +/// However, it doesn't work for all cases. /// -/// For examples, users TCP connection could be in [Busy ESTAB](https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die) state. In this state, no IO event will be emit. The runtime +/// For examples, users TCP connection could be in [Busy ESTAB](https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die) state. In this state, no IO event will be emitted. The runtime /// will never poll our future again. From the application side, this future is hanging forever /// until this TCP connection is closed for reaching the linux [net.ipv4.tcp_retries2](https://man7.org/linux/man-pages/man7/tcp.7.html) times. #[derive(Clone)] diff --git a/core/src/raw/accessor.rs b/core/src/raw/accessor.rs index 888dc4fa18ed..e9b6e1f6e22a 100644 --- a/core/src/raw/accessor.rs +++ b/core/src/raw/accessor.rs @@ -76,7 +76,7 @@ pub trait Access: Send + Sync + Debug + Unpin + 'static { /// This function is required to be implemented. /// /// By returning AccessorInfo, underlying services can declare - /// some useful information about it self. + /// some useful information about itself. /// /// - scheme: declare the scheme of backend. /// - capabilities: declare the capabilities of current backend. diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 95dfa1c17a1e..2c6c33fc0b77 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -472,6 +472,7 @@ impl OperatorBuilder { OperatorBuilder { accessor } .layer(ErrorContextLayer) .layer(CompleteLayer) + .layer(CapabilityCheckLayer::default()) } /// Create a new layer with static dispatch.