Skip to content

Commit

Permalink
refactor: vortex-buffer (#1742)
Browse files Browse the repository at this point in the history
Just a small one for y'all.

`vortex-buffer/src/lib.rs` is a decent starting point for this change.

This PR introduces `Buffer<T>` and `BufferMut<T>` for as (im)mutable
zero-copy runtime-aligned buffers.
You can think of `BufferMut` as a `Vec`, but every time it re-allocates
to resize, it ensures the new buffer remains aligned as requested.

This is neat. Unlike `Vec<T>`, it allows you to build buffers with much
larger alignment, for example always 128 bytes for FastLanes, or
4096/8192 for page-aligned buffers.

## Limitation: no zero-copy from `Vec<T>`

The implementation essentially wraps `Bytes` and `BytesMut` to provide
this functionality. As a result, there is one major annoying limitation:
we cannot go from `Vec<T>` to `Buffer<T>` *and back to* `Vec<T>` with
zero-copy. `Bytes::from_owner` is able to take an externally allocated
buffer (i.e. `Vec<T>`), but it will not return it back to you.

I've decided to entirely disallow going from `Vec<T>` to `Buffer<T>` to
avoid this odd performance foot-gun. Although in doing this, I _force_ a
copy `Buffer::<T>::copy_from_vec`... whereas if we _did_ allow
conversion from `Vec<T>` then we would only pay for the copy if we tried
to turn it into a `BufferMut<T>`. So.... not sure.

We can see an example of this when encoding ALP arrays. The result our
library returns is a `Vec<T>`. So we are forced to copy. One way around
this is to allow zero-copy into `Buffer<T>`, the other way is to make
the ALP library generic over `V: Default + Extend<T>` to allow the
library to initialize and append to a collection of elements in some
arbitrary container type.

Perhaps an argument for allowing `From<Vec<T>>` is that we would rarely
benefit from `into_mut` for arrays constructed by hand by users
in-memory (in-memory arrays constructed by _us_ should use `Buffer<T>`).
And any arrays loaded from disk, would be loaded into a `Buffer<T>`...

## Wire Break

This PR adds an `alignment: u16` field to every flatbuffer Array's
buffers. This lets us know the desired alignment for a given buffer deep
within the I/O stack, helping to avoid copies later on.

---------

Co-authored-by: Will Manning <[email protected]>
  • Loading branch information
gatesn and lwwmanning authored Dec 30, 2024
1 parent 5a32b5a commit 920b2d2
Show file tree
Hide file tree
Showing 208 changed files with 4,533 additions and 2,468 deletions.
1,574 changes: 1,196 additions & 378 deletions CHANGELOG.md

Large diffs are not rendered by default.

21 changes: 16 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions bench-vortex/benches/bytes_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray};
use parquet::data_type::AsBytes;
use vortex::array::{VarBinArray, VarBinViewArray};
use vortex::buffer::{buffer, ByteBuffer};
use vortex::dtype::{DType, Nullability};
use vortex::ipc::iterator::{ArrayIteratorIPC, SyncIPCReader};
use vortex::iter::ArrayIteratorExt;
Expand All @@ -13,8 +15,8 @@ use vortex::{Context, IntoArrayData, IntoArrayVariant};

fn array_data_fixture() -> VarBinArray {
VarBinArray::try_new(
PrimitiveArray::from(vec![0i32, 5i32, 10i32, 15i32, 20i32]).into_array(),
PrimitiveArray::from(b"helloworldhelloworld".to_vec()).into_array(),
buffer![0i32, 5i32, 10i32, 15i32, 20i32].into_array(),
ByteBuffer::copy_from(b"helloworldhelloworld".as_bytes()).into_array(),
DType::Utf8(Nullability::NonNullable),
Validity::NonNullable,
)
Expand Down
8 changes: 4 additions & 4 deletions bench-vortex/benches/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use bench_vortex::public_bi_data::PBIDataset::*;
use bench_vortex::taxi_data::taxi_data_parquet;
use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
use bench_vortex::{fetch_taxi_data, tpch};
use bytes::Bytes;
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use futures::StreamExt;
use log::LevelFilter;
Expand All @@ -28,7 +29,6 @@ use regex::Regex;
use simplelog::*;
use tokio::runtime::Runtime;
use vortex::array::{ChunkedArray, StructArray};
use vortex::buffer::Buffer;
use vortex::dtype::field::Field;
use vortex::error::VortexResult;
use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder};
Expand Down Expand Up @@ -133,8 +133,8 @@ fn vortex_compress_write(
}

#[inline(never)]
fn vortex_decompress_read(runtime: &Runtime, buf: Buffer) -> VortexResult<Vec<ArrayRef>> {
async fn async_read(buf: Buffer) -> VortexResult<Vec<ArrayRef>> {
fn vortex_decompress_read(runtime: &Runtime, buf: Bytes) -> VortexResult<Vec<ArrayRef>> {
async fn async_read(buf: Bytes) -> VortexResult<Vec<ArrayRef>> {
let builder: VortexReadBuilder<_> = VortexReadBuilder::new(
buf,
LayoutDeserializer::new(
Expand Down Expand Up @@ -240,7 +240,7 @@ fn benchmark_compress<F, U>(
let buffer = LazyCell::new(|| {
let mut buf = Vec::new();
vortex_compress_write(runtime, compressor, uncompressed.as_ref(), &mut buf).unwrap();
Buffer::from(buf)
Bytes::from(buf)
});

group.bench_function(bench_name, |b| {
Expand Down
20 changes: 5 additions & 15 deletions bench-vortex/benches/compressor_throughput.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion, Throughput};
use itertools::Itertools as _;
use mimalloc::MiMalloc;
use rand::distributions::Alphanumeric;
use rand::seq::SliceRandom as _;
use rand::{thread_rng, Rng, SeedableRng as _};
use vortex::aliases::hash_set::HashSet;
use vortex::array::{ConstantArray, PrimitiveArray, VarBinViewArray};
use vortex::array::{ConstantArray, VarBinViewArray};
use vortex::buffer::Buffer;
use vortex::compute::{compare, try_cast, Operator};
use vortex::dict::{dict_encode_varbinview, DictArray};
use vortex::dtype::PType;
Expand All @@ -26,7 +26,6 @@ use vortex::sampling_compressor::compressors::zigzag::ZigZagCompressor;
use vortex::sampling_compressor::compressors::CompressorRef;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::scalar::Scalar;
use vortex::validity::Validity;
use vortex::{IntoArrayData as _, IntoCanonical, ToArrayData};

#[global_allocator]
Expand All @@ -39,13 +38,8 @@ fn primitive(c: &mut Criterion) {

let mut rng = rand::rngs::StdRng::seed_from_u64(0);

let uint_array = PrimitiveArray::from_vec(
(0..num_values)
.map(|_| rng.gen_range(0u32..256))
.collect_vec(),
Validity::NonNullable,
)
.into_array();
let uint_array =
Buffer::from_iter((0..num_values).map(|_| rng.gen_range(0u32..256))).into_array();
let int_array = try_cast(uint_array.clone(), PType::I32.into()).unwrap();

let bool_array = compare(
Expand All @@ -55,11 +49,7 @@ fn primitive(c: &mut Criterion) {
)
.unwrap();

let index_array = PrimitiveArray::from_vec(
(0..num_values).map(|i| (i * 2) as u32 + 42).collect_vec(),
Validity::NonNullable,
)
.into_array();
let index_array = Buffer::from_iter((0..num_values).map(|i| (i * 2) as u32 + 42)).into_array();

let float_array = try_cast(uint_array.clone(), PType::F32.into()).unwrap();

Expand Down
79 changes: 34 additions & 45 deletions bench-vortex/src/bin/notimplemented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use vortex::array::{
BoolArray, ChunkedArray, ConstantArray, NullArray, PrimitiveArray, SparseArray, StructArray,
VarBinViewArray,
};
use vortex::buffer::buffer;
use vortex::bytebool::ByteBoolArray;
use vortex::datetime_dtype::{TemporalMetadata, TimeUnit, TIME_ID};
use vortex::datetime_parts::DateTimePartsArray;
Expand Down Expand Up @@ -57,17 +58,16 @@ fn varbinview_array() -> ArrayData {

fn enc_impls() -> Vec<ArrayData> {
vec![
ALPArray::try_new(
PrimitiveArray::from(vec![1]).into_array(),
Exponents { e: 1, f: 1 },
None,
)
.unwrap()
.into_array(),
ALPArray::try_new(buffer![1].into_array(), Exponents { e: 1, f: 1 }, None)
.unwrap()
.into_array(),
RDEncoder::new(&[1.123_848_f32.powi(-2)])
.encode(&PrimitiveArray::from(vec![0.1f64.next_up()]))
.encode(&PrimitiveArray::new(
buffer![0.1f64.next_up()],
Validity::NonNullable,
))
.into_array(),
BitPackedArray::encode(&PrimitiveArray::from(vec![100u32]).into_array(), 8)
BitPackedArray::encode(&buffer![100u32].into_array(), 8)
.unwrap()
.into_array(),
BoolArray::from_iter([false]).into_array(),
Expand All @@ -88,53 +88,42 @@ fn enc_impls() -> Vec<ArrayData> {
Arc::new(DType::Primitive(PType::I32, Nullability::NonNullable)),
Some(TemporalMetadata::Time(TimeUnit::S).into()),
))),
PrimitiveArray::from(vec![1]).into_array(),
PrimitiveArray::from(vec![0]).into_array(),
PrimitiveArray::from(vec![0]).into_array(),
buffer![1].into_array(),
buffer![0].into_array(),
buffer![0].into_array(),
)
.unwrap()
.into_array(),
DeltaArray::try_from_primitive_array(&PrimitiveArray::from(vec![0u32, 1]))
.unwrap()
.into_array(),
DictArray::try_new(
PrimitiveArray::from(vec![0u32, 1, 0]).into_array(),
PrimitiveArray::from(vec![1, 2]).into_array(),
)
DeltaArray::try_from_primitive_array(&PrimitiveArray::new(
buffer![0u32, 1],
Validity::NonNullable,
))
.unwrap()
.into_array(),
DictArray::try_new(buffer![0u32, 1, 0].into_array(), buffer![1, 2].into_array())
.unwrap()
.into_array(),
fsst_array(),
FoRArray::try_new(
PrimitiveArray::from(vec![0u32, 1, 2]).into_array(),
10.into(),
5,
)
.unwrap()
.into_array(),
FoRArray::try_new(buffer![0u32, 1, 2].into_array(), 10.into(), 5)
.unwrap()
.into_array(),
NullArray::new(10).into_array(),
PrimitiveArray::from(vec![0, 1]).into_array(),
buffer![0, 1].into_array(),
RoaringBoolArray::try_new(Bitmap::from([0u32, 10, 20]), 30)
.unwrap()
.into_array(),
RoaringIntArray::try_new(Bitmap::from([5u32, 6, 8]), PType::U32)
.unwrap()
.into_array(),
RunEndArray::try_new(
PrimitiveArray::from(vec![5u32, 8]).into_array(),
PrimitiveArray::from(vec![0, 1]).into_array(),
)
.unwrap()
.into_array(),
RunEndBoolArray::try_new(
PrimitiveArray::from(vec![5u32, 8]).into_array(),
true,
Validity::NonNullable,
)
.unwrap()
.into_array(),
RunEndArray::try_new(buffer![5u32, 8].into_array(), buffer![0, 1].into_array())
.unwrap()
.into_array(),
RunEndBoolArray::try_new(buffer![5u32, 8].into_array(), true, Validity::NonNullable)
.unwrap()
.into_array(),
SparseArray::try_new(
PrimitiveArray::from(vec![5u64, 8]).into_array(),
PrimitiveArray::from_vec(vec![3u32, 6], Validity::AllValid).into_array(),
buffer![5u64, 8].into_array(),
PrimitiveArray::new(buffer![3u32, 6], Validity::AllValid).into_array(),
10,
Scalar::null_typed::<u32>(),
)
Expand All @@ -143,8 +132,8 @@ fn enc_impls() -> Vec<ArrayData> {
StructArray::try_new(
["a".into(), "b".into()].into(),
vec![
PrimitiveArray::from(vec![0, 1, 2]).into_array(),
PrimitiveArray::from(vec![0.1f64, 1.1f64, 2.1f64]).into_array(),
buffer![0, 1, 2].into_array(),
buffer![0.1f64, 1.1f64, 2.1f64].into_array(),
],
3,
Validity::NonNullable,
Expand All @@ -153,7 +142,7 @@ fn enc_impls() -> Vec<ArrayData> {
.into_array(),
varbin_array(),
varbinview_array(),
ZigZagArray::encode(&PrimitiveArray::from(vec![-1, 1, -9, 9]).into_array())
ZigZagArray::encode(&buffer![-1, 1, -9, 9].into_array())
.unwrap()
.into_array(),
]
Expand Down
3 changes: 2 additions & 1 deletion bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use stream::StreamExt;
use vortex::aliases::hash_map::HashMap;
use vortex::array::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::buffer::Buffer;
use vortex::compress::CompressionStrategy;
use vortex::dtype::DType;
use vortex::error::VortexResult;
Expand Down Expand Up @@ -124,7 +125,7 @@ async fn take_vortex<T: VortexReadAt + Unpin + 'static>(
),
)
.with_io_dispatcher(DISPATCHER.clone())
.with_indices(ArrayData::from(indices.to_vec()))
.with_indices(Buffer::copy_from(indices).into_array())
.build()
.await?
.read_all()
Expand Down
4 changes: 2 additions & 2 deletions docs/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ Use :func:`~vortex.encoding.compress` to compress the Vortex array and check the

>>> cvtx = vortex.compress(vtx)
>>> cvtx.nbytes
16539
16859
>>> cvtx.nbytes / vtx.nbytes
0.117...
0.119...

Vortex uses nearly ten times fewer bytes than Arrow. Fewer bytes means more of your data fits in
cache and RAM.
Expand Down
Loading

0 comments on commit 920b2d2

Please sign in to comment.