Skip to content

Commit

Permalink
add with_new_inputs (apache#4393)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener authored Nov 28, 2022
1 parent 9139183 commit dafd957
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
9 changes: 8 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::logical_plan::builder::validate_unique_names;
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
use crate::logical_plan::extension::UserDefinedLogicalNode;
use crate::utils::{
exprlist_to_fields, grouping_set_expr_count, grouping_set_to_exprlist,
exprlist_to_fields, from_plan, grouping_set_expr_count, grouping_set_to_exprlist,
};
use crate::{Expr, ExprSchemable, TableProviderFilterPushDown, TableSource};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
Expand Down Expand Up @@ -349,6 +349,13 @@ impl LogicalPlan {
self.accept(&mut visitor)?;
Ok(visitor.using_columns)
}

pub fn with_new_inputs(
&self,
inputs: &[LogicalPlan],
) -> Result<LogicalPlan, DataFusionError> {
from_plan(self, &self.expressions(), inputs)
}
}

/// Trait that implements the [Visitor
Expand Down
15 changes: 7 additions & 8 deletions datafusion/optimizer/src/limit_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
//! It will push down through projection, limits (taking the smaller limit)
use crate::{utils, OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::utils::from_plan;
use datafusion_expr::{
logical_plan::{
Join, JoinType, Limit, LogicalPlan, Projection, Sort, TableScan, Union,
Expand Down Expand Up @@ -131,7 +130,7 @@ impl OptimizerRule for LimitPushDown {
fetch: scan.fetch.map(|x| std::cmp::min(x, limit)).or(Some(limit)),
projected_schema: scan.projected_schema.clone(),
});
from_plan(plan, &plan.expressions(), &[new_input])?
plan.with_new_inputs(&[new_input])?
}

LogicalPlan::Projection(projection) => {
Expand Down Expand Up @@ -164,7 +163,7 @@ impl OptimizerRule for LimitPushDown {
inputs: new_inputs,
schema: union.schema.clone(),
});
from_plan(plan, &plan.expressions(), &[union])?
plan.with_new_inputs(&[union])?
}

LogicalPlan::CrossJoin(cross_join) => {
Expand All @@ -180,12 +179,12 @@ impl OptimizerRule for LimitPushDown {
fetch: Some(fetch + skip),
input: Arc::new(right.clone()),
});
let new_input = LogicalPlan::CrossJoin(CrossJoin {
let new_cross_join = LogicalPlan::CrossJoin(CrossJoin {
left: Arc::new(new_left),
right: Arc::new(new_right),
schema: plan.schema().clone(),
});
from_plan(plan, &plan.expressions(), &[new_input])?
plan.with_new_inputs(&[new_cross_join])?
}

LogicalPlan::Join(join) => {
Expand All @@ -195,19 +194,19 @@ impl OptimizerRule for LimitPushDown {
JoinType::Right => push_down_join(join, None, Some(limit)),
_ => push_down_join(join, None, None),
};
from_plan(plan, &plan.expressions(), &[new_join])?
plan.with_new_inputs(&[new_join])?
}

LogicalPlan::Sort(sort) => {
let sort_fetch = skip + fetch;
let new_input = LogicalPlan::Sort(Sort {
let new_sort = LogicalPlan::Sort(Sort {
expr: sort.expr.clone(),
input: Arc::new((*sort.input).clone()),
fetch: Some(
sort.fetch.map(|f| f.min(sort_fetch)).unwrap_or(sort_fetch),
),
});
from_plan(plan, &plan.expressions(), &[new_input])?
plan.with_new_inputs(&[new_sort])?
}
_ => plan.clone(),
};
Expand Down

0 comments on commit dafd957

Please sign in to comment.