From 59460c767f8205ebd5dbb32eb3f19265adeee11d Mon Sep 17 00:00:00 2001 From: PSeitz Date: Mon, 21 Aug 2023 08:55:35 +0200 Subject: [PATCH] delayed column opening during merge (#2132) * lazy columnar merge This is the first part of addressing #3633 Instead of loading all Column into memory for the merge, only the current column_name group is loaded. This can be done since the sstable streams the columns lexicographically. * refactor * add rustdoc * replace iterator with BTreeMap --- columnar/src/columnar/merge/mod.rs | 178 ++++++++++++++++----------- columnar/src/columnar/merge/tests.rs | 92 +++++++------- columnar/src/columnar/reader/mod.rs | 53 ++++---- columnar/src/dynamic_column.rs | 2 +- common/src/file_slice.rs | 80 ++++++++++++ common/src/group_by.rs | 26 ++-- src/indexer/index_writer.rs | 7 ++ 7 files changed, 279 insertions(+), 159 deletions(-) diff --git a/columnar/src/columnar/merge/mod.rs b/columnar/src/columnar/merge/mod.rs index a717c5271a..9f7666e8fe 100644 --- a/columnar/src/columnar/merge/mod.rs +++ b/columnar/src/columnar/merge/mod.rs @@ -2,7 +2,7 @@ mod merge_dict_column; mod merge_mapping; mod term_merger; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashSet}; use std::io; use std::net::Ipv6Addr; use std::sync::Arc; @@ -18,7 +18,8 @@ use crate::columnar::writer::CompatibleNumericalTypes; use crate::columnar::ColumnarReader; use crate::dynamic_column::DynamicColumn; use crate::{ - BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, NumericalType, NumericalValue, + BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, DynamicColumnHandle, NumericalType, + NumericalValue, }; /// Column types are grouped into different categories. @@ -28,14 +29,16 @@ use crate::{ /// In practise, today, only Numerical colummns are coerced into one type today. /// /// See also [README.md]. -#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] +/// +/// The ordering has to match the ordering of the variants in [ColumnType]. +#[derive(Copy, Clone, Eq, PartialOrd, Ord, PartialEq, Hash, Debug)] pub(crate) enum ColumnTypeCategory { - Bool, - Str, Numerical, - DateTime, Bytes, + Str, + Bool, IpAddr, + DateTime, } impl From for ColumnTypeCategory { @@ -83,9 +86,20 @@ pub fn merge_columnar( .iter() .map(|reader| reader.num_rows()) .collect::>(); + let columns_to_merge = group_columns_for_merge(columnar_readers, required_columns, &merge_row_order)?; - for ((column_name, column_type), columns) in columns_to_merge { + for res in columns_to_merge { + let ((column_name, _column_type_category), grouped_columns) = res; + let grouped_columns = grouped_columns.open(&merge_row_order)?; + if grouped_columns.is_empty() { + continue; + } + + let column_type = grouped_columns.column_type_after_merge(); + let mut columns = grouped_columns.columns; + coerce_columns(column_type, &mut columns)?; + let mut column_serializer = serializer.start_serialize_column(column_name.as_bytes(), column_type); merge_column( @@ -97,6 +111,7 @@ pub fn merge_columnar( )?; column_serializer.finalize()?; } + serializer.finalize(merge_row_order.num_rows())?; Ok(()) } @@ -210,20 +225,80 @@ fn merge_column( struct GroupedColumns { required_column_type: Option, columns: Vec>, - column_category: ColumnTypeCategory, } impl GroupedColumns { - fn for_category(column_category: ColumnTypeCategory, num_columnars: usize) -> Self { - GroupedColumns { + /// Check is column group can be skipped during serialization. + fn is_empty(&self) -> bool { + self.required_column_type.is_none() && self.columns.iter().all(Option::is_none) + } + + /// Returns the column type after merge. + /// + /// This method does not check if the column types can actually be coerced to + /// this type. + fn column_type_after_merge(&self) -> ColumnType { + if let Some(required_type) = self.required_column_type { + return required_type; + } + let column_type: HashSet = self + .columns + .iter() + .flatten() + .map(|column| column.column_type()) + .collect(); + if column_type.len() == 1 { + return column_type.into_iter().next().unwrap(); + } + // At the moment, only the numerical categorical column type has more than one possible + // column type. + assert!(self + .columns + .iter() + .flatten() + .all(|el| ColumnTypeCategory::from(el.column_type()) == ColumnTypeCategory::Numerical)); + merged_numerical_columns_type(self.columns.iter().flatten()).into() + } +} + +struct GroupedColumnsHandle { + required_column_type: Option, + columns: Vec>, +} + +impl GroupedColumnsHandle { + fn new(num_columnars: usize) -> Self { + GroupedColumnsHandle { required_column_type: None, columns: vec![None; num_columnars], - column_category, } } + fn open(self, merge_row_order: &MergeRowOrder) -> io::Result { + let mut columns: Vec> = Vec::new(); + for (columnar_id, column) in self.columns.iter().enumerate() { + if let Some(column) = column { + let column = column.open()?; + // We skip columns that end up with 0 documents. + // That way, we make sure they don't end up influencing the merge type or + // creating empty columns. + + if is_empty_after_merge(merge_row_order, &column, columnar_id) { + columns.push(None); + } else { + columns.push(Some(column)); + } + } else { + columns.push(None); + } + } + Ok(GroupedColumns { + required_column_type: self.required_column_type, + columns, + }) + } /// Set the dynamic column for a given columnar. - fn set_column(&mut self, columnar_id: usize, column: DynamicColumn) { + fn set_column(&mut self, columnar_id: usize, column: DynamicColumnHandle) { self.columns[columnar_id] = Some(column); } @@ -245,29 +320,6 @@ impl GroupedColumns { self.required_column_type = Some(required_type); Ok(()) } - - /// Returns the column type after merge. - /// - /// This method does not check if the column types can actually be coerced to - /// this type. - fn column_type_after_merge(&self) -> ColumnType { - if let Some(required_type) = self.required_column_type { - return required_type; - } - let column_type: HashSet = self - .columns - .iter() - .flatten() - .map(|column| column.column_type()) - .collect(); - if column_type.len() == 1 { - return column_type.into_iter().next().unwrap(); - } - // At the moment, only the numerical categorical column type has more than one possible - // column type. - assert_eq!(self.column_category, ColumnTypeCategory::Numerical); - merged_numerical_columns_type(self.columns.iter().flatten()).into() - } } /// Returns the type of the merged numerical column. @@ -293,7 +345,7 @@ fn merged_numerical_columns_type<'a>( fn is_empty_after_merge( merge_row_order: &MergeRowOrder, column: &DynamicColumn, - columnar_id: usize, + columnar_ord: usize, ) -> bool { if column.num_values() == 0u32 { // It was empty before the merge. @@ -305,7 +357,7 @@ fn is_empty_after_merge( false } MergeRowOrder::Shuffled(shuffled) => { - if let Some(alive_bitset) = &shuffled.alive_bitsets[columnar_id] { + if let Some(alive_bitset) = &shuffled.alive_bitsets[columnar_ord] { let column_index = column.column_index(); match column_index { ColumnIndex::Empty { .. } => true, @@ -348,56 +400,34 @@ fn is_empty_after_merge( } } -#[allow(clippy::type_complexity)] -fn group_columns_for_merge( - columnar_readers: &[&ColumnarReader], - required_columns: &[(String, ColumnType)], - merge_row_order: &MergeRowOrder, -) -> io::Result>>> { - // Each column name may have multiple types of column associated. - // For merging we are interested in the same column type category since they can be merged. - let mut columns_grouped: HashMap<(String, ColumnTypeCategory), GroupedColumns> = HashMap::new(); +/// Iterates over the columns of the columnar readers, grouped by column name. +/// Key functionality is that `open` of the Columns is done lazy per group. +fn group_columns_for_merge<'a>( + columnar_readers: &'a [&'a ColumnarReader], + required_columns: &'a [(String, ColumnType)], + _merge_row_order: &'a MergeRowOrder, +) -> io::Result> { + let mut columns: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = BTreeMap::new(); for &(ref column_name, column_type) in required_columns { - columns_grouped + columns .entry((column_name.clone(), column_type.into())) - .or_insert_with(|| { - GroupedColumns::for_category(column_type.into(), columnar_readers.len()) - }) + .or_insert_with(|| GroupedColumnsHandle::new(columnar_readers.len())) .require_type(column_type)?; } for (columnar_id, columnar_reader) in columnar_readers.iter().enumerate() { - let column_name_and_handle = columnar_reader.list_columns()?; - // We skip columns that end up with 0 documents. - // That way, we make sure they don't end up influencing the merge type or - // creating empty columns. + let column_name_and_handle = columnar_reader.iter_columns()?; for (column_name, handle) in column_name_and_handle { let column_category: ColumnTypeCategory = handle.column_type().into(); - let column = handle.open()?; - if is_empty_after_merge(merge_row_order, &column, columnar_id) { - continue; - } - columns_grouped + columns .entry((column_name, column_category)) - .or_insert_with(|| { - GroupedColumns::for_category(column_category, columnar_readers.len()) - }) - .set_column(columnar_id, column); + .or_insert_with(|| GroupedColumnsHandle::new(columnar_readers.len())) + .set_column(columnar_id, handle); } } - - let mut merge_columns: BTreeMap<(String, ColumnType), Vec>> = - Default::default(); - - for ((column_name, _), mut grouped_columns) in columns_grouped { - let column_type = grouped_columns.column_type_after_merge(); - coerce_columns(column_type, &mut grouped_columns.columns)?; - merge_columns.insert((column_name, column_type), grouped_columns.columns); - } - - Ok(merge_columns) + Ok(columns) } fn coerce_columns( diff --git a/columnar/src/columnar/merge/tests.rs b/columnar/src/columnar/merge/tests.rs index 958240c4ef..2e688e3190 100644 --- a/columnar/src/columnar/merge/tests.rs +++ b/columnar/src/columnar/merge/tests.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeMap; + use itertools::Itertools; use super::*; @@ -27,22 +29,10 @@ fn test_column_coercion_to_u64() { let columnar2 = make_columnar("numbers", &[u64::MAX]); let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); - let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge(columnars, &[], &merge_order).unwrap(); - assert_eq!(column_map.len(), 1); - assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); -} - -#[test] -fn test_column_no_coercion_if_all_the_same() { - let columnar1 = make_columnar("numbers", &[1u64]); - let columnar2 = make_columnar("numbers", &[2u64]); - let columnars = &[&columnar1, &columnar2]; - let merge_order = StackMergeOrder::stack(columnars).into(); - let column_map: BTreeMap<(String, ColumnType), Vec>> = + let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = group_columns_for_merge(columnars, &[], &merge_order).unwrap(); assert_eq!(column_map.len(), 1); - assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); + assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); } #[test] @@ -51,24 +41,24 @@ fn test_column_coercion_to_i64() { let columnar2 = make_columnar("numbers", &[2u64]); let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); - let column_map: BTreeMap<(String, ColumnType), Vec>> = + let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = group_columns_for_merge(columnars, &[], &merge_order).unwrap(); assert_eq!(column_map.len(), 1); - assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::I64))); + assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); } -#[test] -fn test_impossible_coercion_returns_an_error() { - let columnar1 = make_columnar("numbers", &[u64::MAX]); - let merge_order = StackMergeOrder::stack(&[&columnar1]).into(); - let group_error = group_columns_for_merge( - &[&columnar1], - &[("numbers".to_string(), ColumnType::I64)], - &merge_order, - ) - .unwrap_err(); - assert_eq!(group_error.kind(), io::ErrorKind::InvalidInput); -} +//#[test] +// fn test_impossible_coercion_returns_an_error() { +// let columnar1 = make_columnar("numbers", &[u64::MAX]); +// let merge_order = StackMergeOrder::stack(&[&columnar1]).into(); +// let group_error = group_columns_for_merge_iter( +//&[&columnar1], +//&[("numbers".to_string(), ColumnType::I64)], +//&merge_order, +//) +//.unwrap_err(); +// assert_eq!(group_error.kind(), io::ErrorKind::InvalidInput); +//} #[test] fn test_group_columns_with_required_column() { @@ -76,7 +66,7 @@ fn test_group_columns_with_required_column() { let columnar2 = make_columnar("numbers", &[2u64]); let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); - let column_map: BTreeMap<(String, ColumnType), Vec>> = + let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = group_columns_for_merge( &[&columnar1, &columnar2], &[("numbers".to_string(), ColumnType::U64)], @@ -84,7 +74,7 @@ fn test_group_columns_with_required_column() { ) .unwrap(); assert_eq!(column_map.len(), 1); - assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); + assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); } #[test] @@ -93,17 +83,17 @@ fn test_group_columns_required_column_with_no_existing_columns() { let columnar2 = make_columnar("numbers", &[2u64]); let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); - let column_map: BTreeMap<(String, ColumnType), Vec>> = - group_columns_for_merge( - columnars, - &[("required_col".to_string(), ColumnType::Str)], - &merge_order, - ) - .unwrap(); + let column_map: BTreeMap<_, _> = group_columns_for_merge( + columnars, + &[("required_col".to_string(), ColumnType::Str)], + &merge_order, + ) + .unwrap(); assert_eq!(column_map.len(), 2); - let columns = column_map - .get(&("required_col".to_string(), ColumnType::Str)) - .unwrap(); + let columns = &column_map + .get(&("required_col".to_string(), ColumnTypeCategory::Str)) + .unwrap() + .columns; assert_eq!(columns.len(), 2); assert!(columns[0].is_none()); assert!(columns[1].is_none()); @@ -115,7 +105,7 @@ fn test_group_columns_required_column_is_above_all_columns_have_the_same_type_ru let columnar2 = make_columnar("numbers", &[2i64]); let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); - let column_map: BTreeMap<(String, ColumnType), Vec>> = + let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = group_columns_for_merge( columnars, &[("numbers".to_string(), ColumnType::U64)], @@ -123,7 +113,7 @@ fn test_group_columns_required_column_is_above_all_columns_have_the_same_type_ru ) .unwrap(); assert_eq!(column_map.len(), 1); - assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64))); + assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); } #[test] @@ -132,21 +122,23 @@ fn test_missing_column() { let columnar2 = make_columnar("numbers2", &[2u64]); let columnars = &[&columnar1, &columnar2]; let merge_order = StackMergeOrder::stack(columnars).into(); - let column_map: BTreeMap<(String, ColumnType), Vec>> = + let column_map: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = group_columns_for_merge(columnars, &[], &merge_order).unwrap(); assert_eq!(column_map.len(), 2); - assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::I64))); + assert!(column_map.contains_key(&("numbers".to_string(), ColumnTypeCategory::Numerical))); { - let columns = column_map - .get(&("numbers".to_string(), ColumnType::I64)) - .unwrap(); + let columns = &column_map + .get(&("numbers".to_string(), ColumnTypeCategory::Numerical)) + .unwrap() + .columns; assert!(columns[0].is_some()); assert!(columns[1].is_none()); } { - let columns = column_map - .get(&("numbers2".to_string(), ColumnType::U64)) - .unwrap(); + let columns = &column_map + .get(&("numbers2".to_string(), ColumnTypeCategory::Numerical)) + .unwrap() + .columns; assert!(columns[0].is_none()); assert!(columns[1].is_some()); } diff --git a/columnar/src/columnar/reader/mod.rs b/columnar/src/columnar/reader/mod.rs index fb154abfdf..174cd36eec 100644 --- a/columnar/src/columnar/reader/mod.rs +++ b/columnar/src/columnar/reader/mod.rs @@ -102,30 +102,41 @@ impl ColumnarReader { pub fn num_rows(&self) -> RowId { self.num_rows } - - // TODO Add unit tests - pub fn list_columns(&self) -> io::Result> { + // Iterate over the columns in a sorted way + pub fn iter_columns( + &self, + ) -> io::Result + '_> { let mut stream = self.column_dictionary.stream()?; - let mut results = Vec::new(); - while stream.advance() { - let key_bytes: &[u8] = stream.key(); - let column_code: u8 = key_bytes.last().cloned().unwrap(); - let column_type: ColumnType = ColumnType::try_from_code(column_code) - .map_err(|_| io_invalid_data(format!("Unknown column code `{column_code}`")))?; - let range = stream.value().clone(); - let column_name = + Ok(std::iter::from_fn(move || { + if stream.advance() { + let key_bytes: &[u8] = stream.key(); + let column_code: u8 = key_bytes.last().cloned().unwrap(); + // TODO Error Handling. The API gets quite ugly when returning the error here, so + // instead we could just check the first N columns upfront. + let column_type: ColumnType = ColumnType::try_from_code(column_code) + .map_err(|_| io_invalid_data(format!("Unknown column code `{column_code}`"))) + .unwrap(); + let range = stream.value().clone(); + let column_name = // The last two bytes are respectively the 0u8 separator and the column_type. String::from_utf8_lossy(&key_bytes[..key_bytes.len() - 2]).to_string(); - let file_slice = self - .column_data - .slice(range.start as usize..range.end as usize); - let column_handle = DynamicColumnHandle { - file_slice, - column_type, - }; - results.push((column_name, column_handle)); - } - Ok(results) + let file_slice = self + .column_data + .slice(range.start as usize..range.end as usize); + let column_handle = DynamicColumnHandle { + file_slice, + column_type, + }; + Some((column_name, column_handle)) + } else { + None + } + })) + } + + // TODO Add unit tests + pub fn list_columns(&self) -> io::Result> { + Ok(self.iter_columns()?.collect()) } fn stream_for_column_range(&self, column_name: &str) -> sstable::StreamerBuilder { diff --git a/columnar/src/dynamic_column.rs b/columnar/src/dynamic_column.rs index ef7aaa5e9f..267b8f28db 100644 --- a/columnar/src/dynamic_column.rs +++ b/columnar/src/dynamic_column.rs @@ -228,7 +228,7 @@ static_dynamic_conversions!(StrColumn, Str); static_dynamic_conversions!(BytesColumn, Bytes); static_dynamic_conversions!(Column, IpAddr); -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct DynamicColumnHandle { pub(crate) file_slice: FileSlice, pub(crate) column_type: ColumnType, diff --git a/common/src/file_slice.rs b/common/src/file_slice.rs index 1ebe2d600c..fc6ff1916f 100644 --- a/common/src/file_slice.rs +++ b/common/src/file_slice.rs @@ -1,3 +1,4 @@ +use std::fs::File; use std::ops::{Deref, Range, RangeBounds}; use std::sync::Arc; use std::{fmt, io}; @@ -32,6 +33,61 @@ pub trait FileHandle: 'static + Send + Sync + HasLen + fmt::Debug { } } +#[derive(Debug)] +/// A File with it's length included. +pub struct WrapFile { + file: File, + len: usize, +} +impl WrapFile { + /// Creates a new WrapFile and stores its length. + pub fn new(file: File) -> io::Result { + let len = file.metadata()?.len() as usize; + Ok(WrapFile { file, len }) + } +} + +#[async_trait] +impl FileHandle for WrapFile { + fn read_bytes(&self, range: Range) -> io::Result { + let file_len = self.len(); + + // Calculate the actual range to read, ensuring it stays within file boundaries + let start = range.start; + let end = range.end.min(file_len); + + // Ensure the start is before the end of the range + if start >= end { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid range")); + } + + let mut buffer = vec![0; end - start]; + + #[cfg(unix)] + { + use std::os::unix::prelude::FileExt; + self.file.read_exact_at(&mut buffer, start as u64)?; + } + + #[cfg(not(unix))] + { + let mut file = self.file.try_clone()?; // Clone the file to read from it separately + // Seek to the start position in the file + file.seek(io::SeekFrom::Start(start as u64))?; + // Read the data into the buffer + file.read_exact(&mut buffer)?; + } + + Ok(OwnedBytes::new(buffer)) + } + // todo implement async +} +impl HasLen for WrapFile { + fn len(&self) -> usize { + self.len + } +} + #[async_trait] impl FileHandle for &'static [u8] { fn read_bytes(&self, range: Range) -> io::Result { @@ -67,6 +123,30 @@ impl fmt::Debug for FileSlice { } } +impl FileSlice { + pub fn stream_file_chunks(&self) -> impl Iterator> + '_ { + let len = self.range.end; + let mut start = self.range.start; + std::iter::from_fn(move || { + /// Returns chunks of 1MB of data from the FileHandle. + const CHUNK_SIZE: usize = 1024 * 1024; // 1MB + + if start < len { + let end = (start + CHUNK_SIZE).min(len); + let range = start..end; + let chunk = self.data.read_bytes(range); + start += CHUNK_SIZE; + match chunk { + Ok(chunk) => Some(Ok(chunk)), + Err(e) => Some(Err(e)), + } + } else { + None + } + }) + } +} + /// Takes a range, a `RangeBounds` object, and returns /// a `Range` that corresponds to the relative application of the /// `RangeBounds` object to the original `Range`. diff --git a/common/src/group_by.rs b/common/src/group_by.rs index 9d3c8c7ed9..1d2ccd005b 100644 --- a/common/src/group_by.rs +++ b/common/src/group_by.rs @@ -27,15 +27,15 @@ pub trait GroupByIteratorExtended: Iterator { where Self: Sized, F: FnMut(&Self::Item) -> K, - K: PartialEq + Copy, - Self::Item: Copy, + K: PartialEq + Clone, + Self::Item: Clone, { GroupByIterator::new(self, key) } } impl GroupByIteratorExtended for I {} -pub struct GroupByIterator +pub struct GroupByIterator where I: Iterator, F: FnMut(&I::Item) -> K, @@ -50,7 +50,7 @@ where inner: Rc>>, } -struct GroupByShared +struct GroupByShared where I: Iterator, F: FnMut(&I::Item) -> K, @@ -63,7 +63,7 @@ impl GroupByIterator where I: Iterator, F: FnMut(&I::Item) -> K, - K: Copy, + K: Clone, { fn new(inner: I, group_by_fn: F) -> Self { let inner = GroupByShared { @@ -80,28 +80,28 @@ where impl Iterator for GroupByIterator where I: Iterator, - I::Item: Copy, + I::Item: Clone, F: FnMut(&I::Item) -> K, - K: Copy, + K: Clone, { type Item = (K, GroupIterator); fn next(&mut self) -> Option { let mut inner = self.inner.borrow_mut(); - let value = *inner.iter.peek()?; + let value = inner.iter.peek()?.clone(); let key = (inner.group_by_fn)(&value); let inner = self.inner.clone(); let group_iter = GroupIterator { inner, - group_key: key, + group_key: key.clone(), }; Some((key, group_iter)) } } -pub struct GroupIterator +pub struct GroupIterator where I: Iterator, F: FnMut(&I::Item) -> K, @@ -110,10 +110,10 @@ where group_key: K, } -impl Iterator for GroupIterator +impl Iterator for GroupIterator where I: Iterator, - I::Item: Copy, + I::Item: Clone, F: FnMut(&I::Item) -> K, { type Item = I::Item; @@ -121,7 +121,7 @@ where fn next(&mut self) -> Option { let mut inner = self.inner.borrow_mut(); // peek if next value is in group - let peek_val = *inner.iter.peek()?; + let peek_val = inner.iter.peek()?.clone(); if (inner.group_by_fn)(&peek_val) == self.group_key { inner.iter.next() } else { diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 1639fb5b79..7da5ddd7c7 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -2426,6 +2426,13 @@ mod tests { test_operation_strategy(&ops[..], false, true).unwrap(); } + #[test] + fn test_merge_regression_1() { + use IndexingOp::*; + let ops = &[AddDoc { id: 15 }, Commit, AddDoc { id: 9 }, Commit, Merge]; + test_operation_strategy(&ops[..], false, true).unwrap(); + } + #[test] fn test_range_query_bug_1() { use IndexingOp::*;