From e152ce5c4c10ce5925bbe27428be361e0fc78127 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 31 May 2024 15:25:18 +0300 Subject: [PATCH 1/4] Initial commit --- .../physical-expr-common/src/aggregate/mod.rs | 29 +++++++++++++++ .../physical-expr/src/aggregate/count.rs | 15 ++++++++ .../src/window/sliding_aggregate.rs | 25 +++++++++++++ .../physical-expr/src/window/window_expr.rs | 35 +++++++++++++++++++ .../physical-plan/src/aggregates/mod.rs | 10 +++--- 5 files changed, 109 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs b/datafusion/physical-expr-common/src/aggregate/mod.rs index 503e2d8f9758..6ac60c6b6971 100644 --- a/datafusion/physical-expr-common/src/aggregate/mod.rs +++ b/datafusion/physical-expr-common/src/aggregate/mod.rs @@ -185,8 +185,37 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq { fn create_sliding_accumulator(&self) -> Result> { not_impl_err!("Retractable Accumulator hasn't been implemented for {self:?} yet") } + + /// Returns all expressions used in the [`AggregateExpr`]. + /// First entry in the tuple corresponds to function arguments + /// Second entry in the tuple corresponds to order by expressions. + fn all_expressions(&self) -> AggregatePhysicalExpressions { + let args = self.expressions(); + let order_bys = self.order_bys().unwrap_or(&[]); + let order_by_exprs = order_bys + .iter() + .map(|sort_expr| sort_expr.expr.clone()) + .collect::>(); + (args, order_by_exprs) + } + + /// Rewrites [`AggregateExpr`], with new expressions given. The argument should be consistent + /// with the return value of the [`AggregateExpr::all_expressions`] method. + /// Returns `Some(Arc)` if re-write is supported, otherwise returns `None`. + fn with_new_expressions( + &self, + _args: Vec>, + _order_by_exprs: Vec>, + ) -> Option> { + None + } } +/// Tuple contains the all physical expressions used in the aggregate expression +/// each entry corresponds to function arguments, order by expressions respectively +type AggregatePhysicalExpressions = + (Vec>, Vec>); + /// Physical aggregate expression of a UDAF. #[derive(Debug)] pub struct AggregateFunctionExpr { diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index e3660221e61a..aad18a82ab87 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -260,6 +260,21 @@ impl AggregateExpr for Count { // instantiate specialized accumulator Ok(Box::new(CountGroupsAccumulator::new())) } + + fn with_new_expressions( + &self, + args: Vec>, + order_by_exprs: Vec>, + ) -> Option> { + debug_assert_eq!(self.exprs.len(), args.len()); + debug_assert!(order_by_exprs.is_empty()); + Some(Arc::new(Count { + name: self.name.clone(), + data_type: self.data_type.clone(), + nullable: self.nullable, + exprs: args, + })) + } } impl PartialEq for Count { diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index 1494129cf897..961f0884dd87 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -141,6 +141,31 @@ impl WindowExpr for SlidingAggregateWindowExpr { fn uses_bounded_memory(&self) -> bool { !self.window_frame.end_bound.is_unbounded() } + + fn with_new_expressions( + &self, + args: Vec>, + partition_bys: Vec>, + order_by_exprs: Vec>, + ) -> Option> { + debug_assert_eq!(self.order_by.len(), order_by_exprs.len()); + + let new_order_by = self + .order_by + .iter() + .zip(order_by_exprs) + .map(|(req, new_expr)| PhysicalSortExpr { + expr: new_expr, + options: req.options, + }) + .collect::>(); + Some(Arc::new(SlidingAggregateWindowExpr { + aggregate: self.aggregate.with_new_expressions(args, vec![])?, + partition_by: partition_bys, + order_by: new_order_by, + window_frame: self.window_frame.clone(), + })) + } } impl AggregateWindowExpr for SlidingAggregateWindowExpr { diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index dd9514c69a45..ed38b824f89d 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -128,8 +128,43 @@ pub trait WindowExpr: Send + Sync + Debug { /// Get the reverse expression of this [WindowExpr]. fn get_reverse_expr(&self) -> Option>; + + /// Returns all expressions used in the [`WindowExpr`]. + /// First entry in the tuple corresponds to function arguments + /// Second entry in the tuple corresponds to partition by expressions. + /// Third entry in the tuple corresponds to order by expressions. + fn all_expressions(&self) -> WindowPhysicalExpressions { + let args = self.expressions(); + let partition_bys = self.partition_by().to_vec(); + let order_by_exprs = self + .order_by() + .iter() + .map(|sort_expr| sort_expr.expr.clone()) + .collect::>(); + (args, partition_bys, order_by_exprs) + } + + /// Rewrites [`WindowExpr`], with new expressions given. The argument should be consistent + /// with the return value of the [`WindowExpr::all_expressions`] method. + /// Returns `Some(Arc)` if re-write is supported, otherwise returns `None`. + fn with_new_expressions( + &self, + _args: Vec>, + _partition_bys: Vec>, + _order_by_exprs: Vec>, + ) -> Option> { + None + } } +/// Triple contains the all physical expressions used in the window expression +/// each entry corresponds to function arguments, partition by expressions, order by expressions respectively +type WindowPhysicalExpressions = ( + Vec>, + Vec>, + Vec>, +); + /// Extension trait that adds common functionality to [`AggregateWindowExpr`]s pub trait AggregateWindowExpr: WindowExpr { /// Get the accumulator for the window expression. Note that distinct diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 2bb95852ff43..e3d6a100eb6f 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -994,13 +994,13 @@ fn aggregate_expressions( | AggregateMode::SinglePartitioned => Ok(aggr_expr .iter() .map(|agg| { - let mut result = agg.expressions(); - // Append ordering requirements to expressions' results. This + let mut result = vec![]; + let (args, order_by_exprs) = agg.all_expressions(); + result.extend(args); + // Append order by expressions to the result. This // way order sensitive aggregators can satisfy requirement // themselves. - if let Some(ordering_req) = agg.order_bys() { - result.extend(ordering_req.iter().map(|item| item.expr.clone())); - } + result.extend(order_by_exprs); result }) .collect()), From bf957a1dd93331df6b2dd48054e9981a2f9a3526 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 31 May 2024 16:12:21 +0300 Subject: [PATCH 2/4] Minor changes --- .../physical-expr-common/src/aggregate/mod.rs | 16 +++++++++---- datafusion/physical-expr/src/lib.rs | 4 +++- .../physical-expr/src/window/window_expr.rs | 24 ++++++++++++------- .../physical-plan/src/aggregates/mod.rs | 10 +++++--- 4 files changed, 36 insertions(+), 18 deletions(-) diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs b/datafusion/physical-expr-common/src/aggregate/mod.rs index 6ac60c6b6971..2c8b49d5fea7 100644 --- a/datafusion/physical-expr-common/src/aggregate/mod.rs +++ b/datafusion/physical-expr-common/src/aggregate/mod.rs @@ -196,7 +196,10 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq { .iter() .map(|sort_expr| sort_expr.expr.clone()) .collect::>(); - (args, order_by_exprs) + AggregatePhysicalExpressions { + args, + order_by_exprs, + } } /// Rewrites [`AggregateExpr`], with new expressions given. The argument should be consistent @@ -211,10 +214,13 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq { } } -/// Tuple contains the all physical expressions used in the aggregate expression -/// each entry corresponds to function arguments, order by expressions respectively -type AggregatePhysicalExpressions = - (Vec>, Vec>); +/// Stores the physical expressions used inside the `AggregateExpr`. +pub struct AggregatePhysicalExpressions { + /// Aggregate function arguments + pub args: Vec>, + /// Order by expressions + pub order_by_exprs: Vec>, +} /// Physical aggregate expression of a UDAF. #[derive(Debug)] diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 1bdf082b2eaf..72f5f2d50cb8 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -41,7 +41,9 @@ pub mod execution_props { pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState}; pub use analysis::{analyze, AnalysisContext, ExprBoundaries}; -pub use datafusion_physical_expr_common::aggregate::AggregateExpr; +pub use datafusion_physical_expr_common::aggregate::{ + AggregateExpr, AggregatePhysicalExpressions, +}; pub use equivalence::EquivalenceProperties; pub use partitioning::{Distribution, Partitioning}; pub use physical_expr::{ diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index ed38b824f89d..d7a41aed3972 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -135,13 +135,17 @@ pub trait WindowExpr: Send + Sync + Debug { /// Third entry in the tuple corresponds to order by expressions. fn all_expressions(&self) -> WindowPhysicalExpressions { let args = self.expressions(); - let partition_bys = self.partition_by().to_vec(); + let partition_by_exprs = self.partition_by().to_vec(); let order_by_exprs = self .order_by() .iter() .map(|sort_expr| sort_expr.expr.clone()) .collect::>(); - (args, partition_bys, order_by_exprs) + WindowPhysicalExpressions { + args, + partition_by_exprs, + order_by_exprs, + } } /// Rewrites [`WindowExpr`], with new expressions given. The argument should be consistent @@ -157,13 +161,15 @@ pub trait WindowExpr: Send + Sync + Debug { } } -/// Triple contains the all physical expressions used in the window expression -/// each entry corresponds to function arguments, partition by expressions, order by expressions respectively -type WindowPhysicalExpressions = ( - Vec>, - Vec>, - Vec>, -); +/// Stores the physical expressions used inside the `WindowExpr`. +pub struct WindowPhysicalExpressions { + /// Window function arguments + pub args: Vec>, + /// PARTITION BY expressions + pub partition_by_exprs: Vec>, + /// ORDER BY expressions + pub order_by_exprs: Vec>, +} /// Extension trait that adds common functionality to [`AggregateWindowExpr`]s pub trait AggregateWindowExpr: WindowExpr { diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index e3d6a100eb6f..e5d36bb0cb48 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -42,8 +42,9 @@ use datafusion_expr::Accumulator; use datafusion_physical_expr::{ equivalence::{collapse_lex_req, ProjectionMapping}, expressions::{Column, Max, Min, UnKnownColumn}, - physical_exprs_contains, AggregateExpr, EquivalenceProperties, LexOrdering, - LexRequirement, PhysicalExpr, PhysicalSortRequirement, + physical_exprs_contains, AggregateExpr, AggregatePhysicalExpressions, + EquivalenceProperties, LexOrdering, LexRequirement, PhysicalExpr, + PhysicalSortRequirement, }; use itertools::Itertools; @@ -995,7 +996,10 @@ fn aggregate_expressions( .iter() .map(|agg| { let mut result = vec![]; - let (args, order_by_exprs) = agg.all_expressions(); + let AggregatePhysicalExpressions { + args, + order_by_exprs, + } = agg.all_expressions(); result.extend(args); // Append order by expressions to the result. This // way order sensitive aggregators can satisfy requirement From cb21ae229548f9ead4839feb532d075df0e4dcf2 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 31 May 2024 16:14:36 +0300 Subject: [PATCH 3/4] Minor changes --- datafusion/physical-plan/src/aggregates/mod.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index e5d36bb0cb48..2bb95852ff43 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -42,9 +42,8 @@ use datafusion_expr::Accumulator; use datafusion_physical_expr::{ equivalence::{collapse_lex_req, ProjectionMapping}, expressions::{Column, Max, Min, UnKnownColumn}, - physical_exprs_contains, AggregateExpr, AggregatePhysicalExpressions, - EquivalenceProperties, LexOrdering, LexRequirement, PhysicalExpr, - PhysicalSortRequirement, + physical_exprs_contains, AggregateExpr, EquivalenceProperties, LexOrdering, + LexRequirement, PhysicalExpr, PhysicalSortRequirement, }; use itertools::Itertools; @@ -995,16 +994,13 @@ fn aggregate_expressions( | AggregateMode::SinglePartitioned => Ok(aggr_expr .iter() .map(|agg| { - let mut result = vec![]; - let AggregatePhysicalExpressions { - args, - order_by_exprs, - } = agg.all_expressions(); - result.extend(args); - // Append order by expressions to the result. This + let mut result = agg.expressions(); + // Append ordering requirements to expressions' results. This // way order sensitive aggregators can satisfy requirement // themselves. - result.extend(order_by_exprs); + if let Some(ordering_req) = agg.order_bys() { + result.extend(ordering_req.iter().map(|item| item.expr.clone())); + } result }) .collect()), From 7b640db084807ba7720bac031ada035963c916fd Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 31 May 2024 18:00:55 +0300 Subject: [PATCH 4/4] Update comments --- datafusion/physical-expr-common/src/aggregate/mod.rs | 3 +-- datafusion/physical-expr/src/window/window_expr.rs | 4 +--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs b/datafusion/physical-expr-common/src/aggregate/mod.rs index 2c8b49d5fea7..78c7d40b87f5 100644 --- a/datafusion/physical-expr-common/src/aggregate/mod.rs +++ b/datafusion/physical-expr-common/src/aggregate/mod.rs @@ -187,8 +187,7 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq { } /// Returns all expressions used in the [`AggregateExpr`]. - /// First entry in the tuple corresponds to function arguments - /// Second entry in the tuple corresponds to order by expressions. + /// These expressions are (1)function arguments, (2) order by expressions. fn all_expressions(&self) -> AggregatePhysicalExpressions { let args = self.expressions(); let order_bys = self.order_bys().unwrap_or(&[]); diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index d7a41aed3972..065371d9e43e 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -130,9 +130,7 @@ pub trait WindowExpr: Send + Sync + Debug { fn get_reverse_expr(&self) -> Option>; /// Returns all expressions used in the [`WindowExpr`]. - /// First entry in the tuple corresponds to function arguments - /// Second entry in the tuple corresponds to partition by expressions. - /// Third entry in the tuple corresponds to order by expressions. + /// These expressions are (1) function arguments, (2) partition by expressions, (3) order by expressions. fn all_expressions(&self) -> WindowPhysicalExpressions { let args = self.expressions(); let partition_by_exprs = self.partition_by().to_vec();