Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement rewrite for ExtractEquijoinPredicate and avoid clone in filter #10165

Merged
merged 3 commits into from
Apr 22, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 91 additions & 39 deletions datafusion/optimizer/src/extract_equijoin_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
//! [`ExtractEquijoinPredicate`] identifies equality join (equijoin) predicates
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::DFSchema;
use datafusion_common::tree_node::Transformed;
use datafusion_common::Result;
use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair, split_conjunction};
use datafusion_common::{internal_err, DFSchema};
use datafusion_expr::expr::Alias;
use datafusion_expr::utils::{can_hash, find_valid_equijoin_key_pair};
use datafusion_expr::{BinaryExpr, Expr, ExprSchemable, Join, LogicalPlan, Operator};
use std::sync::Arc;

Expand Down Expand Up @@ -51,14 +53,33 @@ impl ExtractEquijoinPredicate {
impl OptimizerRule for ExtractEquijoinPredicate {
fn try_optimize(
&self,
plan: &LogicalPlan,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
internal_err!("Should have called ExtractEquijoinPredicate::rewrite")
}
fn supports_rewrite(&self) -> bool {
true
}

fn name(&self) -> &str {
"extract_equijoin_predicate"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}

fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Join(Join {
left,
right,
on,
mut on,
filter,
join_type,
join_constraint,
Expand All @@ -67,66 +88,97 @@ impl OptimizerRule for ExtractEquijoinPredicate {
}) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be able to avoid a level of indenting if you did the entire match here, like:

 match plan {
            LogicalPlan::Join(Join {
                left,
                right,
                mut on,
                filter: Some(expr),
                join_type,
                join_constraint,
            }) => {

Then you could avoid reassembling LogicalPlan::Join as well

let left_schema = left.schema();
let right_schema = right.schema();

filter.as_ref().map_or(Result::Ok(None), |expr| {
if let Some(expr) = filter {
let (equijoin_predicates, non_equijoin_expr) =
split_eq_and_noneq_join_predicate(
expr,
left_schema,
right_schema,
)?;

let optimized_plan = (!equijoin_predicates.is_empty()).then(|| {
let mut new_on = on.clone();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here to avoid clone of vector on

new_on.extend(equijoin_predicates);

LogicalPlan::Join(Join {
left: left.clone(),
right: right.clone(),
on: new_on,
Comment on lines -84 to -86
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here to avoid clone of arc

if !equijoin_predicates.is_empty() {
on.extend(equijoin_predicates);
Ok(Transformed::yes(LogicalPlan::Join(Join {
left,
right,
on,
filter: non_equijoin_expr,
join_type,
join_constraint,
schema,
null_equals_null,
})))
} else {
Ok(Transformed::no(LogicalPlan::Join(Join {
left,
right,
on,
filter: non_equijoin_expr,
join_type: *join_type,
join_constraint: *join_constraint,
schema: schema.clone(),
null_equals_null: *null_equals_null,
})
});

Ok(optimized_plan)
})
join_type,
join_constraint,
schema,
null_equals_null,
})))
}
} else {
Ok(Transformed::no(LogicalPlan::Join(Join {
left,
right,
on,
filter,
join_type,
join_constraint,
schema,
null_equals_null,
})))
}
}
_ => Ok(None),
_ => Ok(Transformed::no(plan)),
}
}
}

fn name(&self) -> &str {
"extract_equijoin_predicate"
}
/// split with ownership
fn split_conjunction_own(expr: Expr) -> Vec<Expr> {
Copy link
Contributor Author

@Lordworms Lordworms Apr 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here implement a ownership version of split conjunction to avoid clone while passing reference

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok got it

split_conjunction_own_impl(expr, vec![])
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
fn split_conjunction_own_impl(expr: Expr, mut exprs: Vec<Expr>) -> Vec<Expr> {
match expr {
Expr::BinaryExpr(BinaryExpr {
right,
op: Operator::And,
left,
}) => {
let mut left_exprs = split_conjunction_own_impl(*left, vec![]);
let mut right_exprs = split_conjunction_own_impl(*right, vec![]);
left_exprs.append(&mut right_exprs);
left_exprs
}
Expr::Alias(Alias { expr, .. }) => split_conjunction_own_impl(*expr, exprs),
other => {
exprs.push(other);
exprs
}
}
}

fn split_eq_and_noneq_join_predicate(
filter: &Expr,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we directly split filter to avoid further clone

filter: Expr,
left_schema: &Arc<DFSchema>,
right_schema: &Arc<DFSchema>,
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
let exprs = split_conjunction(filter);
let exprs = split_conjunction_own(filter);

let mut accum_join_keys: Vec<(Expr, Expr)> = vec![];
let mut accum_filters: Vec<Expr> = vec![];
for expr in exprs {
match expr {
Expr::BinaryExpr(BinaryExpr {
left,
ref left,
op: Operator::Eq,
right,
ref right,
}) => {
let left = left.as_ref();
let right = right.as_ref();

let join_key_pair = find_valid_equijoin_key_pair(
left,
right,
Expand All @@ -141,13 +193,13 @@ fn split_eq_and_noneq_join_predicate(
if can_hash(&left_expr_type) && can_hash(&right_expr_type) {
accum_join_keys.push((left_expr, right_expr));
} else {
accum_filters.push(expr.clone());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid clone

accum_filters.push(expr);
}
} else {
accum_filters.push(expr.clone());
accum_filters.push(expr);
}
}
_ => accum_filters.push(expr.clone()),
_ => accum_filters.push(expr),
}
}

Expand Down