Skip to content

Commit

Permalink
chore: cleanup in vortex-ipc & vortex-file (#1553)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Dec 5, 2024
1 parent 1c416e8 commit cc055d4
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 148 deletions.
2 changes: 1 addition & 1 deletion pyvortex/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub async fn read_array_from_reader<T: VortexReadAt + Unpin + 'static>(

pub async fn read_dtype_from_reader<T: VortexReadAt + Unpin>(reader: T) -> VortexResult<DType> {
let initial_read = read_initial_bytes(&reader, reader.size().await?).await?;
initial_read.lazy_dtype()?.value().cloned()
initial_read.lazy_dtype().value().cloned()
}

fn projection_from_python(columns: Option<Vec<Bound<PyAny>>>) -> PyResult<Projection> {
Expand Down
15 changes: 6 additions & 9 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl FileFormat for VortexFormat {
for o in objects {
let os_read_at = ObjectStoreReadAt::new(store.clone(), o.location.clone());
let initial_read = read_initial_bytes(&os_read_at, o.size as u64).await?;
let lazy_dtype = initial_read.lazy_dtype()?;
let lazy_dtype = initial_read.lazy_dtype();
let s = infer_schema(lazy_dtype.value()?)?;
file_schemas.push(s);
}
Expand All @@ -95,19 +95,16 @@ impl FileFormat for VortexFormat {
) -> DFResult<Statistics> {
let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone());
let initial_read = read_initial_bytes(&os_read_at, object.size as u64).await?;
let layout = initial_read.fb_layout()?;
let dtype = initial_read.lazy_dtype().map_err(|e| {
DataFusionError::External(Box::new(
e.with_context("Failed to fetch dtype from initial read"),
))
})?;
let layout = initial_read.fb_layout();
let row_count = layout.row_count();

let layout_deserializer =
LayoutDeserializer::new(self.context.clone(), LayoutContext::default().into());
let layout_message_cache = Arc::new(RwLock::new(LayoutMessageCache::new()));
let relative_message_cache =
RelativeLayoutCache::new(layout_message_cache.clone(), dtype.into());
let relative_message_cache = RelativeLayoutCache::new(
layout_message_cache.clone(),
initial_read.lazy_dtype().into(),
);

let root_layout = read_layout_from_initial(
&initial_read,
Expand Down
101 changes: 0 additions & 101 deletions vortex-file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,104 +193,3 @@ mod forever_constant {
pub use forever_constant::*;
pub use read::*;
pub use write::*;

#[cfg(test)]
#[allow(clippy::panic_in_result_fn)]
mod test {
use std::sync::Arc;

use bytes::Bytes;
use futures_executor::block_on;
use futures_util::{pin_mut, StreamExt, TryStreamExt};
use itertools::Itertools;
use vortex_array::array::{ChunkedArray, PrimitiveArray, PrimitiveEncoding};
use vortex_array::encoding::EncodingVTable;
use vortex_array::stream::ArrayStreamExt;
use vortex_array::{ArrayDType, Context, IntoArrayData, IntoArrayVariant};
use vortex_buffer::Buffer;
use vortex_error::VortexResult;
use vortex_io::VortexBufReader;
use vortex_ipc::stream_reader::StreamArrayReader;
use vortex_ipc::stream_writer::StreamArrayWriter;

fn write_ipc<A: IntoArrayData>(array: A) -> Vec<u8> {
block_on(async {
StreamArrayWriter::new(vec![])
.write_array(array.into_array())
.await
.unwrap()
.into_inner()
})
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_empty_index() -> VortexResult<()> {
let data = PrimitiveArray::from((0i32..3_000_000).collect_vec());
let buffer = write_ipc(data);

let indices = PrimitiveArray::from(vec![1, 2, 10]).into_array();

let ctx = Arc::new(Context::default());
let stream_reader =
StreamArrayReader::try_new(VortexBufReader::new(Bytes::from(buffer)), ctx)
.await
.unwrap()
.load_dtype()
.await
.unwrap();
let reader = stream_reader.into_array_stream();

let result_iter = reader.take_rows(indices)?;
pin_mut!(result_iter);

let _result = block_on(async { result_iter.next().await.unwrap().unwrap() });
Ok(())
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_write_read_chunked() -> VortexResult<()> {
let indices = PrimitiveArray::from(vec![
10u32, 11, 12, 13, 100_000, 2_999_999, 2_999_999, 3_000_000,
])
.into_array();

// NB: the order is reversed here to ensure we aren't grabbing indexes instead of values
let data = PrimitiveArray::from((0i32..3_000_000).rev().collect_vec()).into_array();
let data2 =
PrimitiveArray::from((3_000_000i32..6_000_000).rev().collect_vec()).into_array();
let chunked = ChunkedArray::try_new(vec![data.clone(), data2], data.dtype().clone())?;
let buffer = write_ipc(chunked);
let buffer = Buffer::from(buffer);

let ctx = Arc::new(Context::default());
let stream_reader = StreamArrayReader::try_new(VortexBufReader::new(buffer), ctx)
.await
.unwrap()
.load_dtype()
.await
.unwrap();

let take_iter = stream_reader.into_array_stream().take_rows(indices)?;
pin_mut!(take_iter);

let next = block_on(async { take_iter.try_next().await })?.expect("Expected a chunk");
assert_eq!(next.encoding().id(), PrimitiveEncoding.id());

assert_eq!(
next.into_primitive().unwrap().maybe_null_slice::<i32>(),
vec![2999989, 2999988, 2999987, 2999986, 2899999, 0, 0]
);
assert_eq!(
block_on(async { take_iter.try_next().await })?
.expect("Expected a chunk")
.into_primitive()
.unwrap()
.maybe_null_slice::<i32>(),
vec![5999999]
);

Ok(())
}
}
34 changes: 15 additions & 19 deletions vortex-file/src/read/builder/initial_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,36 @@ pub struct InitialRead {
}

impl InitialRead {
pub fn fb_postscript(&self) -> VortexResult<footer::Postscript> {
Ok(unsafe {
pub fn fb_postscript(&self) -> footer::Postscript {
unsafe {
root_unchecked::<footer::Postscript>(&self.buf[self.fb_postscript_byte_range.clone()])
})
}
}

/// The bytes of the `Layout` flatbuffer.
pub fn fb_layout_byte_range(&self) -> VortexResult<Range<usize>> {
let footer = self.fb_postscript()?;
pub fn fb_layout_byte_range(&self) -> Range<usize> {
let footer = self.fb_postscript();
let layout_start = (footer.layout_offset() - self.initial_read_offset) as usize;
let layout_end = self.fb_postscript_byte_range.start;
Ok(layout_start..layout_end)
layout_start..layout_end
}

/// The `Layout` flatbuffer.
pub fn fb_layout(&self) -> VortexResult<footer::Layout> {
Ok(unsafe { root_unchecked::<footer::Layout>(&self.buf[self.fb_layout_byte_range()?]) })
pub fn fb_layout(&self) -> footer::Layout {
unsafe { root_unchecked::<footer::Layout>(&self.buf[self.fb_layout_byte_range()]) }
}

/// The bytes of the `Schema` flatbuffer.
pub fn fb_schema_byte_range(&self) -> VortexResult<Range<usize>> {
let footer = self.fb_postscript()?;
pub fn fb_schema_byte_range(&self) -> Range<usize> {
let footer = self.fb_postscript();
let schema_start = (footer.schema_offset() - self.initial_read_offset) as usize;
let schema_end = (footer.layout_offset() - self.initial_read_offset) as usize;
Ok(schema_start..schema_end)
schema_start..schema_end
}

pub fn lazy_dtype(&self) -> VortexResult<LazyDType> {
pub fn lazy_dtype(&self) -> LazyDType {
// we validated the schema bytes at construction time
unsafe {
Ok(LazyDType::from_schema_bytes(
self.buf.slice(self.fb_schema_byte_range()?),
))
}
unsafe { LazyDType::from_schema_bytes(self.buf.slice(self.fb_schema_byte_range())) }
}
}

Expand All @@ -66,8 +62,8 @@ pub fn read_layout_from_initial(
scan: Scan,
message_cache: RelativeLayoutCache,
) -> VortexResult<Box<dyn LayoutReader>> {
let layout_bytes = initial_read.buf.slice(initial_read.fb_layout_byte_range()?);
let fb_loc = initial_read.fb_layout()?._tab.loc();
let layout_bytes = initial_read.buf.slice(initial_read.fb_layout_byte_range());
let fb_loc = initial_read.fb_layout()._tab.loc();
layout_serde.read_layout(layout_bytes, fb_loc, scan, message_cache)
}

Expand Down
29 changes: 14 additions & 15 deletions vortex-file/src/read/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ pub(crate) mod initial_read;
pub struct VortexReadBuilder<R> {
read_at: R,
layout_serde: LayoutDeserializer,
projection: Option<Projection>,
size: Option<u64>,
projection: Projection,
file_size: Option<u64>,
row_mask: Option<ArrayData>,
row_filter: Option<RowFilter>,
io_dispatcher: Option<Arc<IoDispatcher>>,
Expand All @@ -76,21 +76,21 @@ impl<R: VortexReadAt + Unpin> VortexReadBuilder<R> {
Self {
read_at,
layout_serde,
projection: None,
size: None,
projection: Projection::default(),
file_size: None,
row_mask: None,
row_filter: None,
io_dispatcher: None,
}
}

pub fn with_size(mut self, size: u64) -> Self {
self.size = Some(size);
pub fn with_file_size(mut self, size: u64) -> Self {
self.file_size = Some(size);
self
}

pub fn with_projection(mut self, projection: Projection) -> Self {
self.projection = Some(projection);
self.projection = projection;
self
}

Expand All @@ -116,15 +116,14 @@ impl<R: VortexReadAt + Unpin> VortexReadBuilder<R> {

pub async fn build(self) -> VortexResult<VortexFileArrayStream<R>> {
// we do a large enough initial read to get footer, layout, and schema
let initial_read = read_initial_bytes(&self.read_at, self.size().await?).await?;
let initial_read = read_initial_bytes(&self.read_at, self.file_size().await?).await?;

let layout = initial_read.fb_layout()?;
let layout = initial_read.fb_layout();

let row_count = layout.row_count();
let read_projection = self.projection.unwrap_or_default();
let lazy_dtype = Arc::new(initial_read.lazy_dtype()?);
let lazy_dtype = Arc::new(initial_read.lazy_dtype());

let projected_dtype = match read_projection {
let projected_dtype = match self.projection {
Projection::All => lazy_dtype.clone(),
Projection::Flat(ref fields) => lazy_dtype.project(fields)?,
};
Expand All @@ -133,7 +132,7 @@ impl<R: VortexReadAt + Unpin> VortexReadBuilder<R> {
let layout_reader = read_layout_from_initial(
&initial_read,
&self.layout_serde,
Scan::new(match read_projection {
Scan::new(match self.projection {
Projection::All => None,
Projection::Flat(p) => Some(Arc::new(Select::include(p))),
}),
Expand Down Expand Up @@ -179,8 +178,8 @@ impl<R: VortexReadAt + Unpin> VortexReadBuilder<R> {
)
}

async fn size(&self) -> VortexResult<u64> {
Ok(match self.size {
async fn file_size(&self) -> VortexResult<u64> {
Ok(match self.file_size {
Some(s) => s,
None => self.read_at.size().await?,
})
Expand Down
2 changes: 1 addition & 1 deletion vortex-file/src/read/layouts/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ mod tests {
.unwrap();
let layout_serde = LayoutDeserializer::default();

let dtype = Arc::new(initial_read.lazy_dtype().unwrap());
let dtype = Arc::new(initial_read.lazy_dtype());
(
read_layout_from_initial(
&initial_read,
Expand Down
2 changes: 1 addition & 1 deletion vortex-file/src/read/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ mod test {
let initial_read = read_initial_bytes(&written_bytes, n_bytes as u64)
.await
.unwrap();
let lazy_dtype = Arc::new(initial_read.lazy_dtype().unwrap());
let lazy_dtype = Arc::new(initial_read.lazy_dtype());
let layout_deserializer = LayoutDeserializer::default();
let layout_message_cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
let layout_reader = read_layout_from_initial(
Expand Down
2 changes: 2 additions & 0 deletions vortex-file/src/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod layouts;
mod mask;
pub mod metadata;
pub mod projection;
mod reader;
mod recordbatchreader;
mod splits;
mod stream;
Expand All @@ -33,6 +34,7 @@ use vortex_ipc::stream_writer::ByteRange;
pub use crate::read::mask::RowMask;

// Recommended read-size according to the AWS performance guide
// FIXME(ngates): this is dumb
pub const INITIAL_READ_SIZE: usize = 8 * 1024 * 1024;

/// Operation to apply to data returned by the layout
Expand Down
33 changes: 33 additions & 0 deletions vortex-file/src/read/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#![allow(dead_code)]
#![allow(unused_variables)]
use std::ops::Range;
use std::sync::{Arc, RwLock};

use vortex_array::ArrayData;
use vortex_error::VortexResult;
use vortex_io::VortexReadAt;

use crate::{InitialRead, LayoutMessageCache, VortexFileArrayStream};

pub struct VortexFileArrayReader<R> {
read: R,
initial: InitialRead,

message_cache: Arc<RwLock<LayoutMessageCache>>,
}

impl<R: VortexReadAt> VortexFileArrayReader<R> {
pub fn row_count(&self) -> usize {
self.initial.fb_layout().row_count() as usize
}

/// Read a single row range from the Vortex file.
pub async fn read_range(&self, _row_range: Range<usize>) -> VortexResult<ArrayData> {
todo!()
}

/// Stream the chunks of the Vortex file.
pub fn into_stream(self) -> VortexFileArrayStream<R> {
todo!()
}
}
5 changes: 5 additions & 0 deletions vortex-file/src/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ impl<R: VortexReadAt + Unpin> VortexFileArrayStream<R> {
if let Some(ref fr) = filter_reader {
fr.add_splits(0, &mut reader_splits)?;
}

let mut split_iterator = FixedSplitIterator::new(row_count, row_mask);
split_iterator.additional_splits(&mut reader_splits)?;

// Set up a stream of RowMask that result from applying a filter expression over the file.
let mask_iterator = if let Some(fr) = filter_reader {
Box::new(BufferedLayoutReader::new(
input.clone(),
Expand All @@ -68,6 +70,8 @@ impl<R: VortexReadAt + Unpin> VortexFileArrayStream<R> {
Box::new(split_iterator) as _
};

// Set up a stream of result ArrayData that result from applying the filter and projection
// expressions over the file.
let array_reader = BufferedLayoutReader::new(
input,
dispatcher,
Expand All @@ -84,6 +88,7 @@ impl<R: VortexReadAt + Unpin> VortexFileArrayStream<R> {
}

pub fn dtype(&self) -> &DType {
// FIXME(ngates): why is this allowed to unwrap?
self.dtype.value().vortex_unwrap()
}

Expand Down
2 changes: 1 addition & 1 deletion vortex-file/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ async fn test_splits() {
.unwrap();
let layout_serde = LayoutDeserializer::default();

let dtype = Arc::new(initial_read.lazy_dtype().unwrap());
let dtype = Arc::new(initial_read.lazy_dtype());
let cache = Arc::new(RwLock::new(LayoutMessageCache::new()));

let layout_reader = read_layout_from_initial(
Expand Down
Loading

0 comments on commit cc055d4

Please sign in to comment.