From 894be6719373be85fa777028fe3ec534536660e3 Mon Sep 17 00:00:00 2001 From: jakevin <30525741+jackwener@users.noreply.github.com> Date: Thu, 26 May 2022 05:15:53 +0800 Subject: [PATCH] refactor: add optimzer struct (#2616) --- datafusion/core/src/execution/context.rs | 59 +++++++--------------- datafusion/core/src/optimizer/optimizer.rs | 43 ++++++++++++++++ 2 files changed, 62 insertions(+), 40 deletions(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 619ac13b1365..272cdc6da892 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -33,14 +33,15 @@ use crate::{ MemTable, ViewTable, }, logical_plan::{PlanType, ToStringifiedPlan}, - optimizer::eliminate_filter::EliminateFilter, - optimizer::eliminate_limit::EliminateLimit, + optimizer::{ + eliminate_filter::EliminateFilter, eliminate_limit::EliminateLimit, + optimizer::Optimizer, + }, physical_optimizer::{ aggregate_statistics::AggregateStatistics, hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule, }, }; -use log::{debug, trace}; use parking_lot::RwLock; use std::string::String; use std::sync::Arc; @@ -1189,7 +1190,7 @@ pub struct SessionState { /// Uuid for the session pub session_id: String, /// Responsible for optimizing a logical plan - pub optimizers: Vec>, + pub optimizer: Optimizer, /// Responsible for optimizing a physical execution plan pub physical_optimizers: Vec>, /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan` @@ -1255,7 +1256,7 @@ impl SessionState { SessionState { session_id, - optimizers: vec![ + optimizer: Optimizer::new(vec![ // Simplify expressions first to maximize the chance // of applying other optimizations Arc::new(SimplifyExpressions::new()), @@ -1267,7 +1268,7 @@ impl SessionState { Arc::new(FilterPushDown::new()), Arc::new(LimitPushDown::new()), Arc::new(SingleDistinctToGroupBy::new()), - ], + ]), physical_optimizers: vec![ Arc::new(AggregateStatistics::new()), Arc::new(HashBuildProbeOrder::new()), @@ -1328,9 +1329,9 @@ impl SessionState { /// Replace the optimizer rules pub fn with_optimizer_rules( mut self, - optimizers: Vec>, + rules: Vec>, ) -> Self { - self.optimizers = optimizers; + self.optimizer = Optimizer::new(rules); self } @@ -1348,7 +1349,7 @@ impl SessionState { mut self, optimizer_rule: Arc, ) -> Self { - self.optimizers.push(optimizer_rule); + self.optimizer.rules.push(optimizer_rule); self } @@ -1363,16 +1364,21 @@ impl SessionState { /// Optimizes the logical plan by applying optimizer rules. pub fn optimize(&self, plan: &LogicalPlan) -> Result { + let execution_props = &mut self.execution_props.clone(); + if let LogicalPlan::Explain(e) = plan { let mut stringified_plans = e.stringified_plans.clone(); // optimize the child plan, capturing the output of each optimizer - let plan = - self.optimize_internal(e.plan.as_ref(), |optimized_plan, optimizer| { + let plan = self.optimizer.optimize( + e.plan.as_ref(), + execution_props, + |optimized_plan, optimizer| { let optimizer_name = optimizer.name().to_string(); let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name }; stringified_plans.push(optimized_plan.to_stringified(plan_type)); - })?; + }, + )?; Ok(LogicalPlan::Explain(Explain { verbose: e.verbose, @@ -1381,35 +1387,8 @@ impl SessionState { schema: e.schema.clone(), })) } else { - self.optimize_internal(plan, |_, _| {}) - } - } - - /// Optimizes the logical plan by applying optimizer rules, and - /// invoking observer function after each call - fn optimize_internal( - &self, - plan: &LogicalPlan, - mut observer: F, - ) -> Result - where - F: FnMut(&LogicalPlan, &dyn OptimizerRule), - { - let execution_props = &mut self.execution_props.clone(); - let optimizers = &self.optimizers; - - let execution_props = execution_props.start_execution(); - - let mut new_plan = plan.clone(); - debug!("Input logical plan:\n{}\n", plan.display_indent()); - trace!("Full input logical plan:\n{:?}", plan); - for optimizer in optimizers { - new_plan = optimizer.optimize(&new_plan, execution_props)?; - observer(&new_plan, optimizer.as_ref()); + self.optimizer.optimize(plan, execution_props, |_, _| {}) } - debug!("Optimized logical plan:\n{}\n", new_plan.display_indent()); - trace!("Full Optimized logical plan:\n {:?}", plan); - Ok(new_plan) } /// Creates a physical plan from a logical plan. diff --git a/datafusion/core/src/optimizer/optimizer.rs b/datafusion/core/src/optimizer/optimizer.rs index 5cf404794704..cc00eb8f830c 100644 --- a/datafusion/core/src/optimizer/optimizer.rs +++ b/datafusion/core/src/optimizer/optimizer.rs @@ -17,6 +17,10 @@ //! Query optimizer traits +use std::sync::Arc; + +use log::{debug, trace}; + use crate::error::Result; use crate::execution::context::ExecutionProps; use crate::logical_plan::LogicalPlan; @@ -35,3 +39,42 @@ pub trait OptimizerRule { /// A human readable name for this optimizer rule fn name(&self) -> &str; } + +/// A rule-based optimizer. +#[derive(Clone)] +pub struct Optimizer { + /// All rules to apply + pub rules: Vec>, +} + +impl Optimizer { + /// Create a new optimizer with the given rules + pub fn new(rules: Vec>) -> Self { + Self { rules } + } + + /// Optimizes the logical plan by applying optimizer rules, and + /// invoking observer function after each call + pub fn optimize( + &self, + plan: &LogicalPlan, + execution_props: &mut ExecutionProps, + mut observer: F, + ) -> Result + where + F: FnMut(&LogicalPlan, &dyn OptimizerRule), + { + let execution_props = execution_props.start_execution(); + + let mut new_plan = plan.clone(); + debug!("Input logical plan:\n{}\n", plan.display_indent()); + trace!("Full input logical plan:\n{:?}", plan); + for rule in &self.rules { + new_plan = rule.optimize(&new_plan, execution_props)?; + observer(&new_plan, rule.as_ref()); + } + debug!("Optimized logical plan:\n{}\n", new_plan.display_indent()); + trace!("Full Optimized logical plan:\n {:?}", plan); + Ok(new_plan) + } +}