diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index cb24193e3e93..07f3563bbbc5 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -52,7 +52,6 @@ use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator; use datafusion_physical_expr::equivalence::project_equivalence_properties; pub use datafusion_physical_expr::expressions::create_aggregate_expr; use datafusion_physical_expr::normalize_out_expr_with_alias_schema; -use datafusion_row::{row_supported, RowType}; /// Hash aggregate modes #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -273,9 +272,7 @@ impl AggregateExec { } fn row_aggregate_supported(&self) -> bool { - let group_schema = group_schema(&self.schema, self.group_by.expr.len()); - row_supported(&group_schema, RowType::Compact) - && accumulator_v2_supported(&self.aggr_expr) + accumulator_v2_supported(&self.aggr_expr) } fn execute_typed( diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index 94654d502f79..b27723dbd4e4 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -22,6 +22,8 @@ use std::task::{Context, Poll}; use std::vec; use ahash::RandomState; +use arrow::row::{OwnedRow, RowConverter, SortField}; +use datafusion_physical_expr::hash_utils::create_row_hashes_v2; use futures::stream::BoxStream; use futures::stream::{Stream, StreamExt}; @@ -32,7 +34,6 @@ use crate::physical_plan::aggregates::{ evaluate_group_by, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode, PhysicalGroupBy, }; -use crate::physical_plan::hash_utils::create_row_hashes; use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput}; use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr}; use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; @@ -50,7 +51,6 @@ use datafusion_common::ScalarValue; use datafusion_row::accessor::RowAccessor; use datafusion_row::layout::RowLayout; use datafusion_row::reader::{read_row, RowReader}; -use datafusion_row::writer::{write_row, RowWriter}; use datafusion_row::{MutableRecordBatch, RowType}; use hashbrown::raw::RawTable; @@ -90,7 +90,7 @@ struct GroupedHashAggregateStreamV2Inner { group_by: PhysicalGroupBy, accumulators: Vec, - group_schema: SchemaRef, + row_converter: RowConverter, aggr_schema: SchemaRef, aggr_layout: Arc, @@ -136,6 +136,13 @@ impl GroupedHashAggregateStreamV2 { let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?; let group_schema = group_schema(&schema, group_by.expr.len()); + let row_converter = RowConverter::new( + group_schema + .fields() + .iter() + .map(|f| SortField::new(f.data_type().clone())) + .collect(), + )?; let aggr_schema = aggr_state_schema(&aggr_expr)?; let aggr_layout = Arc::new(RowLayout::new(&aggr_schema, RowType::WordAligned)); @@ -157,7 +164,7 @@ impl GroupedHashAggregateStreamV2 { input, group_by, accumulators, - group_schema, + row_converter, aggr_schema, aggr_layout, baseline_metrics, @@ -181,7 +188,7 @@ impl GroupedHashAggregateStreamV2 { &this.random_state, &this.group_by, &mut this.accumulators, - &this.group_schema, + &mut this.row_converter, this.aggr_layout.clone(), batch, &mut this.aggr_state, @@ -205,7 +212,7 @@ impl GroupedHashAggregateStreamV2 { let timer = this.baseline_metrics.elapsed_compute().timer(); let result = create_batch_from_map( &this.mode, - &this.group_schema, + &this.row_converter, &this.aggr_schema, this.batch_size, this.row_group_skip_position, @@ -270,7 +277,7 @@ fn group_aggregate_batch( random_state: &RandomState, grouping_set: &PhysicalGroupBy, accumulators: &mut [AccumulatorItemV2], - group_schema: &Schema, + row_converter: &mut RowConverter, state_layout: Arc, batch: RecordBatch, aggr_state: &mut AggregationState, @@ -283,9 +290,10 @@ fn group_aggregate_batch( map, group_states, .. } = aggr_state; let mut allocated = 0usize; + let row_converter_size_pre = row_converter.size(); for group_values in grouping_by_values { - let group_rows: Vec> = create_group_rows(group_values, group_schema); + let group_rows = row_converter.convert_columns(&group_values)?; // evaluate the aggregation expressions. // We could evaluate them after the `take`, but since we need to evaluate all @@ -301,7 +309,7 @@ fn group_aggregate_batch( // 1.1 Calculate the group keys for the group values let mut batch_hashes = vec![0; batch.num_rows()]; - create_row_hashes(&group_rows, random_state, &mut batch_hashes)?; + create_row_hashes_v2(&group_rows, random_state, &mut batch_hashes)?; for (row, hash) in batch_hashes.into_iter().enumerate() { let entry = map.get_mut(hash, |(_hash, group_idx)| { @@ -309,7 +317,7 @@ fn group_aggregate_batch( // actually the same key value as the group in // existing_idx (aka group_values @ row) let group_state = &group_states[*group_idx]; - group_rows[row] == group_state.group_by_values + group_rows.row(row) == group_state.group_by_values.row() }); match entry { @@ -330,7 +338,7 @@ fn group_aggregate_batch( None => { // Add new entry to group_states and save newly created index let group_state = RowGroupState { - group_by_values: group_rows[row].clone(), + group_by_values: group_rows.row(row).owned(), aggregation_buffer: vec![0; state_layout.fixed_part_width()], indices: vec![row as u32], // 1.3 }; @@ -339,7 +347,7 @@ fn group_aggregate_batch( // NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by // `group_states` (see allocation down below) allocated += (std::mem::size_of::() - * group_state.group_by_values.capacity()) + * group_state.group_by_values.as_ref().len()) + (std::mem::size_of::() * group_state.aggregation_buffer.capacity()) + (std::mem::size_of::() * group_state.indices.capacity()); @@ -438,14 +446,16 @@ fn group_aggregate_batch( })?; } + allocated += row_converter.size().saturating_sub(row_converter_size_pre); + Ok(allocated) } /// The state that is built for each output group. #[derive(Debug)] struct RowGroupState { - /// The actual group by values, stored sequentially - group_by_values: Vec, + // Group key. + group_by_values: OwnedRow, // Accumulator state, stored sequentially aggregation_buffer: Vec, @@ -483,23 +493,11 @@ impl std::fmt::Debug for AggregationState { } } -/// Create grouping rows -fn create_group_rows(arrays: Vec, schema: &Schema) -> Vec> { - let mut writer = RowWriter::new(schema, RowType::Compact); - let mut results = vec![]; - for cur_row in 0..arrays[0].len() { - write_row(&mut writer, cur_row, schema, &arrays); - results.push(writer.get_row().to_vec()); - writer.reset() - } - results -} - /// Create a RecordBatch with all group keys and accumulator' states or values. #[allow(clippy::too_many_arguments)] fn create_batch_from_map( mode: &AggregateMode, - group_schema: &Schema, + converter: &RowConverter, aggr_schema: &Schema, batch_size: usize, skip_items: usize, @@ -524,11 +522,10 @@ fn create_batch_from_map( .iter() .skip(skip_items) .take(batch_size) - .map(|gs| (gs.group_by_values.clone(), gs.aggregation_buffer.clone())) + .map(|gs| (gs.group_by_values.row(), gs.aggregation_buffer.clone())) .unzip(); - let mut columns: Vec = - read_as_batch(&group_buffers, group_schema, RowType::Compact); + let mut columns: Vec = converter.convert_rows(group_buffers)?; match mode { AggregateMode::Partial => columns.extend(read_as_batch( diff --git a/datafusion/physical-expr/src/hash_utils.rs b/datafusion/physical-expr/src/hash_utils.rs index c687eb80e2ab..ab2377da94aa 100644 --- a/datafusion/physical-expr/src/hash_utils.rs +++ b/datafusion/physical-expr/src/hash_utils.rs @@ -20,6 +20,7 @@ use ahash::RandomState; use arrow::array::*; use arrow::datatypes::*; +use arrow::row::Rows; use arrow::{downcast_dictionary_array, downcast_primitive_array}; use arrow_buffer::i256; use datafusion_common::{ @@ -249,6 +250,38 @@ pub fn create_hashes<'a>( Ok(hashes_buffer) } +/// Test version of `create_row_hashes_v2` that produces the same value for +/// all hashes (to test collisions) +/// +/// See comments on `hashes_buffer` for more details +#[cfg(feature = "force_hash_collisions")] +pub fn create_row_hashes_v2<'a>( + _rows: &Rows, + _random_state: &RandomState, + hashes_buffer: &'a mut Vec, +) -> Result<&'a mut Vec> { + for hash in hashes_buffer.iter_mut() { + *hash = 0 + } + Ok(hashes_buffer) +} + +/// Creates hash values for every row, based on their raw bytes. +#[cfg(not(feature = "force_hash_collisions"))] +pub fn create_row_hashes_v2<'a>( + rows: &Rows, + random_state: &RandomState, + hashes_buffer: &'a mut Vec, +) -> Result<&'a mut Vec> { + for hash in hashes_buffer.iter_mut() { + *hash = 0 + } + for (i, hash) in hashes_buffer.iter_mut().enumerate() { + *hash = random_state.hash_one(rows.row(i)); + } + Ok(hashes_buffer) +} + #[cfg(test)] mod tests { use crate::from_slice::FromSlice;