Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: vortex-buffer #1742

Merged
merged 86 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
79e6ba2
Store desired alignment in the array buffer
gatesn Dec 18, 2024
33b911c
Buffer alignment
gatesn Dec 18, 2024
602f509
Buffer alignment
gatesn Dec 18, 2024
b54f546
Buffer alignment
gatesn Dec 18, 2024
7db32d1
Buffer alignment
gatesn Dec 18, 2024
7852ff3
Buffer alignment
gatesn Dec 18, 2024
1525de5
Buffer alignment
gatesn Dec 18, 2024
83ad2ba
Merge branch 'develop' into ngates/buffers
gatesn Dec 18, 2024
8209d76
AlignedBufferMut
gatesn Dec 18, 2024
897d063
AlignedBufferMut
gatesn Dec 18, 2024
1ef552e
AlignedBufferMut
gatesn Dec 18, 2024
b4cb50a
AlignedBufferMut
gatesn Dec 18, 2024
c5ab1ac
AlignedBufferMut
gatesn Dec 18, 2024
59bc34a
AlignedBufferMut
gatesn Dec 18, 2024
d14bc95
AlignedBufferMut
gatesn Dec 19, 2024
3baafa7
AlignedBufferMut
gatesn Dec 19, 2024
50438aa
AlignedBufferMut
gatesn Dec 19, 2024
7689fc8
AlignedBufferMut
gatesn Dec 19, 2024
6f67551
AlignedBufferMut
gatesn Dec 19, 2024
3989cca
Fix transmute
gatesn Dec 19, 2024
1f5eca7
Fix transmute
gatesn Dec 19, 2024
f61bd02
Fix transmute
gatesn Dec 19, 2024
c75b39a
Fix transmute
gatesn Dec 19, 2024
f247f1a
Fix transmute
gatesn Dec 19, 2024
434b635
Fix transmute
gatesn Dec 19, 2024
cf6049f
Fix transmute
gatesn Dec 19, 2024
a21f595
Combine into single ScalarBuffer
gatesn Dec 19, 2024
6b37397
Benchmark from_iter
gatesn Dec 19, 2024
3338746
Benchmark from_iter
gatesn Dec 19, 2024
20e4c7e
Benchmark from_iter
gatesn Dec 19, 2024
f5d3bf0
Rename ScalarBuffer to Buffer
gatesn Dec 19, 2024
12f91ea
Improve performance of from_iter
gatesn Dec 19, 2024
baf9bb1
Improve performance of from_iter
gatesn Dec 19, 2024
aef3e16
Improve performance of from_iter
gatesn Dec 19, 2024
e41a3c5
Improve performance of from_iter
gatesn Dec 19, 2024
0bfce15
Remove maybe_null prefix
gatesn Dec 20, 2024
1bae5c1
Remove maybe_null prefix
gatesn Dec 20, 2024
c680791
Remove maybe_null prefix
gatesn Dec 20, 2024
5be44d5
Remove maybe_null prefix
gatesn Dec 20, 2024
59abd5d
Some updates
gatesn Dec 20, 2024
7100a11
Add push benchmark
gatesn Dec 20, 2024
1d3f8ae
Speed up push
gatesn Dec 20, 2024
e7fbef1
Speed up push
gatesn Dec 20, 2024
91a7ef7
More things
gatesn Dec 20, 2024
8bf5025
Migrate to buffer
gatesn Dec 20, 2024
6e26355
Remove copy for ALP decompress
gatesn Dec 20, 2024
0937fdf
Remove copy for ALP decompress
gatesn Dec 20, 2024
1706492
Remove copy for ALP decompress
gatesn Dec 20, 2024
6ff25dd
Remove copy for ALP decompress
gatesn Dec 20, 2024
f25637b
Remove copy for ALP decompress
gatesn Dec 20, 2024
840f4c2
Remove copy for ALP decompress
gatesn Dec 20, 2024
750d979
Remove copy for ALP decompress
gatesn Dec 20, 2024
2f3da8f
Remove copy for ALP decompress
gatesn Dec 20, 2024
298b8dd
Fix cast bug
gatesn Dec 20, 2024
d199163
Fix cast bug
gatesn Dec 20, 2024
8ca57a6
Fix cast bug
gatesn Dec 20, 2024
f1b2fc6
push_n
gatesn Dec 20, 2024
afc1d32
Merge develop
gatesn Dec 21, 2024
82fd044
Lint
gatesn Dec 21, 2024
4c64819
Lint
gatesn Dec 21, 2024
852b573
Docs|
gatesn Dec 21, 2024
3a1c28c
Docs|
gatesn Dec 21, 2024
302ed43
Make Miri happy by borrowing the uninit region
gatesn Dec 21, 2024
343eaad
More tests
gatesn Dec 21, 2024
da06bd1
More tests
gatesn Dec 21, 2024
f4b65eb
More tests
gatesn Dec 21, 2024
898c8a5
Add some assertions
gatesn Dec 21, 2024
af63176
Remove vortex-dtype dependency
gatesn Dec 21, 2024
92f66f8
fix typo
lwwmanning Dec 26, 2024
0f734d7
fix ALP compress to not copy
lwwmanning Dec 26, 2024
ca84348
trivial optimization
lwwmanning Dec 26, 2024
ce9c8fd
remove unnecessary allocations in bitpacking compress
lwwmanning Dec 26, 2024
3c1735a
nits
lwwmanning Dec 26, 2024
885eb4b
more nits
lwwmanning Dec 26, 2024
530c69c
fixup locks
lwwmanning Dec 26, 2024
4b51cf4
Zero-copy IO
gatesn Dec 29, 2024
eba9a69
Zero-copy IO
gatesn Dec 29, 2024
768c692
Zero-copy IO
gatesn Dec 29, 2024
2f46737
Zero-copy IO
gatesn Dec 29, 2024
039ed78
Zero-copy IO
gatesn Dec 29, 2024
aec6081
Zero-copy IO
gatesn Dec 29, 2024
6dafbdc
Zero-copy IO
gatesn Dec 29, 2024
120b1c7
Zero-copy IO
gatesn Dec 29, 2024
d31f847
Zero-copy IO
gatesn Dec 29, 2024
978a2fb
Appease Miri
gatesn Dec 29, 2024
cdf2bf9
Merge remote-tracking branch 'origin/develop' into ngates/buffers
lwwmanning Dec 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,574 changes: 1,196 additions & 378 deletions CHANGELOG.md

Large diffs are not rendered by default.

21 changes: 16 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions bench-vortex/benches/bytes_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use vortex::array::{PrimitiveArray, VarBinArray, VarBinViewArray};
use parquet::data_type::AsBytes;
use vortex::array::{VarBinArray, VarBinViewArray};
use vortex::buffer::{buffer, ByteBuffer};
use vortex::dtype::{DType, Nullability};
use vortex::ipc::iterator::{ArrayIteratorIPC, SyncIPCReader};
use vortex::iter::ArrayIteratorExt;
Expand All @@ -13,8 +15,8 @@ use vortex::{Context, IntoArrayData, IntoArrayVariant};

fn array_data_fixture() -> VarBinArray {
VarBinArray::try_new(
PrimitiveArray::from(vec![0i32, 5i32, 10i32, 15i32, 20i32]).into_array(),
PrimitiveArray::from(b"helloworldhelloworld".to_vec()).into_array(),
buffer![0i32, 5i32, 10i32, 15i32, 20i32].into_array(),
ByteBuffer::copy_from(b"helloworldhelloworld".as_bytes()).into_array(),
DType::Utf8(Nullability::NonNullable),
Validity::NonNullable,
)
Expand Down
8 changes: 4 additions & 4 deletions bench-vortex/benches/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use bench_vortex::public_bi_data::PBIDataset::*;
use bench_vortex::taxi_data::taxi_data_parquet;
use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
use bench_vortex::{fetch_taxi_data, tpch};
use bytes::Bytes;
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use futures::StreamExt;
use log::LevelFilter;
Expand All @@ -28,7 +29,6 @@ use regex::Regex;
use simplelog::*;
use tokio::runtime::Runtime;
use vortex::array::{ChunkedArray, StructArray};
use vortex::buffer::Buffer;
use vortex::dtype::field::Field;
use vortex::error::VortexResult;
use vortex::file::{LayoutContext, LayoutDeserializer, VortexFileWriter, VortexReadBuilder};
Expand Down Expand Up @@ -133,8 +133,8 @@ fn vortex_compress_write(
}

#[inline(never)]
fn vortex_decompress_read(runtime: &Runtime, buf: Buffer) -> VortexResult<Vec<ArrayRef>> {
async fn async_read(buf: Buffer) -> VortexResult<Vec<ArrayRef>> {
fn vortex_decompress_read(runtime: &Runtime, buf: Bytes) -> VortexResult<Vec<ArrayRef>> {
async fn async_read(buf: Bytes) -> VortexResult<Vec<ArrayRef>> {
let builder: VortexReadBuilder<_> = VortexReadBuilder::new(
buf,
LayoutDeserializer::new(
Expand Down Expand Up @@ -240,7 +240,7 @@ fn benchmark_compress<F, U>(
let buffer = LazyCell::new(|| {
let mut buf = Vec::new();
vortex_compress_write(runtime, compressor, uncompressed.as_ref(), &mut buf).unwrap();
Buffer::from(buf)
Bytes::from(buf)
});

group.bench_function(bench_name, |b| {
Expand Down
20 changes: 5 additions & 15 deletions bench-vortex/benches/compressor_throughput.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion, Throughput};
use itertools::Itertools as _;
use mimalloc::MiMalloc;
use rand::distributions::Alphanumeric;
use rand::seq::SliceRandom as _;
use rand::{thread_rng, Rng, SeedableRng as _};
use vortex::aliases::hash_set::HashSet;
use vortex::array::{ConstantArray, PrimitiveArray, VarBinViewArray};
use vortex::array::{ConstantArray, VarBinViewArray};
use vortex::buffer::Buffer;
use vortex::compute::{compare, try_cast, Operator};
use vortex::dict::{dict_encode_varbinview, DictArray};
use vortex::dtype::PType;
Expand All @@ -26,7 +26,6 @@ use vortex::sampling_compressor::compressors::zigzag::ZigZagCompressor;
use vortex::sampling_compressor::compressors::CompressorRef;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::scalar::Scalar;
use vortex::validity::Validity;
use vortex::{IntoArrayData as _, IntoCanonical, ToArrayData};

#[global_allocator]
Expand All @@ -39,13 +38,8 @@ fn primitive(c: &mut Criterion) {

let mut rng = rand::rngs::StdRng::seed_from_u64(0);

let uint_array = PrimitiveArray::from_vec(
(0..num_values)
.map(|_| rng.gen_range(0u32..256))
.collect_vec(),
Validity::NonNullable,
)
.into_array();
let uint_array =
Buffer::from_iter((0..num_values).map(|_| rng.gen_range(0u32..256))).into_array();
let int_array = try_cast(uint_array.clone(), PType::I32.into()).unwrap();

let bool_array = compare(
Expand All @@ -55,11 +49,7 @@ fn primitive(c: &mut Criterion) {
)
.unwrap();

let index_array = PrimitiveArray::from_vec(
(0..num_values).map(|i| (i * 2) as u32 + 42).collect_vec(),
Validity::NonNullable,
)
.into_array();
let index_array = Buffer::from_iter((0..num_values).map(|i| (i * 2) as u32 + 42)).into_array();

let float_array = try_cast(uint_array.clone(), PType::F32.into()).unwrap();

Expand Down
79 changes: 34 additions & 45 deletions bench-vortex/src/bin/notimplemented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use vortex::array::{
BoolArray, ChunkedArray, ConstantArray, NullArray, PrimitiveArray, SparseArray, StructArray,
VarBinViewArray,
};
use vortex::buffer::buffer;
use vortex::bytebool::ByteBoolArray;
use vortex::datetime_dtype::{TemporalMetadata, TimeUnit, TIME_ID};
use vortex::datetime_parts::DateTimePartsArray;
Expand Down Expand Up @@ -57,17 +58,16 @@ fn varbinview_array() -> ArrayData {

fn enc_impls() -> Vec<ArrayData> {
vec![
ALPArray::try_new(
PrimitiveArray::from(vec![1]).into_array(),
Exponents { e: 1, f: 1 },
None,
)
.unwrap()
.into_array(),
ALPArray::try_new(buffer![1].into_array(), Exponents { e: 1, f: 1 }, None)
.unwrap()
.into_array(),
RDEncoder::new(&[1.123_848_f32.powi(-2)])
.encode(&PrimitiveArray::from(vec![0.1f64.next_up()]))
.encode(&PrimitiveArray::new(
buffer![0.1f64.next_up()],
Validity::NonNullable,
))
.into_array(),
BitPackedArray::encode(&PrimitiveArray::from(vec![100u32]).into_array(), 8)
BitPackedArray::encode(&buffer![100u32].into_array(), 8)
.unwrap()
.into_array(),
BoolArray::from_iter([false]).into_array(),
Expand All @@ -88,53 +88,42 @@ fn enc_impls() -> Vec<ArrayData> {
Arc::new(DType::Primitive(PType::I32, Nullability::NonNullable)),
Some(TemporalMetadata::Time(TimeUnit::S).into()),
))),
PrimitiveArray::from(vec![1]).into_array(),
PrimitiveArray::from(vec![0]).into_array(),
PrimitiveArray::from(vec![0]).into_array(),
buffer![1].into_array(),
buffer![0].into_array(),
buffer![0].into_array(),
)
.unwrap()
.into_array(),
DeltaArray::try_from_primitive_array(&PrimitiveArray::from(vec![0u32, 1]))
.unwrap()
.into_array(),
DictArray::try_new(
PrimitiveArray::from(vec![0u32, 1, 0]).into_array(),
PrimitiveArray::from(vec![1, 2]).into_array(),
)
DeltaArray::try_from_primitive_array(&PrimitiveArray::new(
buffer![0u32, 1],
Validity::NonNullable,
))
.unwrap()
.into_array(),
DictArray::try_new(buffer![0u32, 1, 0].into_array(), buffer![1, 2].into_array())
.unwrap()
.into_array(),
fsst_array(),
FoRArray::try_new(
PrimitiveArray::from(vec![0u32, 1, 2]).into_array(),
10.into(),
5,
)
.unwrap()
.into_array(),
FoRArray::try_new(buffer![0u32, 1, 2].into_array(), 10.into(), 5)
.unwrap()
.into_array(),
NullArray::new(10).into_array(),
PrimitiveArray::from(vec![0, 1]).into_array(),
buffer![0, 1].into_array(),
RoaringBoolArray::try_new(Bitmap::from([0u32, 10, 20]), 30)
.unwrap()
.into_array(),
RoaringIntArray::try_new(Bitmap::from([5u32, 6, 8]), PType::U32)
.unwrap()
.into_array(),
RunEndArray::try_new(
PrimitiveArray::from(vec![5u32, 8]).into_array(),
PrimitiveArray::from(vec![0, 1]).into_array(),
)
.unwrap()
.into_array(),
RunEndBoolArray::try_new(
PrimitiveArray::from(vec![5u32, 8]).into_array(),
true,
Validity::NonNullable,
)
.unwrap()
.into_array(),
RunEndArray::try_new(buffer![5u32, 8].into_array(), buffer![0, 1].into_array())
.unwrap()
.into_array(),
RunEndBoolArray::try_new(buffer![5u32, 8].into_array(), true, Validity::NonNullable)
.unwrap()
.into_array(),
SparseArray::try_new(
PrimitiveArray::from(vec![5u64, 8]).into_array(),
PrimitiveArray::from_vec(vec![3u32, 6], Validity::AllValid).into_array(),
buffer![5u64, 8].into_array(),
PrimitiveArray::new(buffer![3u32, 6], Validity::AllValid).into_array(),
10,
Scalar::null_typed::<u32>(),
)
Expand All @@ -143,8 +132,8 @@ fn enc_impls() -> Vec<ArrayData> {
StructArray::try_new(
["a".into(), "b".into()].into(),
vec![
PrimitiveArray::from(vec![0, 1, 2]).into_array(),
PrimitiveArray::from(vec![0.1f64, 1.1f64, 2.1f64]).into_array(),
buffer![0, 1, 2].into_array(),
buffer![0.1f64, 1.1f64, 2.1f64].into_array(),
],
3,
Validity::NonNullable,
Expand All @@ -153,7 +142,7 @@ fn enc_impls() -> Vec<ArrayData> {
.into_array(),
varbin_array(),
varbinview_array(),
ZigZagArray::encode(&PrimitiveArray::from(vec![-1, 1, -9, 9]).into_array())
ZigZagArray::encode(&buffer![-1, 1, -9, 9].into_array())
.unwrap()
.into_array(),
]
Expand Down
3 changes: 2 additions & 1 deletion bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use stream::StreamExt;
use vortex::aliases::hash_map::HashMap;
use vortex::array::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::buffer::Buffer;
use vortex::compress::CompressionStrategy;
use vortex::dtype::DType;
use vortex::error::VortexResult;
Expand Down Expand Up @@ -124,7 +125,7 @@ async fn take_vortex<T: VortexReadAt + Unpin + 'static>(
),
)
.with_io_dispatcher(DISPATCHER.clone())
.with_indices(ArrayData::from(indices.to_vec()))
.with_indices(Buffer::copy_from(indices).into_array())
.build()
.await?
.read_all()
Expand Down
4 changes: 2 additions & 2 deletions docs/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ Use :func:`~vortex.encoding.compress` to compress the Vortex array and check the

>>> cvtx = vortex.compress(vtx)
>>> cvtx.nbytes
16539
16859
>>> cvtx.nbytes / vtx.nbytes
0.117...
0.119...

Vortex uses nearly ten times fewer bytes than Arrow. Fewer bytes means more of your data fits in
cache and RAM.
Expand Down
Loading
Loading