Skip to content

Commit

Permalink
refactor: add optimzer struct (#2616)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener authored May 25, 2022
1 parent de314e7 commit 894be67
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 40 deletions.
59 changes: 19 additions & 40 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ use crate::{
MemTable, ViewTable,
},
logical_plan::{PlanType, ToStringifiedPlan},
optimizer::eliminate_filter::EliminateFilter,
optimizer::eliminate_limit::EliminateLimit,
optimizer::{
eliminate_filter::EliminateFilter, eliminate_limit::EliminateLimit,
optimizer::Optimizer,
},
physical_optimizer::{
aggregate_statistics::AggregateStatistics,
hash_build_probe_order::HashBuildProbeOrder, optimizer::PhysicalOptimizerRule,
},
};
use log::{debug, trace};
use parking_lot::RwLock;
use std::string::String;
use std::sync::Arc;
Expand Down Expand Up @@ -1189,7 +1190,7 @@ pub struct SessionState {
/// Uuid for the session
pub session_id: String,
/// Responsible for optimizing a logical plan
pub optimizers: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
pub optimizer: Optimizer,
/// Responsible for optimizing a physical execution plan
pub physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
/// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
Expand Down Expand Up @@ -1255,7 +1256,7 @@ impl SessionState {

SessionState {
session_id,
optimizers: vec![
optimizer: Optimizer::new(vec![
// Simplify expressions first to maximize the chance
// of applying other optimizations
Arc::new(SimplifyExpressions::new()),
Expand All @@ -1267,7 +1268,7 @@ impl SessionState {
Arc::new(FilterPushDown::new()),
Arc::new(LimitPushDown::new()),
Arc::new(SingleDistinctToGroupBy::new()),
],
]),
physical_optimizers: vec![
Arc::new(AggregateStatistics::new()),
Arc::new(HashBuildProbeOrder::new()),
Expand Down Expand Up @@ -1328,9 +1329,9 @@ impl SessionState {
/// Replace the optimizer rules
pub fn with_optimizer_rules(
mut self,
optimizers: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
) -> Self {
self.optimizers = optimizers;
self.optimizer = Optimizer::new(rules);
self
}

Expand All @@ -1348,7 +1349,7 @@ impl SessionState {
mut self,
optimizer_rule: Arc<dyn OptimizerRule + Send + Sync>,
) -> Self {
self.optimizers.push(optimizer_rule);
self.optimizer.rules.push(optimizer_rule);
self
}

Expand All @@ -1363,16 +1364,21 @@ impl SessionState {

/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
let execution_props = &mut self.execution_props.clone();

if let LogicalPlan::Explain(e) = plan {
let mut stringified_plans = e.stringified_plans.clone();

// optimize the child plan, capturing the output of each optimizer
let plan =
self.optimize_internal(e.plan.as_ref(), |optimized_plan, optimizer| {
let plan = self.optimizer.optimize(
e.plan.as_ref(),
execution_props,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
stringified_plans.push(optimized_plan.to_stringified(plan_type));
})?;
},
)?;

Ok(LogicalPlan::Explain(Explain {
verbose: e.verbose,
Expand All @@ -1381,35 +1387,8 @@ impl SessionState {
schema: e.schema.clone(),
}))
} else {
self.optimize_internal(plan, |_, _| {})
}
}

/// Optimizes the logical plan by applying optimizer rules, and
/// invoking observer function after each call
fn optimize_internal<F>(
&self,
plan: &LogicalPlan,
mut observer: F,
) -> Result<LogicalPlan>
where
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
let execution_props = &mut self.execution_props.clone();
let optimizers = &self.optimizers;

let execution_props = execution_props.start_execution();

let mut new_plan = plan.clone();
debug!("Input logical plan:\n{}\n", plan.display_indent());
trace!("Full input logical plan:\n{:?}", plan);
for optimizer in optimizers {
new_plan = optimizer.optimize(&new_plan, execution_props)?;
observer(&new_plan, optimizer.as_ref());
self.optimizer.optimize(plan, execution_props, |_, _| {})
}
debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
trace!("Full Optimized logical plan:\n {:?}", plan);
Ok(new_plan)
}

/// Creates a physical plan from a logical plan.
Expand Down
43 changes: 43 additions & 0 deletions datafusion/core/src/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

//! Query optimizer traits
use std::sync::Arc;

use log::{debug, trace};

use crate::error::Result;
use crate::execution::context::ExecutionProps;
use crate::logical_plan::LogicalPlan;
Expand All @@ -35,3 +39,42 @@ pub trait OptimizerRule {
/// A human readable name for this optimizer rule
fn name(&self) -> &str;
}

/// A rule-based optimizer.
#[derive(Clone)]
pub struct Optimizer {
/// All rules to apply
pub rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
}

impl Optimizer {
/// Create a new optimizer with the given rules
pub fn new(rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>) -> Self {
Self { rules }
}

/// Optimizes the logical plan by applying optimizer rules, and
/// invoking observer function after each call
pub fn optimize<F>(
&self,
plan: &LogicalPlan,
execution_props: &mut ExecutionProps,
mut observer: F,
) -> Result<LogicalPlan>
where
F: FnMut(&LogicalPlan, &dyn OptimizerRule),
{
let execution_props = execution_props.start_execution();

let mut new_plan = plan.clone();
debug!("Input logical plan:\n{}\n", plan.display_indent());
trace!("Full input logical plan:\n{:?}", plan);
for rule in &self.rules {
new_plan = rule.optimize(&new_plan, execution_props)?;
observer(&new_plan, rule.as_ref());
}
debug!("Optimized logical plan:\n{}\n", new_plan.display_indent());
trace!("Full Optimized logical plan:\n {:?}", plan);
Ok(new_plan)
}
}

0 comments on commit 894be67

Please sign in to comment.