Skip to content

Commit

Permalink
WIP: Simplify join logic
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 13, 2023
1 parent 3f2d442 commit dcef37f
Showing 1 changed file with 14 additions and 70 deletions.
84 changes: 14 additions & 70 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,51 +130,6 @@ fn scalar_function_type_from_str(name: &str) -> Result<ScalarFunctionType> {
}
}

fn split_eq_and_noneq_join_predicate_with_nulls_equality(
filter: &Expr,
) -> (Vec<(Column, Column)>, bool, Option<Expr>) {
let exprs = split_conjunction(filter);

let mut accum_join_keys: Vec<(Column, Column)> = vec![];
let mut accum_filters: Vec<Expr> = vec![];
let mut nulls_equal_nulls = false;

for expr in exprs {
match expr {
Expr::BinaryExpr(binary_expr) => match binary_expr {
x @ (BinaryExpr {
left,
op: Operator::Eq,
right,
}
| BinaryExpr {
left,
op: Operator::IsNotDistinctFrom,
right,
}) => {
nulls_equal_nulls = match x.op {
Operator::Eq => false,
Operator::IsNotDistinctFrom => true,
_ => unreachable!(),
};

match (left.as_ref(), right.as_ref()) {
(Expr::Column(l), Expr::Column(r)) => {
accum_join_keys.push((l.clone(), r.clone()));
}
_ => accum_filters.push(expr.clone()),
}
}
_ => accum_filters.push(expr.clone()),
},
_ => accum_filters.push(expr.clone()),
}
}

let join_filter = accum_filters.into_iter().reduce(Expr::and);
(accum_join_keys, nulls_equal_nulls, join_filter)
}

/// Convert Substrait Plan to DataFusion DataFrame
pub async fn from_substrait_plan(
ctx: &mut SessionContext,
Expand Down Expand Up @@ -395,36 +350,25 @@ pub async fn from_substrait_rel(
from_substrait_rel(ctx, join.right.as_ref().unwrap(), extensions).await?,
);
let join_type = from_substrait_jointype(join.r#type)?;
// The join condition expression needs full input schema and not the output schema from join since we lose columns from
// certain join types such as semi and anti joins

// The join condition expression needs full input schema and not the
// output schema from join since we lose columns from certain join
// types such as semi and anti joins
let in_join_schema = left.schema().join(right.schema())?;

// If join expression exists, parse the `on` condition expression, build join and return
// Otherwise, build join with only the filter, without join keys
match &join.expression.as_ref() {
// Use the join expression as the `on` expression. DataFusion's optimizer will
// split the predicates into equijoins if needed.
let join_expression = match join.expression.as_ref() {
Some(expr) => {
let on =
let expr =
from_substrait_rex(expr, &in_join_schema, extensions).await?;
// The join expression can contain both equal and non-equal ops.
// As of datafusion 31.0.0, the equal and non equal join conditions are in separate fields.
// So we extract each part as follows:
// - If an Eq or IsNotDistinctFrom op is encountered, add the left column, right column and is_null_equal_nulls to `join_ons` vector
// - Otherwise we add the expression to join_filter (use conjunction if filter already exists)
let (join_ons, nulls_equal_nulls, join_filter) =
split_eq_and_noneq_join_predicate_with_nulls_equality(&on);
let (left_cols, right_cols): (Vec<_>, Vec<_>) =
itertools::multiunzip(join_ons);
left.join_detailed(
right.build()?,
join_type,
(left_cols, right_cols),
join_filter,
nulls_equal_nulls,
)?
.build()
Some(expr.as_ref().clone())
}
None => plan_err!("JoinRel without join condition is not allowed"),
}
None => None,
};

left.join_on(right.build()?, join_type, join_expression)?
.build()
}
Some(RelType::Read(read)) => match &read.as_ref().read_type {
Some(ReadType::NamedTable(nt)) => {
Expand Down

0 comments on commit dcef37f

Please sign in to comment.