From dcef37fcf22514bd55710c0665dce696a3f25893 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 13 Oct 2023 14:10:29 -0400 Subject: [PATCH] WIP: Simplify join logic --- .../substrait/src/logical_plan/consumer.rs | 84 ++++--------------- 1 file changed, 14 insertions(+), 70 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 82e457767bb43..d00f2d3308883 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -130,51 +130,6 @@ fn scalar_function_type_from_str(name: &str) -> Result { } } -fn split_eq_and_noneq_join_predicate_with_nulls_equality( - filter: &Expr, -) -> (Vec<(Column, Column)>, bool, Option) { - let exprs = split_conjunction(filter); - - let mut accum_join_keys: Vec<(Column, Column)> = vec![]; - let mut accum_filters: Vec = vec![]; - let mut nulls_equal_nulls = false; - - for expr in exprs { - match expr { - Expr::BinaryExpr(binary_expr) => match binary_expr { - x @ (BinaryExpr { - left, - op: Operator::Eq, - right, - } - | BinaryExpr { - left, - op: Operator::IsNotDistinctFrom, - right, - }) => { - nulls_equal_nulls = match x.op { - Operator::Eq => false, - Operator::IsNotDistinctFrom => true, - _ => unreachable!(), - }; - - match (left.as_ref(), right.as_ref()) { - (Expr::Column(l), Expr::Column(r)) => { - accum_join_keys.push((l.clone(), r.clone())); - } - _ => accum_filters.push(expr.clone()), - } - } - _ => accum_filters.push(expr.clone()), - }, - _ => accum_filters.push(expr.clone()), - } - } - - let join_filter = accum_filters.into_iter().reduce(Expr::and); - (accum_join_keys, nulls_equal_nulls, join_filter) -} - /// Convert Substrait Plan to DataFusion DataFrame pub async fn from_substrait_plan( ctx: &mut SessionContext, @@ -395,36 +350,25 @@ pub async fn from_substrait_rel( from_substrait_rel(ctx, join.right.as_ref().unwrap(), extensions).await?, ); let join_type = from_substrait_jointype(join.r#type)?; - // The join condition expression needs full input schema and not the output schema from join since we lose columns from - // certain join types such as semi and anti joins + + // The join condition expression needs full input schema and not the + // output schema from join since we lose columns from certain join + // types such as semi and anti joins let in_join_schema = left.schema().join(right.schema())?; - // If join expression exists, parse the `on` condition expression, build join and return - // Otherwise, build join with only the filter, without join keys - match &join.expression.as_ref() { + // Use the join expression as the `on` expression. DataFusion's optimizer will + // split the predicates into equijoins if needed. + let join_expression = match join.expression.as_ref() { Some(expr) => { - let on = + let expr = from_substrait_rex(expr, &in_join_schema, extensions).await?; - // The join expression can contain both equal and non-equal ops. - // As of datafusion 31.0.0, the equal and non equal join conditions are in separate fields. - // So we extract each part as follows: - // - If an Eq or IsNotDistinctFrom op is encountered, add the left column, right column and is_null_equal_nulls to `join_ons` vector - // - Otherwise we add the expression to join_filter (use conjunction if filter already exists) - let (join_ons, nulls_equal_nulls, join_filter) = - split_eq_and_noneq_join_predicate_with_nulls_equality(&on); - let (left_cols, right_cols): (Vec<_>, Vec<_>) = - itertools::multiunzip(join_ons); - left.join_detailed( - right.build()?, - join_type, - (left_cols, right_cols), - join_filter, - nulls_equal_nulls, - )? - .build() + Some(expr.as_ref().clone()) } - None => plan_err!("JoinRel without join condition is not allowed"), - } + None => None, + }; + + left.join_on(right.build()?, join_type, join_expression)? + .build() } Some(RelType::Read(read)) => match &read.as_ref().read_type { Some(ReadType::NamedTable(nt)) => {