Skip to content

Commit

Permalink
refactor: reduce allocations in push down filter (#10567)
Browse files Browse the repository at this point in the history
  • Loading branch information
erratic-pattern authored May 21, 2024
1 parent 96e0ee6 commit 4b2652f
Showing 1 changed file with 23 additions and 27 deletions.
50 changes: 23 additions & 27 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,19 +654,20 @@ impl OptimizerRule for PushDownFilter {
}
LogicalPlan::Repartition(repartition) => {
let new_filter =
Filter::try_new(filter.predicate, repartition.input.clone())
Filter::try_new(filter.predicate, Arc::clone(&repartition.input))
.map(LogicalPlan::Filter)?;
insert_below(LogicalPlan::Repartition(repartition), new_filter)
}
LogicalPlan::Distinct(distinct) => {
let new_filter =
Filter::try_new(filter.predicate, distinct.input().clone())
Filter::try_new(filter.predicate, Arc::clone(distinct.input()))
.map(LogicalPlan::Filter)?;
insert_below(LogicalPlan::Distinct(distinct), new_filter)
}
LogicalPlan::Sort(sort) => {
let new_filter = Filter::try_new(filter.predicate, sort.input.clone())
.map(LogicalPlan::Filter)?;
let new_filter =
Filter::try_new(filter.predicate, Arc::clone(&sort.input))
.map(LogicalPlan::Filter)?;
insert_below(LogicalPlan::Sort(sort), new_filter)
}
LogicalPlan::SubqueryAlias(subquery_alias) => {
Expand All @@ -685,7 +686,7 @@ impl OptimizerRule for PushDownFilter {

let new_filter = LogicalPlan::Filter(Filter::try_new(
new_predicate,
subquery_alias.input.clone(),
Arc::clone(&subquery_alias.input),
)?);
insert_below(LogicalPlan::SubqueryAlias(subquery_alias), new_filter)
}
Expand All @@ -697,19 +698,18 @@ impl OptimizerRule for PushDownFilter {
projection
.schema
.iter()
.enumerate()
.map(|(i, (qualifier, field))| {
.zip(projection.expr.iter())
.map(|((qualifier, field), expr)| {
// strip alias, as they should not be part of filters
let expr = projection.expr[i].clone().unalias();
let expr = expr.clone().unalias();

(qualified_name(qualifier, field.name()), expr)
})
.partition(|(_, value)| value.is_volatile().unwrap_or(true));

let mut push_predicates = vec![];
let mut keep_predicates = vec![];
for expr in split_conjunction_owned(filter.predicate.clone()).into_iter()
{
for expr in split_conjunction_owned(filter.predicate.clone()) {
if contain(&expr, &volatile_map) {
keep_predicates.push(expr);
} else {
Expand All @@ -723,7 +723,7 @@ impl OptimizerRule for PushDownFilter {
// E.g. in `Filter: b\n Projection: a > 1 as b`, we can swap them, but the filter must be "a > 1"
let new_filter = LogicalPlan::Filter(Filter::try_new(
replace_cols_by_name(expr, &non_volatile_map)?,
projection.input.clone(),
Arc::clone(&projection.input),
)?);

match conjunction(keep_predicates) {
Expand Down Expand Up @@ -764,12 +764,12 @@ impl OptimizerRule for PushDownFilter {
replace_cols_by_name(filter.predicate.clone(), &replace_map)?;
inputs.push(Arc::new(LogicalPlan::Filter(Filter::try_new(
push_predicate,
input.clone(),
Arc::clone(input),
)?)))
}
Ok(Transformed::yes(LogicalPlan::Union(Union {
inputs,
schema: plan_schema.clone(),
schema: Arc::clone(&plan_schema),
})))
}
LogicalPlan::Aggregate(agg) => {
Expand Down Expand Up @@ -801,11 +801,11 @@ impl OptimizerRule for PushDownFilter {
replace_map.insert(expr.display_name()?, expr.clone());
}
let replaced_push_predicates = push_predicates
.iter()
.map(|expr| replace_cols_by_name(expr.clone(), &replace_map))
.into_iter()
.map(|expr| replace_cols_by_name(expr, &replace_map))
.collect::<Result<Vec<_>>>()?;

let agg_input = agg.input.clone();
let agg_input = Arc::clone(&agg.input);
Transformed::yes(LogicalPlan::Aggregate(agg))
.transform_data(|new_plan| {
// If we have a filter to push, we push it down to the input of the aggregate
Expand All @@ -828,12 +828,12 @@ impl OptimizerRule for PushDownFilter {
}
LogicalPlan::Join(join) => push_down_join(join, Some(&filter.predicate)),
LogicalPlan::CrossJoin(cross_join) => {
let predicates = split_conjunction_owned(filter.predicate.clone());
let predicates = split_conjunction_owned(filter.predicate);
let join = convert_cross_join_to_inner_join(cross_join)?;
let plan = push_down_all_join(predicates, vec![], join, vec![])?;
convert_to_cross_join_if_beneficial(plan.data)
}
LogicalPlan::TableScan(ref scan) => {
LogicalPlan::TableScan(scan) => {
let filter_predicates = split_conjunction(&filter.predicate);
let results = scan
.source
Expand All @@ -845,12 +845,12 @@ impl OptimizerRule for PushDownFilter {
filter_predicates.len());
}

let zip = filter_predicates.iter().zip(results);
let zip = filter_predicates.into_iter().zip(results);

let new_scan_filters = zip
.clone()
.filter(|(_, res)| res != &TableProviderFilterPushDown::Unsupported)
.map(|(pred, _)| *pred);
.map(|(pred, _)| pred);
let new_scan_filters: Vec<Expr> = scan
.filters
.iter()
Expand All @@ -860,16 +860,12 @@ impl OptimizerRule for PushDownFilter {
.collect();
let new_predicate: Vec<Expr> = zip
.filter(|(_, res)| res != &TableProviderFilterPushDown::Exact)
.map(|(pred, _)| (*pred).clone())
.map(|(pred, _)| pred.clone())
.collect();

let new_scan = LogicalPlan::TableScan(TableScan {
source: scan.source.clone(),
projection: scan.projection.clone(),
projected_schema: scan.projected_schema.clone(),
table_name: scan.table_name.clone(),
filters: new_scan_filters,
fetch: scan.fetch,
..scan
});

Transformed::yes(new_scan).transform_data(|new_scan| {
Expand Down Expand Up @@ -1371,7 +1367,7 @@ mod tests {
fn from_template(&self, _exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
Self {
input: inputs.to_vec(),
schema: self.schema.clone(),
schema: Arc::clone(&self.schema),
}
}
}
Expand Down

0 comments on commit 4b2652f

Please sign in to comment.