Skip to content

Commit

Permalink
feat: Add opendal native support (#117)
Browse files Browse the repository at this point in the history
* feat: Add opendal native support

Signed-off-by: Xuanwo <[email protected]>

* Format toml

Signed-off-by: Xuanwo <[email protected]>

* Split features

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Aug 5, 2024
1 parent f819f41 commit 41e7616
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 0 deletions.
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ tokio = { version = "1.28", optional = true, features = [
anyhow = { version = "1.0", optional = true }
clap = { version = "4.5.4", features = ["derive"], optional = true }

# opendal
opendal = { version = "0.48", optional = true, default-features = false }

# datafusion support
datafusion = { version = "39.0.0", optional = true }
datafusion-expr = { version = "39.0.0", optional = true }
Expand All @@ -52,6 +55,7 @@ object_store = { version = "0.10.1", optional = true }
[dev-dependencies]
arrow-json = "52.0.0"
criterion = { version = "0.5", default-features = false, features = ["async_tokio"] }
opendal = { version = "0.48", default-features = false, features = ["services-memory"] }
pretty_assertions = "1.3.0"
serde_json = { version = "1.0", default-features = false, features = ["std"] }

Expand All @@ -61,6 +65,8 @@ default = ["datafusion"]
async = ["async-trait", "futures", "futures-util", "tokio"]
cli = ["anyhow", "clap"]
datafusion = ["async", "dep:datafusion", "datafusion-expr", "datafusion-physical-expr", "object_store"]
# Enable opendal support.
opendal = ["dep:opendal"]

[[bench]]
name = "arrow_reader"
Expand Down
73 changes: 73 additions & 0 deletions src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,76 @@ mod async_chunk_reader {

#[cfg(feature = "async")]
pub use async_chunk_reader::AsyncChunkReader;

#[cfg(all(feature = "async", feature = "opendal"))]
mod async_opendal_reader {
use crate::reader::AsyncChunkReader;
use bytes::Bytes;
use futures_util::future::BoxFuture;
use opendal::Operator;
use std::sync::Arc;

/// AsyncOpendalReader provides native support for [`opendal`]
///
/// ```
/// use opendal::Operator;
/// use std::io::Result;
/// use orc_rust::reader::AsyncOpendalReader;
/// use orc_rust::reader::AsyncChunkReader;
/// use opendal::services::MemoryConfig;
///
/// # async fn test() -> Result<()> {
/// let op = Operator::from_config(MemoryConfig::default())?.finish();
/// op.write("test", "Hello, world!").await?;
///
/// let mut reader = AsyncOpendalReader::new(op, "test");
/// let len = reader.len().await?;
/// let data = reader.get_bytes(0, len).await?;
/// # Ok(())
/// # }
/// ```
pub struct AsyncOpendalReader {
op: Operator,
path: Arc<String>,
}

impl AsyncOpendalReader {
/// Create a new async opendal reader.
pub fn new(op: Operator, path: &str) -> Self {
Self {
op,
path: Arc::new(path.to_string()),
}
}
}

impl AsyncChunkReader for AsyncOpendalReader {
fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
let path = self.path.clone();
Box::pin(async move {
let meta = self.op.stat(&path).await?;
Ok(meta.content_length())
})
}

fn get_bytes(
&mut self,
offset_from_start: u64,
length: u64,
) -> BoxFuture<'_, std::io::Result<Bytes>> {
let path = self.path.clone();

Box::pin(async move {
let reader = self
.op
.read_with(&path)
.range(offset_from_start..offset_from_start + length)
.await?;
Ok(reader.to_bytes())
})
}
}
}

#[cfg(all(feature = "async", feature = "opendal"))]
pub use async_opendal_reader::AsyncOpendalReader;

0 comments on commit 41e7616

Please sign in to comment.