diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 27870c7865f3..b09554246d6c 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -115,22 +115,14 @@ fn take_optimizable(node: &dyn ExecutionPlan) -> Option> if !final_agg_exec.mode().is_first_stage() && final_agg_exec.group_expr().is_empty() { - let mut child = Arc::clone(final_agg_exec.input()); - loop { - if let Some(partial_agg_exec) = - child.as_any().downcast_ref::() + let child = final_agg_exec.input(); + if let Some(partial_agg_exec) = child.as_any().downcast_ref::() + { + if partial_agg_exec.mode().is_first_stage() + && partial_agg_exec.group_expr().is_empty() + && partial_agg_exec.filter_expr().iter().all(|e| e.is_none()) { - if partial_agg_exec.mode().is_first_stage() - && partial_agg_exec.group_expr().is_empty() - && partial_agg_exec.filter_expr().iter().all(|e| e.is_none()) - { - return Some(child); - } - } - if let [childrens_child] = child.children().as_slice() { - child = Arc::clone(childrens_child); - } else { - break; + return Some(Arc::clone(child)); } } } @@ -167,7 +159,6 @@ mod tests { use datafusion_expr_common::operator::Operator; use datafusion_physical_plan::aggregates::PhysicalGroupBy; - use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::common; use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::memory::MemoryExec; @@ -362,70 +353,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_count_partial_indirect_child() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - let agg = TestAggregate::new_count_star(); - - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::default(), - vec![Arc::new(agg.count_expr(&schema))], - vec![None], - source, - Arc::clone(&schema), - )?; - - // We introduce an intermediate optimization step between the partial and final aggregtator - let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg)); - - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![Arc::new(agg.count_expr(&schema))], - vec![None], - Arc::new(coalesce), - Arc::clone(&schema), - )?; - - assert_count_optim_success(final_agg, agg).await?; - - Ok(()) - } - - #[tokio::test] - async fn test_count_partial_with_nulls_indirect_child() -> Result<()> { - let source = mock_data()?; - let schema = source.schema(); - let agg = TestAggregate::new_count_column(&schema); - - let partial_agg = AggregateExec::try_new( - AggregateMode::Partial, - PhysicalGroupBy::default(), - vec![Arc::new(agg.count_expr(&schema))], - vec![None], - source, - Arc::clone(&schema), - )?; - - // We introduce an intermediate optimization step between the partial and final aggregtator - let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg)); - - let final_agg = AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![Arc::new(agg.count_expr(&schema))], - vec![None], - Arc::new(coalesce), - Arc::clone(&schema), - )?; - - assert_count_optim_success(final_agg, agg).await?; - - Ok(()) - } - #[tokio::test] async fn test_count_inexact_stat() -> Result<()> { let source = mock_data()?;