From 41e7616c49a2a367727615182e50d211160176d1 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 6 Aug 2024 01:29:55 +0800 Subject: [PATCH] feat: Add opendal native support (#117) * feat: Add opendal native support Signed-off-by: Xuanwo * Format toml Signed-off-by: Xuanwo * Split features Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo --- Cargo.toml | 6 ++++ src/reader/mod.rs | 73 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 4a962343..988b42dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,9 @@ tokio = { version = "1.28", optional = true, features = [ anyhow = { version = "1.0", optional = true } clap = { version = "4.5.4", features = ["derive"], optional = true } +# opendal +opendal = { version = "0.48", optional = true, default-features = false } + # datafusion support datafusion = { version = "39.0.0", optional = true } datafusion-expr = { version = "39.0.0", optional = true } @@ -52,6 +55,7 @@ object_store = { version = "0.10.1", optional = true } [dev-dependencies] arrow-json = "52.0.0" criterion = { version = "0.5", default-features = false, features = ["async_tokio"] } +opendal = { version = "0.48", default-features = false, features = ["services-memory"] } pretty_assertions = "1.3.0" serde_json = { version = "1.0", default-features = false, features = ["std"] } @@ -61,6 +65,8 @@ default = ["datafusion"] async = ["async-trait", "futures", "futures-util", "tokio"] cli = ["anyhow", "clap"] datafusion = ["async", "dep:datafusion", "datafusion-expr", "datafusion-physical-expr", "object_store"] +# Enable opendal support. +opendal = ["dep:opendal"] [[bench]] name = "arrow_reader" diff --git a/src/reader/mod.rs b/src/reader/mod.rs index ff451ac7..b9ccea4e 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -105,3 +105,76 @@ mod async_chunk_reader { #[cfg(feature = "async")] pub use async_chunk_reader::AsyncChunkReader; + +#[cfg(all(feature = "async", feature = "opendal"))] +mod async_opendal_reader { + use crate::reader::AsyncChunkReader; + use bytes::Bytes; + use futures_util::future::BoxFuture; + use opendal::Operator; + use std::sync::Arc; + + /// AsyncOpendalReader provides native support for [`opendal`] + /// + /// ``` + /// use opendal::Operator; + /// use std::io::Result; + /// use orc_rust::reader::AsyncOpendalReader; + /// use orc_rust::reader::AsyncChunkReader; + /// use opendal::services::MemoryConfig; + /// + /// # async fn test() -> Result<()> { + /// let op = Operator::from_config(MemoryConfig::default())?.finish(); + /// op.write("test", "Hello, world!").await?; + /// + /// let mut reader = AsyncOpendalReader::new(op, "test"); + /// let len = reader.len().await?; + /// let data = reader.get_bytes(0, len).await?; + /// # Ok(()) + /// # } + /// ``` + pub struct AsyncOpendalReader { + op: Operator, + path: Arc, + } + + impl AsyncOpendalReader { + /// Create a new async opendal reader. + pub fn new(op: Operator, path: &str) -> Self { + Self { + op, + path: Arc::new(path.to_string()), + } + } + } + + impl AsyncChunkReader for AsyncOpendalReader { + fn len(&mut self) -> BoxFuture<'_, std::io::Result> { + let path = self.path.clone(); + Box::pin(async move { + let meta = self.op.stat(&path).await?; + Ok(meta.content_length()) + }) + } + + fn get_bytes( + &mut self, + offset_from_start: u64, + length: u64, + ) -> BoxFuture<'_, std::io::Result> { + let path = self.path.clone(); + + Box::pin(async move { + let reader = self + .op + .read_with(&path) + .range(offset_from_start..offset_from_start + length) + .await?; + Ok(reader.to_bytes()) + }) + } + } +} + +#[cfg(all(feature = "async", feature = "opendal"))] +pub use async_opendal_reader::AsyncOpendalReader;