Skip to content

Commit

Permalink
feat: use arrow row format for hash-group-by
Browse files Browse the repository at this point in the history
For apache#2723.

This has two effects:

- **wider feature support:** We now use the V2 aggregator for all
  group-column types. The arrow row format support is sufficient for
  that. V1 will only be used if the aggregator itself doesn't support V2
  (and these are quite a few at the moment). We'll improve on that front
  in follow-up PRs.
- **more speed:** Turns out the arrow row format is also faster (see
  below).

Perf results (mind the noise in the benchmarks that are actually not
even touched by this code change):

```text
❯ cargo bench -p datafusion --bench aggregate_query_sql -- --baseline issue2723a-pre
...
     Running benches/aggregate_query_sql.rs (target/release/deps/aggregate_query_sql-fdbe5671f9c3019b)
aggregate_query_no_group_by 15 12
                        time:   [779.28 µs 782.77 µs 786.28 µs]
                        change: [+2.1375% +2.7672% +3.4171%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

aggregate_query_no_group_by_min_max_f64
                        time:   [712.96 µs 715.90 µs 719.14 µs]
                        change: [+0.8379% +1.7648% +2.6345%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 10 outliers among 100 measurements (10.00%)
  3 (3.00%) low mild
  6 (6.00%) high mild
  1 (1.00%) high severe

Benchmarking aggregate_query_no_group_by_count_distinct_wide: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.7s, enable flat sampling, or reduce sample count to 50.
aggregate_query_no_group_by_count_distinct_wide
                        time:   [1.7297 ms 1.7399 ms 1.7503 ms]
                        change: [-34.647% -33.908% -33.165%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
  5 (5.00%) high mild

Benchmarking aggregate_query_no_group_by_count_distinct_narrow: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.7s, enable flat sampling, or reduce sample count to 60.
aggregate_query_no_group_by_count_distinct_narrow
                        time:   [1.0984 ms 1.1045 ms 1.1115 ms]
                        change: [-38.581% -38.076% -37.569%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 6 outliers among 100 measurements (6.00%)
  1 (1.00%) low mild
  5 (5.00%) high mild

Benchmarking aggregate_query_group_by: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 9.1s, enable flat sampling, or reduce sample count to 50.
aggregate_query_group_by
                        time:   [1.7810 ms 1.7925 ms 1.8057 ms]
                        change: [-25.252% -24.127% -22.737%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 9 outliers among 100 measurements (9.00%)
  2 (2.00%) low mild
  5 (5.00%) high mild
  2 (2.00%) high severe

Benchmarking aggregate_query_group_by_with_filter: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.1s, enable flat sampling, or reduce sample count to 60.
aggregate_query_group_by_with_filter
                        time:   [1.2068 ms 1.2119 ms 1.2176 ms]
                        change: [+2.2847% +3.0857% +3.8789%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 10 outliers among 100 measurements (10.00%)
  1 (1.00%) low mild
  7 (7.00%) high mild
  2 (2.00%) high severe

Benchmarking aggregate_query_group_by_u64 15 12: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.7s, enable flat sampling, or reduce sample count to 50.
aggregate_query_group_by_u64 15 12
                        time:   [1.6762 ms 1.6848 ms 1.6942 ms]
                        change: [-29.598% -28.603% -27.400%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  1 (1.00%) low mild
  1 (1.00%) high mild
  6 (6.00%) high severe

Benchmarking aggregate_query_group_by_with_filter_u64 15 12: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.1s, enable flat sampling, or reduce sample count to 60.
aggregate_query_group_by_with_filter_u64 15 12
                        time:   [1.1969 ms 1.2008 ms 1.2049 ms]
                        change: [+1.3015% +2.1513% +3.0016%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 6 outliers among 100 measurements (6.00%)
  1 (1.00%) low severe
  2 (2.00%) high mild
  3 (3.00%) high severe

aggregate_query_group_by_u64_multiple_keys
                        time:   [14.797 ms 15.112 ms 15.427 ms]
                        change: [-12.072% -8.7274% -5.3392%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 3 outliers among 100 measurements (3.00%)
  3 (3.00%) high mild

aggregate_query_approx_percentile_cont_on_u64
                        time:   [4.1278 ms 4.1687 ms 4.2098 ms]
                        change: [+0.4851% +1.9525% +3.3676%] (p = 0.01 < 0.05)
                        Change within noise threshold.
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) low mild
  1 (1.00%) high mild

aggregate_query_approx_percentile_cont_on_f32
                        time:   [3.4694 ms 3.4967 ms 3.5245 ms]
                        change: [-1.5467% -0.4432% +0.6609%] (p = 0.43 > 0.05)
                        No change in performance detected.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild
```
  • Loading branch information
crepererum committed Jan 6, 2023
1 parent 24023b5 commit a3b1065
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 34 deletions.
5 changes: 1 addition & 4 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
use datafusion_physical_expr::equivalence::project_equivalence_properties;
pub use datafusion_physical_expr::expressions::create_aggregate_expr;
use datafusion_physical_expr::normalize_out_expr_with_alias_schema;
use datafusion_row::{row_supported, RowType};

/// Hash aggregate modes
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -273,9 +272,7 @@ impl AggregateExec {
}

fn row_aggregate_supported(&self) -> bool {
let group_schema = group_schema(&self.schema, self.group_by.expr.len());
row_supported(&group_schema, RowType::Compact)
&& accumulator_v2_supported(&self.aggr_expr)
accumulator_v2_supported(&self.aggr_expr)
}

fn execute_typed(
Expand Down
57 changes: 27 additions & 30 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use std::task::{Context, Poll};
use std::vec;

use ahash::RandomState;
use arrow::row::{OwnedRow, RowConverter, SortField};
use datafusion_physical_expr::hash_utils::create_row_hashes_v2;
use futures::stream::BoxStream;
use futures::stream::{Stream, StreamExt};

Expand All @@ -32,7 +34,6 @@ use crate::physical_plan::aggregates::{
evaluate_group_by, evaluate_many, group_schema, AccumulatorItemV2, AggregateMode,
PhysicalGroupBy,
};
use crate::physical_plan::hash_utils::create_row_hashes;
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
Expand All @@ -50,7 +51,6 @@ use datafusion_common::ScalarValue;
use datafusion_row::accessor::RowAccessor;
use datafusion_row::layout::RowLayout;
use datafusion_row::reader::{read_row, RowReader};
use datafusion_row::writer::{write_row, RowWriter};
use datafusion_row::{MutableRecordBatch, RowType};
use hashbrown::raw::RawTable;

Expand Down Expand Up @@ -90,7 +90,7 @@ struct GroupedHashAggregateStreamV2Inner {
group_by: PhysicalGroupBy,
accumulators: Vec<AccumulatorItemV2>,

group_schema: SchemaRef,
row_converter: RowConverter,
aggr_schema: SchemaRef,
aggr_layout: Arc<RowLayout>,

Expand Down Expand Up @@ -136,6 +136,13 @@ impl GroupedHashAggregateStreamV2 {
let accumulators = aggregates::create_accumulators_v2(&aggr_expr)?;

let group_schema = group_schema(&schema, group_by.expr.len());
let row_converter = RowConverter::new(
group_schema
.fields()
.iter()
.map(|f| SortField::new(f.data_type().clone()))
.collect(),
)?;
let aggr_schema = aggr_state_schema(&aggr_expr)?;

let aggr_layout = Arc::new(RowLayout::new(&aggr_schema, RowType::WordAligned));
Expand All @@ -157,7 +164,7 @@ impl GroupedHashAggregateStreamV2 {
input,
group_by,
accumulators,
group_schema,
row_converter,
aggr_schema,
aggr_layout,
baseline_metrics,
Expand All @@ -181,7 +188,7 @@ impl GroupedHashAggregateStreamV2 {
&this.random_state,
&this.group_by,
&mut this.accumulators,
&this.group_schema,
&mut this.row_converter,
this.aggr_layout.clone(),
batch,
&mut this.aggr_state,
Expand All @@ -205,7 +212,7 @@ impl GroupedHashAggregateStreamV2 {
let timer = this.baseline_metrics.elapsed_compute().timer();
let result = create_batch_from_map(
&this.mode,
&this.group_schema,
&this.row_converter,
&this.aggr_schema,
this.batch_size,
this.row_group_skip_position,
Expand Down Expand Up @@ -270,7 +277,7 @@ fn group_aggregate_batch(
random_state: &RandomState,
grouping_set: &PhysicalGroupBy,
accumulators: &mut [AccumulatorItemV2],
group_schema: &Schema,
row_converter: &mut RowConverter,
state_layout: Arc<RowLayout>,
batch: RecordBatch,
aggr_state: &mut AggregationState,
Expand All @@ -283,9 +290,10 @@ fn group_aggregate_batch(
map, group_states, ..
} = aggr_state;
let mut allocated = 0usize;
let row_converter_size_pre = row_converter.size();

for group_values in grouping_by_values {
let group_rows: Vec<Vec<u8>> = create_group_rows(group_values, group_schema);
let group_rows = row_converter.convert_columns(&group_values)?;

// evaluate the aggregation expressions.
// We could evaluate them after the `take`, but since we need to evaluate all
Expand All @@ -301,15 +309,15 @@ fn group_aggregate_batch(

// 1.1 Calculate the group keys for the group values
let mut batch_hashes = vec![0; batch.num_rows()];
create_row_hashes(&group_rows, random_state, &mut batch_hashes)?;
create_row_hashes_v2(&group_rows, random_state, &mut batch_hashes)?;

for (row, hash) in batch_hashes.into_iter().enumerate() {
let entry = map.get_mut(hash, |(_hash, group_idx)| {
// verify that a group that we are inserting with hash is
// actually the same key value as the group in
// existing_idx (aka group_values @ row)
let group_state = &group_states[*group_idx];
group_rows[row] == group_state.group_by_values
group_rows.row(row) == group_state.group_by_values.row()
});

match entry {
Expand All @@ -330,7 +338,7 @@ fn group_aggregate_batch(
None => {
// Add new entry to group_states and save newly created index
let group_state = RowGroupState {
group_by_values: group_rows[row].clone(),
group_by_values: group_rows.row(row).owned(),
aggregation_buffer: vec![0; state_layout.fixed_part_width()],
indices: vec![row as u32], // 1.3
};
Expand All @@ -339,7 +347,7 @@ fn group_aggregate_batch(
// NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by
// `group_states` (see allocation down below)
allocated += (std::mem::size_of::<u8>()
* group_state.group_by_values.capacity())
* group_state.group_by_values.as_ref().len())
+ (std::mem::size_of::<u8>()
* group_state.aggregation_buffer.capacity())
+ (std::mem::size_of::<u32>() * group_state.indices.capacity());
Expand Down Expand Up @@ -438,14 +446,16 @@ fn group_aggregate_batch(
})?;
}

allocated += row_converter.size().saturating_sub(row_converter_size_pre);

Ok(allocated)
}

/// The state that is built for each output group.
#[derive(Debug)]
struct RowGroupState {
/// The actual group by values, stored sequentially
group_by_values: Vec<u8>,
// Group key.
group_by_values: OwnedRow,

// Accumulator state, stored sequentially
aggregation_buffer: Vec<u8>,
Expand Down Expand Up @@ -483,23 +493,11 @@ impl std::fmt::Debug for AggregationState {
}
}

/// Create grouping rows
fn create_group_rows(arrays: Vec<ArrayRef>, schema: &Schema) -> Vec<Vec<u8>> {
let mut writer = RowWriter::new(schema, RowType::Compact);
let mut results = vec![];
for cur_row in 0..arrays[0].len() {
write_row(&mut writer, cur_row, schema, &arrays);
results.push(writer.get_row().to_vec());
writer.reset()
}
results
}

/// Create a RecordBatch with all group keys and accumulator' states or values.
#[allow(clippy::too_many_arguments)]
fn create_batch_from_map(
mode: &AggregateMode,
group_schema: &Schema,
converter: &RowConverter,
aggr_schema: &Schema,
batch_size: usize,
skip_items: usize,
Expand All @@ -524,11 +522,10 @@ fn create_batch_from_map(
.iter()
.skip(skip_items)
.take(batch_size)
.map(|gs| (gs.group_by_values.clone(), gs.aggregation_buffer.clone()))
.map(|gs| (gs.group_by_values.row(), gs.aggregation_buffer.clone()))
.unzip();

let mut columns: Vec<ArrayRef> =
read_as_batch(&group_buffers, group_schema, RowType::Compact);
let mut columns: Vec<ArrayRef> = converter.convert_rows(group_buffers)?;

match mode {
AggregateMode::Partial => columns.extend(read_as_batch(
Expand Down
33 changes: 33 additions & 0 deletions datafusion/physical-expr/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use ahash::RandomState;
use arrow::array::*;
use arrow::datatypes::*;
use arrow::row::Rows;
use arrow::{downcast_dictionary_array, downcast_primitive_array};
use arrow_buffer::i256;
use datafusion_common::{
Expand Down Expand Up @@ -249,6 +250,38 @@ pub fn create_hashes<'a>(
Ok(hashes_buffer)
}

/// Test version of `create_row_hashes_v2` that produces the same value for
/// all hashes (to test collisions)
///
/// See comments on `hashes_buffer` for more details
#[cfg(feature = "force_hash_collisions")]
pub fn create_row_hashes_v2<'a>(
_rows: &Rows,
_random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
for hash in hashes_buffer.iter_mut() {
*hash = 0
}
Ok(hashes_buffer)
}

/// Creates hash values for every row, based on their raw bytes.
#[cfg(not(feature = "force_hash_collisions"))]
pub fn create_row_hashes_v2<'a>(
rows: &Rows,
random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
for hash in hashes_buffer.iter_mut() {
*hash = 0
}
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
*hash = random_state.hash_one(rows.row(i));
}
Ok(hashes_buffer)
}

#[cfg(test)]
mod tests {
use crate::from_slice::FromSlice;
Expand Down

0 comments on commit a3b1065

Please sign in to comment.