Skip to content

Commit

Permalink
Improve aggregatation documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 28, 2024
1 parent 9b7e4c8 commit 1d96e25
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 8 deletions.
9 changes: 7 additions & 2 deletions datafusion/expr/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ pub trait GroupsAccumulator: Send {
/// Returns the intermediate aggregate state for this accumulator,
/// used for multi-phase grouping, resetting its internal state.
///
/// See [`Accumulator::state`] for more information on multi-phase
/// aggregation.
///
/// For example, `AVG` might return two arrays: `SUM` and `COUNT`
/// but the `MIN` aggregate would just return a single array.
///
Expand Down Expand Up @@ -177,7 +180,9 @@ pub trait GroupsAccumulator: Send {
}

/// Amount of memory used to store the state of this accumulator,
/// in bytes. This function is called once per batch, so it should
/// be `O(n)` to compute, not `O(num_groups)`
/// in bytes.
///
/// This function is called once per batch, so it should be `O(n)` to
/// compute, not `O(num_groups)`
fn size(&self) -> usize;
}
5 changes: 4 additions & 1 deletion datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,16 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {

/// Return the fields used to store the intermediate state of this accumulator.
///
/// See [`Accumulator::state`] for background information.
///
/// # Arguments:
/// 1. `name`: the name of the expression (e.g. AVG, SUM, etc)
/// 2. `value_type`: Aggregate function output returned by [`Self::return_type`] if defined, otherwise
/// it is equivalent to the data type of the first arguments
/// 3. `ordering_fields`: the fields used to order the input arguments, if any.
/// Empty if no ordering expression is provided.
///
///
/// # Notes:
///
/// The default implementation returns a single state field named `name`
Expand Down Expand Up @@ -384,7 +387,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
/// # Notes
///
/// Even if this function returns true, DataFusion will still use
/// `Self::accumulator` for certain queries, such as when this aggregate is
/// [`Self::accumulator`] for certain queries, such as when this aggregate is
/// used as a window function or when there no GROUP BY columns in the
/// query.
fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
Expand Down
18 changes: 15 additions & 3 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,20 @@ pub use datafusion_expr::AggregateFunction;
pub use datafusion_physical_expr::expressions::create_aggregate_expr;

/// Hash aggregate modes
///
/// See [`Accumulator::state`] for background information on multi-phase
/// aggregation and how these modes are used.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum AggregateMode {
/// Partial aggregate that can be applied in parallel across input partitions
/// Partial aggregate that can be applied in parallel across input
/// partitions.
///
/// This is the first phase of a multi-phase aggregation.
Partial,
/// Final aggregate that produces a single partition of output
/// Final aggregate that produces a single partition of output by combining
/// the output of multiple partial aggregates.
///
/// This is the second phase of a multi-phase aggregation.
Final,
/// Final aggregate that works on pre-partitioned data.
///
Expand All @@ -75,12 +84,15 @@ pub enum AggregateMode {
/// Applies the entire logical aggregation operation in a single operator,
/// as opposed to Partial / Final modes which apply the logical aggregation using
/// two operators.
///
/// This mode requires that the input is a single partition (like Final)
Single,
/// Applies the entire logical aggregation operation in a single operator,
/// as opposed to Partial / Final modes which apply the logical aggregation using
/// two operators.
/// This mode requires that the input is partitioned by group key (like FinalPartitioned)
///
/// This mode requires that the input is partitioned by group key (like
/// FinalPartitioned)
SinglePartitioned,
}

Expand Down
14 changes: 12 additions & 2 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl SkipAggregationProbe {
/// of `x` and one accumulator for `SUM(y)`, specialized for the data
/// type of `y`.
///
/// # Description
/// # Discussion
///
/// [`group_values`] does not store any aggregate state inline. It only
/// assigns "group indices", one for each (distinct) group value. The
Expand All @@ -222,7 +222,17 @@ impl SkipAggregationProbe {
///
/// [`group_values`]: Self::group_values
///
/// # Spilling
/// # Partial Aggregate and multi-phase grouping
///
/// As described on [`Accumulator::state`], this operator is used in the context
/// "multi-phase" grouping when the mode is [`AggregateMode::Partial`].
///
/// An important optimization for partial aggregation
///
///
/// [`Accumulator::state`]: datafusion_expr::Accumulator::state
///
/// # Spilling (to disk)
///
/// The sizes of group values and accumulators can become large. Before that causes out of memory,
/// this hash aggregator outputs partial states early for partial aggregation or spills to local
Expand Down

0 comments on commit 1d96e25

Please sign in to comment.