Skip to content

Commit

Permalink
Message Codec (#1692)
Browse files Browse the repository at this point in the history
This PR extracts IO as much as possible from message reading/writing.

* A `MessageEncoder` takes either Buffer, DType, or ArrayData and
produces a `Vec<Buffer>`
* A `MessageDecoder` takes a `BytesMut` and either returns a message, or
the number of additional bytes it needs.
* `(A)SyncMessageReader` wraps a `(Async)Read` trait to produce
messages. Vice-versa for `(A)SyncMessageWriter`.
* `(A)SyncIPCReader` provides a way of turning a Read into an
`Array(Iterator|Stream)`
* `Array(Iterator|Stream)IPC` provides a way to turn an
`Array(Iterator|Stream)` into ipc bytes
  • Loading branch information
gatesn authored Dec 18, 2024
1 parent f0d1761 commit 46ec5a5
Show file tree
Hide file tree
Showing 39 changed files with 1,520 additions and 1,402 deletions.
147 changes: 139 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ categories = ["database-implementations", "data-structures", "compression"]

[workspace.dependencies]
anyhow = "1.0"
aligned-buffer = "0.2.0"
arbitrary = "1.3.2"
arrayref = "0.3.7"
arrow = { version = "53.0.0" }
arrow-arith = "53.0.0"
arrow-array = "53.0.0"
arrow-buffer = "53.0.0"
arrow-cast = "53.0.0"
arrow-ipc = "53.0.0"
arrow-ord = "53.0.0"
arrow-schema = "53.0.0"
arrow-select = "53.0.0"
Expand Down Expand Up @@ -103,6 +103,7 @@ once_cell = "1.20.2"
parquet = "53.0.0"
paste = "1.0.14"
pin-project = "1.1.5"
pin-project-lite = "0.2.15"
prettytable-rs = "0.10.0"
tabled = { version = "0.17.0", default-features = false }
prost = "0.13.0"
Expand Down
35 changes: 10 additions & 25 deletions bench-vortex/benches/bytes_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use futures::executor::block_on;
use futures::StreamExt;
use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray};
use vortex::buffer::Buffer;
use vortex::dtype::{DType, Nullability};
use vortex::io::VortexBufReader;
use vortex::ipc::stream_reader::StreamArrayReader;
use vortex::ipc::stream_writer::StreamArrayWriter;
use vortex::ipc::iterator::{ArrayIteratorIPC, SyncIPCReader};
use vortex::iter::ArrayIteratorExt;
use vortex::validity::Validity;
use vortex::{Context, IntoArrayData, IntoCanonical};
use vortex::{Context, IntoArrayData, IntoArrayVariant};

fn array_data_fixture() -> VarBinArray {
VarBinArray::try_new(
Expand All @@ -27,27 +23,16 @@ fn array_data_fixture() -> VarBinArray {

fn array_view_fixture() -> VarBinViewArray {
let array_data = array_data_fixture();
let mut buffer = Vec::new();

let writer = StreamArrayWriter::new(&mut buffer);
block_on(writer.write_array(array_data.into_array())).unwrap();
let buffer = array_data
.into_array()
.into_array_iterator()
.write_ipc(vec![])
.unwrap();

let buffer = Buffer::from(buffer);

let ctx = Arc::new(Context::default());
let reader = block_on(StreamArrayReader::try_new(
VortexBufReader::new(buffer),
ctx.clone(),
))
.unwrap();
let reader = block_on(reader.load_dtype()).unwrap();

let mut stream = Box::pin(reader.into_array_stream());

block_on(stream.next())
.unwrap()
SyncIPCReader::try_new(buffer.as_slice(), Arc::new(Context::default()))
.unwrap()
.into_canonical()
.into_array_data()
.unwrap()
.into_varbinview()
.unwrap()
Expand Down
15 changes: 8 additions & 7 deletions bench-vortex/src/data_downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ use std::path::PathBuf;

use arrow_array::RecordBatchReader;
use bzip2::read::BzDecoder;
use futures::StreamExt;
use log::info;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tokio::runtime::Runtime;
use vortex::array::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::dtype::DType;
use vortex::error::{VortexError, VortexResult};
use vortex::io::TokioAdapter;
use vortex::ipc::stream_writer::StreamArrayWriter;
use vortex::io::{TokioAdapter, VortexWrite};
use vortex::ipc::stream::ArrayStreamIPC;
use vortex::{ArrayData, IntoArrayData};

use crate::idempotent;
Expand Down Expand Up @@ -56,11 +57,11 @@ pub fn data_vortex_uncompressed(fname_out: &str, downloaded_data: PathBuf) -> Pa
Runtime::new()
.unwrap()
.block_on(async move {
let write = TokioAdapter(tokio::fs::File::create(path).await.unwrap());
StreamArrayWriter::new(write)
.write_array(array)
.await
.unwrap();
let mut write = TokioAdapter(tokio::fs::File::create(path).await.unwrap());
let mut bytes = array.into_array_stream().into_ipc();
while let Some(buffer) = bytes.next().await {
write.write_all(buffer.unwrap()).await.unwrap();
}
Ok::<(), VortexError>(())
})
.unwrap();
Expand Down
15 changes: 13 additions & 2 deletions vortex-array/src/iter/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use vortex_error::VortexResult;
use crate::array::ChunkedArray;
use crate::iter::ArrayIterator;
use crate::stream::{ArrayStream, ArrayStreamAdapter};
use crate::{ArrayData, IntoArrayData};

pub trait ArrayIteratorExt: ArrayIterator {
fn into_stream(self) -> impl ArrayStream
Expand All @@ -13,11 +14,21 @@ pub trait ArrayIteratorExt: ArrayIterator {
ArrayStreamAdapter::new(self.dtype().clone(), futures_util::stream::iter(self))
}

fn try_into_chunked(self) -> VortexResult<ChunkedArray>
/// Collect the iterator into a single `ArrayData`.
///
/// If the iterator yields multiple chunks, they will be returned as a [`ChunkedArray`].
fn into_array_data(self) -> VortexResult<ArrayData>
where
Self: Sized,
{
let dtype = self.dtype().clone();
ChunkedArray::try_new(self.try_collect()?, dtype)
let mut chunks: Vec<ArrayData> = self.try_collect()?;
if chunks.len() == 1 {
Ok(chunks.remove(0))
} else {
Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
}
}
}

impl<I: ArrayIterator> ArrayIteratorExt for I {}
Loading

0 comments on commit 46ec5a5

Please sign in to comment.