From d23e48fc1cd0b988ebfca38b5d70516b33541999 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 10 Jul 2023 10:00:26 -0400 Subject: [PATCH] Minor: Add some more doc comments to `BoundedAggregateStream` (#6881) * Minor: Add docs to BoundedAggregateStream * Update datafusion/core/src/physical_plan/aggregates/mod.rs Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> --------- Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> --- datafusion/common/src/utils.rs | 7 +- .../aggregates/bounded_aggregate_stream.rs | 84 ++++++++++++------- .../core/src/physical_plan/aggregates/mod.rs | 32 +++++-- 3 files changed, 83 insertions(+), 40 deletions(-) diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs index 2edcd07846da..c324e2079dc6 100644 --- a/datafusion/common/src/utils.rs +++ b/datafusion/common/src/utils.rs @@ -177,9 +177,10 @@ where Ok(low) } -/// This function finds the partition points according to `partition_columns`. -/// If there are no sort columns, then the result will be a single element -/// vector containing one partition range spanning all data. +/// Given a list of 0 or more already sorted columns, finds the +/// partition ranges that would partition equally across columns. +/// +/// See [`lexicographical_partition_ranges`] for more details. pub fn evaluate_partition_ranges( num_rows: usize, partition_columns: &[SortColumn], diff --git a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs index 4bbac3c4a52a..a89ef3aaffc8 100644 --- a/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs +++ b/datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs @@ -115,7 +115,9 @@ pub(crate) struct BoundedAggregateStream { /// first element in the array corresponds to normal accumulators /// second element in the array corresponds to row accumulators indices: [Vec>; 2], + /// Information on how the input of this group is ordered aggregation_ordering: AggregationOrdering, + /// Has this stream finished producing output is_end: bool, } @@ -275,11 +277,11 @@ impl Stream for BoundedAggregateStream { } // inner had error, return to caller Some(Err(e)) => return Poll::Ready(Some(Err(e))), - // inner is done, producing output + // inner is done, switch to producing output None => { - for element in self.aggr_state.ordered_group_states.iter_mut() - { - element.status = GroupStatus::CanEmit; + let states = self.aggr_state.ordered_group_states.iter_mut(); + for state in states { + state.status = GroupStatus::CanEmit; } self.exec_state = ExecutionState::ProducingOutput; } @@ -297,6 +299,7 @@ impl Stream for BoundedAggregateStream { Ok(Some(result)) => { let batch = result.record_output(&self.baseline_metrics); self.row_group_skip_position += batch.num_rows(); + // try to read more input self.exec_state = ExecutionState::ReadingInput; self.prune(); return Poll::Ready(Some(Ok(batch))); @@ -325,18 +328,22 @@ impl RecordBatchStream for BoundedAggregateStream { /// indices for a group. This information is used when executing streaming /// GROUP BY calculations. struct GroupOrderInfo { + /// The group by key owned_row: OwnedRow, + /// the hash value of the group hash: u64, + /// the range of row indices in the input batch that belong to this group range: Range, } impl BoundedAggregateStream { - // Update the aggr_state according to group_by values (result of group_by_expressions) when group by - // expressions are fully ordered. - fn update_ordered_group_state( + /// Update the aggr_state hash table according to group_by values + /// (result of group_by_expressions) when group by expressions are + /// fully ordered. + fn update_fully_ordered_group_state( &mut self, group_values: &[ArrayRef], - per_group_indices: Vec, + per_group_order_info: Vec, allocated: &mut usize, ) -> Result> { // 1.1 construct the key from the group values @@ -348,7 +355,7 @@ impl BoundedAggregateStream { let AggregationState { map: row_map, - ordered_group_states: row_group_states, + ordered_group_states, .. } = &mut self.aggr_state; @@ -356,13 +363,13 @@ impl BoundedAggregateStream { owned_row, hash, range, - } in per_group_indices + } in per_group_order_info { let entry = row_map.get_mut(hash, |(_hash, group_idx)| { // verify that a group that we are inserting with hash is // actually the same key value as the group in // existing_idx (aka group_values @ row) - let ordered_group_state = &row_group_states[*group_idx]; + let ordered_group_state = &ordered_group_states[*group_idx]; let group_state = &ordered_group_state.group_state; owned_row.row() == group_state.group_by_values.row() }); @@ -370,7 +377,7 @@ impl BoundedAggregateStream { match entry { // Existing entry for this group value Some((_hash, group_idx)) => { - let group_state = &mut row_group_states[*group_idx].group_state; + let group_state = &mut ordered_group_states[*group_idx].group_state; // 1.3 if group_state.indices.is_empty() { @@ -385,6 +392,7 @@ impl BoundedAggregateStream { None => { let accumulator_set = aggregates::create_accumulators(&self.normal_aggr_expr)?; + // Save the value of the ordering columns as Vec let row = get_row_at_idx(group_values, range.start)?; let ordered_columns = self .aggregation_ordering @@ -392,6 +400,7 @@ impl BoundedAggregateStream { .iter() .map(|idx| row[*idx].clone()) .collect::>(); + // Add new entry to group_states and save newly created index let group_state = GroupState { group_by_values: owned_row, @@ -403,7 +412,7 @@ impl BoundedAggregateStream { indices: (range.start as u32..range.end as u32) .collect::>(), // 1.3 }; - let group_idx = row_group_states.len(); + let group_idx = ordered_group_states.len(); // NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by // `group_states` (see allocation down below) @@ -431,10 +440,10 @@ impl BoundedAggregateStream { let ordered_group_state = OrderedGroupState { group_state, ordered_columns, - status: GroupStatus::GroupProgress, + status: GroupStatus::GroupInProgress, hash, }; - row_group_states.push_accounted(ordered_group_state, allocated); + ordered_group_states.push_accounted(ordered_group_state, allocated); groups_with_rows.push(group_idx); } @@ -538,7 +547,7 @@ impl BoundedAggregateStream { let ordered_group_state = OrderedGroupState { group_state, ordered_columns, - status: GroupStatus::GroupProgress, + status: GroupStatus::GroupInProgress, hash, }; group_states.push_accounted(ordered_group_state, allocated); @@ -707,6 +716,7 @@ impl BoundedAggregateStream { let row_converter_size_pre = self.row_converter.size(); for group_values in &group_by_values { + // If the input is fully sorted on its grouping keys let groups_with_rows = if let AggregationOrdering { mode: GroupByOrderMode::FullyOrdered, order_indices, @@ -727,8 +737,9 @@ impl BoundedAggregateStream { }) .collect::>(); let n_rows = group_rows.num_rows(); + // determine the boundaries between groups let ranges = evaluate_partition_ranges(n_rows, &sort_column)?; - let per_group_indices = ranges + let per_group_order_info = ranges .into_iter() .map(|range| GroupOrderInfo { owned_row: group_rows.row(range.start).owned(), @@ -736,9 +747,9 @@ impl BoundedAggregateStream { range, }) .collect::>(); - self.update_ordered_group_state( + self.update_fully_ordered_group_state( group_values, - per_group_indices, + per_group_order_info, &mut allocated, )? } else { @@ -835,22 +846,30 @@ impl BoundedAggregateStream { } } +/// Tracks the state of the ordered grouping #[derive(Debug, PartialEq)] enum GroupStatus { - // `GroupProgress` means data for current group is not complete. New data may arrive. - GroupProgress, - // `CanEmit` means data for current group is completed. And its result can emitted. + /// Data for current group is not complete, and new data may yet + /// arrive. + GroupInProgress, + /// Data for current group is completed, and its result can emitted. CanEmit, - // Emitted means that result for the groups is outputted. Group can be pruned from state. + /// Result for the groups has been successfully emitted, and group + /// state can be pruned. Emitted, } -/// The state that is built for each output group. +/// Information about the order of the state that is built for each +/// output group. #[derive(Debug)] pub struct OrderedGroupState { + /// Aggregate values group_state: GroupState, + /// The actual value of the ordered columns for this group ordered_columns: Vec, + /// Can we emit this group? status: GroupStatus, + /// Hash value of the group hash: u64, } @@ -883,16 +902,23 @@ impl std::fmt::Debug for AggregationState { } impl BoundedAggregateStream { - /// Prune the groups from the `self.aggr_state.group_states` which are in - /// `GroupStatus::Emitted`(this status means that result of this group emitted/outputted already, and - /// we are sure that these groups cannot receive new rows.) status. + /// Prune the groups from `[Self::ordered_group_states]` which are in + /// [`GroupStatus::Emitted`]. + /// + /// Emitted means that the result of this group has already been + /// emitted, and we are sure that these groups can not receive new + /// rows. fn prune(&mut self) { + // clear out emitted groups let n_partition = self.aggr_state.ordered_group_states.len(); self.aggr_state .ordered_group_states .retain(|elem| elem.status != GroupStatus::Emitted); + let n_partition_new = self.aggr_state.ordered_group_states.len(); let n_pruned = n_partition - n_partition_new; + + // update hash table with the new indexes of the remaining groups self.aggr_state.map.clear(); for (idx, item) in self.aggr_state.ordered_group_states.iter().enumerate() { self.aggr_state @@ -920,7 +946,9 @@ impl BoundedAggregateStream { ); let group_state_chunk = &self.aggr_state.ordered_group_states[skip_items..end_idx]; - // Consider only the groups that can be emitted. (The ones we are sure that will not receive new entry.) + + // Consider only the groups that can be emitted. (The ones we + // are sure that will not receive new entry.) let group_state_chunk = group_state_chunk .iter() .filter(|item| item.status == GroupStatus::CanEmit) diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index 4bf5f664450e..60d483d8c800 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -81,19 +81,33 @@ pub enum AggregateMode { } /// Group By expression modes +/// +/// `PartiallyOrdered` and `FullyOrdered` are used to reason about +/// when certain group by keys will never again be seen (and thus can +/// be emitted by the grouping operator). +/// +/// Specifically, each distinct combination of the relevant columns +/// are contiguous in the input, and once a new combination is seen +/// previous combinations are guaranteed never to appear again #[derive(Debug, Clone, PartialEq, Eq)] pub enum GroupByOrderMode { - /// None of the expressions in the GROUP BY clause have an ordering. + /// The input is not (known to be) ordered by any of the + /// expressions in the GROUP BY clause. None, - /// Some of the expressions in the GROUP BY clause have an ordering. - // For example, if the input is ordered by a, b, c and we group by b, a, d; - // the mode will be `PartiallyOrdered` meaning a subset of group b, a, d - // defines a preset for the existing ordering, e.g a, b defines a preset. + /// The input is known to be ordered by a preset (prefix but + /// possibly reordered) of the expressions in the `GROUP BY` clause. + /// + /// For example, if the input is ordered by `a, b, c` and we group + /// by `b, a, d`, `PartiallyOrdered` means a subset of group `b, + /// a, d` defines a preset for the existing ordering, in this case + /// `a, b`. PartiallyOrdered, - /// All the expressions in the GROUP BY clause have orderings. - // For example, if the input is ordered by a, b, c, d and we group by b, a; - // the mode will be `Ordered` meaning a all of the of group b, d - // defines a preset for the existing ordering, e.g a, b defines a preset. + /// The input is known to be ordered by *all* the expressions in the + /// `GROUP BY` clause. + /// + /// For example, if the input is ordered by `a, b, c, d` and we group by b, a, + /// `Ordered` means that all of the of group by expressions appear + /// as a preset for the existing ordering, in this case `a, b`. FullyOrdered, }