Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message Codec #1692

Merged
merged 33 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ arrow-arith = "53.0.0"
arrow-array = "53.0.0"
arrow-buffer = "53.0.0"
arrow-cast = "53.0.0"
arrow-ipc = "53.0.0"
arrow-ord = "53.0.0"
arrow-schema = "53.0.0"
arrow-select = "53.0.0"
Expand Down Expand Up @@ -103,6 +102,7 @@ once_cell = "1.20.2"
parquet = "53.0.0"
paste = "1.0.14"
pin-project = "1.1.5"
pin-project-lite = "0.2.15"
prettytable-rs = "0.10.0"
tabled = { version = "0.17.0", default-features = false }
prost = "0.13.0"
Expand Down
35 changes: 10 additions & 25 deletions bench-vortex/benches/bytes_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use futures::executor::block_on;
use futures::StreamExt;
use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray};
use vortex::buffer::Buffer;
use vortex::dtype::{DType, Nullability};
use vortex::io::VortexBufReader;
use vortex::ipc::stream_reader::StreamArrayReader;
use vortex::ipc::stream_writer::StreamArrayWriter;
use vortex::ipc::iterator::{ArrayIteratorIPC, SyncIPCReader};
use vortex::iter::ArrayIteratorExt;
use vortex::validity::Validity;
use vortex::{Context, IntoArrayData, IntoCanonical};
use vortex::{Context, IntoArrayData, IntoArrayVariant};

fn array_data_fixture() -> VarBinArray {
VarBinArray::try_new(
Expand All @@ -27,27 +23,16 @@ fn array_data_fixture() -> VarBinArray {

fn array_view_fixture() -> VarBinViewArray {
let array_data = array_data_fixture();
let mut buffer = Vec::new();

let writer = StreamArrayWriter::new(&mut buffer);
block_on(writer.write_array(array_data.into_array())).unwrap();
let buffer = array_data
.into_array()
.into_array_iterator()
.write_ipc(vec![])
.unwrap();

let buffer = Buffer::from(buffer);

let ctx = Arc::new(Context::default());
let reader = block_on(StreamArrayReader::try_new(
VortexBufReader::new(buffer),
ctx.clone(),
))
.unwrap();
let reader = block_on(reader.load_dtype()).unwrap();

let mut stream = Box::pin(reader.into_array_stream());

block_on(stream.next())
.unwrap()
SyncIPCReader::try_new(buffer.as_slice(), Arc::new(Context::default()))
.unwrap()
.into_canonical()
.into_array_data()
.unwrap()
.into_varbinview()
.unwrap()
Expand Down
15 changes: 8 additions & 7 deletions bench-vortex/src/data_downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ use std::path::PathBuf;

use arrow_array::RecordBatchReader;
use bzip2::read::BzDecoder;
use futures::StreamExt;
use log::info;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tokio::runtime::Runtime;
use vortex::array::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::dtype::DType;
use vortex::error::{VortexError, VortexResult};
use vortex::io::TokioAdapter;
use vortex::ipc::stream_writer::StreamArrayWriter;
use vortex::io::{TokioAdapter, VortexWrite};
use vortex::ipc::stream::ArrayStreamIPC;
use vortex::{ArrayData, IntoArrayData};

use crate::idempotent;
Expand Down Expand Up @@ -56,11 +57,11 @@ pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> Pa
Runtime::new()
.unwrap()
.block_on(async move {
let write = TokioAdapter(tokio::fs::File::create(path).await.unwrap());
StreamArrayWriter::new(write)
.write_array(array)
.await
.unwrap();
let mut write = TokioAdapter(tokio::fs::File::create(path).await.unwrap());
let mut bytes = array.into_array_stream().into_ipc();
while let Some(buffer) = bytes.next().await {
write.write_all(buffer.unwrap()).await.unwrap();
}
Ok::<(), VortexError>(())
})
.unwrap();
Expand Down
15 changes: 13 additions & 2 deletions vortex-array/src/iter/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use vortex_error::VortexResult;
use crate::array::ChunkedArray;
use crate::iter::ArrayIterator;
use crate::stream::{ArrayStream, ArrayStreamAdapter};
use crate::{ArrayData, IntoArrayData};

pub trait ArrayIteratorExt: ArrayIterator {
fn into_stream(self) -> impl ArrayStream
Expand All @@ -13,11 +14,21 @@ pub trait ArrayIteratorExt: ArrayIterator {
ArrayStreamAdapter::new(self.dtype().clone(), futures_util::stream::iter(self))
}

fn try_into_chunked(self) -> VortexResult<ChunkedArray>
/// Collect the iterator into a single `ArrayData`.
///
/// If the iterator yields multiple chunks, they will be returned as a [`ChunkedArray`].
fn into_array_data(self) -> VortexResult<ArrayData>
where
Self: Sized,
{
let dtype = self.dtype().clone();
ChunkedArray::try_new(self.try_collect()?, dtype)
let mut chunks: Vec<ArrayData> = self.try_collect()?;
if chunks.len() == 1 {
Ok(chunks.remove(0))
} else {
Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
}
}
}

impl<I: ArrayIterator> ArrayIteratorExt for I {}
21 changes: 14 additions & 7 deletions vortex-array/src/stream/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,28 @@ use vortex_error::VortexResult;
use crate::array::ChunkedArray;
use crate::stream::take_rows::TakeRows;
use crate::stream::{ArrayStream, ArrayStreamAdapter};
use crate::ArrayData;
use crate::{ArrayData, IntoArrayData};

pub trait ArrayStreamExt: ArrayStream {
fn collect_chunked(self) -> impl Future<Output = VortexResult<ChunkedArray>>
/// Collect the stream into a single `ArrayData`.
///
/// If the stream yields multiple chunks, they will be returned as a [`ChunkedArray`].
fn into_array_data(self) -> impl Future<Output = VortexResult<ArrayData>>
where
Self: Sized,
{
async {
async move {
let dtype = self.dtype().clone();
self.try_collect()
.await
.and_then(|chunks| ChunkedArray::try_new(chunks, dtype))
let mut chunks: Vec<ArrayData> = self.try_collect().await?;
if chunks.len() == 1 {
Ok(chunks.remove(0))
} else {
Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
}
}
}

/// Perform a row-wise selection on the stream from an array of sorted indicessss.
fn take_rows(self, indices: ArrayData) -> VortexResult<impl ArrayStream>
where
Self: Sized,
Expand All @@ -32,4 +39,4 @@ pub trait ArrayStreamExt: ArrayStream {
}
}

impl<R: ArrayStream> ArrayStreamExt for R {}
impl<S: ArrayStream> ArrayStreamExt for S {}
1 change: 1 addition & 0 deletions vortex-buffer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl Buffer {

#[allow(clippy::same_name_method)]
/// Return a new view on the buffer, but limited to the given index range.
/// TODO(ngates): implement std::ops::Index
pub fn slice(&self, range: Range<usize>) -> Self {
match &self.0 {
Inner::Arrow(b) => Buffer(Inner::Arrow(
Expand Down
39 changes: 39 additions & 0 deletions vortex-file/src/byte_range.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use std::fmt::{Display, Formatter};
use std::ops::Range;

use vortex_error::VortexUnwrap;

#[derive(Copy, Clone, Debug)]
pub struct ByteRange {
pub begin: u64,
pub end: u64,
}

impl Display for ByteRange {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}, {})", self.begin, self.end)
}
}

impl ByteRange {
pub fn new(begin: u64, end: u64) -> Self {
assert!(begin < end, "Buffer begin must be before its end");
Self { begin, end }
}

pub fn len(&self) -> u64 {
self.end - self.begin
}

pub fn is_empty(&self) -> bool {
self.begin == self.end
}

pub fn as_range(&self) -> Range<usize> {
Range {
// TODO(ngates): this cast is unsafe and can panic
start: self.begin.try_into().vortex_unwrap(),
end: self.end.try_into().vortex_unwrap(),
}
}
}
28 changes: 0 additions & 28 deletions vortex-file/src/dtype_reader.rs

This file was deleted.

5 changes: 1 addition & 4 deletions vortex-file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,10 @@
//! If you ultimately seek Arrow arrays, [`VortexRecordBatchReader`] converts a
//! [`VortexFileArrayStream`] into a [`RecordBatchReader`](arrow_array::RecordBatchReader).

mod dtype_reader;

pub use dtype_reader::*;

mod read;
mod write;

mod byte_range;
mod pruning;
#[cfg(test)]
mod tests;
Expand Down
2 changes: 1 addition & 1 deletion vortex-file/src/read/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ where
self.dispatcher
.dispatch(move || async move {
let read_messages = reader
.read_byte_ranges(messages.iter().map(|msg| msg.1.to_range()).collect())
.read_byte_ranges(messages.iter().map(|msg| msg.1.as_range()).collect())
.map(move |read_res| {
Ok(messages
.into_iter()
Expand Down
16 changes: 10 additions & 6 deletions vortex-file/src/read/layouts/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,16 +400,17 @@ mod tests {
use arrow_buffer::BooleanBufferBuilder;
use bytes::Bytes;
use flatbuffers::{root, FlatBufferBuilder};
use futures_util::io::Cursor;
use futures_util::TryStreamExt;
use vortex_array::array::{ChunkedArray, PrimitiveArray};
use vortex_array::compute::FilterMask;
use vortex_array::{ArrayDType, ArrayLen, IntoArrayData, IntoArrayVariant};
use vortex_dtype::PType;
use vortex_expr::{BinaryExpr, Identity, Literal, Operator};
use vortex_flatbuffers::{footer, WriteFlatBuffer};
use vortex_ipc::messages::writer::MessageWriter;
use vortex_ipc::stream_writer::ByteRange;
use vortex_ipc::messages::{AsyncMessageWriter, EncoderMessage};

use crate::byte_range::ByteRange;
use crate::layouts::chunked::{ChunkedLayoutBuilder, ChunkedLayoutReader};
use crate::read::cache::{LazyDType, RelativeLayoutCache};
use crate::read::layouts::test_read::{filter_read_layout, read_layout, read_layout_data};
Expand All @@ -420,22 +421,25 @@ mod tests {
cache: Arc<RwLock<LayoutMessageCache>>,
scan: Scan,
) -> (ChunkedLayoutReader, ChunkedLayoutReader, Bytes, usize) {
let mut writer = MessageWriter::new(Vec::new());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This messy test code will be refactored when I get around to the layouts

let mut writer = Cursor::new(Vec::new());
let array = PrimitiveArray::from((0..100).collect::<Vec<_>>()).into_array();
let array_dtype = array.dtype().clone();
let chunked =
ChunkedArray::try_new(iter::repeat(array).take(5).collect(), array_dtype).unwrap();
let len = chunked.len();
let mut byte_offsets = vec![writer.tell()];
let mut byte_offsets = vec![writer.position()];
let mut row_offsets = vec![0];
let mut row_offset = 0;

let mut chunk_stream = chunked.array_stream();
let mut msgs = AsyncMessageWriter::new(&mut writer);
while let Some(chunk) = chunk_stream.try_next().await.unwrap() {
row_offset += chunk.len() as u64;
row_offsets.push(row_offset);
writer.write_array(chunk).await.unwrap();
byte_offsets.push(writer.tell());
msgs.write_message(EncoderMessage::Array(&chunk))
.await
.unwrap();
byte_offsets.push(msgs.as_ref().position());
}
let flat_layouts = byte_offsets
.iter()
Expand Down
Loading
Loading