Skip to content

Commit

Permalink
Make ObjectReader prefetch size configurable (#486)
Browse files Browse the repository at this point in the history
  • Loading branch information
eddyxu authored Jan 29, 2023
1 parent 6e9d2ed commit f13f6f5
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 15 deletions.
23 changes: 19 additions & 4 deletions rust/src/io/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,40 @@ use object_store::path::Path;
use super::object_reader::ObjectReader;
use crate::Result;

/// ObjectReader for local file system.
/// [ObjectReader] for local file system.
pub struct LocalObjectReader {
/// File handler.
file: Arc<File>,

/// Preferred I/O size, in bytes.
///
/// It could be the block size for local SSD.
prefetch_size: usize,
}

/// Default prefetch size for local SSD.
const PREFETCH_SIZE: usize = 4096;

impl LocalObjectReader {
/// Open a local object reader.
/// Open a local object reader, with default prefetch size.
pub fn open(path: &Path) -> Result<Box<dyn ObjectReader>> {
let local_path = format!("/{}", path.to_string());
Self::open_with_prefetch(path, PREFETCH_SIZE)
}

/// Open a local object reader, with specified `prefetch` size.
pub fn open_with_prefetch(path: &Path, prefetch: usize) -> Result<Box<dyn ObjectReader>> {
let local_path = format!("/{path}");
Ok(Box::new(Self {
file: File::open(local_path)?.into(),
prefetch_size: prefetch,
}))
}
}

#[async_trait]
impl ObjectReader for LocalObjectReader {
fn prefetch_size(&self) -> usize {
4096
self.prefetch_size
}

/// Returns the file size.
Expand Down
16 changes: 5 additions & 11 deletions rust/src/io/object_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct CloudObjectReader {
// File path
pub path: Path,

_prefetch_size: usize,
prefetch_size: usize,
}

impl<'a> CloudObjectReader {
Expand All @@ -68,16 +68,15 @@ impl<'a> CloudObjectReader {
Ok(Self {
object_store: object_store.clone(),
path,
_prefetch_size: prefetch_size,
prefetch_size,
})
}
}

#[async_trait]
impl ObjectReader for CloudObjectReader {
fn prefetch_size(&self) -> usize {
// 32KB as default.
32 * 1024
self.prefetch_size
}

/// Object/File Size.
Expand Down Expand Up @@ -140,8 +139,7 @@ pub(crate) async fn read_fixed_stride_array(
) -> Result<ArrayRef> {
if !data_type.is_fixed_stride() {
return Err(Error::Schema(format!(
"{} is not a fixed stride type",
data_type
"{data_type} is not a fixed stride type"
)));
}
// TODO: support more than plain encoding here.
Expand All @@ -166,11 +164,7 @@ pub(crate) async fn read_binary_array(
LargeBinary => Box::new(BinaryDecoder::<LargeBinaryType>::new(
reader, position, length,
)),
_ => {
return Err(Error::IO(
format!("Unsupported binary type: {}", data_type,),
))
}
_ => return Err(Error::IO(format!("Unsupported binary type: {data_type}"))),
};
let fut = decoder.as_ref().get(params.into());
fut.await
Expand Down

0 comments on commit f13f6f5

Please sign in to comment.