Skip to content

Commit

Permalink
Simplify update_skip_aggregation_probe method
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Sep 5, 2024
1 parent 91b1d2b commit 42215cf
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl GroupsAccumulatorAdapter {

let values_to_accumulate =
slice_and_maybe_filter(&values, opt_filter.as_ref(), offsets)?;
(f)(state.accumulator.as_mut(), &values_to_accumulate)?;
f(state.accumulator.as_mut(), &values_to_accumulate)?;

// clear out the state so they are empty for next
// iteration
Expand Down
13 changes: 1 addition & 12 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,6 @@ impl SkipAggregationProbe {
self.should_skip
}

/// Provides an ability to externally set `should_skip` flag
/// to `false` and prohibit further state updates
fn forbid_skipping(&mut self) {
self.should_skip = false;
self.is_locked = true;
}

/// Record the number of rows that were output directly without aggregation
fn record_skipped(&mut self, batch: &RecordBatch) {
self.skipped_aggregation_rows.add(batch.num_rows());
Expand Down Expand Up @@ -1024,11 +1017,7 @@ impl GroupedHashAggregateStream {
/// Note: currently spilling is not supported for Partial aggregation
fn update_skip_aggregation_probe(&mut self, input_rows: usize) {
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
if !self.spill_state.spills.is_empty() {
probe.forbid_skipping();
} else {
probe.update_state(input_rows, self.group_values.len());
}
probe.update_state(input_rows, self.group_values.len());
};
}

Expand Down

0 comments on commit 42215cf

Please sign in to comment.