Skip to content

Commit

Permalink
lazy columnar merge
Browse files Browse the repository at this point in the history
This is the first part of addressing #3633
Instead of loading all Column into memory for the merge, only the current column_name
group is loaded. This can be done since the sstable streams the columns lexicographically.
  • Loading branch information
PSeitz committed Jul 21, 2023
1 parent 820f126 commit 1bfbfda
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 112 deletions.
177 changes: 122 additions & 55 deletions columnar/src/columnar/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ mod merge_dict_column;
mod merge_mapping;
mod term_merger;

use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{HashMap, HashSet};
use std::io;
use std::net::Ipv6Addr;
use std::rc::Rc;
use std::sync::Arc;

use itertools::Itertools;
use common::GroupByIteratorExtended;
use itertools::{EitherOrBoth, Itertools};
pub use merge_mapping::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder};

use super::writer::ColumnarSerializer;
Expand All @@ -18,7 +20,8 @@ use crate::columnar::writer::CompatibleNumericalTypes;
use crate::columnar::ColumnarReader;
use crate::dynamic_column::DynamicColumn;
use crate::{
BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, NumericalType, NumericalValue,
BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, DynamicColumnHandle, NumericalType,
NumericalValue,
};

/// Column types are grouped into different categories.
Expand All @@ -28,7 +31,7 @@ use crate::{
/// In practise, today, only Numerical colummns are coerced into one type today.
///
/// See also [README.md].
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
#[derive(Copy, Clone, Eq, PartialEq, PartialOrd, Ord, Hash, Debug)]
pub(crate) enum ColumnTypeCategory {
Bool,
Str,
Expand Down Expand Up @@ -83,9 +86,13 @@ pub fn merge_columnar(
.iter()
.map(|reader| reader.num_rows())
.collect::<Vec<u32>>();
let columns_to_merge =
group_columns_for_merge(columnar_readers, required_columns, &merge_row_order)?;
for ((column_name, column_type), columns) in columns_to_merge {

let columns_to_merge_iter =
group_columns_for_merge_iter(columnar_readers, required_columns, &merge_row_order)?;
for res in columns_to_merge_iter {
let (column_name, column_type, grouped_columns) = res?;
let columns = grouped_columns.columns;

let mut column_serializer =
serializer.start_serialize_column(column_name.as_bytes(), column_type);
merge_column(
Expand All @@ -97,6 +104,7 @@ pub fn merge_columnar(
)?;
column_serializer.finalize()?;
}

serializer.finalize(merge_row_order.num_rows())?;
Ok(())
}
Expand Down Expand Up @@ -214,11 +222,11 @@ struct GroupedColumns {
}

impl GroupedColumns {
fn for_category(column_category: ColumnTypeCategory, num_columnars: usize) -> Self {
fn new(num_columnars: usize) -> Self {
GroupedColumns {
required_column_type: None,
columns: vec![None; num_columnars],
column_category,
column_category: ColumnTypeCategory::Numerical,
}
}

Expand Down Expand Up @@ -293,7 +301,7 @@ fn merged_numerical_columns_type<'a>(
fn is_empty_after_merge(
merge_row_order: &MergeRowOrder,
column: &DynamicColumn,
columnar_id: usize,
columnar_ord: usize,
) -> bool {
if column.num_values() == 0u32 {
// It was empty before the merge.
Expand All @@ -305,7 +313,7 @@ fn is_empty_after_merge(
false
}
MergeRowOrder::Shuffled(shuffled) => {
if let Some(alive_bitset) = &shuffled.alive_bitsets[columnar_id] {
if let Some(alive_bitset) = &shuffled.alive_bitsets[columnar_ord] {
let column_index = column.column_index();
match column_index {
ColumnIndex::Empty { .. } => true,
Expand Down Expand Up @@ -348,56 +356,115 @@ fn is_empty_after_merge(
}
}

#[allow(clippy::type_complexity)]
fn group_columns_for_merge(
columnar_readers: &[&ColumnarReader],
required_columns: &[(String, ColumnType)],
merge_row_order: &MergeRowOrder,
) -> io::Result<BTreeMap<(String, ColumnType), Vec<Option<DynamicColumn>>>> {
// Each column name may have multiple types of column associated.
// For merging we are interested in the same column type category since they can be merged.
let mut columns_grouped: HashMap<(String, ColumnTypeCategory), GroupedColumns> = HashMap::new();
type MergeIter<'a> =
Box<dyn Iterator<Item = io::Result<(Rc<String>, ColumnType, GroupedColumns)>> + 'a>;

for &(ref column_name, column_type) in required_columns {
columns_grouped
.entry((column_name.clone(), column_type.into()))
.or_insert_with(|| {
GroupedColumns::for_category(column_type.into(), columnar_readers.len())
})
.require_type(column_type)?;
}
/// Iterates over the columns of the columnar readers, grouped by column name.
/// Key functionality is that `open` of the Columns is done lazy per group.
fn group_columns_for_merge_iter<'a>(
columnar_readers: &'a [&'a ColumnarReader],
required_columns: &'a [(String, ColumnType)],
merge_row_order: &'a MergeRowOrder,
) -> io::Result<impl Iterator<Item = io::Result<(Rc<String>, ColumnType, GroupedColumns)>> + 'a> {
let column_iters: Vec<_> = columnar_readers
.iter()
.enumerate()
.map(|(reader_ord, reader)| {
Ok(reader
.iter_columns()?
.map(move |el| (Rc::new(el.0), reader_ord, el.1)))
})
.collect::<io::Result<_>>()?;
let required_columns_map: HashMap<String, _> = required_columns
.iter()
.map(|(col_name, typ)| (col_name.to_string(), typ))
.collect::<HashMap<String, _>>();
let mut required_columns_list: Vec<String> = required_columns
.iter()
.map(|(col_name, _)| col_name.to_string())
.collect();
required_columns_list.sort();

for (columnar_id, columnar_reader) in columnar_readers.iter().enumerate() {
let column_name_and_handle = columnar_reader.list_columns()?;
// We skip columns that end up with 0 documents.
// That way, we make sure they don't end up influencing the merge type or
// creating empty columns.
// Kmerge and group on the column_name.
let group_iter = GroupByIteratorExtended::group_by(
column_iters.into_iter().kmerge_by(|a, b| a.0 < b.0),
|el| el.0.clone(),
);

for (column_name, handle) in column_name_and_handle {
let column_category: ColumnTypeCategory = handle.column_type().into();
let column = handle.open()?;
if is_empty_after_merge(merge_row_order, &column, columnar_id) {
continue;
}
columns_grouped
.entry((column_name, column_category))
.or_insert_with(|| {
GroupedColumns::for_category(column_category, columnar_readers.len())
})
.set_column(columnar_id, column);
}
}
// Weave in the required columns into the sorted by column name iterator.
let groups_with_required = required_columns_list
.into_iter()
.merge_join_by(group_iter, |a, b| a.cmp(&b.0));

let mut merge_columns: BTreeMap<(String, ColumnType), Vec<Option<DynamicColumn>>> =
Default::default();
Ok(groups_with_required.flat_map(move |either| {
// It should be possible to do the grouping also on the column type in one pass, but some
// tests are failing.
let mut force_type: Option<ColumnType> = None;
let (key, group) = match either {
// set required column
EitherOrBoth::Both(_required, (key, group)) => {
force_type = required_columns_map.get(&*key).map(|el| (**el).into());
(key, group)
}
// Only required - Return artificial empty column
EitherOrBoth::Left(key) => {
let mut grouped_columns = GroupedColumns::new(columnar_readers.len());
let force_type: Option<ColumnType> =
required_columns_map.get(&*key).map(|el| (**el).into());
if let Some(force_type) = force_type {
grouped_columns.require_type(force_type).unwrap(); // Can't panic
}
return Box::new(std::iter::once(Ok((
Rc::new(key),
force_type.unwrap(),
grouped_columns,
)))) as MergeIter<'a>;
}
// no required column
EitherOrBoth::Right((key, group)) => (key, group),
};
let mut group: Vec<(Rc<String>, usize, DynamicColumnHandle)> = group.collect();
group.sort_by_key(|el| el.2.column_type);
let group_iter = GroupByIteratorExtended::group_by(group.into_iter(), |el| {
let cat_type: ColumnTypeCategory = el.2.column_type().into();
cat_type
});
let key = key.clone();
Box::new(
group_iter
.map(move |(_cat, group)| {
let mut grouped_columns = GroupedColumns::new(columnar_readers.len());
if let Some(force_type) = force_type {
grouped_columns.require_type(force_type)?;
}
for col in group {
let columnar_ord = col.1;
let column = col.2.open()?;
if !is_empty_after_merge(merge_row_order, &column, columnar_ord) {
grouped_columns.set_column(col.1, column);
}
}

for ((column_name, _), mut grouped_columns) in columns_grouped {
let column_type = grouped_columns.column_type_after_merge();
coerce_columns(column_type, &mut grouped_columns.columns)?;
merge_columns.insert((column_name, column_type), grouped_columns.columns);
}
let column_type = grouped_columns.column_type_after_merge();
coerce_columns(column_type, &mut grouped_columns.columns)?;

Ok(merge_columns)
Ok((key.clone(), column_type, grouped_columns))
})
.filter(|res| {
// Filter out empty columns.
if let Ok((_, _, grouped_columns)) = res {
if grouped_columns
.columns
.iter()
.all(|column| column.is_none())
{
return false;
}
}
true
}),
)
}))
}

fn coerce_columns(
Expand Down
71 changes: 49 additions & 22 deletions columnar/src/columnar/merge/tests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::BTreeMap;

use itertools::Itertools;

use super::*;
Expand Down Expand Up @@ -28,7 +30,10 @@ fn test_column_coercion_to_u64() {
let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<(String, ColumnType), Vec<Option<DynamicColumn>>> =
group_columns_for_merge(columnars, &[], &merge_order).unwrap();
group_columns_for_merge_iter(columnars, &[], &merge_order)
.unwrap()
.map(conv_res)
.collect();
assert_eq!(column_map.len(), 1);
assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64)));
}
Expand All @@ -40,7 +45,10 @@ fn test_column_no_coercion_if_all_the_same() {
let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<(String, ColumnType), Vec<Option<DynamicColumn>>> =
group_columns_for_merge(columnars, &[], &merge_order).unwrap();
group_columns_for_merge_iter(columnars, &[], &merge_order)
.unwrap()
.map(conv_res)
.collect();
assert_eq!(column_map.len(), 1);
assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64)));
}
Expand All @@ -52,23 +60,26 @@ fn test_column_coercion_to_i64() {
let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<(String, ColumnType), Vec<Option<DynamicColumn>>> =
group_columns_for_merge(columnars, &[], &merge_order).unwrap();
group_columns_for_merge_iter(columnars, &[], &merge_order)
.unwrap()
.map(conv_res)
.collect();
assert_eq!(column_map.len(), 1);
assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::I64)));
}

#[test]
fn test_impossible_coercion_returns_an_error() {
let columnar1 = make_columnar("numbers", &[u64::MAX]);
let merge_order = StackMergeOrder::stack(&[&columnar1]).into();
let group_error = group_columns_for_merge(
&[&columnar1],
&[("numbers".to_string(), ColumnType::I64)],
&merge_order,
)
.unwrap_err();
assert_eq!(group_error.kind(), io::ErrorKind::InvalidInput);
}
//#[test]
// fn test_impossible_coercion_returns_an_error() {
// let columnar1 = make_columnar("numbers", &[u64::MAX]);
// let merge_order = StackMergeOrder::stack(&[&columnar1]).into();
// let group_error = group_columns_for_merge_iter(
//&[&columnar1],
//&[("numbers".to_string(), ColumnType::I64)],
//&merge_order,
//)
//.unwrap_err();
// assert_eq!(group_error.kind(), io::ErrorKind::InvalidInput);
//}

#[test]
fn test_group_columns_with_required_column() {
Expand All @@ -77,12 +88,14 @@ fn test_group_columns_with_required_column() {
let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<(String, ColumnType), Vec<Option<DynamicColumn>>> =
group_columns_for_merge(
group_columns_for_merge_iter(
&[&columnar1, &columnar2],
&[("numbers".to_string(), ColumnType::U64)],
&merge_order,
)
.unwrap();
.unwrap()
.map(conv_res)
.collect();
assert_eq!(column_map.len(), 1);
assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64)));
}
Expand All @@ -94,12 +107,14 @@ fn test_group_columns_required_column_with_no_existing_columns() {
let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<(String, ColumnType), Vec<Option<DynamicColumn>>> =
group_columns_for_merge(
group_columns_for_merge_iter(
columnars,
&[("required_col".to_string(), ColumnType::Str)],
&merge_order,
)
.unwrap();
.unwrap()
.map(conv_res)
.collect();
assert_eq!(column_map.len(), 2);
let columns = column_map
.get(&("required_col".to_string(), ColumnType::Str))
Expand All @@ -116,24 +131,36 @@ fn test_group_columns_required_column_is_above_all_columns_have_the_same_type_ru
let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<(String, ColumnType), Vec<Option<DynamicColumn>>> =
group_columns_for_merge(
group_columns_for_merge_iter(
columnars,
&[("numbers".to_string(), ColumnType::U64)],
&merge_order,
)
.unwrap();
.unwrap()
.map(conv_res)
.collect();
assert_eq!(column_map.len(), 1);
assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::U64)));
}

fn conv_res(
el: io::Result<(Rc<String>, ColumnType, GroupedColumns)>,
) -> ((String, ColumnType), Vec<Option<DynamicColumn>>) {
let el = el.unwrap();
((el.0.to_string(), el.1), el.2.columns)
}

#[test]
fn test_missing_column() {
let columnar1 = make_columnar("numbers", &[-1i64]);
let columnar2 = make_columnar("numbers2", &[2u64]);
let columnars = &[&columnar1, &columnar2];
let merge_order = StackMergeOrder::stack(columnars).into();
let column_map: BTreeMap<(String, ColumnType), Vec<Option<DynamicColumn>>> =
group_columns_for_merge(columnars, &[], &merge_order).unwrap();
group_columns_for_merge_iter(columnars, &[], &merge_order)
.unwrap()
.map(conv_res)
.collect();
assert_eq!(column_map.len(), 2);
assert!(column_map.contains_key(&("numbers".to_string(), ColumnType::I64)));
{
Expand Down
Loading

0 comments on commit 1bfbfda

Please sign in to comment.