Skip to content

Commit

Permalink
refactor push_down_filter to fix dead-loop and use optimizer_recurs…
Browse files Browse the repository at this point in the history
…e. (#5337)
  • Loading branch information
jackwener authored Feb 25, 2023
1 parent 8b92b9b commit 0b77ec2
Showing 1 changed file with 38 additions and 37 deletions.
75 changes: 38 additions & 37 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! Push Down Filter optimizer rule ensures that filters are applied as early as possible in the plan
use crate::optimizer::ApplyOrder;
use crate::utils::{conjunction, split_conjunction};
use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::{Column, DFSchema, DataFusionError, Result};
Expand All @@ -29,7 +30,6 @@ use datafusion_expr::{
use std::collections::{HashMap, HashSet};
use std::iter::once;
use std::sync::Arc;
use utils::optimize_children;

/// Push Down Filter optimizer rule pushes filter clauses down the plan
/// # Introduction
Expand Down Expand Up @@ -511,25 +511,20 @@ impl OptimizerRule for PushDownFilter {
"push_down_filter"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}

fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let filter = match plan {
LogicalPlan::Filter(filter) => filter,
// we also need to pushdown filter in Join.
LogicalPlan::Join(join) => {
let optimized_plan = push_down_join(plan, join, None)?;
return match optimized_plan {
Some(optimized_plan) => Ok(Some(
optimize_children(self, &optimized_plan, config)?
.unwrap_or(optimized_plan),
)),
None => optimize_children(self, plan, config),
};
}
_ => return optimize_children(self, plan, config),
LogicalPlan::Join(join) => return push_down_join(plan, join, None),
_ => return Ok(None),
};

let child_plan = filter.input.as_ref();
Expand All @@ -550,11 +545,12 @@ impl OptimizerRule for PushDownFilter {
let new_predicate = conjunction(new_predicates).ok_or_else(|| {
DataFusionError::Plan("at least one expression exists".to_string())
})?;
let new_plan = LogicalPlan::Filter(Filter::try_new(
let new_filter = LogicalPlan::Filter(Filter::try_new(
new_predicate,
child_filter.input.clone(),
)?);
return self.try_optimize(&new_plan, config);
self.try_optimize(&new_filter, _config)?
.unwrap_or(new_filter)
}
LogicalPlan::Repartition(_)
| LogicalPlan::Distinct(_)
Expand Down Expand Up @@ -691,7 +687,7 @@ impl OptimizerRule for PushDownFilter {
LogicalPlan::Join(join) => {
match push_down_join(&filter.input, join, Some(&filter.predicate))? {
Some(optimized_plan) => optimized_plan,
None => plan.clone(),
None => return Ok(None),
}
}
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
Expand Down Expand Up @@ -741,12 +737,9 @@ impl OptimizerRule for PushDownFilter {
None => new_scan,
}
}
_ => plan.clone(),
_ => return Ok(None),
};

Ok(Some(
optimize_children(self, &new_plan, config)?.unwrap_or(new_plan),
))
Ok(Some(new_plan))
}
}

Expand Down Expand Up @@ -777,6 +770,7 @@ pub fn replace_cols_by_name(
#[cfg(test)]
mod tests {
use super::*;
use crate::optimizer::Optimizer;
use crate::rewrite_disjunctive_predicate::RewriteDisjunctivePredicate;
use crate::test::*;
use crate::OptimizerContext;
Expand All @@ -791,28 +785,35 @@ mod tests {
use std::sync::Arc;

fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
let optimized_plan = PushDownFilter::new()
.try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{optimized_plan:?}");
assert_eq!(plan.schema(), optimized_plan.schema());
assert_eq!(expected, formatted_plan);
Ok(())
crate::test::assert_optimized_plan_eq(
Arc::new(PushDownFilter::new()),
plan,
expected,
)
}

fn assert_optimized_plan_eq_with_rewrite_predicate(
plan: &LogicalPlan,
expected: &str,
) -> Result<()> {
let mut optimized_plan = RewriteDisjunctivePredicate::new()
.try_optimize(plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
optimized_plan = PushDownFilter::new()
.try_optimize(&optimized_plan, &OptimizerContext::new())
.unwrap()
.expect("failed to optimize plan");
let optimizer = Optimizer::with_rules(vec![
Arc::new(RewriteDisjunctivePredicate::new()),
Arc::new(PushDownFilter::new()),
]);
let mut optimized_plan = optimizer
.optimize_recursively(
optimizer.rules.get(0).unwrap(),
plan,
&OptimizerContext::new(),
)?
.unwrap_or_else(|| plan.clone());
optimized_plan = optimizer
.optimize_recursively(
optimizer.rules.get(1).unwrap(),
&optimized_plan,
&OptimizerContext::new(),
)?
.unwrap_or_else(|| plan.clone());
let formatted_plan = format!("{optimized_plan:?}");
assert_eq!(plan.schema(), optimized_plan.schema());
assert_eq!(expected, formatted_plan);
Expand Down

0 comments on commit 0b77ec2

Please sign in to comment.