Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

delayed column opening during merge #2132

Merged
merged 4 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 104 additions & 74 deletions columnar/src/columnar/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod merge_dict_column;
mod merge_mapping;
mod term_merger;

use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, HashSet};
use std::io;
use std::net::Ipv6Addr;
use std::sync::Arc;
Expand All @@ -18,7 +18,8 @@ use crate::columnar::writer::CompatibleNumericalTypes;
use crate::columnar::ColumnarReader;
use crate::dynamic_column::DynamicColumn;
use crate::{
BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, NumericalType, NumericalValue,
BytesColumn, Column, ColumnIndex, ColumnType, ColumnValues, DynamicColumnHandle, NumericalType,
NumericalValue,
};

/// Column types are grouped into different categories.
Expand All @@ -28,14 +29,16 @@ use crate::{
/// In practise, today, only Numerical colummns are coerced into one type today.
///
/// See also [README.md].
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
///
/// The ordering has to match the ordering of the variants in [ColumnType].
#[derive(Copy, Clone, Eq, PartialOrd, Ord, PartialEq, Hash, Debug)]
pub(crate) enum ColumnTypeCategory {
Bool,
Str,
Numerical,
DateTime,
Bytes,
Str,
Bool,
IpAddr,
DateTime,
}

impl From<ColumnType> for ColumnTypeCategory {
Expand Down Expand Up @@ -83,9 +86,20 @@ pub fn merge_columnar(
.iter()
.map(|reader| reader.num_rows())
.collect::<Vec<u32>>();

let columns_to_merge =
group_columns_for_merge(columnar_readers, required_columns, &merge_row_order)?;
for ((column_name, column_type), columns) in columns_to_merge {
for res in columns_to_merge {
let ((column_name, _column_type_category), grouped_columns) = res;
let grouped_columns = grouped_columns.open(&merge_row_order)?;
if grouped_columns.is_empty() {
continue;
}

let column_type = grouped_columns.column_type_after_merge();
let mut columns = grouped_columns.columns;
coerce_columns(column_type, &mut columns)?;

let mut column_serializer =
serializer.start_serialize_column(column_name.as_bytes(), column_type);
merge_column(
Expand All @@ -97,6 +111,7 @@ pub fn merge_columnar(
)?;
column_serializer.finalize()?;
}

serializer.finalize(merge_row_order.num_rows())?;
Ok(())
}
Expand Down Expand Up @@ -210,20 +225,80 @@ fn merge_column(
struct GroupedColumns {
required_column_type: Option<ColumnType>,
columns: Vec<Option<DynamicColumn>>,
column_category: ColumnTypeCategory,
}

impl GroupedColumns {
fn for_category(column_category: ColumnTypeCategory, num_columnars: usize) -> Self {
GroupedColumns {
/// Check is column group can be skipped during serialization.
fn is_empty(&self) -> bool {
self.required_column_type.is_none() && self.columns.iter().all(Option::is_none)
}

/// Returns the column type after merge.
///
/// This method does not check if the column types can actually be coerced to
/// this type.
fn column_type_after_merge(&self) -> ColumnType {
if let Some(required_type) = self.required_column_type {
return required_type;
}
let column_type: HashSet<ColumnType> = self
.columns
.iter()
.flatten()
.map(|column| column.column_type())
.collect();
if column_type.len() == 1 {
return column_type.into_iter().next().unwrap();
}
// At the moment, only the numerical categorical column type has more than one possible
// column type.
assert!(self
.columns
.iter()
.flatten()
.all(|el| ColumnTypeCategory::from(el.column_type()) == ColumnTypeCategory::Numerical));
merged_numerical_columns_type(self.columns.iter().flatten()).into()
}
}

struct GroupedColumnsHandle {
required_column_type: Option<ColumnType>,
columns: Vec<Option<DynamicColumnHandle>>,
}

impl GroupedColumnsHandle {
fn new(num_columnars: usize) -> Self {
GroupedColumnsHandle {
required_column_type: None,
columns: vec![None; num_columnars],
column_category,
}
}
fn open(self, merge_row_order: &MergeRowOrder) -> io::Result<GroupedColumns> {
let mut columns: Vec<Option<DynamicColumn>> = Vec::new();
for (columnar_id, column) in self.columns.iter().enumerate() {
if let Some(column) = column {
let column = column.open()?;
// We skip columns that end up with 0 documents.
// That way, we make sure they don't end up influencing the merge type or
// creating empty columns.

if is_empty_after_merge(merge_row_order, &column, columnar_id) {
columns.push(None);
} else {
columns.push(Some(column));
}
} else {
columns.push(None);
}
}
Ok(GroupedColumns {
required_column_type: self.required_column_type,
columns,
})
}

/// Set the dynamic column for a given columnar.
fn set_column(&mut self, columnar_id: usize, column: DynamicColumn) {
fn set_column(&mut self, columnar_id: usize, column: DynamicColumnHandle) {
self.columns[columnar_id] = Some(column);
}

Expand All @@ -245,29 +320,6 @@ impl GroupedColumns {
self.required_column_type = Some(required_type);
Ok(())
}

/// Returns the column type after merge.
///
/// This method does not check if the column types can actually be coerced to
/// this type.
fn column_type_after_merge(&self) -> ColumnType {
if let Some(required_type) = self.required_column_type {
return required_type;
}
let column_type: HashSet<ColumnType> = self
.columns
.iter()
.flatten()
.map(|column| column.column_type())
.collect();
if column_type.len() == 1 {
return column_type.into_iter().next().unwrap();
}
// At the moment, only the numerical categorical column type has more than one possible
// column type.
assert_eq!(self.column_category, ColumnTypeCategory::Numerical);
merged_numerical_columns_type(self.columns.iter().flatten()).into()
}
}

/// Returns the type of the merged numerical column.
Expand All @@ -293,7 +345,7 @@ fn merged_numerical_columns_type<'a>(
fn is_empty_after_merge(
merge_row_order: &MergeRowOrder,
column: &DynamicColumn,
columnar_id: usize,
columnar_ord: usize,
) -> bool {
if column.num_values() == 0u32 {
// It was empty before the merge.
Expand All @@ -305,7 +357,7 @@ fn is_empty_after_merge(
false
}
MergeRowOrder::Shuffled(shuffled) => {
if let Some(alive_bitset) = &shuffled.alive_bitsets[columnar_id] {
if let Some(alive_bitset) = &shuffled.alive_bitsets[columnar_ord] {
let column_index = column.column_index();
match column_index {
ColumnIndex::Empty { .. } => true,
Expand Down Expand Up @@ -348,56 +400,34 @@ fn is_empty_after_merge(
}
}

#[allow(clippy::type_complexity)]
fn group_columns_for_merge(
columnar_readers: &[&ColumnarReader],
required_columns: &[(String, ColumnType)],
merge_row_order: &MergeRowOrder,
) -> io::Result<BTreeMap<(String, ColumnType), Vec<Option<DynamicColumn>>>> {
// Each column name may have multiple types of column associated.
// For merging we are interested in the same column type category since they can be merged.
let mut columns_grouped: HashMap<(String, ColumnTypeCategory), GroupedColumns> = HashMap::new();
/// Iterates over the columns of the columnar readers, grouped by column name.
/// Key functionality is that `open` of the Columns is done lazy per group.
fn group_columns_for_merge<'a>(
columnar_readers: &'a [&'a ColumnarReader],
required_columns: &'a [(String, ColumnType)],
_merge_row_order: &'a MergeRowOrder,
) -> io::Result<BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle>> {
let mut columns: BTreeMap<(String, ColumnTypeCategory), GroupedColumnsHandle> = BTreeMap::new();

for &(ref column_name, column_type) in required_columns {
columns_grouped
columns
.entry((column_name.clone(), column_type.into()))
.or_insert_with(|| {
GroupedColumns::for_category(column_type.into(), columnar_readers.len())
})
.or_insert_with(|| GroupedColumnsHandle::new(columnar_readers.len()))
.require_type(column_type)?;
}

for (columnar_id, columnar_reader) in columnar_readers.iter().enumerate() {
let column_name_and_handle = columnar_reader.list_columns()?;
// We skip columns that end up with 0 documents.
// That way, we make sure they don't end up influencing the merge type or
// creating empty columns.
let column_name_and_handle = columnar_reader.iter_columns()?;

for (column_name, handle) in column_name_and_handle {
let column_category: ColumnTypeCategory = handle.column_type().into();
let column = handle.open()?;
if is_empty_after_merge(merge_row_order, &column, columnar_id) {
continue;
}
columns_grouped
columns
.entry((column_name, column_category))
.or_insert_with(|| {
GroupedColumns::for_category(column_category, columnar_readers.len())
})
.set_column(columnar_id, column);
.or_insert_with(|| GroupedColumnsHandle::new(columnar_readers.len()))
.set_column(columnar_id, handle);
}
}

let mut merge_columns: BTreeMap<(String, ColumnType), Vec<Option<DynamicColumn>>> =
Default::default();

for ((column_name, _), mut grouped_columns) in columns_grouped {
let column_type = grouped_columns.column_type_after_merge();
coerce_columns(column_type, &mut grouped_columns.columns)?;
merge_columns.insert((column_name, column_type), grouped_columns.columns);
}

Ok(merge_columns)
Ok(columns)
}

fn coerce_columns(
Expand Down
Loading