Skip to content

Commit

Permalink
add method to fetch block of first vals in columnar (#2330)
Browse files Browse the repository at this point in the history
* add method to fetch block of first vals in columnar

add method to fetch block of first vals in columnar (this is way faster
than single calls for full columns)
add benchmark
fix import warnings

```
test bench_get_block_first_on_full_column                  ... bench:          56 ns/iter (+/- 26)
test bench_get_block_first_on_full_column_single_calls     ... bench:         311 ns/iter (+/- 6)
test bench_get_block_first_on_multi_column                 ... bench:         378 ns/iter (+/- 15)
test bench_get_block_first_on_multi_column_single_calls    ... bench:         546 ns/iter (+/- 13)
test bench_get_block_first_on_optional_column              ... bench:         291 ns/iter (+/- 6)
test bench_get_block_first_on_optional_column_single_calls ... bench:         362 ns/iter (+/- 8)
```

* use remainder
  • Loading branch information
PSeitz authored Mar 15, 2024
1 parent 0cffe5f commit 7ce950f
Show file tree
Hide file tree
Showing 25 changed files with 227 additions and 41 deletions.
1 change: 0 additions & 1 deletion bitpacker/src/bitpacker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::convert::TryInto;
use std::io;
use std::ops::{Range, RangeInclusive};

Expand Down
155 changes: 155 additions & 0 deletions columnar/benches/bench_first_vals.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#![feature(test)]
extern crate test;

use std::sync::Arc;

use rand::prelude::*;
use tantivy_columnar::column_values::{serialize_and_load_u64_based_column_values, CodecType};
use tantivy_columnar::*;
use test::{black_box, Bencher};

struct Columns {
pub optional: Column,
pub full: Column,
pub multi: Column,
}

fn get_test_columns() -> Columns {
let data = generate_permutation();
let mut dataframe_writer = ColumnarWriter::default();
for (idx, val) in data.iter().enumerate() {
dataframe_writer.record_numerical(idx as u32, "full_values", NumericalValue::U64(*val));
if idx % 2 == 0 {
dataframe_writer.record_numerical(
idx as u32,
"optional_values",
NumericalValue::U64(*val),
);
}
dataframe_writer.record_numerical(idx as u32, "multi_values", NumericalValue::U64(*val));
dataframe_writer.record_numerical(idx as u32, "multi_values", NumericalValue::U64(*val));
}
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer
.serialize(data.len() as u32, None, &mut buffer)
.unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();

let cols: Vec<DynamicColumnHandle> = columnar.read_columns("optional_values").unwrap();
assert_eq!(cols.len(), 1);
let optional = cols[0].open_u64_lenient().unwrap().unwrap();
assert_eq!(optional.index.get_cardinality(), Cardinality::Optional);

let cols: Vec<DynamicColumnHandle> = columnar.read_columns("full_values").unwrap();
assert_eq!(cols.len(), 1);
let column_full = cols[0].open_u64_lenient().unwrap().unwrap();
assert_eq!(column_full.index.get_cardinality(), Cardinality::Full);

let cols: Vec<DynamicColumnHandle> = columnar.read_columns("multi_values").unwrap();
assert_eq!(cols.len(), 1);
let multi = cols[0].open_u64_lenient().unwrap().unwrap();
assert_eq!(multi.index.get_cardinality(), Cardinality::Multivalued);

Columns {
optional,
full: column_full,
multi,
}
}

const NUM_VALUES: u64 = 100_000;
fn generate_permutation() -> Vec<u64> {
let mut permutation: Vec<u64> = (0u64..NUM_VALUES).collect();
permutation.shuffle(&mut StdRng::from_seed([1u8; 32]));
permutation
}

pub fn serialize_and_load(column: &[u64], codec_type: CodecType) -> Arc<dyn ColumnValues<u64>> {
serialize_and_load_u64_based_column_values(&column, &[codec_type])
}

fn run_bench_on_column_full_scan(b: &mut Bencher, column: Column) {
let num_iter = black_box(NUM_VALUES);
b.iter(|| {
let mut sum = 0u64;
for i in 0..num_iter as u32 {
let val = column.first(i);
sum += val.unwrap_or(0);
}
sum
});
}
fn run_bench_on_column_block_fetch(b: &mut Bencher, column: Column) {
let mut block: Vec<Option<u64>> = vec![None; 64];
let fetch_docids = (0..64).collect::<Vec<_>>();
b.iter(move || {
column.first_vals(&fetch_docids, &mut block);
block[0]
});
}
fn run_bench_on_column_block_single_calls(b: &mut Bencher, column: Column) {
let mut block: Vec<Option<u64>> = vec![None; 64];
let fetch_docids = (0..64).collect::<Vec<_>>();
b.iter(move || {
for i in 0..fetch_docids.len() {
block[i] = column.first(fetch_docids[i]);
}
block[0]
});
}

/// Column first method
#[bench]
fn bench_get_first_on_full_column_full_scan(b: &mut Bencher) {
let column = get_test_columns().full;
run_bench_on_column_full_scan(b, column);
}

#[bench]
fn bench_get_first_on_optional_column_full_scan(b: &mut Bencher) {
let column = get_test_columns().optional;
run_bench_on_column_full_scan(b, column);
}

#[bench]
fn bench_get_first_on_multi_column_full_scan(b: &mut Bencher) {
let column = get_test_columns().multi;
run_bench_on_column_full_scan(b, column);
}

/// Block fetch column accessor
#[bench]
fn bench_get_block_first_on_optional_column(b: &mut Bencher) {
let column = get_test_columns().optional;
run_bench_on_column_block_fetch(b, column);
}

#[bench]
fn bench_get_block_first_on_multi_column(b: &mut Bencher) {
let column = get_test_columns().multi;
run_bench_on_column_block_fetch(b, column);
}

#[bench]
fn bench_get_block_first_on_full_column(b: &mut Bencher) {
let column = get_test_columns().full;
run_bench_on_column_block_fetch(b, column);
}

#[bench]
fn bench_get_block_first_on_optional_column_single_calls(b: &mut Bencher) {
let column = get_test_columns().optional;
run_bench_on_column_block_single_calls(b, column);
}

#[bench]
fn bench_get_block_first_on_multi_column_single_calls(b: &mut Bencher) {
let column = get_test_columns().multi;
run_bench_on_column_block_single_calls(b, column);
}

#[bench]
fn bench_get_block_first_on_full_column_single_calls(b: &mut Bencher) {
let column = get_test_columns().full;
run_bench_on_column_block_single_calls(b, column);
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,6 @@ fn generate_permutation() -> Vec<u64> {
permutation
}

fn generate_random() -> Vec<u64> {
let mut permutation: Vec<u64> = (0u64..100_000u64)
.map(|el| el + random::<u16>() as u64)
.collect();
permutation.shuffle(&mut StdRng::from_seed([1u8; 32]));
permutation
}

// Warning: this generates the same permutation at each call
fn generate_permutation_gcd() -> Vec<u64> {
let mut permutation: Vec<u64> = (1u64..100_000u64).map(|el| el * 1000).collect();
Expand Down
28 changes: 27 additions & 1 deletion columnar/src/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub use serialize::{
open_column_u64, serialize_column_mappable_to_u128, serialize_column_mappable_to_u64,
};

use crate::column_index::ColumnIndex;
use crate::column_index::{ColumnIndex, Set};
use crate::column_values::monotonic_mapping::StrictlyMonotonicMappingToInternal;
use crate::column_values::{monotonic_map_column, ColumnValues};
use crate::{Cardinality, DocId, EmptyColumnValues, MonotonicallyMappableToU64, RowId};
Expand Down Expand Up @@ -83,10 +83,36 @@ impl<T: PartialOrd + Copy + Debug + Send + Sync + 'static> Column<T> {
self.values.max_value()
}

#[inline]
pub fn first(&self, row_id: RowId) -> Option<T> {
self.values_for_doc(row_id).next()
}

/// Load the first value for each docid in the provided slice.
#[inline]
pub fn first_vals(&self, docids: &[DocId], output: &mut [Option<T>]) {
match &self.index {
ColumnIndex::Empty { .. } => {}
ColumnIndex::Full => self.values.get_vals_opt(docids, output),
ColumnIndex::Optional(optional_index) => {
for (i, docid) in docids.iter().enumerate() {
output[i] = optional_index
.rank_if_exists(*docid)
.map(|rowid| self.values.get_val(rowid));
}
}
ColumnIndex::Multivalued(multivalued_index) => {
for (i, docid) in docids.iter().enumerate() {
let range = multivalued_index.range(*docid);
let is_empty = range.start == range.end;
if !is_empty {
output[i] = Some(self.values.get_val(range.start));
}
}
}
}
}

/// Translates a block of docis to row_ids.
///
/// returns the row_ids and the matching docids on the same index
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::convert::TryInto;
use std::io::{self, Write};

use common::BinarySerializable;
Expand Down
3 changes: 1 addition & 2 deletions columnar/src/column_index/optional_index/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use proptest::prelude::{any, prop, *};
use proptest::strategy::Strategy;
use proptest::prelude::*;
use proptest::{prop_oneof, proptest};

use super::*;
Expand Down
42 changes: 38 additions & 4 deletions columnar/src/column_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,40 @@ pub trait ColumnValues<T: PartialOrd = u64>: Send + Sync + DowncastSync {
out_x4[3] = self.get_val(idx_x4[3]);
}

let step_size = 4;
let cutoff = indexes.len() - indexes.len() % step_size;
let out_and_idx_chunks = output
.chunks_exact_mut(4)
.into_remainder()
.into_iter()
.zip(indexes.chunks_exact(4).remainder());
for (out, idx) in out_and_idx_chunks {
*out = self.get_val(*idx);
}
}

for idx in cutoff..indexes.len() {
output[idx] = self.get_val(indexes[idx]);
/// Allows to push down multiple fetch calls, to avoid dynamic dispatch overhead.
/// The slightly weird `Option<T>` in output allows pushdown to full columns.
///
/// idx and output should have the same length
///
/// # Panics
///
/// May panic if `idx` is greater than the column length.
fn get_vals_opt(&self, indexes: &[u32], output: &mut [Option<T>]) {
assert!(indexes.len() == output.len());
let out_and_idx_chunks = output.chunks_exact_mut(4).zip(indexes.chunks_exact(4));
for (out_x4, idx_x4) in out_and_idx_chunks {
out_x4[0] = Some(self.get_val(idx_x4[0]));
out_x4[1] = Some(self.get_val(idx_x4[1]));
out_x4[2] = Some(self.get_val(idx_x4[2]));
out_x4[3] = Some(self.get_val(idx_x4[3]));
}
let out_and_idx_chunks = output
.chunks_exact_mut(4)
.into_remainder()
.into_iter()
.zip(indexes.chunks_exact(4).remainder());
for (out, idx) in out_and_idx_chunks {
*out = Some(self.get_val(*idx));
}
}

Expand Down Expand Up @@ -172,6 +201,11 @@ impl<T: Copy + PartialOrd + Debug + 'static> ColumnValues<T> for Arc<dyn ColumnV
self.as_ref().get_val(idx)
}

#[inline(always)]
fn get_vals_opt(&self, indexes: &[u32], output: &mut [Option<T>]) {
self.as_ref().get_vals_opt(indexes, output)
}

#[inline(always)]
fn min_value(&self) -> T {
self.as_ref().min_value()
Expand Down
2 changes: 1 addition & 1 deletion columnar/src/column_values/u128_based/compact_space/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mod build_compact_space;

use build_compact_space::get_compact_space;
use common::{BinarySerializable, CountingWriter, OwnedBytes, VInt, VIntU128};
use tantivy_bitpacker::{self, BitPacker, BitUnpacker};
use tantivy_bitpacker::{BitPacker, BitUnpacker};

use crate::column_values::ColumnValues;
use crate::RowId;
Expand Down
1 change: 0 additions & 1 deletion columnar/src/column_values/u64_based/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use proptest::prelude::*;
use proptest::strategy::Strategy;
use proptest::{prop_oneof, proptest};

#[test]
Expand Down
4 changes: 0 additions & 4 deletions columnar/src/columnar/merge/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
use std::collections::BTreeMap;

use itertools::Itertools;

use super::*;
use crate::{Cardinality, ColumnarWriter, HasAssociatedColumnType, RowId};

Expand Down
1 change: 0 additions & 1 deletion columnar/src/columnar/writer/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ impl<'a, W: io::Write> io::Write for ColumnSerializer<'a, W> {
#[cfg(test)]
mod tests {
use super::*;
use crate::columnar::column_type::ColumnType;

#[test]
fn test_prepare_key_bytes() {
Expand Down
1 change: 0 additions & 1 deletion common/src/bitset.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::convert::TryInto;
use std::io::Write;
use std::{fmt, io, u64};

Expand Down
1 change: 0 additions & 1 deletion ownedbytes/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::convert::TryInto;
use std::ops::{Deref, Range};
use std::sync::Arc;
use std::{fmt, io};
Expand Down
3 changes: 0 additions & 3 deletions src/aggregation/bucket/histogram/histogram.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::cmp::Ordering;
use std::fmt::Display;

use columnar::ColumnType;
use itertools::Itertools;
Expand Down Expand Up @@ -600,13 +599,11 @@ mod tests {
use serde_json::Value;

use super::*;
use crate::aggregation::agg_req::Aggregations;
use crate::aggregation::agg_result::AggregationResults;
use crate::aggregation::tests::{
exec_request, exec_request_with_query, exec_request_with_query_and_memory_limit,
get_test_index_2_segments, get_test_index_from_values, get_test_index_with_num_docs,
};
use crate::aggregation::AggregationCollector;
use crate::query::AllQuery;

#[test]
Expand Down
1 change: 0 additions & 1 deletion src/core/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ mod mmap_specific {
use tempfile::TempDir;

use super::*;
use crate::Directory;

#[test]
fn test_index_on_commit_reload_policy_mmap() -> crate::Result<()> {
Expand Down
3 changes: 1 addition & 2 deletions src/indexer/doc_id_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ mod tests_indexsorting {
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::indexer::NoMergePolicy;
use crate::query::QueryParser;
use crate::schema::document::Value;
use crate::schema::{Schema, *};
use crate::schema::*;
use crate::{DocAddress, Index, IndexSettings, IndexSortByField, Order};

fn create_test_index(
Expand Down
1 change: 0 additions & 1 deletion src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,6 @@ mod tests {
use columnar::{Cardinality, Column, MonotonicallyMappableToU128};
use itertools::Itertools;
use proptest::prop_oneof;
use proptest::strategy::Strategy;

use super::super::operation::UserOperation;
use crate::collector::TopDocs;
Expand Down
4 changes: 2 additions & 2 deletions src/indexer/log_merge_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ mod tests {
use once_cell::sync::Lazy;

use super::*;
use crate::index::{SegmentId, SegmentMeta, SegmentMetaInventory};
use crate::index::SegmentMetaInventory;
use crate::indexer::merge_policy::MergePolicy;
use crate::schema;
use crate::schema::INDEXED;
use crate::{schema, SegmentId};

static INVENTORY: Lazy<SegmentMetaInventory> = Lazy::new(SegmentMetaInventory::default);

Expand Down
2 changes: 1 addition & 1 deletion src/indexer/segment_register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl SegmentRegister {
#[cfg(test)]
mod tests {
use super::*;
use crate::index::{SegmentId, SegmentMetaInventory};
use crate::index::SegmentMetaInventory;
use crate::indexer::delete_queue::*;

fn segment_ids(segment_register: &SegmentRegister) -> Vec<SegmentId> {
Expand Down
Loading

0 comments on commit 7ce950f

Please sign in to comment.