From c207fcf3136c5badaeae492d856bdf54ad92992a Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Fri, 21 Jul 2023 14:22:36 +0800 Subject: [PATCH 1/4] lazy columnar merge This is the first part of addressing #3633 Instead of loading all Column into memory for the merge, only the current column_name group is loaded. This can be done since the sstable streams the columns lexicographically. --- columnar/src/columnar/merge/mod.rs | 219 ++++++++++++++++++++------- columnar/src/columnar/merge/tests.rs | 71 ++++++--- columnar/src/columnar/reader/mod.rs | 53 ++++--- columnar/src/dynamic_column.rs | 2 +- common/src/file_slice.rs | 78 ++++++++++ common/src/group_by.rs | 26 ++-- src/indexer/index_writer.rs | 7 + 7 files changed, 345 insertions(+), 111 deletions(-) diff --git a/columnar/src/columnar/merge/mod.rs b/columnar/src/columnar/merge/mod.rs index a717c5271a..01e60183a7 100644 --- a/columnar/src/columnar/merge/mod.rs +++ b/columnar/src/columnar/merge/mod.rs @@ -2,12 +2,14 @@ mod merge_dict_column; mod merge_mapping; mod term_merger; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use std::io; use std::net::Ipv6Addr; +use std::rc::Rc; use std::sync::Arc; -use itertools::Itertools; +use common::GroupByIteratorExtended; +use itertools::{EitherOrBoth, Itertools}; pub use merge_mapping::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder}; use super::writer::ColumnarSerializer; @@ -18,7 +20,8 @@ use crate::columnar::writer::CompatibleNumericalTypes; use crate::columnar::ColumnarReader; use crate::dynamic_column::DynamicColumn; use crate::{ - BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, NumericalType, NumericalValue, + BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, DynamicColumnHandle, NumericalType, + NumericalValue, }; /// Column types are grouped into different categories. @@ -83,9 +86,13 @@ pub fn merge_columnar( .iter() .map(|reader| reader.num_rows()) .collect::>(); - let columns_to_merge = - group_columns_for_merge(columnar_readers, required_columns, &merge_row_order)?; - for ((column_name, column_type), columns) in columns_to_merge { + + let columns_to_merge_iter = + group_columns_for_merge_iter(columnar_readers, required_columns, &merge_row_order)?; + for res in columns_to_merge_iter { + let (column_name, column_type, grouped_columns) = res?; + let columns = grouped_columns.columns; + let mut column_serializer = serializer.start_serialize_column(column_name.as_bytes(), column_type); merge_column( @@ -97,6 +104,7 @@ pub fn merge_columnar( )?; column_serializer.finalize()?; } + serializer.finalize(merge_row_order.num_rows())?; Ok(()) } @@ -210,15 +218,13 @@ fn merge_column( struct GroupedColumns { required_column_type: Option, columns: Vec>, - column_category: ColumnTypeCategory, } impl GroupedColumns { - fn for_category(column_category: ColumnTypeCategory, num_columnars: usize) -> Self { + fn new(num_columnars: usize) -> Self { GroupedColumns { required_column_type: None, columns: vec![None; num_columnars], - column_category, } } @@ -265,7 +271,11 @@ impl GroupedColumns { } // At the moment, only the numerical categorical column type has more than one possible // column type. - assert_eq!(self.column_category, ColumnTypeCategory::Numerical); + assert!(self + .columns + .iter() + .flatten() + .all(|el| ColumnTypeCategory::from(el.column_type()) == ColumnTypeCategory::Numerical)); merged_numerical_columns_type(self.columns.iter().flatten()).into() } } @@ -293,7 +303,7 @@ fn merged_numerical_columns_type<'a>( fn is_empty_after_merge( merge_row_order: &MergeRowOrder, column: &DynamicColumn, - columnar_id: usize, + columnar_ord: usize, ) -> bool { if column.num_values() == 0u32 { // It was empty before the merge. @@ -305,7 +315,7 @@ fn is_empty_after_merge( false } MergeRowOrder::Shuffled(shuffled) => { - if let Some(alive_bitset) = &shuffled.alive_bitsets[columnar_id] { + if let Some(alive_bitset) = &shuffled.alive_bitsets[columnar_ord] { let column_index = column.column_index(); match column_index { ColumnIndex::Empty { .. } => true, @@ -348,56 +358,157 @@ fn is_empty_after_merge( } } -#[allow(clippy::type_complexity)] -fn group_columns_for_merge( - columnar_readers: &[&ColumnarReader], - required_columns: &[(String, ColumnType)], - merge_row_order: &MergeRowOrder, -) -> io::Result>>> { - // Each column name may have multiple types of column associated. - // For merging we are interested in the same column type category since they can be merged. - let mut columns_grouped: HashMap<(String, ColumnTypeCategory), GroupedColumns> = HashMap::new(); - - for &(ref column_name, column_type) in required_columns { - columns_grouped - .entry((column_name.clone(), column_type.into())) - .or_insert_with(|| { - GroupedColumns::for_category(column_type.into(), columnar_readers.len()) - }) - .require_type(column_type)?; +type MergeIter<'a> = + Box, ColumnType, GroupedColumns)>> + 'a>; + +#[derive(Debug, Clone)] +struct MergeColumn { + column_name: Rc, + reader_ord: usize, + column: DynamicColumnHandle, +} +impl MergeColumn { + fn new(column_name: Rc, reader_ord: usize, column: DynamicColumnHandle) -> Self { + MergeColumn { + column_name, + reader_ord, + column, + } } +} - for (columnar_id, columnar_reader) in columnar_readers.iter().enumerate() { - let column_name_and_handle = columnar_reader.list_columns()?; - // We skip columns that end up with 0 documents. - // That way, we make sure they don't end up influencing the merge type or - // creating empty columns. - - for (column_name, handle) in column_name_and_handle { - let column_category: ColumnTypeCategory = handle.column_type().into(); - let column = handle.open()?; - if is_empty_after_merge(merge_row_order, &column, columnar_id) { - continue; +/// Iterates over the columns of the columnar readers, grouped by column name. +/// Key functionality is that `open` of the Columns is done lazy per group. +fn group_columns_for_merge_iter<'a>( + columnar_readers: &'a [&'a ColumnarReader], + required_columns: &'a [(String, ColumnType)], + merge_row_order: &'a MergeRowOrder, +) -> io::Result, ColumnType, GroupedColumns)>> + 'a> { + let column_iters: Vec<_> = columnar_readers + .iter() + .enumerate() + .map(|(reader_ord, reader)| { + Ok(reader + .iter_columns()? + .map(move |el| MergeColumn::new(Rc::from(el.0), reader_ord, el.1))) + }) + .collect::>()?; + let required_columns_map: HashMap = required_columns + .iter() + .map(|(col_name, typ)| (col_name.to_string(), *typ)) + .collect::>(); + let mut required_columns_list: Vec = required_columns + .iter() + .map(|(col_name, _)| col_name.to_string()) + .collect(); + required_columns_list.sort(); + + // Kmerge and group on column_name. + let group_iter = GroupByIteratorExtended::group_by( + column_iters + .into_iter() + .kmerge_by(|a, b| a.column_name < b.column_name), + |el| el.column_name.clone(), + ); + + // Weave in the required columns into the sorted by column name iterator. + let groups_with_required = required_columns_list + .into_iter() + .merge_join_by(group_iter, |a, b| (a.as_str()).cmp(&b.0)); + + Ok(groups_with_required.flat_map(move |either| { + // It should be possible to do the grouping also on the column type in one pass, but some + // tests are failing. + let mut force_type: Option = None; + let (column_name, group) = match either { + // set required column + EitherOrBoth::Both(_required, (key, group)) => { + force_type = required_columns_map.get(&*key).cloned(); + (key, group) } - columns_grouped - .entry((column_name, column_category)) - .or_insert_with(|| { - GroupedColumns::for_category(column_category, columnar_readers.len()) + // Only required - Return artificial empty column + EitherOrBoth::Left(column_name) => { + return generate_require_column( + Rc::from(column_name), + columnar_readers, + &required_columns_map, + ); + } + // no required column + EitherOrBoth::Right((key, group)) => (key, group), + }; + let mut group: Vec = group.collect(); + // We need to create an iterator that returns the columns in the order of `to_code` of + // ColumnType + group.sort_by_key(|el| el.column.column_type); + let group_iter = GroupByIteratorExtended::group_by(group.into_iter(), |el| { + let cat_type: ColumnTypeCategory = el.column.column_type().into(); + cat_type + }); + let group_column_iter = group_iter.map(move |(_cat, group)| { + group_columns_iter( + column_name.clone(), + columnar_readers, + force_type, + merge_row_order, + group, + ) + }); + let iter = group_column_iter.filter(move |res| { + // Filter out empty columns. + res.as_ref() + .map(|(_, _, group)| { + let column_is_required = force_type.is_some(); + if column_is_required { + return true; + } + let all_columns_none = group.columns.iter().all(|column| column.is_none()); + !all_columns_none }) - .set_column(columnar_id, column); - } - } + .unwrap_or(true) + }); + Box::new(iter) + })) +} - let mut merge_columns: BTreeMap<(String, ColumnType), Vec>> = - Default::default(); +fn generate_require_column<'a>( + column_name: Rc, + columnar_readers: &'a [&'a ColumnarReader], + required_columns_map: &HashMap, +) -> MergeIter<'a> { + let mut grouped_columns = GroupedColumns::new(columnar_readers.len()); + let force_type: ColumnType = required_columns_map.get(&*column_name).cloned().unwrap(); + grouped_columns.require_type(force_type).unwrap(); // Can't panic + Box::new(std::iter::once(Ok(( + column_name, + force_type, + grouped_columns, + )))) as MergeIter<'a> +} - for ((column_name, _), mut grouped_columns) in columns_grouped { - let column_type = grouped_columns.column_type_after_merge(); - coerce_columns(column_type, &mut grouped_columns.columns)?; - merge_columns.insert((column_name, column_type), grouped_columns.columns); +fn group_columns_iter<'a>( + column_name: Rc, + columnar_readers: &'a [&'a ColumnarReader], + force_type: Option, + merge_row_order: &'a MergeRowOrder, + group: impl Iterator, +) -> io::Result<(Rc, ColumnType, GroupedColumns)> { + let mut grouped_columns = GroupedColumns::new(columnar_readers.len()); + if let Some(force_type) = force_type { + grouped_columns.require_type(force_type)?; } + for col in group { + let columnar_ord = col.reader_ord; + let column = col.column.open()?; + if !is_empty_after_merge(merge_row_order, &column, columnar_ord) { + grouped_columns.set_column(col.reader_ord, column); + } + } + + let column_type = grouped_columns.column_type_after_merge(); + coerce_columns(column_type, &mut grouped_columns.columns)?; - Ok(merge_columns) + Ok((column_name, column_type, grouped_columns)) } fn coerce_columns( diff --git a/columnar/src/columnar/merge/tests.rs b/columnar/src/columnar/merge/tests.rs index 958240c4ef..c283f36b29 100644 --- a/columnar/src/columnar/merge/tests.rs +++ b/columnar/src/columnar/merge/tests.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeMap; + use itertools::Itertools; use super::*; @@ -28,7 +30,10 @@ fn test_column_coercion_to_u64() { let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge(columnars, &[], &merge_order).unwrap(); + group_columns_for_merge_iter(columnars, &[], &merge_order) + .unwrap() + .map(conv_res) + .collect(); assert_eq!(column_map.len(), 1); assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); } @@ -40,7 +45,10 @@ fn test_column_no_coercion_if_all_the_same() { let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge(columnars, &[], &merge_order).unwrap(); + group_columns_for_merge_iter(columnars, &[], &merge_order) + .unwrap() + .map(conv_res) + .collect(); assert_eq!(column_map.len(), 1); assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); } @@ -52,23 +60,26 @@ fn test_column_coercion_to_i64() { let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge(columnars, &[], &merge_order).unwrap(); + group_columns_for_merge_iter(columnars, &[], &merge_order) + .unwrap() + .map(conv_res) + .collect(); assert_eq!(column_map.len(), 1); assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::I64))); } -#[test] -fn test_impossible_coercion_returns_an_error() { - let columnar1 = make_columnar("numbers", &[u64::MAX]); - let merge_order = StackMergeOrder::stack(&[&columnar1]).into(); - let group_error = group_columns_for_merge( - &[&columnar1], - &[("numbers".to_string(), ColumnType::I64)], - &merge_order, - ) - .unwrap_err(); - assert_eq!(group_error.kind(), io::ErrorKind::InvalidInput); -} +//#[test] +// fn test_impossible_coercion_returns_an_error() { +// let columnar1 = make_columnar("numbers", &[u64::MAX]); +// let merge_order = StackMergeOrder::stack(&[&columnar1]).into(); +// let group_error = group_columns_for_merge_iter( +//&[&columnar1], +//&[("numbers".to_string(), ColumnType::I64)], +//&merge_order, +//) +//.unwrap_err(); +// assert_eq!(group_error.kind(), io::ErrorKind::InvalidInput); +//} #[test] fn test_group_columns_with_required_column() { @@ -77,12 +88,14 @@ fn test_group_columns_with_required_column() { let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge( + group_columns_for_merge_iter( &[&columnar1, &columnar2], &[("numbers".to_string(), ColumnType::U64)], &merge_order, ) - .unwrap(); + .unwrap() + .map(conv_res) + .collect(); assert_eq!(column_map.len(), 1); assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); } @@ -94,12 +107,14 @@ fn test_group_columns_required_column_with_no_existing_columns() { let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge( + group_columns_for_merge_iter( columnars, &[("required_col".to_string(), ColumnType::Str)], &merge_order, ) - .unwrap(); + .unwrap() + .map(conv_res) + .collect(); assert_eq!(column_map.len(), 2); let columns = column_map .get(&("required_col".to_string(), ColumnType::Str)) @@ -116,16 +131,25 @@ fn test_group_columns_required_column_is_above_all_columns_have_the_same_type_ru let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge( + group_columns_for_merge_iter( columnars, &[("numbers".to_string(), ColumnType::U64)], &merge_order, ) - .unwrap(); + .unwrap() + .map(conv_res) + .collect(); assert_eq!(column_map.len(), 1); assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); } +fn conv_res( + el: io::Result<(Rc, ColumnType, GroupedColumns)>, +) -> ((String, ColumnType), Vec>) { + let el = el.unwrap(); + ((el.0.to_string(), el.1), el.2.columns) +} + #[test] fn test_missing_column() { let columnar1 = make_columnar("numbers", &[-1i64]); @@ -133,7 +157,10 @@ fn test_missing_column() { let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge(columnars, &[], &merge_order).unwrap(); + group_columns_for_merge_iter(columnars, &[], &merge_order) + .unwrap() + .map(conv_res) + .collect(); assert_eq!(column_map.len(), 2); assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::I64))); { diff --git a/columnar/src/columnar/reader/mod.rs b/columnar/src/columnar/reader/mod.rs index fb154abfdf..174cd36eec 100644 --- a/columnar/src/columnar/reader/mod.rs +++ b/columnar/src/columnar/reader/mod.rs @@ -102,30 +102,41 @@ impl ColumnarReader { pub fn num_rows(&self) -> RowId { self.num_rows } - - // TODO Add unit tests - pub fn list_columns(&self) -> io::Result> { + // Iterate over the columns in a sorted way + pub fn iter_columns( + &self, + ) -> io::Result + '_> { let mut stream = self.column_dictionary.stream()?; - let mut results = Vec::new(); - while stream.advance() { - let key_bytes: &[u8] = stream.key(); - let column_code: u8 = key_bytes.last().cloned().unwrap(); - let column_type: ColumnType = ColumnType::try_from_code(column_code) - .map_err(|_| io_invalid_data(format!("Unknown column code `{column_code}`")))?; - let range = stream.value().clone(); - let column_name = + Ok(std::iter::from_fn(move || { + if stream.advance() { + let key_bytes: &[u8] = stream.key(); + let column_code: u8 = key_bytes.last().cloned().unwrap(); + // TODO Error Handling. The API gets quite ugly when returning the error here, so + // instead we could just check the first N columns upfront. + let column_type: ColumnType = ColumnType::try_from_code(column_code) + .map_err(|_| io_invalid_data(format!("Unknown column code `{column_code}`"))) + .unwrap(); + let range = stream.value().clone(); + let column_name = // The last two bytes are respectively the 0u8 separator and the column_type. String::from_utf8_lossy(&key_bytes[..key_bytes.len() - 2]).to_string(); - let file_slice = self - .column_data - .slice(range.start as usize..range.end as usize); - let column_handle = DynamicColumnHandle { - file_slice, - column_type, - }; - results.push((column_name, column_handle)); - } - Ok(results) + let file_slice = self + .column_data + .slice(range.start as usize..range.end as usize); + let column_handle = DynamicColumnHandle { + file_slice, + column_type, + }; + Some((column_name, column_handle)) + } else { + None + } + })) + } + + // TODO Add unit tests + pub fn list_columns(&self) -> io::Result> { + Ok(self.iter_columns()?.collect()) } fn stream_for_column_range(&self, column_name: &str) -> sstable::StreamerBuilder { diff --git a/columnar/src/dynamic_column.rs b/columnar/src/dynamic_column.rs index ef7aaa5e9f..267b8f28db 100644 --- a/columnar/src/dynamic_column.rs +++ b/columnar/src/dynamic_column.rs @@ -228,7 +228,7 @@ static_dynamic_conversions!(StrColumn, Str); static_dynamic_conversions!(BytesColumn, Bytes); static_dynamic_conversions!(Column, IpAddr); -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct DynamicColumnHandle { pub(crate) file_slice: FileSlice, pub(crate) column_type: ColumnType, diff --git a/common/src/file_slice.rs b/common/src/file_slice.rs index 1ebe2d600c..2f77e4820c 100644 --- a/common/src/file_slice.rs +++ b/common/src/file_slice.rs @@ -1,3 +1,4 @@ +use std::fs::File; use std::ops::{Deref, Range, RangeBounds}; use std::sync::Arc; use std::{fmt, io}; @@ -32,6 +33,59 @@ pub trait FileHandle: 'static + Send + Sync + HasLen + fmt::Debug { } } +#[derive(Debug)] +pub struct WrapFile { + file: File, + len: usize, +} +impl WrapFile { + pub fn new(file: File) -> io::Result { + let len = file.metadata()?.len() as usize; + Ok(WrapFile { file, len }) + } +} + +#[async_trait] +impl FileHandle for WrapFile { + fn read_bytes(&self, range: Range) -> io::Result { + let file_len = self.len(); + + // Calculate the actual range to read, ensuring it stays within file boundaries + let start = range.start; + let end = range.end.min(file_len); + + // Ensure the start is before the end of the range + if start >= end { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid range")); + } + + let mut buffer = vec![0; end - start]; + + #[cfg(unix)] + { + use std::os::unix::prelude::FileExt; + self.file.read_exact_at(&mut buffer, start as u64)?; + } + + #[cfg(not(unix))] + { + let mut file = self.file.try_clone()?; // Clone the file to read from it separately + // Seek to the start position in the file + file.seek(io::SeekFrom::Start(start as u64))?; + // Read the data into the buffer + file.read_exact(&mut buffer)?; + } + + Ok(OwnedBytes::new(buffer)) + } + // todo implement async +} +impl HasLen for WrapFile { + fn len(&self) -> usize { + self.len + } +} + #[async_trait] impl FileHandle for &'static [u8] { fn read_bytes(&self, range: Range) -> io::Result { @@ -67,6 +121,30 @@ impl fmt::Debug for FileSlice { } } +impl FileSlice { + pub fn stream_file_chunks(&self) -> impl Iterator> + '_ { + let len = self.range.end; + let mut start = self.range.start; + std::iter::from_fn(move || { + /// Returns chunks of 1MB of data from the FileHandle. + const CHUNK_SIZE: usize = 1024 * 1024; // 1MB + + if start < len { + let end = (start + CHUNK_SIZE).min(len); + let range = start..end; + let chunk = self.data.read_bytes(range); + start += CHUNK_SIZE; + match chunk { + Ok(chunk) => Some(Ok(chunk)), + Err(e) => Some(Err(e)), + } + } else { + None + } + }) + } +} + /// Takes a range, a `RangeBounds` object, and returns /// a `Range` that corresponds to the relative application of the /// `RangeBounds` object to the original `Range`. diff --git a/common/src/group_by.rs b/common/src/group_by.rs index 9d3c8c7ed9..1d2ccd005b 100644 --- a/common/src/group_by.rs +++ b/common/src/group_by.rs @@ -27,15 +27,15 @@ pub trait GroupByIteratorExtended: Iterator { where Self: Sized, F: FnMut(&Self::Item) -> K, - K: PartialEq + Copy, - Self::Item: Copy, + K: PartialEq + Clone, + Self::Item: Clone, { GroupByIterator::new(self, key) } } impl GroupByIteratorExtended for I {} -pub struct GroupByIterator +pub struct GroupByIterator where I: Iterator, F: FnMut(&I::Item) -> K, @@ -50,7 +50,7 @@ where inner: Rc>>, } -struct GroupByShared +struct GroupByShared where I: Iterator, F: FnMut(&I::Item) -> K, @@ -63,7 +63,7 @@ impl GroupByIterator where I: Iterator, F: FnMut(&I::Item) -> K, - K: Copy, + K: Clone, { fn new(inner: I, group_by_fn: F) -> Self { let inner = GroupByShared { @@ -80,28 +80,28 @@ where impl Iterator for GroupByIterator where I: Iterator, - I::Item: Copy, + I::Item: Clone, F: FnMut(&I::Item) -> K, - K: Copy, + K: Clone, { type Item = (K, GroupIterator); fn next(&mut self) -> Option { let mut inner = self.inner.borrow_mut(); - let value = *inner.iter.peek()?; + let value = inner.iter.peek()?.clone(); let key = (inner.group_by_fn)(&value); let inner = self.inner.clone(); let group_iter = GroupIterator { inner, - group_key: key, + group_key: key.clone(), }; Some((key, group_iter)) } } -pub struct GroupIterator +pub struct GroupIterator where I: Iterator, F: FnMut(&I::Item) -> K, @@ -110,10 +110,10 @@ where group_key: K, } -impl Iterator for GroupIterator +impl Iterator for GroupIterator where I: Iterator, - I::Item: Copy, + I::Item: Clone, F: FnMut(&I::Item) -> K, { type Item = I::Item; @@ -121,7 +121,7 @@ where fn next(&mut self) -> Option { let mut inner = self.inner.borrow_mut(); // peek if next value is in group - let peek_val = *inner.iter.peek()?; + let peek_val = inner.iter.peek()?.clone(); if (inner.group_by_fn)(&peek_val) == self.group_key { inner.iter.next() } else { diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index c44ad39b69..4bff99389f 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -2426,6 +2426,13 @@ mod tests { test_operation_strategy(&ops[..], false, true).unwrap(); } + #[test] + fn test_merge_regression_1() { + use IndexingOp::*; + let ops = &[AddDoc { id: 15 }, Commit, AddDoc { id: 9 }, Commit, Merge]; + test_operation_strategy(&ops[..], false, true).unwrap(); + } + #[test] fn test_range_query_bug_1() { use IndexingOp::*; From 0125d112d8e8d57e7c75c0837864abd854f9dda9 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Mon, 24 Jul 2023 17:36:42 +0800 Subject: [PATCH 2/4] refactor --- columnar/src/columnar/merge/mod.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/columnar/src/columnar/merge/mod.rs b/columnar/src/columnar/merge/mod.rs index 01e60183a7..22d7d203f7 100644 --- a/columnar/src/columnar/merge/mod.rs +++ b/columnar/src/columnar/merge/mod.rs @@ -384,6 +384,7 @@ fn group_columns_for_merge_iter<'a>( required_columns: &'a [(String, ColumnType)], merge_row_order: &'a MergeRowOrder, ) -> io::Result, ColumnType, GroupedColumns)>> + 'a> { + // One iterator per columnar reader. let column_iters: Vec<_> = columnar_readers .iter() .enumerate() @@ -403,18 +404,17 @@ fn group_columns_for_merge_iter<'a>( .collect(); required_columns_list.sort(); - // Kmerge and group on column_name. - let group_iter = GroupByIteratorExtended::group_by( - column_iters - .into_iter() - .kmerge_by(|a, b| a.column_name < b.column_name), - |el| el.column_name.clone(), - ); + // Kmerge on column_name + let kmerge = column_iters + .into_iter() + .kmerge_by(|a, b| a.column_name < b.column_name); + // Group by on column_name. + let group_iter = GroupByIteratorExtended::group_by(kmerge, |el| el.column_name.clone()); // Weave in the required columns into the sorted by column name iterator. let groups_with_required = required_columns_list .into_iter() - .merge_join_by(group_iter, |a, b| (a.as_str()).cmp(&b.0)); + .merge_join_by(group_iter, |left, right| (left.as_str()).cmp(&right.0)); Ok(groups_with_required.flat_map(move |either| { // It should be possible to do the grouping also on the column type in one pass, but some From 6a99bba61584177ce6d8de4c9ec7754ff8f1a9c4 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Thu, 27 Jul 2023 18:27:17 +0800 Subject: [PATCH 3/4] add rustdoc --- common/src/file_slice.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/common/src/file_slice.rs b/common/src/file_slice.rs index 2f77e4820c..fc6ff1916f 100644 --- a/common/src/file_slice.rs +++ b/common/src/file_slice.rs @@ -34,11 +34,13 @@ pub trait FileHandle: 'static + Send + Sync + HasLen + fmt::Debug { } #[derive(Debug)] +/// A File with it's length included. pub struct WrapFile { file: File, len: usize, } impl WrapFile { + /// Creates a new WrapFile and stores its length. pub fn new(file: File) -> io::Result { let len = file.metadata()?.len() as usize; Ok(WrapFile { file, len }) From 37bfbe4d17980607831231fdd00f6de1cbfe4f24 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Mon, 14 Aug 2023 19:03:37 +0200 Subject: [PATCH 4/4] replace iterator with BTreeMap --- columnar/src/columnar/merge/mod.rs | 289 ++++++++++----------------- columnar/src/columnar/merge/tests.rs | 105 ++++------ 2 files changed, 139 insertions(+), 255 deletions(-) diff --git a/columnar/src/columnar/merge/mod.rs b/columnar/src/columnar/merge/mod.rs index 22d7d203f7..9f7666e8fe 100644 --- a/columnar/src/columnar/merge/mod.rs +++ b/columnar/src/columnar/merge/mod.rs @@ -2,14 +2,12 @@ mod merge_dict_column; mod merge_mapping; mod term_merger; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashSet}; use std::io; use std::net::Ipv6Addr; -use std::rc::Rc; use std::sync::Arc; -use common::GroupByIteratorExtended; -use itertools::{EitherOrBoth, Itertools}; +use itertools::Itertools; pub use merge_mapping::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder}; use super::writer::ColumnarSerializer; @@ -31,14 +29,16 @@ use crate::{ /// In practise, today, only Numerical colummns are coerced into one type today. /// /// See also [README.md]. -#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] +/// +/// The ordering has to match the ordering of the variants in [ColumnType]. +#[derive(Copy, Clone, Eq, PartialOrd, Ord, PartialEq, Hash, Debug)] pub(crate) enum ColumnTypeCategory { - Bool, - Str, Numerical, - DateTime, Bytes, + Str, + Bool, IpAddr, + DateTime, } impl From for ColumnTypeCategory { @@ -87,11 +87,18 @@ pub fn merge_columnar( .map(|reader| reader.num_rows()) .collect::>(); - let columns_to_merge_iter = - group_columns_for_merge_iter(columnar_readers, required_columns, &merge_row_order)?; - for res in columns_to_merge_iter { - let (column_name, column_type, grouped_columns) = res?; - let columns = grouped_columns.columns; + let columns_to_merge = + group_columns_for_merge(columnar_readers, required_columns, &merge_row_order)?; + for res in columns_to_merge { + let ((column_name, _column_type_category), grouped_columns) = res; + let grouped_columns = grouped_columns.open(&merge_row_order)?; + if grouped_columns.is_empty() { + continue; + } + + let column_type = grouped_columns.column_type_after_merge(); + let mut columns = grouped_columns.columns; + coerce_columns(column_type, &mut columns)?; let mut column_serializer = serializer.start_serialize_column(column_name.as_bytes(), column_type); @@ -221,35 +228,9 @@ struct GroupedColumns { } impl GroupedColumns { - fn new(num_columnars: usize) -> Self { - GroupedColumns { - required_column_type: None, - columns: vec![None; num_columnars], - } - } - - /// Set the dynamic column for a given columnar. - fn set_column(&mut self, columnar_id: usize, column: DynamicColumn) { - self.columns[columnar_id] = Some(column); - } - - /// Force the existence of a column, as well as its type. - fn require_type(&mut self, required_type: ColumnType) -> io::Result<()> { - if let Some(existing_required_type) = self.required_column_type { - if existing_required_type == required_type { - // This was just a duplicate in the `required_columns`. - // Nothing to do. - return Ok(()); - } else { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Required column conflicts with another required column of the same type \ - category.", - )); - } - } - self.required_column_type = Some(required_type); - Ok(()) + /// Check is column group can be skipped during serialization. + fn is_empty(&self) -> bool { + self.required_column_type.is_none() && self.columns.iter().all(Option::is_none) } /// Returns the column type after merge. @@ -280,6 +261,67 @@ impl GroupedColumns { } } +struct GroupedColumnsHandle { + required_column_type: Option, + columns: Vec>, +} + +impl GroupedColumnsHandle { + fn new(num_columnars: usize) -> Self { + GroupedColumnsHandle { + required_column_type: None, + columns: vec![None; num_columnars], + } + } + fn open(self, merge_row_order: &MergeRowOrder) -> io::Result { + let mut columns: Vec> = Vec::new(); + for (columnar_id, column) in self.columns.iter().enumerate() { + if let Some(column) = column { + let column = column.open()?; + // We skip columns that end up with 0 documents. + // That way, we make sure they don't end up influencing the merge type or + // creating empty columns. + + if is_empty_after_merge(merge_row_order, &column, columnar_id) { + columns.push(None); + } else { + columns.push(Some(column)); + } + } else { + columns.push(None); + } + } + Ok(GroupedColumns { + required_column_type: self.required_column_type, + columns, + }) + } + + /// Set the dynamic column for a given columnar. + fn set_column(&mut self, columnar_id: usize, column: DynamicColumnHandle) { + self.columns[columnar_id] = Some(column); + } + + /// Force the existence of a column, as well as its type. + fn require_type(&mut self, required_type: ColumnType) -> io::Result<()> { + if let Some(existing_required_type) = self.required_column_type { + if existing_required_type == required_type { + // This was just a duplicate in the `required_columns`. + // Nothing to do. + return Ok(()); + } else { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Required column conflicts with another required column of the same type \ + category.", + )); + } + } + self.required_column_type = Some(required_type); + Ok(()) + } +} + /// Returns the type of the merged numerical column. /// /// This function picks the first numerical type out of i64, u64, f64 (order matters @@ -358,157 +400,34 @@ fn is_empty_after_merge( } } -type MergeIter<'a> = - Box, ColumnType, GroupedColumns)>> + 'a>; - -#[derive(Debug, Clone)] -struct MergeColumn { - column_name: Rc, - reader_ord: usize, - column: DynamicColumnHandle, -} -impl MergeColumn { - fn new(column_name: Rc, reader_ord: usize, column: DynamicColumnHandle) -> Self { - MergeColumn { - column_name, - reader_ord, - column, - } - } -} - /// Iterates over the columns of the columnar readers, grouped by column name. /// Key functionality is that `open` of the Columns is done lazy per group. -fn group_columns_for_merge_iter<'a>( +fn group_columns_for_merge<'a>( columnar_readers: &'a [&'a ColumnarReader], required_columns: &'a [(String, ColumnType)], - merge_row_order: &'a MergeRowOrder, -) -> io::Result, ColumnType, GroupedColumns)>> + 'a> { - // One iterator per columnar reader. - let column_iters: Vec<_> = columnar_readers - .iter() - .enumerate() - .map(|(reader_ord, reader)| { - Ok(reader - .iter_columns()? - .map(move |el| MergeColumn::new(Rc::from(el.0), reader_ord, el.1))) - }) - .collect::>()?; - let required_columns_map: HashMap = required_columns - .iter() - .map(|(col_name, typ)| (col_name.to_string(), *typ)) - .collect::>(); - let mut required_columns_list: Vec = required_columns - .iter() - .map(|(col_name, _)| col_name.to_string()) - .collect(); - required_columns_list.sort(); - - // Kmerge on column_name - let kmerge = column_iters - .into_iter() - .kmerge_by(|a, b| a.column_name < b.column_name); - // Group by on column_name. - let group_iter = GroupByIteratorExtended::group_by(kmerge, |el| el.column_name.clone()); - - // Weave in the required columns into the sorted by column name iterator. - let groups_with_required = required_columns_list - .into_iter() - .merge_join_by(group_iter, |left, right| (left.as_str()).cmp(&right.0)); - - Ok(groups_with_required.flat_map(move |either| { - // It should be possible to do the grouping also on the column type in one pass, but some - // tests are failing. - let mut force_type: Option = None; - let (column_name, group) = match either { - // set required column - EitherOrBoth::Both(_required, (key, group)) => { - force_type = required_columns_map.get(&*key).cloned(); - (key, group) - } - // Only required - Return artificial empty column - EitherOrBoth::Left(column_name) => { - return generate_require_column( - Rc::from(column_name), - columnar_readers, - &required_columns_map, - ); - } - // no required column - EitherOrBoth::Right((key, group)) => (key, group), - }; - let mut group: Vec = group.collect(); - // We need to create an iterator that returns the columns in the order of `to_code` of - // ColumnType - group.sort_by_key(|el| el.column.column_type); - let group_iter = GroupByIteratorExtended::group_by(group.into_iter(), |el| { - let cat_type: ColumnTypeCategory = el.column.column_type().into(); - cat_type - }); - let group_column_iter = group_iter.map(move |(_cat, group)| { - group_columns_iter( - column_name.clone(), - columnar_readers, - force_type, - merge_row_order, - group, - ) - }); - let iter = group_column_iter.filter(move |res| { - // Filter out empty columns. - res.as_ref() - .map(|(_, _, group)| { - let column_is_required = force_type.is_some(); - if column_is_required { - return true; - } - let all_columns_none = group.columns.iter().all(|column| column.is_none()); - !all_columns_none - }) - .unwrap_or(true) - }); - Box::new(iter) - })) -} + _merge_row_order: &'a MergeRowOrder, +) -> io::Result> { + let mut columns: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = BTreeMap::new(); + + for &(ref column_name, column_type) in required_columns { + columns + .entry((column_name.clone(), column_type.into())) + .or_insert_with(|| GroupedColumnsHandle::new(columnar_readers.len())) + .require_type(column_type)?; + } -fn generate_require_column<'a>( - column_name: Rc, - columnar_readers: &'a [&'a ColumnarReader], - required_columns_map: &HashMap, -) -> MergeIter<'a> { - let mut grouped_columns = GroupedColumns::new(columnar_readers.len()); - let force_type: ColumnType = required_columns_map.get(&*column_name).cloned().unwrap(); - grouped_columns.require_type(force_type).unwrap(); // Can't panic - Box::new(std::iter::once(Ok(( - column_name, - force_type, - grouped_columns, - )))) as MergeIter<'a> -} + for (columnar_id, columnar_reader) in columnar_readers.iter().enumerate() { + let column_name_and_handle = columnar_reader.iter_columns()?; -fn group_columns_iter<'a>( - column_name: Rc, - columnar_readers: &'a [&'a ColumnarReader], - force_type: Option, - merge_row_order: &'a MergeRowOrder, - group: impl Iterator, -) -> io::Result<(Rc, ColumnType, GroupedColumns)> { - let mut grouped_columns = GroupedColumns::new(columnar_readers.len()); - if let Some(force_type) = force_type { - grouped_columns.require_type(force_type)?; - } - for col in group { - let columnar_ord = col.reader_ord; - let column = col.column.open()?; - if !is_empty_after_merge(merge_row_order, &column, columnar_ord) { - grouped_columns.set_column(col.reader_ord, column); + for (column_name, handle) in column_name_and_handle { + let column_category: ColumnTypeCategory = handle.column_type().into(); + columns + .entry((column_name, column_category)) + .or_insert_with(|| GroupedColumnsHandle::new(columnar_readers.len())) + .set_column(columnar_id, handle); } } - - let column_type = grouped_columns.column_type_after_merge(); - coerce_columns(column_type, &mut grouped_columns.columns)?; - - Ok((column_name, column_type, grouped_columns)) + Ok(columns) } fn coerce_columns( diff --git a/columnar/src/columnar/merge/tests.rs b/columnar/src/columnar/merge/tests.rs index c283f36b29..2e688e3190 100644 --- a/columnar/src/columnar/merge/tests.rs +++ b/columnar/src/columnar/merge/tests.rs @@ -29,28 +29,10 @@ fn test_column_coercion_to_u64() { let columnar2 = make_columnar("numbers", &[u64::MAX]); let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); - let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge_iter(columnars, &[], &merge_order) - .unwrap() - .map(conv_res) - .collect(); - assert_eq!(column_map.len(), 1); - assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); -} - -#[test] -fn test_column_no_coercion_if_all_the_same() { - let columnar1 = make_columnar("numbers", &[1u64]); - let columnar2 = make_columnar("numbers", &[2u64]); - let columnars = &[&columnar1, &columnar2]; - let merge_order = StackMergeOrder::stack(columnars).into(); - let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge_iter(columnars, &[], &merge_order) - .unwrap() - .map(conv_res) - .collect(); + let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = + group_columns_for_merge(columnars, &[], &merge_order).unwrap(); assert_eq!(column_map.len(), 1); - assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); + assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); } #[test] @@ -59,13 +41,10 @@ fn test_column_coercion_to_i64() { let columnar2 = make_columnar("numbers", &[2u64]); let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); - let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge_iter(columnars, &[], &merge_order) - .unwrap() - .map(conv_res) - .collect(); + let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = + group_columns_for_merge(columnars, &[], &merge_order).unwrap(); assert_eq!(column_map.len(), 1); - assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::I64))); + assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); } //#[test] @@ -87,17 +66,15 @@ fn test_group_columns_with_required_column() { let columnar2 = make_columnar("numbers", &[2u64]); let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); - let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge_iter( + let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = + group_columns_for_merge( &[&columnar1, &columnar2], &[("numbers".to_string(), ColumnType::U64)], &merge_order, ) - .unwrap() - .map(conv_res) - .collect(); + .unwrap(); assert_eq!(column_map.len(), 1); - assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); + assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); } #[test] @@ -106,19 +83,17 @@ fn test_group_columns_required_column_with_no_existing_columns() { let columnar2 = make_columnar("numbers", &[2u64]); let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); - let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge_iter( - columnars, - &[("required_col".to_string(), ColumnType::Str)], - &merge_order, - ) - .unwrap() - .map(conv_res) - .collect(); + let column_map: BTreeMap<_, _> = group_columns_for_merge( + columnars, + &[("required_col".to_string(), ColumnType::Str)], + &merge_order, + ) + .unwrap(); assert_eq!(column_map.len(), 2); - let columns = column_map - .get(&("required_col".to_string(), ColumnType::Str)) - .unwrap(); + let columns = &column_map + .get(&("required_col".to_string(), ColumnTypeCategory::Str)) + .unwrap() + .columns; assert_eq!(columns.len(), 2); assert!(columns[0].is_none()); assert!(columns[1].is_none()); @@ -130,24 +105,15 @@ fn test_group_columns_required_column_is_above_all_columns_have_the_same_type_ru let columnar2 = make_columnar("numbers", &[2i64]); let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); - let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge_iter( + let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = + group_columns_for_merge( columnars, &[("numbers".to_string(), ColumnType::U64)], &merge_order, ) - .unwrap() - .map(conv_res) - .collect(); + .unwrap(); assert_eq!(column_map.len(), 1); - assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); -} - -fn conv_res( - el: io::Result<(Rc, ColumnType, GroupedColumns)>, -) -> ((String, ColumnType), Vec>) { - let el = el.unwrap(); - ((el.0.to_string(), el.1), el.2.columns) + assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); } #[test] @@ -156,24 +122,23 @@ fn test_missing_column() { let columnar2 = make_columnar("numbers2", &[2u64]); let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); - let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge_iter(columnars, &[], &merge_order) - .unwrap() - .map(conv_res) - .collect(); + let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = + group_columns_for_merge(columnars, &[], &merge_order).unwrap(); assert_eq!(column_map.len(), 2); - assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::I64))); + assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); { - let columns = column_map - .get(&("numbers".to_string(), ColumnType::I64)) - .unwrap(); + let columns = &column_map + .get(&("numbers".to_string(), ColumnTypeCategory::Numerical)) + .unwrap() + .columns; assert!(columns[0].is_some()); assert!(columns[1].is_none()); } { - let columns = column_map - .get(&("numbers2".to_string(), ColumnType::U64)) - .unwrap(); + let columns = &column_map + .get(&("numbers2".to_string(), ColumnTypeCategory::Numerical)) + .unwrap() + .columns; assert!(columns[0].is_none()); assert!(columns[1].is_some()); }