Skip to content

Commit

Permalink
Integrated fastfield codecs into columnar. (#1782)
Browse files Browse the repository at this point in the history
Introduced asymetric OptionalCodec / SerializableOptionalCodec
Removed cardinality from the columnar sstable.
Added DynamicColumn
Reorganized all files
Change DenseCodec serialization logic.
Renamed methods to rank/select
Moved versioning footer to the columnar level
  • Loading branch information
fulmicoton authored Jan 16, 2023
1 parent 4bac945 commit 25bad78
Show file tree
Hide file tree
Showing 45 changed files with 6,036 additions and 328 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ unstable = [] # useful for benches.
quickwit = ["sstable"]

[workspace]
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable", "columnar", "tokenizer-api"]
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable", "tokenizer-api"]

# Following the "fail" crate best practises, we isolate
# tests that define specific behavior in fail check points
Expand Down
16 changes: 15 additions & 1 deletion columnar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,22 @@ thiserror = "1"
fnv = "1"
sstable = { path = "../sstable", package = "tantivy-sstable" }
common = { path = "../common", package = "tantivy-common" }
fastfield_codecs = { path = "../fastfield_codecs"}
itertools = "0.10"
log = "0.4"
tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" }
prettytable-rs = {version="0.10.0", optional= true}
rand = {version="0.8.3", optional= true}
fastdivide = "0.4"
measure_time = { version="0.8.2", optional=true}

[dev-dependencies]
proptest = "1"
more-asserts = "0.3.0"
rand = "0.8.3"

# temporary
[workspace]
members = []

[features]
unstable = []
48 changes: 45 additions & 3 deletions columnar/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ This crate describes columnar format used in tantivy.

This format is special in the following way.
- it needs to be compact
- it does not required to be loaded in memory.
- it is designed to fit well with quickwit's strange constraint:
we need to be able to load columns rapidly.
- accessing a specific column does not require to load the entire columnar. It can be done in 2 to 3 random access.
- columns of several types can be associated with the same column name.
- it needs to support columns with different types `(str, u64, i64, f64)`
and different cardinality `(required, optional, multivalued)`.
- columns, once loaded, offer cheap random access.
- it is designed to allow range queries.

# Coercion rules

Expand Down Expand Up @@ -65,3 +64,46 @@ be done by listing all keys prefixed by

The associated range of bytes refer to a range of bytes

This crate exposes a columnar format for tantivy.
This format is described in README.md


The crate introduces the following concepts.

`Columnar` is an equivalent of a dataframe.
It maps `column_key` to `Column`.

A `Column<T>` asssociates a `RowId` (u32) to any
number of values.

This is made possible by wrapping a `ColumnIndex` and a `ColumnValue` object.
The `ColumnValue<T>` represents a mapping that associates each `RowId` to
exactly one single value.

The `ColumnIndex` then maps each RowId to a set of `RowId` in the
`ColumnValue`.

For optimization, and compression purposes, the `ColumnIndex` has three
possible representation, each for different cardinalities.

- Full

All RowId have exactly one value. The ColumnIndex is the trivial mapping.

- Optional

All RowIds can have at most one value. The ColumnIndex is the trivial mapping `ColumnRowId -> Option<ColumnValueRowId>`.

- Multivalued

All RowIds can have any number of values.
The column index is mapping values to a range.


All these objects are implemented an unit tested independently
in their own module:

- columnar
- column_index
- column_values
- column
45 changes: 45 additions & 0 deletions columnar/src/TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# zero to one
* merges
* full still needs a num_values
* replug u128
* add dictionary encoded stuff
* fix multivalued
* find a way to make columnar work with strict types
* plug to tantivy
- indexing
- aggregations
- merge

# Perf and Size
* re-add ZSTD compression for dictionaries
no systematic monotonic mapping
consider removing multilinear
f32?
adhoc solution for bool?

add metrics helper for aggregate. sum(row_id)
review inline absence/presence
improv perf of select using PDEP
compare with roaring bitmap/elias fano etc etc.
SIMD range? (see blog post)
Add alignment?
Consider another codec to bridge the gap between few and 5k elements

# Cleanup and rationalization
in benchmark, unify percent vs ratio, f32 vs f64.
investigate if should have better errors? io::Error is overused at the moment.
rename rank/select in unit tests
Review the public API via cargo doc
go through TODOs
remove all doc_id occurences -> row_id
use the rank & select naming in unit tests branch.
multi-linear -> blockwise
linear codec -> simply a multiplication for the index column

# Other
fix enhance column-cli

# Santa claus

autodetect datetime ipaddr, plug customizable tokenizer.

40 changes: 40 additions & 0 deletions columnar/src/column/dictionary_encoded.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use std::io;
use std::ops::Deref;
use std::sync::Arc;

use sstable::{Dictionary, VoidSSTable};

use crate::column::Column;
use crate::column_index::ColumnIndex;

/// Dictionary encoded column.
#[derive(Clone)]
pub struct BytesColumn {
pub(crate) dictionary: Arc<Dictionary<VoidSSTable>>,
pub(crate) term_ord_column: Column<u64>,
}

impl BytesColumn {
/// Returns `false` if the term does not exist (e.g. `term_ord` is greater or equal to the
/// overll number of terms).
pub fn term_ord_to_str(&self, term_ord: u64, output: &mut Vec<u8>) -> io::Result<bool> {
self.dictionary.ord_to_term(term_ord, output)
}

pub fn term_ords(&self) -> &Column<u64> {
&self.term_ord_column
}
}

impl Deref for BytesColumn {
type Target = ColumnIndex<'static>;

fn deref(&self) -> &Self::Target {
&**self.term_ords()
}
}

#[cfg(test)]
mod tests {
use crate::{ColumnarReader, ColumnarWriter};
}
56 changes: 56 additions & 0 deletions columnar/src/column/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
mod dictionary_encoded;
mod serialize;

use std::ops::Deref;
use std::sync::Arc;

use common::BinarySerializable;
pub use dictionary_encoded::BytesColumn;
pub use serialize::{open_column_bytes, open_column_u64, serialize_column_u64};

use crate::column_index::ColumnIndex;
use crate::column_values::ColumnValues;
use crate::{Cardinality, RowId};

#[derive(Clone)]
pub struct Column<T> {
pub idx: ColumnIndex<'static>,
pub values: Arc<dyn ColumnValues<T>>,
}

use crate::column_index::Set;

impl<T: PartialOrd> Column<T> {
pub fn first(&self, row_id: RowId) -> Option<T> {
match &self.idx {
ColumnIndex::Full => Some(self.values.get_val(row_id)),
ColumnIndex::Optional(opt_idx) => {
let value_row_idx = opt_idx.rank_if_exists(row_id)?;
Some(self.values.get_val(value_row_idx))
}
ColumnIndex::Multivalued(_multivalued_index) => {
todo!();
}
}
}
}

impl<T> Deref for Column<T> {
type Target = ColumnIndex<'static>;

fn deref(&self) -> &Self::Target {
&self.idx
}
}

impl BinarySerializable for Cardinality {
fn serialize<W: std::io::Write>(&self, writer: &mut W) -> std::io::Result<()> {
self.to_code().serialize(writer)
}

fn deserialize<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
let cardinality_code = u8::deserialize(reader)?;
let cardinality = Cardinality::try_from_code(cardinality_code)?;
Ok(cardinality)
}
}
54 changes: 54 additions & 0 deletions columnar/src/column/serialize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::io;
use std::io::Write;
use std::sync::Arc;

use common::{CountingWriter, OwnedBytes};
use sstable::Dictionary;

use crate::column::{BytesColumn, Column};
use crate::column_index::{serialize_column_index, SerializableColumnIndex};
use crate::column_values::{
serialize_column_values, ColumnValues, MonotonicallyMappableToU64, ALL_CODEC_TYPES,
};
pub fn serialize_column_u64<T: MonotonicallyMappableToU64>(
column_index: SerializableColumnIndex<'_>,
column_values: &impl ColumnValues<T>,
output: &mut impl Write,
) -> io::Result<()> {
let mut counting_writer = CountingWriter::wrap(output);
serialize_column_index(column_index, &mut counting_writer)?;
let column_index_num_bytes = counting_writer.written_bytes() as u32;
let output = counting_writer.finish();
serialize_column_values(column_values, &ALL_CODEC_TYPES[..], output)?;
output.write_all(&column_index_num_bytes.to_le_bytes())?;
Ok(())
}

pub fn open_column_u64<T: MonotonicallyMappableToU64>(bytes: OwnedBytes) -> io::Result<Column<T>> {
let (body, column_index_num_bytes_payload) = bytes.rsplit(4);
let column_index_num_bytes = u32::from_le_bytes(
column_index_num_bytes_payload
.as_slice()
.try_into()
.unwrap(),
);
let (column_index_data, column_values_data) = body.split(column_index_num_bytes as usize);
let column_index = crate::column_index::open_column_index(column_index_data)?;
let column_values = crate::column_values::open_u64_mapped(column_values_data)?;
Ok(Column {
idx: column_index,
values: column_values,
})
}

pub fn open_column_bytes(data: OwnedBytes) -> io::Result<BytesColumn> {
let (body, dictionary_len_bytes) = data.rsplit(4);
let dictionary_len = u32::from_le_bytes(dictionary_len_bytes.as_slice().try_into().unwrap());
let (dictionary_bytes, column_bytes) = body.split(dictionary_len as usize);
let dictionary = Arc::new(Dictionary::from_bytes(dictionary_bytes)?);
let term_ord_column = crate::column::open_column_u64::<u64>(column_bytes)?;
Ok(BytesColumn {
dictionary,
term_ord_column,
})
}
40 changes: 40 additions & 0 deletions columnar/src/column_index/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
mod multivalued_index;
mod optional_index;
mod serialize;

use std::sync::Arc;

pub use optional_index::{OptionalIndex, SerializableOptionalIndex, Set};
pub use serialize::{open_column_index, serialize_column_index, SerializableColumnIndex};

use crate::column_values::ColumnValues;
use crate::{Cardinality, RowId};

#[derive(Clone)]
pub enum ColumnIndex<'a> {
Full,
Optional(OptionalIndex),
// TODO remove the Arc<dyn> apart from serialization this is not
// dynamic at all.
Multivalued(Arc<dyn ColumnValues<RowId> + 'a>),
}

impl<'a> ColumnIndex<'a> {
pub fn get_cardinality(&self) -> Cardinality {
match self {
ColumnIndex::Full => Cardinality::Full,
ColumnIndex::Optional(_) => Cardinality::Optional,
ColumnIndex::Multivalued(_) => Cardinality::Multivalued,
}
}

pub fn num_rows(&self) -> RowId {
match self {
ColumnIndex::Full => {
todo!()
}
ColumnIndex::Optional(optional_index) => optional_index.num_rows(),
ColumnIndex::Multivalued(multivalued_index) => multivalued_index.num_vals() - 1,
}
}
}
27 changes: 27 additions & 0 deletions columnar/src/column_index/multivalued_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use std::io;
use std::io::Write;
use std::sync::Arc;

use common::OwnedBytes;

use crate::column_values::{ColumnValues, FastFieldCodecType};
use crate::RowId;

#[derive(Clone)]
pub struct MultivaluedIndex(Arc<dyn ColumnValues<RowId>>);

pub fn serialize_multivalued_index(
multivalued_index: MultivaluedIndex,
output: &mut impl Write,
) -> io::Result<()> {
crate::column_values::serialize_column_values(
&*multivalued_index.0,
&[FastFieldCodecType::Bitpacked, FastFieldCodecType::Linear],
output,
)?;
Ok(())
}

pub fn open_multivalued_index(bytes: OwnedBytes) -> io::Result<Arc<dyn ColumnValues<RowId>>> {
todo!();
}
Loading

0 comments on commit 25bad78

Please sign in to comment.