Skip to content

Commit

Permalink
CR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Jan 6, 2023
1 parent 40cda01 commit 889fa50
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 47 deletions.
2 changes: 1 addition & 1 deletion columnar/src/column_type_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::InvalidData;
#[repr(u8)]
pub enum Cardinality {
/// All documents contain exactly one value.
/// Required is the default for auto-detecting the Cardinality, since it is the most strict.
/// Required is the default for auto-detecting the Cardinality, since it is the most strict.
#[default]
Required = 0,
/// All documents contain at most one value.
Expand Down
4 changes: 2 additions & 2 deletions columnar/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ impl ColumnarReader {
// Each column is a associated to a given `column_key`,
// that starts by `column_name\0column_header`.
//
// Listing the columns associated to the given column name is therefore equivalent to listing
// `column_key` with the prefix `column_name\0`.
// Listing the columns associated to the given column name is therefore equivalent to
// listing `column_key` with the prefix `column_name\0`.
//
// This is in turn equivalent to searching for the range
// `[column_name,\0`..column_name\1)`.
Expand Down
97 changes: 63 additions & 34 deletions columnar/src/writer/column_operation.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,80 @@
use crate::dictionary::UnorderedId;
use crate::utils::{place_bits, pop_first_byte, select_bits};
use crate::value::NumericalValue;
use crate::{DocId, NumericalType};
use crate::{DocId, InvalidData, NumericalType};

/// When we build a columnar dataframe, we first just group
/// all mutations per column, and append them in append-only object.
/// all mutations per column, and appends them in append-only buffer
/// in the stacker.
///
/// These ColumnOperation<T> are therefore serialize/deserialized
/// in memory.
///
/// We represents all of these operations as `ColumnOperation`.
#[derive(Eq, PartialEq, Debug, Clone, Copy)]
pub(crate) enum ColumnOperation<T> {
pub(super) enum ColumnOperation<T> {
NewDoc(DocId),
Value(T),
}

#[derive(Copy, Clone, Debug, Eq, PartialEq)]
struct ColumnOperationHeader {
typ_code: u8,
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
struct ColumnOperationMetadata {
op_type: ColumnOperationType,
len: u8,
}

impl ColumnOperationHeader {
impl ColumnOperationMetadata {
fn to_code(self) -> u8 {
place_bits::<0, 4>(self.len) | place_bits::<4, 8>(self.typ_code)
place_bits::<0, 4>(self.len) | place_bits::<4, 8>(self.op_type.to_code())
}

fn from_code(code: u8) -> Self {
fn try_from_code(code: u8) -> Result<Self, InvalidData> {
let len = select_bits::<0, 4>(code);
let typ_code = select_bits::<4, 8>(code);
ColumnOperationHeader { typ_code, len }
let column_type = ColumnOperationType::try_from_code(typ_code)?;
Ok(ColumnOperationMetadata {
op_type: column_type,
len,
})
}
}

const NEW_DOC_CODE: u8 = 0u8;
const NEW_VALUE_CODE: u8 = 1u8;
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
#[repr(u8)]
enum ColumnOperationType {
NewDoc = 0u8,
AddValue = 1u8,
}

impl ColumnOperationType {
pub fn to_code(self) -> u8 {
self as u8
}

pub fn try_from_code(code: u8) -> Result<Self, InvalidData> {
match code {
0 => Ok(Self::NewDoc),
1 => Ok(Self::AddValue),
_ => Err(InvalidData),
}
}
}

impl<V: SymbolValue> ColumnOperation<V> {
pub fn serialize(self) -> impl AsRef<[u8]> {
pub(super) fn serialize(self) -> impl AsRef<[u8]> {
let mut minibuf = MiniBuffer::default();
let header = match self {
ColumnOperation::NewDoc(new_doc) => {
let symbol_len = new_doc.serialize(&mut minibuf.bytes[1..]);
ColumnOperationHeader {
typ_code: NEW_DOC_CODE,
ColumnOperationMetadata {
op_type: ColumnOperationType::NewDoc,
len: symbol_len,
}
}
ColumnOperation::Value(val) => {
let symbol_len = val.serialize(&mut minibuf.bytes[1..]);
ColumnOperationHeader {
typ_code: NEW_VALUE_CODE,
ColumnOperationMetadata {
op_type: ColumnOperationType::AddValue,
len: symbol_len,
}
}
Expand All @@ -62,24 +88,23 @@ impl<V: SymbolValue> ColumnOperation<V> {
/// Deserialize a colummn operation.
/// Returns None if the buffer is empty.
///
/// Panics if the payload is invalid.
pub fn deserialize(bytes: &mut &[u8]) -> Option<Self> {
/// Panics if the payload is invalid:
/// this deserialize method is meant to target in memory.
pub(super) fn deserialize(bytes: &mut &[u8]) -> Option<Self> {
let header_byte = pop_first_byte(bytes)?;
let column_op_header = ColumnOperationHeader::from_code(header_byte);
let column_op_header =
ColumnOperationMetadata::try_from_code(header_byte).expect("Invalid header byte");
let symbol_bytes: &[u8];
(symbol_bytes, *bytes) = bytes.split_at(column_op_header.len as usize);
match column_op_header.typ_code {
NEW_DOC_CODE => {
match column_op_header.op_type {
ColumnOperationType::NewDoc => {
let new_doc = u32::deserialize(symbol_bytes);
Some(ColumnOperation::NewDoc(new_doc))
}
NEW_VALUE_CODE => {
ColumnOperationType::AddValue => {
let value = V::deserialize(symbol_bytes);
Some(ColumnOperation::Value(value))
}
_ => {
panic!("Unknown code {}", column_op_header.typ_code);
}
}
}
}
Expand All @@ -90,13 +115,17 @@ impl<T> From<T> for ColumnOperation<T> {
}
}

// Serialization trait very local to the writer.
// As we write fast fields, we accumulate them in "in memory".
// In order to limit memory usage, and in order
// to benefit from the stacker, we do this by serialization our data
// as "Symbols".
#[allow(clippy::from_over_into)]
pub(crate) trait SymbolValue: Clone + Copy {
pub(super) trait SymbolValue: Clone + Copy {
// Serializes the symbol into the given buffer.
// Returns the number of bytes written into the buffer.
fn serialize(self, buffer: &mut [u8]) -> u8;

//
// `bytes` does not contain the header byte.
// This method should advance bytes by the number of bytes that were consumed.
// Panics if invalid
fn deserialize(bytes: &[u8]) -> Self;
}

Expand Down Expand Up @@ -248,10 +277,10 @@ mod tests {
#[test]
fn test_header_byte_serialization() {
for len in 0..=15 {
for typ_code in 0..=15 {
let header = ColumnOperationHeader { typ_code, len };
for op_type in [ColumnOperationType::AddValue, ColumnOperationType::NewDoc] {
let header = ColumnOperationMetadata { op_type, len };
let header_code = header.to_code();
let serdeser_header = ColumnOperationHeader::from_code(header_code);
let serdeser_header = ColumnOperationMetadata::try_from_code(header_code).unwrap();
assert_eq!(header, serdeser_header);
}
}
Expand Down
8 changes: 4 additions & 4 deletions columnar/src/writer/column_writers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct ColumnWriter {
impl ColumnWriter {
/// Returns an iterator over the Symbol that have been recorded
/// for the given column.
pub(crate) fn operation_iterator<'a, V: SymbolValue>(
pub(super) fn operation_iterator<'a, V: SymbolValue>(
&self,
arena: &MemoryArena,
buffer: &'a mut Vec<u8>,
Expand All @@ -53,7 +53,7 @@ impl ColumnWriter {
///
/// This function will also update the cardinality of the column
/// if necessary.
pub(crate) fn record<S: SymbolValue>(&mut self, doc: DocId, value: S, arena: &mut MemoryArena) {
pub(super) fn record<S: SymbolValue>(&mut self, doc: DocId, value: S, arena: &mut MemoryArena) {
// Difference between `doc` and the last doc.
match delta_with_last_doc(self.last_doc_opt, doc) {
DocumentStep::SameDoc => {
Expand Down Expand Up @@ -166,7 +166,7 @@ impl NumericalColumnWriter {
self.column_writer.record(doc, value, arena);
}

pub fn operation_iterator<'a>(
pub(super) fn operation_iterator<'a>(
self,
arena: &MemoryArena,
buffer: &'a mut Vec<u8>,
Expand Down Expand Up @@ -200,7 +200,7 @@ impl StrColumnWriter {
self.column_writer.record(doc, unordered_id, arena);
}

pub(crate) fn operation_iterator<'a>(
pub(super) fn operation_iterator<'a>(
&self,
arena: &MemoryArena,
byte_buffer: &'a mut Vec<u8>,
Expand Down
3 changes: 2 additions & 1 deletion columnar/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use crate::writer::column_writers::{ColumnWriter, NumericalColumnWriter, StrColu
use crate::writer::value_index::{IndexBuilder, SpareIndexBuilders};
use crate::{Cardinality, DocId};

/// This is a set of buffers that are used to temporarily write the values into before passing them to the fast field codecs.
/// This is a set of buffers that are used to temporarily write the values into before passing them
/// to the fast field codecs.
#[derive(Default)]
struct SpareBuffers {
value_index_builders: SpareIndexBuilders,
Expand Down
10 changes: 5 additions & 5 deletions columnar/src/writer/value_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl IndexBuilder for OptionalIndexBuilder {
pub struct MultivaluedIndexBuilder {
// TODO should we switch to `start_offset`?
// contains the num values so far for each `DocId`.
end_offset: Vec<DocId>,
end_offsets: Vec<DocId>,
total_num_vals_seen: u32,
}

Expand Down Expand Up @@ -113,22 +113,22 @@ impl<'a> MultiValueIndexInfo for MultivaluedValueArrayIndex<'a> {

impl MultivaluedIndexBuilder {
pub fn finish(&mut self, num_docs: DocId) -> impl MultiValueIndexInfo + '_ {
self.end_values
self.end_offsets
.resize(num_docs as usize, self.total_num_vals_seen);
MultivaluedValueArrayIndex {
end_offsets: &self.end_values[..],
end_offsets: &self.end_offsets[..],
}
}

fn reset(&mut self) {
self.end_values.clear();
self.end_offsets.clear();
self.total_num_vals_seen = 0;
}
}

impl IndexBuilder for MultivaluedIndexBuilder {
fn record_doc(&mut self, doc: DocId) {
self.end_values
self.end_offsets
.resize(doc as usize, self.total_num_vals_seen);
}

Expand Down

0 comments on commit 889fa50

Please sign in to comment.