From 42215cf465915ea7d73afd0b8293ffc8f94eb688 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Thu, 5 Sep 2024 14:33:24 +0800 Subject: [PATCH] Simplify update_skip_aggregation_probe method --- .../src/aggregate/groups_accumulator.rs | 2 +- datafusion/physical-plan/src/aggregates/row_hash.rs | 13 +------------ 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs index 1c97d22ec79c..38c1298847db 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs @@ -209,7 +209,7 @@ impl GroupsAccumulatorAdapter { let values_to_accumulate = slice_and_maybe_filter(&values, opt_filter.as_ref(), offsets)?; - (f)(state.accumulator.as_mut(), &values_to_accumulate)?; + f(state.accumulator.as_mut(), &values_to_accumulate)?; // clear out the state so they are empty for next // iteration diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index c38137994d44..c749c6b5b179 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -188,13 +188,6 @@ impl SkipAggregationProbe { self.should_skip } - /// Provides an ability to externally set `should_skip` flag - /// to `false` and prohibit further state updates - fn forbid_skipping(&mut self) { - self.should_skip = false; - self.is_locked = true; - } - /// Record the number of rows that were output directly without aggregation fn record_skipped(&mut self, batch: &RecordBatch) { self.skipped_aggregation_rows.add(batch.num_rows()); @@ -1024,11 +1017,7 @@ impl GroupedHashAggregateStream { /// Note: currently spilling is not supported for Partial aggregation fn update_skip_aggregation_probe(&mut self, input_rows: usize) { if let Some(probe) = self.skip_aggregation_probe.as_mut() { - if !self.spill_state.spills.is_empty() { - probe.forbid_skipping(); - } else { - probe.update_state(input_rows, self.group_values.len()); - } + probe.update_state(input_rows, self.group_values.len()); }; }