Skip to content

Commit

Permalink
Minor: Add some more doc comments to BoundedAggregateStream (#6881)
Browse files Browse the repository at this point in the history
* Minor: Add docs to BoundedAggregateStream

* Update datafusion/core/src/physical_plan/aggregates/mod.rs

Co-authored-by: Mustafa Akur <[email protected]>

---------

Co-authored-by: Mustafa Akur <[email protected]>
  • Loading branch information
alamb and mustafasrepo authored Jul 10, 2023
1 parent 2585ae6 commit d23e48f
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 40 deletions.
7 changes: 4 additions & 3 deletions datafusion/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,10 @@ where
Ok(low)
}

/// This function finds the partition points according to `partition_columns`.
/// If there are no sort columns, then the result will be a single element
/// vector containing one partition range spanning all data.
/// Given a list of 0 or more already sorted columns, finds the
/// partition ranges that would partition equally across columns.
///
/// See [`lexicographical_partition_ranges`] for more details.
pub fn evaluate_partition_ranges(
num_rows: usize,
partition_columns: &[SortColumn],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ pub(crate) struct BoundedAggregateStream {
/// first element in the array corresponds to normal accumulators
/// second element in the array corresponds to row accumulators
indices: [Vec<Range<usize>>; 2],
/// Information on how the input of this group is ordered
aggregation_ordering: AggregationOrdering,
/// Has this stream finished producing output
is_end: bool,
}

Expand Down Expand Up @@ -275,11 +277,11 @@ impl Stream for BoundedAggregateStream {
}
// inner had error, return to caller
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
// inner is done, producing output
// inner is done, switch to producing output
None => {
for element in self.aggr_state.ordered_group_states.iter_mut()
{
element.status = GroupStatus::CanEmit;
let states = self.aggr_state.ordered_group_states.iter_mut();
for state in states {
state.status = GroupStatus::CanEmit;
}
self.exec_state = ExecutionState::ProducingOutput;
}
Expand All @@ -297,6 +299,7 @@ impl Stream for BoundedAggregateStream {
Ok(Some(result)) => {
let batch = result.record_output(&self.baseline_metrics);
self.row_group_skip_position += batch.num_rows();
// try to read more input
self.exec_state = ExecutionState::ReadingInput;
self.prune();
return Poll::Ready(Some(Ok(batch)));
Expand Down Expand Up @@ -325,18 +328,22 @@ impl RecordBatchStream for BoundedAggregateStream {
/// indices for a group. This information is used when executing streaming
/// GROUP BY calculations.
struct GroupOrderInfo {
/// The group by key
owned_row: OwnedRow,
/// the hash value of the group
hash: u64,
/// the range of row indices in the input batch that belong to this group
range: Range<usize>,
}

impl BoundedAggregateStream {
// Update the aggr_state according to group_by values (result of group_by_expressions) when group by
// expressions are fully ordered.
fn update_ordered_group_state(
/// Update the aggr_state hash table according to group_by values
/// (result of group_by_expressions) when group by expressions are
/// fully ordered.
fn update_fully_ordered_group_state(
&mut self,
group_values: &[ArrayRef],
per_group_indices: Vec<GroupOrderInfo>,
per_group_order_info: Vec<GroupOrderInfo>,
allocated: &mut usize,
) -> Result<Vec<usize>> {
// 1.1 construct the key from the group values
Expand All @@ -348,29 +355,29 @@ impl BoundedAggregateStream {

let AggregationState {
map: row_map,
ordered_group_states: row_group_states,
ordered_group_states,
..
} = &mut self.aggr_state;

for GroupOrderInfo {
owned_row,
hash,
range,
} in per_group_indices
} in per_group_order_info
{
let entry = row_map.get_mut(hash, |(_hash, group_idx)| {
// verify that a group that we are inserting with hash is
// actually the same key value as the group in
// existing_idx (aka group_values @ row)
let ordered_group_state = &row_group_states[*group_idx];
let ordered_group_state = &ordered_group_states[*group_idx];
let group_state = &ordered_group_state.group_state;
owned_row.row() == group_state.group_by_values.row()
});

match entry {
// Existing entry for this group value
Some((_hash, group_idx)) => {
let group_state = &mut row_group_states[*group_idx].group_state;
let group_state = &mut ordered_group_states[*group_idx].group_state;

// 1.3
if group_state.indices.is_empty() {
Expand All @@ -385,13 +392,15 @@ impl BoundedAggregateStream {
None => {
let accumulator_set =
aggregates::create_accumulators(&self.normal_aggr_expr)?;
// Save the value of the ordering columns as Vec<ScalarValue>
let row = get_row_at_idx(group_values, range.start)?;
let ordered_columns = self
.aggregation_ordering
.order_indices
.iter()
.map(|idx| row[*idx].clone())
.collect::<Vec<_>>();

// Add new entry to group_states and save newly created index
let group_state = GroupState {
group_by_values: owned_row,
Expand All @@ -403,7 +412,7 @@ impl BoundedAggregateStream {
indices: (range.start as u32..range.end as u32)
.collect::<Vec<_>>(), // 1.3
};
let group_idx = row_group_states.len();
let group_idx = ordered_group_states.len();

// NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by
// `group_states` (see allocation down below)
Expand Down Expand Up @@ -431,10 +440,10 @@ impl BoundedAggregateStream {
let ordered_group_state = OrderedGroupState {
group_state,
ordered_columns,
status: GroupStatus::GroupProgress,
status: GroupStatus::GroupInProgress,
hash,
};
row_group_states.push_accounted(ordered_group_state, allocated);
ordered_group_states.push_accounted(ordered_group_state, allocated);

groups_with_rows.push(group_idx);
}
Expand Down Expand Up @@ -538,7 +547,7 @@ impl BoundedAggregateStream {
let ordered_group_state = OrderedGroupState {
group_state,
ordered_columns,
status: GroupStatus::GroupProgress,
status: GroupStatus::GroupInProgress,
hash,
};
group_states.push_accounted(ordered_group_state, allocated);
Expand Down Expand Up @@ -707,6 +716,7 @@ impl BoundedAggregateStream {

let row_converter_size_pre = self.row_converter.size();
for group_values in &group_by_values {
// If the input is fully sorted on its grouping keys
let groups_with_rows = if let AggregationOrdering {
mode: GroupByOrderMode::FullyOrdered,
order_indices,
Expand All @@ -727,18 +737,19 @@ impl BoundedAggregateStream {
})
.collect::<Vec<_>>();
let n_rows = group_rows.num_rows();
// determine the boundaries between groups
let ranges = evaluate_partition_ranges(n_rows, &sort_column)?;
let per_group_indices = ranges
let per_group_order_info = ranges
.into_iter()
.map(|range| GroupOrderInfo {
owned_row: group_rows.row(range.start).owned(),
hash: batch_hashes[range.start],
range,
})
.collect::<Vec<_>>();
self.update_ordered_group_state(
self.update_fully_ordered_group_state(
group_values,
per_group_indices,
per_group_order_info,
&mut allocated,
)?
} else {
Expand Down Expand Up @@ -835,22 +846,30 @@ impl BoundedAggregateStream {
}
}

/// Tracks the state of the ordered grouping
#[derive(Debug, PartialEq)]
enum GroupStatus {
// `GroupProgress` means data for current group is not complete. New data may arrive.
GroupProgress,
// `CanEmit` means data for current group is completed. And its result can emitted.
/// Data for current group is not complete, and new data may yet
/// arrive.
GroupInProgress,
/// Data for current group is completed, and its result can emitted.
CanEmit,
// Emitted means that result for the groups is outputted. Group can be pruned from state.
/// Result for the groups has been successfully emitted, and group
/// state can be pruned.
Emitted,
}

/// The state that is built for each output group.
/// Information about the order of the state that is built for each
/// output group.
#[derive(Debug)]
pub struct OrderedGroupState {
/// Aggregate values
group_state: GroupState,
/// The actual value of the ordered columns for this group
ordered_columns: Vec<ScalarValue>,
/// Can we emit this group?
status: GroupStatus,
/// Hash value of the group
hash: u64,
}

Expand Down Expand Up @@ -883,16 +902,23 @@ impl std::fmt::Debug for AggregationState {
}

impl BoundedAggregateStream {
/// Prune the groups from the `self.aggr_state.group_states` which are in
/// `GroupStatus::Emitted`(this status means that result of this group emitted/outputted already, and
/// we are sure that these groups cannot receive new rows.) status.
/// Prune the groups from `[Self::ordered_group_states]` which are in
/// [`GroupStatus::Emitted`].
///
/// Emitted means that the result of this group has already been
/// emitted, and we are sure that these groups can not receive new
/// rows.
fn prune(&mut self) {
// clear out emitted groups
let n_partition = self.aggr_state.ordered_group_states.len();
self.aggr_state
.ordered_group_states
.retain(|elem| elem.status != GroupStatus::Emitted);

let n_partition_new = self.aggr_state.ordered_group_states.len();
let n_pruned = n_partition - n_partition_new;

// update hash table with the new indexes of the remaining groups
self.aggr_state.map.clear();
for (idx, item) in self.aggr_state.ordered_group_states.iter().enumerate() {
self.aggr_state
Expand Down Expand Up @@ -920,7 +946,9 @@ impl BoundedAggregateStream {
);
let group_state_chunk =
&self.aggr_state.ordered_group_states[skip_items..end_idx];
// Consider only the groups that can be emitted. (The ones we are sure that will not receive new entry.)

// Consider only the groups that can be emitted. (The ones we
// are sure that will not receive new entry.)
let group_state_chunk = group_state_chunk
.iter()
.filter(|item| item.status == GroupStatus::CanEmit)
Expand Down
32 changes: 23 additions & 9 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,33 @@ pub enum AggregateMode {
}

/// Group By expression modes
///
/// `PartiallyOrdered` and `FullyOrdered` are used to reason about
/// when certain group by keys will never again be seen (and thus can
/// be emitted by the grouping operator).
///
/// Specifically, each distinct combination of the relevant columns
/// are contiguous in the input, and once a new combination is seen
/// previous combinations are guaranteed never to appear again
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum GroupByOrderMode {
/// None of the expressions in the GROUP BY clause have an ordering.
/// The input is not (known to be) ordered by any of the
/// expressions in the GROUP BY clause.
None,
/// Some of the expressions in the GROUP BY clause have an ordering.
// For example, if the input is ordered by a, b, c and we group by b, a, d;
// the mode will be `PartiallyOrdered` meaning a subset of group b, a, d
// defines a preset for the existing ordering, e.g a, b defines a preset.
/// The input is known to be ordered by a preset (prefix but
/// possibly reordered) of the expressions in the `GROUP BY` clause.
///
/// For example, if the input is ordered by `a, b, c` and we group
/// by `b, a, d`, `PartiallyOrdered` means a subset of group `b,
/// a, d` defines a preset for the existing ordering, in this case
/// `a, b`.
PartiallyOrdered,
/// All the expressions in the GROUP BY clause have orderings.
// For example, if the input is ordered by a, b, c, d and we group by b, a;
// the mode will be `Ordered` meaning a all of the of group b, d
// defines a preset for the existing ordering, e.g a, b defines a preset.
/// The input is known to be ordered by *all* the expressions in the
/// `GROUP BY` clause.
///
/// For example, if the input is ordered by `a, b, c, d` and we group by b, a,
/// `Ordered` means that all of the of group by expressions appear
/// as a preset for the existing ordering, in this case `a, b`.
FullyOrdered,
}

Expand Down

0 comments on commit d23e48f

Please sign in to comment.