From 3b3f58e6eee611f60cee78e44d08f8c62fe858e0 Mon Sep 17 00:00:00 2001 From: sgrebnov Date: Tue, 5 Nov 2024 13:50:31 -0800 Subject: [PATCH] Add test for optimize_projections with preserve_projections option enabled --- datafusion/execution/src/config.rs | 3 +- .../optimizer/src/optimize_projections/mod.rs | 63 +++++++++++++++++-- datafusion/optimizer/src/optimizer.rs | 10 +++ datafusion/optimizer/src/test/mod.rs | 13 +++- 4 files changed, 81 insertions(+), 8 deletions(-) diff --git a/datafusion/execution/src/config.rs b/datafusion/execution/src/config.rs index 78b4bac6bc176..40bb3fbda48ee 100644 --- a/datafusion/execution/src/config.rs +++ b/datafusion/execution/src/config.rs @@ -339,7 +339,8 @@ impl SessionConfig { } /// When set to true, the `optimize_projections` rule will not attempt to move, add, or remove existing projections. - /// This is useful when optimization is used alongside unparsing logic to preserve the original layout and simplify the overall query structure. + /// This flag helps maintain the original structure of the `LogicalPlan` when converting it back into SQL via the `unparser` module. + /// It ensures the query layout remains simple and readable, relying on the underlying SQL engine to apply its own optimizations during execution. /// /// [optimize_projections_preserve_existing_projections]: datafusion_common::config::OptimizerOptions::optimize_projections_preserve_existing_projections pub fn with_optimize_projections_preserve_existing_projections(mut self, enabled: bool) -> Self { diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 7b2fc216c4369..e2a333c024fe6 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -785,12 +785,19 @@ fn rewrite_projection_given_requirements( /// - `optimize_projections_preserve_existing_projections` optimizer config is false, and /// - input schema of the projection, output schema of the projection are same, and /// - all projection expressions are either Column or Literal -fn is_projection_unnecessary(input: &LogicalPlan, proj_exprs: &[Expr]) -> Result { -!config +fn is_projection_unnecessary( + input: &LogicalPlan, + proj_exprs: &[Expr], + config: &dyn OptimizerConfig, +) -> Result { + if config .options() .optimizer .optimize_projections_preserve_existing_projections - + { + return Ok(false); + } + let proj_schema = projection_schema(input, proj_exprs)?; Ok(&proj_schema == input.schema() && proj_exprs.iter().all(is_expr_trivial)) } @@ -807,11 +814,12 @@ mod tests { use crate::optimize_projections::OptimizeProjections; use crate::optimizer::Optimizer; use crate::test::{ - assert_fields_eq, assert_optimized_plan_eq, scan_empty, test_table_scan, - test_table_scan_fields, test_table_scan_with_name, + assert_fields_eq, assert_optimized_plan_eq, assert_optimized_plan_with_config_eq, + scan_empty, test_table_scan, test_table_scan_fields, test_table_scan_with_name, }; use crate::{OptimizerContext, OptimizerRule}; use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::config::ConfigOptions; use datafusion_common::{ Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference, }; @@ -1998,4 +2006,49 @@ mod tests { optimizer.optimize(plan, &OptimizerContext::new(), observe)?; Ok(optimized_plan) } + + #[test] + fn aggregate_filter_pushdown_preserve_projections() -> Result<()> { + let table_scan = test_table_scan()?; + let aggr_with_filter = count_udaf() + .call(vec![col("b")]) + .filter(col("c").gt(lit(42))) + .build()?; + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate( + vec![col("a")], + vec![count(col("b")), aggr_with_filter.alias("count2")], + )? + .project(vec![col("a"), col("count(test.b)"), col("count2")])? + .build()?; + + let expected_default = "Aggregate: groupBy=[[test.a]], aggr=[[count(test.b), count(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]\ + \n TableScan: test projection=[a, b, c]"; + + let expected_preserve_projections = "Projection: test.a, count(test.b), count2\ + \n Aggregate: groupBy=[[test.a]], aggr=[[count(test.b), count(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]\ + \n TableScan: test projection=[a, b, c]"; + + let scenarios = [ + (false, expected_default), + (true, expected_preserve_projections), + ]; + + for (preserve_projections, expected_plan) in scenarios.into_iter() { + let mut config = ConfigOptions::new(); + config + .optimizer + .optimize_projections_preserve_existing_projections = + preserve_projections; + let optimizer_context = OptimizerContext::new_with_options(config); + assert_optimized_plan_with_config_eq( + Arc::new(OptimizeProjections::new()), + plan.clone(), + expected_plan, + &optimizer_context, + )?; + } + + Ok(()) + } } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 975150cd61220..b3e5a51e8132e 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -145,6 +145,7 @@ pub struct OptimizerContext { /// Alias generator used to generate unique aliases for subqueries alias_generator: Arc, + /// Configuration options for the optimizer options: ConfigOptions, } @@ -161,6 +162,15 @@ impl OptimizerContext { } } + /// Create optimizer config with the given configuration options + pub fn new_with_options(options: ConfigOptions) -> Self { + Self { + query_execution_start_time: Utc::now(), + alias_generator: Arc::new(AliasGenerator::new()), + options, + } + } + /// Specify whether to enable the filter_null_keys rule pub fn filter_null_keys(mut self, filter_null_keys: bool) -> Self { self.options.optimizer.filter_null_join_keys = filter_null_keys; diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs index 94d07a0791b3b..3330e1f24c4d4 100644 --- a/datafusion/optimizer/src/test/mod.rs +++ b/datafusion/optimizer/src/test/mod.rs @@ -17,7 +17,7 @@ use crate::analyzer::{Analyzer, AnalyzerRule}; use crate::optimizer::Optimizer; -use crate::{OptimizerContext, OptimizerRule}; +use crate::{OptimizerConfig, OptimizerContext, OptimizerRule}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; use datafusion_common::{assert_contains, Result}; @@ -173,8 +173,17 @@ pub fn assert_optimized_plan_eq( // Apply the rule once let opt_context = OptimizerContext::new().with_max_passes(1); + assert_optimized_plan_with_config_eq(rule, plan, expected, &opt_context) +} + +pub fn assert_optimized_plan_with_config_eq( + rule: Arc, + plan: LogicalPlan, + expected: &str, + config: &dyn OptimizerConfig, +) -> Result<()> { let optimizer = Optimizer::with_rules(vec![Arc::clone(&rule)]); - let optimized_plan = optimizer.optimize(plan, &opt_context, observe)?; + let optimized_plan = optimizer.optimize(plan, config, observe)?; let formatted_plan = format!("{optimized_plan}"); assert_eq!(formatted_plan, expected);