From a961ad09ff623dc6d0b4ad9760089b7a0cc4e493 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 18 Jul 2023 20:08:09 -0400 Subject: [PATCH] Extract GroupValues (#6969) --- .../src/physical_plan/aggregates/row_hash.rs | 383 +++++++++--------- 1 file changed, 194 insertions(+), 189 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index c57f43632407..701203e4ddcd 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -60,6 +60,151 @@ pub(crate) enum ExecutionState { use super::AggregateExec; +/// An interning store for group keys +trait GroupValues: Send { + /// Calculates the `groups` for each input row of `cols` + fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()>; + + /// Returns the number of bytes used by this [`GroupValues`] + fn size(&self) -> usize; + + /// Returns true if this [`GroupValues`] is empty + fn is_empty(&self) -> bool; + + /// The number of values stored in this [`GroupValues`] + fn len(&self) -> usize; + + /// Flushes the unique [`GroupValues`] + fn flush(&mut self) -> Result>; +} + +/// A [`GroupValues`] making use of [`Rows`] +struct GroupValuesRows { + /// Converter for the group values + row_converter: RowConverter, + + /// Logically maps group values to a group_index in + /// [`Self::group_values`] and in each accumulator + /// + /// Uses the raw API of hashbrown to avoid actually storing the + /// keys (group values) in the table + /// + /// keys: u64 hashes of the GroupValue + /// values: (hash, group_index) + map: RawTable<(u64, usize)>, + + /// The size of `map` in bytes + map_size: usize, + + /// The actual group by values, stored in arrow [`Row`] format. + /// `group_values[i]` holds the group value for group_index `i`. + /// + /// The row format is used to compare group keys quickly and store + /// them efficiently in memory. Quick comparison is especially + /// important for multi-column group keys. + /// + /// [`Row`]: arrow::row::Row + group_values: Rows, + + // buffer to be reused to store hashes + hashes_buffer: Vec, + + /// Random state for creating hashes + random_state: RandomState, +} + +impl GroupValuesRows { + fn try_new(schema: SchemaRef) -> Result { + let row_converter = RowConverter::new( + schema + .fields() + .iter() + .map(|f| SortField::new(f.data_type().clone())) + .collect(), + )?; + + let map = RawTable::with_capacity(0); + let group_values = row_converter.empty_rows(0, 0); + + Ok(Self { + row_converter, + map, + map_size: 0, + group_values, + hashes_buffer: Default::default(), + random_state: Default::default(), + }) + } +} + +impl GroupValues for GroupValuesRows { + fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { + // Convert the group keys into the row format + // Avoid reallocation when https://github.com/apache/arrow-rs/issues/4479 is available + let group_rows = self.row_converter.convert_columns(cols)?; + let n_rows = group_rows.num_rows(); + + // tracks to which group each of the input rows belongs + groups.clear(); + + // 1.1 Calculate the group keys for the group values + let batch_hashes = &mut self.hashes_buffer; + batch_hashes.clear(); + batch_hashes.resize(n_rows, 0); + create_hashes(cols, &self.random_state, batch_hashes)?; + + for (row, &hash) in batch_hashes.iter().enumerate() { + let entry = self.map.get_mut(hash, |(_hash, group_idx)| { + // verify that a group that we are inserting with hash is + // actually the same key value as the group in + // existing_idx (aka group_values @ row) + group_rows.row(row) == self.group_values.row(*group_idx) + }); + + let group_idx = match entry { + // Existing group_index for this group value + Some((_hash, group_idx)) => *group_idx, + // 1.2 Need to create new entry for the group + None => { + // Add new entry to aggr_state and save newly created index + let group_idx = self.group_values.num_rows(); + self.group_values.push(group_rows.row(row)); + + // for hasher function, use precomputed hash value + self.map.insert_accounted( + (hash, group_idx), + |(hash, _group_index)| *hash, + &mut self.map_size, + ); + group_idx + } + }; + groups.push(group_idx); + } + + Ok(()) + } + + fn size(&self) -> usize { + self.row_converter.size() + + self.group_values.size() + + self.map_size + + self.hashes_buffer.allocated_size() + } + + fn is_empty(&self) -> bool { + self.len() == 0 + } + + fn len(&self) -> usize { + self.group_values.num_rows() + } + + fn flush(&mut self) -> Result> { + Ok(self.row_converter.convert_rows(&self.group_values)?) + } +} + /// Hash based Grouping Aggregator /// /// # Design Goals @@ -75,29 +220,29 @@ use super::AggregateExec; /// /// ```text /// -/// stores "group stores group values, internally stores aggregate -/// indexes" in arrow_row format values, for all groups +/// Assigns a consecutive group internally stores aggregate values +/// index for each unique set for all groups +/// of group values /// -/// ┌─────────────┐ ┌────────────┐ ┌──────────────┐ ┌──────────────┐ -/// │ ┌─────┐ │ │ ┌────────┐ │ │┌────────────┐│ │┌────────────┐│ -/// │ │ 5 │ │ ┌────┼▶│ "A" │ │ ││accumulator ││ ││accumulator ││ -/// │ ├─────┤ │ │ │ ├────────┤ │ ││ 0 ││ ││ N ││ -/// │ │ 9 │ │ │ │ │ "Z" │ │ ││ ┌────────┐ ││ ││ ┌────────┐ ││ -/// │ └─────┘ │ │ │ └────────┘ │ ││ │ state │ ││ ││ │ state │ ││ -/// │ ... │ │ │ │ ││ │┌─────┐ │ ││ ... ││ │┌─────┐ │ ││ -/// │ ┌─────┐ │ │ │ ... │ ││ │├─────┤ │ ││ ││ │├─────┤ │ ││ -/// │ │ 1 │───┼─┘ │ │ ││ │└─────┘ │ ││ ││ │└─────┘ │ ││ -/// │ ├─────┤ │ │ │ ││ │ │ ││ ││ │ │ ││ -/// │ │ 13 │───┼─┐ │ ┌────────┐ │ ││ │ ... │ ││ ││ │ ... │ ││ -/// │ └─────┘ │ └────┼▶│ "Q" │ │ ││ │ │ ││ ││ │ │ ││ -/// └─────────────┘ │ └────────┘ │ ││ │┌─────┐ │ ││ ││ │┌─────┐ │ ││ -/// │ │ ││ │└─────┘ │ ││ ││ │└─────┘ │ ││ -/// └────────────┘ ││ └────────┘ ││ ││ └────────┘ ││ -/// │└────────────┘│ │└────────────┘│ -/// └──────────────┘ └──────────────┘ +/// ┌────────────┐ ┌──────────────┐ ┌──────────────┐ +/// │ ┌────────┐ │ │┌────────────┐│ │┌────────────┐│ +/// │ │ "A" │ │ ││accumulator ││ ││accumulator ││ +/// │ ├────────┤ │ ││ 0 ││ ││ N ││ +/// │ │ "Z" │ │ ││ ┌────────┐ ││ ││ ┌────────┐ ││ +/// │ └────────┘ │ ││ │ state │ ││ ││ │ state │ ││ +/// │ │ ││ │┌─────┐ │ ││ ... ││ │┌─────┐ │ ││ +/// │ ... │ ││ │├─────┤ │ ││ ││ │├─────┤ │ ││ +/// │ │ ││ │└─────┘ │ ││ ││ │└─────┘ │ ││ +/// │ │ ││ │ │ ││ ││ │ │ ││ +/// │ ┌────────┐ │ ││ │ ... │ ││ ││ │ ... │ ││ +/// │ │ "Q" │ │ ││ │ │ ││ ││ │ │ ││ +/// │ └────────┘ │ ││ │┌─────┐ │ ││ ││ │┌─────┐ │ ││ +/// │ │ ││ │└─────┘ │ ││ ││ │└─────┘ │ ││ +/// └────────────┘ ││ └────────┘ ││ ││ └────────┘ ││ +/// │└────────────┘│ │└────────────┘│ +/// └──────────────┘ └──────────────┘ /// -/// map group_values accumulators -/// (Hash Table) +/// group_values accumulators /// /// ``` /// @@ -109,10 +254,10 @@ use super::AggregateExec; /// /// # Description /// -/// The hash table does not store any aggregate state inline. It only -/// stores "group indices", one for each (distinct) group value. The +/// [`group_values`] does not store any aggregate state inline. It only +/// assigns "group indices", one for each (distinct) group value. The /// accumulators manage the in-progress aggregate state for each -/// group, and the group values themselves are stored in +/// group, with the group values themselves are stored in /// [`group_values`] at the corresponding group index. /// /// The accumulator state (e.g partial sums) is managed by and stored @@ -153,40 +298,18 @@ pub(crate) struct GroupedHashAggregateStream { /// the filter expression is `x > 100`. filter_expressions: Vec>>, - /// Converter for the group values - row_converter: RowConverter, - /// GROUP BY expressions group_by: PhysicalGroupBy, /// The memory reservation for this grouping reservation: MemoryReservation, - /// Logically maps group values to a group_index in - /// [`Self::group_values`] and in each accumulator - /// - /// Uses the raw API of hashbrown to avoid actually storing the - /// keys (group values) in the table - /// - /// keys: u64 hashes of the GroupValue - /// values: (hash, group_index) - map: RawTable<(u64, usize)>, - - /// The actual group by values, stored in arrow [`Row`] format. - /// `group_values[i]` holds the group value for group_index `i`. - /// - /// The row format is used to compare group keys quickly and store - /// them efficiently in memory. Quick comparison is especially - /// important for multi-column group keys. - /// - /// [`Row`]: arrow::row::Row - group_values: Rows, + /// An interning store of group keys + group_values: Box, /// scratch space for the current input [`RecordBatch`] being - /// processed. The reason this is a field is so it can be reused - /// for all input batches, avoiding the need to reallocate Vecs on - /// each input. - scratch_space: ScratchSpace, + /// processed. Reused across batches here to avoid reallocations + current_group_indices: Vec, /// Tracks if this stream is generating input or output exec_state: ExecutionState, @@ -194,9 +317,6 @@ pub(crate) struct GroupedHashAggregateStream { /// Execution metrics baseline_metrics: BaselineMetrics, - /// Random state for creating hashes - random_state: RandomState, - /// max rows in output RecordBatches batch_size: usize, } @@ -245,18 +365,10 @@ impl GroupedHashAggregateStream { .collect::>()?; let group_schema = group_schema(&agg_schema, agg_group_by.expr.len()); - let row_converter = RowConverter::new( - group_schema - .fields() - .iter() - .map(|f| SortField::new(f.data_type().clone())) - .collect(), - )?; let name = format!("GroupedHashAggregateStream[{partition}]"); let reservation = MemoryConsumer::new(name).register(context.memory_pool()); - let map = RawTable::with_capacity(0); - let group_values = row_converter.empty_rows(0, 0); + let group = Box::new(GroupValuesRows::try_new(group_schema)?); timer.done(); @@ -269,15 +381,12 @@ impl GroupedHashAggregateStream { accumulators, aggregate_arguments, filter_expressions, - row_converter, group_by: agg_group_by, reservation, - map, - group_values, - scratch_space: ScratchSpace::new(), + group_values: group, + current_group_indices: Default::default(), exec_state, baseline_metrics, - random_state: Default::default(), batch_size, }) } @@ -330,11 +439,9 @@ impl Stream for GroupedHashAggregateStream { // // Also this means we either store the // whole record batch or not. - let result = result.and_then(|allocated| { - self.reservation.try_grow(allocated) - }); - - if let Err(e) = result { + if let Err(e) = + result.and_then(|_| self.update_memory_reservation()) + { return Poll::Ready(Some(Err(e))); } } @@ -385,102 +492,25 @@ impl RecordBatchStream for GroupedHashAggregateStream { } impl GroupedHashAggregateStream { - /// Calculates the group indices for each input row of - /// `group_values`. - /// - /// At the return of this function, - /// `self.scratch_space.current_group_indices` has the same number - /// of entries as each array in `group_values` and holds the - /// correct group_index for that row. - /// - /// This is one of the core hot loops in the algorithm - fn update_group_state( - &mut self, - group_values: &[ArrayRef], - allocated: &mut usize, - ) -> Result<()> { - // Convert the group keys into the row format - // Avoid reallocation when https://github.com/apache/arrow-rs/issues/4479 is available - let group_rows = self.row_converter.convert_columns(group_values)?; - let n_rows = group_rows.num_rows(); - - // track memory used - let group_values_size_pre = self.group_values.size(); - let scratch_size_pre = self.scratch_space.size(); - - // tracks to which group each of the input rows belongs - let group_indices = &mut self.scratch_space.current_group_indices; - group_indices.clear(); - - // 1.1 Calculate the group keys for the group values - let batch_hashes = &mut self.scratch_space.hashes_buffer; - batch_hashes.clear(); - batch_hashes.resize(n_rows, 0); - create_hashes(group_values, &self.random_state, batch_hashes)?; - - for (row, &hash) in batch_hashes.iter().enumerate() { - let entry = self.map.get_mut(hash, |(_hash, group_idx)| { - // verify that a group that we are inserting with hash is - // actually the same key value as the group in - // existing_idx (aka group_values @ row) - group_rows.row(row) == self.group_values.row(*group_idx) - }); - - let group_idx = match entry { - // Existing group_index for this group value - Some((_hash, group_idx)) => *group_idx, - // 1.2 Need to create new entry for the group - None => { - // Add new entry to aggr_state and save newly created index - let group_idx = self.group_values.num_rows(); - self.group_values.push(group_rows.row(row)); - - // for hasher function, use precomputed hash value - self.map.insert_accounted( - (hash, group_idx), - |(hash, _group_index)| *hash, - allocated, - ); - group_idx - } - }; - group_indices.push(group_idx); - } - - // account for memory growth in scratch space - *allocated += self.scratch_space.size(); - *allocated -= scratch_size_pre; // subtract after adding to avoid underflow - - // account for any memory increase used to store group_values - *allocated += self.group_values.size(); - *allocated -= group_values_size_pre; // subtract after adding to avoid underflow - - Ok(()) - } - /// Perform group-by aggregation for the given [`RecordBatch`]. /// /// If successful, returns the additional amount of memory, in /// bytes, that were allocated during this process. - fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result { + fn group_aggregate_batch(&mut self, batch: RecordBatch) -> Result<()> { // Evaluate the grouping expressions let group_by_values = evaluate_group_by(&self.group_by, &batch)?; - // Keep track of memory allocated: - let mut allocated = 0usize; - // Evaluate the aggregation expressions. let input_values = evaluate_many(&self.aggregate_arguments, &batch)?; // Evaluate the filter expressions, if any, against the inputs let filter_values = evaluate_optional(&self.filter_expressions, &batch)?; - let row_converter_size_pre = self.row_converter.size(); - for group_values in &group_by_values { // calculate the group indices for each input row - self.update_group_state(group_values, &mut allocated)?; - let group_indices = &self.scratch_space.current_group_indices; + self.group_values + .intern(group_values, &mut self.current_group_indices)?; + let group_indices = &self.current_group_indices; // Gather the inputs to call the actual accumulator let t = self @@ -489,10 +519,8 @@ impl GroupedHashAggregateStream { .zip(input_values.iter()) .zip(filter_values.iter()); - let total_num_groups = self.group_values.num_rows(); - + let total_num_groups = self.group_values.len(); for ((acc, values), opt_filter) in t { - let acc_size_pre = acc.size(); let opt_filter = opt_filter.as_ref().map(|filter| filter.as_boolean()); // Call the appropriate method on each aggregator with @@ -519,27 +547,28 @@ impl GroupedHashAggregateStream { )?; } } - - allocated += acc.size(); - allocated -= acc_size_pre; } } - allocated += self.row_converter.size(); - allocated -= row_converter_size_pre; - Ok(allocated) + Ok(()) + } + + fn update_memory_reservation(&mut self) -> Result<()> { + let acc = self.accumulators.iter().map(|x| x.size()).sum::(); + self.reservation.try_resize( + acc + self.group_values.size() + self.current_group_indices.allocated_size(), + ) } /// Create an output RecordBatch with all group keys and accumulator states/values fn create_batch_from_map(&mut self) -> Result { - if self.group_values.num_rows() == 0 { + if self.group_values.is_empty() { let schema = self.schema.clone(); return Ok(RecordBatch::new_empty(schema)); } // First output rows are the groups - let groups_rows = self.group_values.iter(); - let mut output: Vec = self.row_converter.convert_rows(groups_rows)?; + let mut output = self.group_values.flush()?; // Next output each aggregate value, from the accumulators for acc in self.accumulators.iter_mut() { @@ -555,27 +584,3 @@ impl GroupedHashAggregateStream { Ok(RecordBatch::try_new(self.schema.clone(), output)?) } } - -/// Holds structures used for the current input [`RecordBatch`] being -/// processed. Reused across batches here to avoid reallocations -#[derive(Debug, Default)] -struct ScratchSpace { - /// scratch space for the current input [`RecordBatch`] being - /// processed. Reused across batches here to avoid reallocations - current_group_indices: Vec, - // buffer to be reused to store hashes - hashes_buffer: Vec, -} - -impl ScratchSpace { - fn new() -> Self { - Default::default() - } - - /// Return the amount of memory alocated by this structure in bytes - fn size(&self) -> usize { - std::mem::size_of_val(self) - + self.current_group_indices.allocated_size() - + self.hashes_buffer.allocated_size() - } -}