diff --git a/Cargo.lock b/Cargo.lock index 280014f443..d129b8fb46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,20 +49,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "aligned-buffer" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3b0ccaa876f7e00c6a4818bf878d5d4b387eff28c79bea024824469ec25509d" -dependencies = [ - "const_panic", - "crossbeam-utils", - "rkyv", - "stable_deref_trait", - "static_assertions", - "thiserror 1.0.69", -] - [[package]] name = "alloc-no-stdlib" version = "2.0.4" @@ -565,29 +551,6 @@ version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" -[[package]] -name = "bytecheck" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50c8f430744b23b54ad15161fcbc22d82a29b73eacbe425fea23ec822600bc6f" -dependencies = [ - "bytecheck_derive", - "ptr_meta", - "rancor", - "simdutf8", -] - -[[package]] -name = "bytecheck_derive" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "523363cbe1df49b68215efdf500b103ac3b0fb4836aed6d15689a076eadb8fff" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.93", -] - [[package]] name = "bytecount" version = "0.6.8" @@ -1040,12 +1003,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c50fcfdf972929aff202c16b80086aa3cfc6a3a820af714096c58c7c1d0582" -[[package]] -name = "const_panic" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53857514f72ee4a2b583de67401e3ff63a5472ca4acf289d09a9ea7636dfec17" - [[package]] name = "core-foundation" version = "0.9.4" @@ -1789,9 +1746,9 @@ dependencies = [ [[package]] name = "fixedbitset" -version = "0.4.2" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" [[package]] name = "flatbuffers" @@ -1876,9 +1833,9 @@ dependencies = [ [[package]] name = "fsst-rs" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76a17331823cb8b0f61a5c44861cf24af41002e838d7ae54d68645a8b4c8f44a" +checksum = "e14272990ae41c5822927019c5f98bb36cf69bde551b6bfbb6bcc460a43e19ec" [[package]] name = "futures" @@ -2389,12 +2346,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.7.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f" +checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" dependencies = [ "equivalent", - "hashbrown 0.15.2", + "hashbrown 0.14.5", ] [[package]] @@ -2783,26 +2740,6 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" -[[package]] -name = "munge" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64142d38c84badf60abf06ff9bd80ad2174306a5b11bd4706535090a30a419df" -dependencies = [ - "munge_macro", -] - -[[package]] -name = "munge_macro" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bb5c1d8184f13f7d0ccbeeca0def2f9a181bce2624302793005f5ca8aa62e5e" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.93", -] - [[package]] name = "nanorand" version = "0.7.0" @@ -3209,9 +3146,9 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "petgraph" -version = "0.6.5" +version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +checksum = "c94eb96835f05ec51384814c9b2daef83f68486f67a0e2e9680e0f698dca808e" dependencies = [ "fixedbitset", "indexmap", @@ -3461,26 +3398,6 @@ dependencies = [ "prost", ] -[[package]] -name = "ptr_meta" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe9e76f66d3f9606f44e45598d155cb13ecf09f4a28199e48daf8c8fc937ea90" -dependencies = [ - "ptr_meta_derive", -] - -[[package]] -name = "ptr_meta_derive" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca414edb151b4c8d125c12566ab0d74dc9cdba36fb80eb7b848c15f495fd32d1" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.93", -] - [[package]] name = "pyo3" version = "0.22.6" @@ -3658,15 +3575,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "rancor" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caf5f7161924b9d1cea0e4cabc97c372cea92b5f927fc13c6bca67157a0ad947" -dependencies = [ - "ptr_meta", -] - [[package]] name = "rand" version = "0.8.5" @@ -3787,15 +3695,6 @@ version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" -[[package]] -name = "rend" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a35e8a6bf28cd121053a66aa2e6a2e3eaffad4a60012179f0e864aa5ffeff215" -dependencies = [ - "bytecheck", -] - [[package]] name = "reqwest" version = "0.12.11" @@ -3863,36 +3762,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "rkyv" -version = "0.8.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b11a153aec4a6ab60795f8ebe2923c597b16b05bb1504377451e705ef1a45323" -dependencies = [ - "bytecheck", - "bytes", - "hashbrown 0.15.2", - "indexmap", - "munge", - "ptr_meta", - "rancor", - "rend", - "rkyv_derive", - "tinyvec", - "uuid", -] - -[[package]] -name = "rkyv_derive" -version = "0.8.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "beb382a4d9f53bd5c0be86b10d8179c3f8a14c30bf774ff77096ed6581e35981" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.93", -] - [[package]] name = "rstest" version = "0.23.0" @@ -4192,12 +4061,6 @@ dependencies = [ "libc", ] -[[package]] -name = "simdutf8" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" - [[package]] name = "simplelog" version = "0.12.2" @@ -5218,7 +5081,6 @@ dependencies = [ name = "vortex-ipc" version = "0.21.1" dependencies = [ - "aligned-buffer", "bytes", "flatbuffers", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index c77d296725..c43f8fd615 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,6 @@ categories = ["database-implementations", "data-structures", "compression"] [workspace.dependencies] anyhow = "1.0" -aligned-buffer = "0.2.0" arbitrary = "1.3.2" arrayref = "0.3.7" arrow = { version = "53.0.0" } diff --git a/encodings/fastlanes/src/bitpacking/compute/take.rs b/encodings/fastlanes/src/bitpacking/compute/take.rs index dd8a448d12..684b26120f 100644 --- a/encodings/fastlanes/src/bitpacking/compute/take.rs +++ b/encodings/fastlanes/src/bitpacking/compute/take.rs @@ -153,7 +153,7 @@ mod test { #[test] fn take_with_patches() { - let unpacked = Buffer::from_iter(0u32..100_000).into_array(); + let unpacked = Buffer::from_iter(0u32..1024).into_array(); let bitpacked = BitPackedArray::encode(unpacked.as_ref(), 2).unwrap(); let indices = PrimitiveArray::from_iter([0, 2, 4, 6]); diff --git a/encodings/fsst/src/canonical.rs b/encodings/fsst/src/canonical.rs index da7d1dcc57..5f35ac9fd5 100644 --- a/encodings/fsst/src/canonical.rs +++ b/encodings/fsst/src/canonical.rs @@ -4,7 +4,7 @@ use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{ ArrayDType, ArrayData, Canonical, IntoArrayData, IntoArrayVariant, IntoCanonical, }; -use vortex_buffer::Buffer; +use vortex_buffer::{Buffer, ByteBuffer}; use vortex_dtype::match_each_integer_ptype; use vortex_error::VortexResult; @@ -28,8 +28,6 @@ impl IntoCanonical for FSSTArray { .into_primitive()?; // Bulk-decompress the entire array. - // TODO(ngates): return non-vec to avoid this copy - // See: https://github.com/spiraldb/fsst/issues/61 let uncompressed_bytes = decompressor.decompress(compressed_bytes.as_slice::()); let uncompressed_lens_array = self @@ -58,9 +56,7 @@ impl IntoCanonical for FSSTArray { }); let views_array: ArrayData = Buffer::::from_byte_buffer(views.into_byte_buffer()).into_array(); - // TODO(ngates): return non-vec to avoid this copy - // See: https://github.com/spiraldb/fsst/issues/61 - let uncompressed_bytes_array = Buffer::copy_from(uncompressed_bytes).into_array(); + let uncompressed_bytes_array = ByteBuffer::from(uncompressed_bytes).into_array(); VarBinViewArray::try_new( views_array, diff --git a/vortex-array/src/array/primitive/compute/slice.rs b/vortex-array/src/array/primitive/compute/slice.rs index 85e27d38d9..4ca8163886 100644 --- a/vortex-array/src/array/primitive/compute/slice.rs +++ b/vortex-array/src/array/primitive/compute/slice.rs @@ -1,3 +1,4 @@ +use vortex_dtype::match_each_native_ptype; use vortex_error::VortexResult; use crate::array::primitive::PrimitiveArray; @@ -8,15 +9,12 @@ use crate::{ArrayData, IntoArrayData}; impl SliceFn for PrimitiveEncoding { fn slice(&self, array: &PrimitiveArray, start: usize, stop: usize) -> VortexResult { - let byte_width = array.ptype().byte_width(); - let buffer = array - .byte_buffer() - .slice(start * byte_width..stop * byte_width); - Ok(PrimitiveArray::from_byte_buffer( - buffer, - array.ptype(), - array.validity().slice(start, stop)?, - ) - .into_array()) + match_each_native_ptype!(array.ptype(), |$T| { + Ok(PrimitiveArray::new( + array.buffer::<$T>().slice(start..stop), + array.validity().slice(start, stop)?, + ) + .into_array()) + }) } } diff --git a/vortex-buffer/src/buffer.rs b/vortex-buffer/src/buffer.rs index 836971e47f..16ec4cd387 100644 --- a/vortex-buffer/src/buffer.rs +++ b/vortex-buffer/src/buffer.rs @@ -1,7 +1,7 @@ use std::collections::Bound; use std::ops::{Deref, RangeBounds}; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use vortex_error::{vortex_panic, VortexExpect}; use crate::{Alignment, BufferMut, ByteBuffer}; @@ -56,6 +56,7 @@ impl Buffer { /// Panics if the buffer is not aligned to the size of `T`, or the length is not a multiple of /// the size of `T`. pub fn from_byte_buffer(buffer: ByteBuffer) -> Self { + // TODO(ngates): should this preserve the current alignment of the buffer? Self::from_byte_buffer_aligned(buffer, Alignment::of::()) } @@ -258,6 +259,15 @@ impl Buffer { _marker: Default::default(), }) } + + /// Return a `Buffer` with the given alignment. Where possible, this will be zero-copy. + pub fn aligned(self, alignment: Alignment) -> Self { + if self.as_ptr().align_offset(*alignment) == 0 { + self + } else { + Self::copy_from_aligned(self, alignment) + } + } } impl Deref for Buffer { @@ -300,6 +310,28 @@ impl From for ByteBuffer { } } +impl Buf for ByteBuffer { + fn remaining(&self) -> usize { + self.len() + } + + fn chunk(&self) -> &[u8] { + self.as_slice() + } + + fn advance(&mut self, cnt: usize) { + if !cnt.is_multiple_of(*self.alignment) { + vortex_panic!( + "Cannot advance buffer by {} items, resulting alignment is not {}", + cnt, + self.alignment + ); + } + self.bytes.advance(cnt); + self.length -= cnt; + } +} + /// Owned iterator over a `Buffer`. pub struct BufferIterator { buffer: Buffer, @@ -343,7 +375,9 @@ impl From> for Buffer { #[cfg(test)] mod test { - use crate::buffer; + use bytes::Buf; + + use crate::{buffer, ByteBuffer}; #[test] fn slice() { @@ -366,4 +400,16 @@ mod test { // We should only be able to slice this buffer on 4-byte (i32) boundaries. buf.slice(1..2); } + + #[test] + fn bytes_buf() { + let mut buf = ByteBuffer::copy_from("helloworld".as_bytes()); + assert_eq!(buf.remaining(), 10); + assert_eq!(buf.chunk(), b"helloworld"); + + Buf::advance(&mut buf, 5); + assert_eq!(buf.remaining(), 5); + assert_eq!(buf.as_slice(), b"world"); + assert_eq!(buf.chunk(), b"world"); + } } diff --git a/vortex-buffer/src/buffer_mut.rs b/vortex-buffer/src/buffer_mut.rs index 11f51571f0..6d01a44b34 100644 --- a/vortex-buffer/src/buffer_mut.rs +++ b/vortex-buffer/src/buffer_mut.rs @@ -1,10 +1,11 @@ use core::mem::MaybeUninit; use std::ops::{Deref, DerefMut}; -use bytes::{Buf, BytesMut}; +use bytes::buf::UninitSlice; +use bytes::{Buf, BufMut, BytesMut}; use vortex_error::{vortex_panic, VortexExpect}; -use crate::{Alignment, Buffer}; +use crate::{Alignment, Buffer, ByteBufferMut}; /// A mutable buffer that maintains a runtime-defined alignment through resizing operations. #[derive(Debug, PartialEq, Eq)] @@ -321,6 +322,15 @@ impl BufferMut { } buf } + + /// Return a `BufferMut` with the given alignment. Where possible, this will be zero-copy. + pub fn aligned(self, alignment: Alignment) -> Self { + if self.as_ptr().align_offset(*alignment) == 0 { + self + } else { + Self::copy_from_aligned(self, alignment) + } + } } impl Clone for BufferMut { @@ -411,6 +421,77 @@ impl FromIterator for BufferMut { } } +impl Buf for ByteBufferMut { + fn remaining(&self) -> usize { + self.len() + } + + fn chunk(&self) -> &[u8] { + self.as_slice() + } + + fn advance(&mut self, cnt: usize) { + if !cnt.is_multiple_of(*self.alignment) { + vortex_panic!( + "Cannot advance buffer by {} items, resulting alignment is not {}", + cnt, + self.alignment + ); + } + self.bytes.advance(cnt); + self.length -= cnt; + } +} + +/// As per the BufMut implementation, we must support internal resizing when +/// asked to extend the buffer. +/// See: +unsafe impl BufMut for ByteBufferMut { + #[inline] + fn remaining_mut(&self) -> usize { + usize::MAX - self.len() + } + + #[inline] + unsafe fn advance_mut(&mut self, cnt: usize) { + if !cnt.is_multiple_of(*self.alignment) { + vortex_panic!( + "Cannot advance buffer by {} items, resulting alignment is not {}", + cnt, + self.alignment + ); + } + unsafe { self.bytes.advance_mut(cnt) }; + self.length -= cnt; + } + + #[inline] + fn chunk_mut(&mut self) -> &mut UninitSlice { + self.bytes.chunk_mut() + } + + fn put(&mut self, mut src: T) + where + Self: Sized, + { + while src.has_remaining() { + let chunk = src.chunk(); + self.extend_from_slice(chunk); + src.advance(chunk.len()); + } + } + + #[inline] + fn put_slice(&mut self, src: &[u8]) { + self.extend_from_slice(src); + } + + #[inline] + fn put_bytes(&mut self, val: u8, cnt: usize) { + self.push_n(val, cnt) + } +} + /// Extension trait for [`BytesMut`] that provides functions for aligning the buffer. trait AlignedBytesMut { /// Align an empty `BytesMut` to the specified alignment. @@ -441,7 +522,9 @@ impl AlignedBytesMut for BytesMut { #[cfg(test)] mod test { - use crate::{buffer_mut, Alignment, BufferMut}; + use bytes::{Buf, BufMut}; + + use crate::{buffer_mut, Alignment, BufferMut, ByteBufferMut}; #[test] fn capacity() { @@ -505,4 +588,25 @@ mod test { let buf = buf.map_each(|i| (i + 1) as u32); assert_eq!(buf.as_slice(), &[1u32, 2, 3]); } + + #[test] + fn bytes_buf() { + let mut buf = ByteBufferMut::copy_from("helloworld".as_bytes()); + assert_eq!(buf.remaining(), 10); + assert_eq!(buf.chunk(), b"helloworld"); + + Buf::advance(&mut buf, 5); + assert_eq!(buf.remaining(), 5); + assert_eq!(buf.as_slice(), b"world"); + assert_eq!(buf.chunk(), b"world"); + } + + #[test] + fn bytes_buf_mut() { + let mut buf = ByteBufferMut::copy_from("hello".as_bytes()); + assert_eq!(BufMut::remaining_mut(&buf), usize::MAX - 5); + + BufMut::put_slice(&mut buf, b"world"); + assert_eq!(buf.as_slice(), b"helloworld"); + } } diff --git a/vortex-buffer/src/bytes.rs b/vortex-buffer/src/bytes.rs new file mode 100644 index 0000000000..1c68b7cef7 --- /dev/null +++ b/vortex-buffer/src/bytes.rs @@ -0,0 +1,25 @@ +use bytes::Buf; + +use crate::{Alignment, ByteBuffer}; + +/// An extension to the [`Buf`] trait that provides a function `copy_to_aligned` similar to +/// `copy_to_bytes` that allows for zero-copy aligned reads where possible. +pub trait AlignedBuf: Buf { + /// Copy the next `len` bytes from the buffer into a new buffer with the given alignment. + /// This will be zero-copy wherever possible. + /// + /// The [`Buf`] trait has a specialized `copy_to_bytes` function that allows the implementation + /// of `Buf` for `Bytes` and `BytesMut` to return bytes with zero-copy. + /// + /// This function provides similar functionality for `ByteBuffer`. + /// + /// TODO(ngates): what should this do the alignment of the current buffer? We have to advance + /// it by len.. + fn copy_to_aligned(&mut self, len: usize, alignment: Alignment) -> ByteBuffer { + // The default implementation uses copy_to_bytes, and then returns a ByteBuffer with + // alignment of 1. This will be zero-copy if the underlying `copy_to_bytes` is zero-copy. + ByteBuffer::from(self.copy_to_bytes(len)).aligned(alignment) + } +} + +impl AlignedBuf for B {} diff --git a/vortex-buffer/src/lib.rs b/vortex-buffer/src/lib.rs index c3dcf1e293..36f3773fd9 100644 --- a/vortex-buffer/src/lib.rs +++ b/vortex-buffer/src/lib.rs @@ -43,11 +43,10 @@ //! including `arrow_buffer::Buffer`, `arrow_buffer::ScalarBuffer`, and //! `arrow_buffer::OffsetBuffer`. -extern crate core; - pub use alignment::*; pub use buffer::*; pub use buffer_mut::*; +pub use bytes::*; pub use r#const::*; pub use string::*; @@ -56,6 +55,7 @@ mod alignment; mod arrow; mod buffer; mod buffer_mut; +mod bytes; mod r#const; mod macros; mod string; diff --git a/vortex-file/src/read/layouts/flat.rs b/vortex-file/src/read/layouts/flat.rs index 0277942590..45b921698e 100644 --- a/vortex-file/src/read/layouts/flat.rs +++ b/vortex-file/src/read/layouts/flat.rs @@ -1,12 +1,11 @@ use std::collections::BTreeSet; -use std::io::Cursor; use std::sync::Arc; use bytes::Bytes; use vortex_array::{ArrayData, Context}; use vortex_error::{vortex_bail, VortexResult}; use vortex_flatbuffers::footer; -use vortex_ipc::messages::{DecoderMessage, SyncMessageReader}; +use vortex_ipc::messages::{BufMessageReader, DecoderMessage}; use crate::byte_range::ByteRange; use crate::read::cache::RelativeLayoutCache; @@ -74,7 +73,7 @@ impl FlatLayoutReader { } fn array_from_bytes(&self, buf: Bytes) -> VortexResult { - let mut reader = SyncMessageReader::new(Cursor::new(buf)); + let mut reader = BufMessageReader::new(buf); match reader.next().transpose()? { Some(DecoderMessage::Array(array_parts)) => array_parts.into_array_data( self.ctx.clone(), diff --git a/vortex-ipc/Cargo.toml b/vortex-ipc/Cargo.toml index e8d1df9719..e215c223ef 100644 --- a/vortex-ipc/Cargo.toml +++ b/vortex-ipc/Cargo.toml @@ -14,7 +14,6 @@ readme.workspace = true categories.workspace = true [dependencies] -aligned-buffer = { workspace = true } bytes = { workspace = true } flatbuffers = { workspace = true } futures-util = { workspace = true, features = ["io"] } diff --git a/vortex-ipc/src/messages/decoder.rs b/vortex-ipc/src/messages/decoder.rs index d4342d58b1..81f6e45f6b 100644 --- a/vortex-ipc/src/messages/decoder.rs +++ b/vortex-ipc/src/messages/decoder.rs @@ -1,11 +1,11 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use bytes::{Buf, BytesMut}; +use bytes::Buf; use flatbuffers::{root, root_unchecked, Follow}; use itertools::Itertools; use vortex_array::{flatbuffers as fba, ArrayData, Context}; -use vortex_buffer::{Alignment, ByteBuffer}; +use vortex_buffer::{AlignedBuf, Alignment, ByteBuffer}; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; use vortex_flatbuffers::message as fb; @@ -121,11 +121,11 @@ impl MessageDecoder { /// If the message is incomplete, the function will return `NeedMore` with the _total_ number /// of bytes needed to make progress. The next call to read_next _should_ provide at least /// this number of bytes otherwise it will be given the same `NeedMore` response. - pub fn read_next(&mut self, bytes: &mut BytesMut) -> VortexResult { + pub fn read_next(&mut self, bytes: &mut B) -> VortexResult { loop { match &self.state { State::Length => { - if bytes.len() < 4 { + if bytes.remaining() < 4 { return Ok(PollRead::NeedMore(4)); } @@ -133,12 +133,11 @@ impl MessageDecoder { self.state = State::Header(msg_length as usize); } State::Header(msg_length) => { - if bytes.len() < *msg_length { - bytes.try_reserve_aligned(*msg_length, FB_ALIGNMENT); + if bytes.remaining() < *msg_length { return Ok(PollRead::NeedMore(*msg_length)); } - let mut msg_bytes = bytes.split_to_aligned(*msg_length, FB_ALIGNMENT); + let msg_bytes = bytes.copy_to_aligned(*msg_length, FB_ALIGNMENT); let msg = root::(msg_bytes.as_ref())?; if msg.version() != MessageVersion::V0 { vortex_bail!("Unsupported message version {:?}", msg.version()); @@ -161,10 +160,7 @@ impl MessageDecoder { })?; self.state = State::Array(ReadingArray { - header: ByteBuffer::from_bytes_aligned( - msg_bytes.split().freeze(), - FB_ALIGNMENT, - ), + header: msg_bytes, buffers_length, }); } @@ -200,18 +196,14 @@ impl MessageDecoder { }) => { // Ensure the buffer is read with maximum of reader and message alignment. let read_alignment = self.alignment.max(*alignment); - if bytes.len() < *length_with_padding { - bytes.try_reserve_aligned(*length_with_padding, read_alignment); + if bytes.remaining() < *length_with_padding { return Ok(PollRead::NeedMore(*length_with_padding)); } - let buffer = bytes.split_to_aligned(*length, read_alignment); + let buffer = bytes.copy_to_aligned(*length, read_alignment); - let msg = DecoderMessage::Buffer(ByteBuffer::from_bytes_aligned( - buffer.freeze(), - // Then use the buffer-requested alignment for the result. - *alignment, - )); - let _padding = bytes.split_to(length_with_padding - length); + // Then use the buffer-requested alignment for the result. + let msg = DecoderMessage::Buffer(buffer.aligned(*alignment)); + bytes.advance(length_with_padding - length); // Nothing else to read, so we reset the state to Length self.state = Default::default(); @@ -221,8 +213,7 @@ impl MessageDecoder { header, buffers_length, }) => { - if bytes.len() < *buffers_length { - bytes.try_reserve_aligned(*buffers_length, self.alignment); + if bytes.remaining() < *buffers_length { return Ok(PollRead::NeedMore(*buffers_length)); } @@ -246,10 +237,10 @@ impl MessageDecoder { // Ensure the buffer is read with maximum of reader and message alignment. let read_alignment = self.alignment.max(buffer_alignment); - let buffer = bytes.split_to_aligned(buffer_len, read_alignment); - let _padding = bytes.split_to(buffer_msg.padding() as usize); + let buffer = bytes.copy_to_aligned(buffer_len, read_alignment); + bytes.advance(buffer_msg.padding() as usize); // But use the buffer-requested alignment for the result. - ByteBuffer::from_bytes_aligned(buffer.freeze(), buffer_alignment) + buffer.aligned(buffer_alignment) }) .collect_vec(); @@ -271,63 +262,9 @@ impl MessageDecoder { } } -// TODO(ngates): use ByteBufferMut instead of BytesMut -trait BytesMutAlignedSplit { - /// If the buffer is empty, advances the cursor to the next aligned position and ensures there - /// is sufficient capacity for the requested length. - /// - /// If the buffer is not empty, this function does nothing. - /// - /// This allows us to optimistically align buffers that might be read into from an I/O source. - /// However, if the source of the decoder's BytesMut is a fully formed in-memory IPC buffer, - /// then it would be wasteful to copy the whole thing, and we'd rather only copy the individual - /// buffers that require alignment. - fn try_reserve_aligned(&mut self, capacity: usize, alignment: Alignment); - - /// Splits the buffer at the given index, ensuring the returned BytesMut is aligned - /// as requested. - /// - /// If the buffer isn't already aligned, the split data will be copied into a new - /// buffer that is aligned. - fn split_to_aligned(&mut self, at: usize, alignment: Alignment) -> BytesMut; -} - -impl BytesMutAlignedSplit for BytesMut { - fn try_reserve_aligned(&mut self, capacity: usize, alignment: Alignment) { - if !self.is_empty() { - return; - } - - // Reserve up to the worst-cast alignment - self.reserve(capacity + *alignment); - let padding = self.as_ptr().align_offset(*alignment); - unsafe { self.set_len(padding) }; - self.advance(padding); - } - - fn split_to_aligned(&mut self, at: usize, alignment: Alignment) -> BytesMut { - let buffer = self.split_to(at); - - // If the buffer is already aligned, we can return it directly. - if buffer.as_ptr().align_offset(*alignment) == 0 { - return buffer; - } - - // Otherwise, we allocate a new buffer, align the start, and copy the data. - // NOTE(ngates): this case will rarely be hit. Only if the caller mutates the bytes after - // they have been aligned by the decoder using `reserve_aligned`. - let mut aligned = BytesMut::with_capacity(buffer.len() + *alignment); - let padding = aligned.as_ptr().align_offset(*alignment); - unsafe { aligned.set_len(padding) }; - aligned.advance(padding); - aligned.extend_from_slice(&buffer); - - aligned - } -} - #[cfg(test)] mod test { + use bytes::BytesMut; use vortex_array::array::ConstantArray; use vortex_array::{ArrayDType, IntoArrayData}; use vortex_buffer::buffer; diff --git a/vortex-ipc/src/messages/mod.rs b/vortex-ipc/src/messages/mod.rs index 98a0530cb2..ec864a722a 100644 --- a/vortex-ipc/src/messages/mod.rs +++ b/vortex-ipc/src/messages/mod.rs @@ -1,6 +1,7 @@ mod decoder; mod encoder; mod reader_async; +mod reader_buf; mod reader_sync; mod writer_async; mod writer_sync; @@ -8,6 +9,7 @@ mod writer_sync; pub use decoder::*; pub use encoder::*; pub use reader_async::*; +pub use reader_buf::*; pub use reader_sync::*; pub use writer_async::*; pub use writer_sync::*; diff --git a/vortex-ipc/src/messages/reader_buf.rs b/vortex-ipc/src/messages/reader_buf.rs new file mode 100644 index 0000000000..bc36dfd17c --- /dev/null +++ b/vortex-ipc/src/messages/reader_buf.rs @@ -0,0 +1,37 @@ +use bytes::Buf; +use vortex_error::{vortex_err, VortexResult}; + +use crate::messages::{DecoderMessage, MessageDecoder, PollRead}; + +/// An IPC message reader backed by a `Read` stream. +pub struct BufMessageReader { + buffer: B, + decoder: MessageDecoder, +} + +impl BufMessageReader { + pub fn new(buffer: B) -> Self { + BufMessageReader { + buffer, + decoder: MessageDecoder::default(), + } + } +} + +impl Iterator for BufMessageReader { + type Item = VortexResult; + + fn next(&mut self) -> Option { + if !self.buffer.has_remaining() { + // End-of-buffer reached + return None; + } + match self.decoder.read_next(&mut self.buffer) { + Ok(PollRead::Some(msg)) => Some(Ok(msg)), + Ok(PollRead::NeedMore(_)) => Some(Err(vortex_err!( + "Buffer did not have sufficient bytes for an IPC message" + ))), + Err(e) => Some(Err(e)), + } + } +}