From 8f52701becebb5e560d1acb40cc638d6d4f39125 Mon Sep 17 00:00:00 2001 From: kamille Date: Thu, 11 Jul 2024 08:23:01 +0800 Subject: [PATCH 1/2] impl the short term solution. --- .../aggregate_statistics.rs | 130 +++++++++++------- 1 file changed, 79 insertions(+), 51 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index 7e9aec9e5e4c..8ab7725334f4 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -140,31 +140,29 @@ fn take_optimizable_column_and_table_count( stats: &Statistics, ) -> Option<(ScalarValue, String)> { let col_stats = &stats.column_statistics; - if let Some(agg_expr) = agg_expr.as_any().downcast_ref::() { - if agg_expr.fun().name() == "count" && !agg_expr.is_distinct() { - if let Precision::Exact(num_rows) = stats.num_rows { - let exprs = agg_expr.expressions(); - if exprs.len() == 1 { - // TODO optimize with exprs other than Column - if let Some(col_expr) = - exprs[0].as_any().downcast_ref::() - { - let current_val = &col_stats[col_expr.index()].null_count; - if let &Precision::Exact(val) = current_val { - return Some(( - ScalarValue::Int64(Some((num_rows - val) as i64)), - agg_expr.name().to_string(), - )); - } - } else if let Some(lit_expr) = - exprs[0].as_any().downcast_ref::() - { - if lit_expr.value() == &COUNT_STAR_EXPANSION { - return Some(( - ScalarValue::Int64(Some(num_rows as i64)), - agg_expr.name().to_string(), - )); - } + if is_non_distinct_count(agg_expr) { + if let Precision::Exact(num_rows) = stats.num_rows { + let exprs = agg_expr.expressions(); + if exprs.len() == 1 { + // TODO optimize with exprs other than Column + if let Some(col_expr) = + exprs[0].as_any().downcast_ref::() + { + let current_val = &col_stats[col_expr.index()].null_count; + if let &Precision::Exact(val) = current_val { + return Some(( + ScalarValue::Int64(Some((num_rows - val) as i64)), + agg_expr.name().to_string(), + )); + } + } else if let Some(lit_expr) = + exprs[0].as_any().downcast_ref::() + { + if lit_expr.value() == &COUNT_STAR_EXPANSION { + return Some(( + ScalarValue::Int64(Some(num_rows as i64)), + agg_expr.name().to_string(), + )); } } } @@ -182,26 +180,22 @@ fn take_optimizable_min( match *num_rows { 0 => { // MIN/MAX with 0 rows is always null - if let Some(casted_expr) = - agg_expr.as_any().downcast_ref::() - { + if is_min(agg_expr) { if let Ok(min_data_type) = - ScalarValue::try_from(casted_expr.field().unwrap().data_type()) + ScalarValue::try_from(agg_expr.field().unwrap().data_type()) { - return Some((min_data_type, casted_expr.name().to_string())); + return Some((min_data_type, agg_expr.name().to_string())); } } } value if value > 0 => { let col_stats = &stats.column_statistics; - if let Some(casted_expr) = - agg_expr.as_any().downcast_ref::() - { - if casted_expr.expressions().len() == 1 { + if is_min(agg_expr) { + let exprs = agg_expr.expressions(); + if exprs.len() == 1 { // TODO optimize with exprs other than Column - if let Some(col_expr) = casted_expr.expressions()[0] - .as_any() - .downcast_ref::() + if let Some(col_expr) = + exprs[0].as_any().downcast_ref::() { if let Precision::Exact(val) = &col_stats[col_expr.index()].min_value @@ -209,7 +203,7 @@ fn take_optimizable_min( if !val.is_null() { return Some(( val.clone(), - casted_expr.name().to_string(), + agg_expr.name().to_string(), )); } } @@ -232,26 +226,22 @@ fn take_optimizable_max( match *num_rows { 0 => { // MIN/MAX with 0 rows is always null - if let Some(casted_expr) = - agg_expr.as_any().downcast_ref::() - { + if is_max(agg_expr) { if let Ok(max_data_type) = - ScalarValue::try_from(casted_expr.field().unwrap().data_type()) + ScalarValue::try_from(agg_expr.field().unwrap().data_type()) { - return Some((max_data_type, casted_expr.name().to_string())); + return Some((max_data_type, agg_expr.name().to_string())); } } } value if value > 0 => { let col_stats = &stats.column_statistics; - if let Some(casted_expr) = - agg_expr.as_any().downcast_ref::() - { - if casted_expr.expressions().len() == 1 { + if is_max(agg_expr) { + let exprs = agg_expr.expressions(); + if exprs.len() == 1 { // TODO optimize with exprs other than Column - if let Some(col_expr) = casted_expr.expressions()[0] - .as_any() - .downcast_ref::() + if let Some(col_expr) = + exprs[0].as_any().downcast_ref::() { if let Precision::Exact(val) = &col_stats[col_expr.index()].max_value @@ -259,7 +249,7 @@ fn take_optimizable_max( if !val.is_null() { return Some(( val.clone(), - casted_expr.name().to_string(), + agg_expr.name().to_string(), )); } } @@ -273,6 +263,44 @@ fn take_optimizable_max( None } +fn is_non_distinct_count(agg_expr: &dyn AggregateExpr) -> bool { + if let Some(agg_expr) = agg_expr.as_any().downcast_ref::() { + if agg_expr.fun().name() == "count" && !agg_expr.is_distinct() { + return true; + } + } + + false +} + +fn is_min(agg_expr: &dyn AggregateExpr) -> bool { + if agg_expr.as_any().is::() { + return true; + } + + if let Some(agg_expr) = agg_expr.as_any().downcast_ref::() { + if agg_expr.fun().name() == "min" { + return true; + } + } + + false +} + +fn is_max(agg_expr: &dyn AggregateExpr) -> bool { + if agg_expr.as_any().is::() { + return true; + } + + if let Some(agg_expr) = agg_expr.as_any().downcast_ref::() { + if agg_expr.fun().name() == "max" { + return true; + } + } + + false +} + #[cfg(test)] pub(crate) mod tests { use super::*; From 0a3affe56c1d7f961c74d842b2b519bf6c07e26c Mon Sep 17 00:00:00 2001 From: kamille Date: Fri, 12 Jul 2024 22:12:03 +0800 Subject: [PATCH 2/2] add todos. --- .../core/src/physical_optimizer/aggregate_statistics.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index 8ab7725334f4..66067d8cb5c4 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -263,6 +263,8 @@ fn take_optimizable_max( None } +// TODO: Move this check into AggregateUDFImpl +// https://github.com/apache/datafusion/issues/11153 fn is_non_distinct_count(agg_expr: &dyn AggregateExpr) -> bool { if let Some(agg_expr) = agg_expr.as_any().downcast_ref::() { if agg_expr.fun().name() == "count" && !agg_expr.is_distinct() { @@ -273,6 +275,8 @@ fn is_non_distinct_count(agg_expr: &dyn AggregateExpr) -> bool { false } +// TODO: Move this check into AggregateUDFImpl +// https://github.com/apache/datafusion/issues/11153 fn is_min(agg_expr: &dyn AggregateExpr) -> bool { if agg_expr.as_any().is::() { return true; @@ -287,6 +291,8 @@ fn is_min(agg_expr: &dyn AggregateExpr) -> bool { false } +// TODO: Move this check into AggregateUDFImpl +// https://github.com/apache/datafusion/issues/11153 fn is_max(agg_expr: &dyn AggregateExpr) -> bool { if agg_expr.as_any().is::() { return true;