Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encode all join conditions in a single expression field #7612

Merged
merged 6 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 55 additions & 35 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ fn scalar_function_type_from_str(name: &str) -> Result<ScalarFunctionType> {
}
}

#[inline(always)]
fn make_mixed_join_condition(
join_filter: Option<Expr>,
predicate: &Expr,
) -> Option<Expr> {
match &join_filter {
Some(filter) => Some(filter.clone().and(predicate.clone())),
None => Some(predicate.clone()),
}
}

/// Convert Substrait Plan to DataFusion DataFrame
pub async fn from_substrait_plan(
ctx: &mut SessionContext,
Expand Down Expand Up @@ -331,6 +342,12 @@ pub async fn from_substrait_rel(
}
}
Some(RelType::Join(join)) => {
if join.post_join_filter.is_some() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return not_impl_err!(
"JoinRel with post_join_filter is not yet supported"
);
}

let left = LogicalPlanBuilder::from(
from_substrait_rel(ctx, join.left.as_ref().unwrap(), extensions).await?,
);
Expand All @@ -341,65 +358,68 @@ pub async fn from_substrait_rel(
// The join condition expression needs full input schema and not the output schema from join since we lose columns from
// certain join types such as semi and anti joins
let in_join_schema = left.schema().join(right.schema())?;
// Parse post join filter if exists
let join_filter = match &join.post_join_filter {
Some(filter) => {
let parsed_filter =
from_substrait_rex(filter, &in_join_schema, extensions).await?;
Some(parsed_filter.as_ref().clone())
}
None => None,
};
let mut join_filter: Option<Expr> = None;
// If join expression exists, parse the `on` condition expression, build join and return
// Otherwise, build join with koin filter, without join keys
// Otherwise, build join with only the filter, without join keys
match &join.expression.as_ref() {
Some(expr) => {
let on =
from_substrait_rex(expr, &in_join_schema, extensions).await?;
let predicates = split_conjunction(&on);
// TODO: collect only one null_eq_null
let join_exprs: Vec<(Column, Column, bool)> = predicates
.iter()
.map(|p| match p {
// The predicates can contain both equal and non-equal ops.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is a good idea, it seems to make things more complex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dandandan Thank you for pointing this out. I agree that this makes things more complicated, and my apologies for the inaccurate explanation of the issue. I added a comment to correct my description of the issue.

The high-level idea is that join filter and post_join_filter do not have the same meaning semantically. The former is for filtering input during/pre join, the latter is for filtering the output of the join (post-join). Currently in datafusion, we do not have a field in join relation that represents a post-join predicate (the parser/logical optimizer takes care of creating an appropriate filter relation if necessary). So the producer should only generate plans with None as post_join_filer.

I'll modify the consumer to throw an error for now if there's a post_join_filter. Later, we can wrap the join relation with a filter relation if we want to support post_join_filter, or if the later version of datafusion supports post_join_filter directly in the join relation, then we can also add the support in both producer and consumer as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into the substrait spec, and it doesn't really talk about what semantics of the post_join_filter is: https://substrait.io/relations/logical_relations/#join-operation
https://github.com/search?q=repo%3Asubstrait-io%2Fsubstrait%20post_join_filter&type=code

There is a subtle distinction between non equality filters applied during the join (in the ON clause) and applied post join for non-INNER joins: for non inner joins the filters during the join don't filter out input rows (they still come out, just with NULL matches)

So the producer should only generate plans with None as post_join_filer.

This makes sense to me

For the consumer, there is already code in DataFusion that breaks up an arbitrary Expr into equality predicates and others. This is how the SQL frontend creates a Join (a single expr):

https://github.com/apache/arrow-datafusion/blob/a514b6752b063a5a3006aa114297520a933339b0/datafusion/sql/src/relation/join.rs#L134-L141

I think we could do the same here in the subtrait consumer which would be much simpler, and would let the normal DataFusion optimization machinery work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alamb for the pointer! I'll look into this and update the consumer.

// Since as of datafusion 31.0.0, the equal and non equal join conditions are in separate fields,
// we extract each part as follows:
// - If an equal op is encountered, add the left column, right column and is_null_equal_nulls to `join_on` vector
// - Otherwise we add the expression to join_filter (use conjunction if filter already exists)
let mut join_on: Vec<(Column, Column, bool)> = vec![];
for p in predicates {
match p {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
match (left.as_ref(), right.as_ref()) {
(Expr::Column(l), Expr::Column(r)) => match op {
Operator::Eq => Ok((l.clone(), r.clone(), false)),
Operator::Eq => {
join_on.push((l.clone(), r.clone(), false))
}
Operator::IsNotDistinctFrom => {
Ok((l.clone(), r.clone(), true))
join_on.push((l.clone(), r.clone(), true))
}
_ => {
// If the operator is not a form of an equal operator, add this expression to filter
join_filter =
make_mixed_join_condition(join_filter, p);
}
_ => plan_err!("invalid join condition op"),
},
_ => plan_err!("invalid join condition expression"),
_ => {
// If this is not a `col op col` expression, then `and` the expression to filter
join_filter =
make_mixed_join_condition(join_filter, p);
}
}
}
_ => plan_err!(
"Non-binary expression is not supported in join condition"
),
})
.collect::<Result<Vec<_>>>()?;
_ => {
// If this is not a binary expression, then `and` the expression to filter
join_filter = make_mixed_join_condition(join_filter, p);
}
}
}

let (left_cols, right_cols, null_eq_nulls): (Vec<_>, Vec<_>, Vec<_>) =
itertools::multiunzip(join_exprs);
itertools::multiunzip(join_on);
left.join_detailed(
right.build()?,
join_type,
(left_cols, right_cols),
join_filter,
null_eq_nulls[0],
if null_eq_nulls.is_empty() {
false // if no equal condition, default null_eq_nulls to false
} else {
null_eq_nulls[0]
},
)?
.build()
}
None => match &join_filter {
Some(_) => left
.join(
right.build()?,
join_type,
(Vec::<Column>::new(), Vec::<Column>::new()),
join_filter,
)?
.build(),
None => plan_err!("Join without join keys require a valid filter"),
},
None => plan_err!("JoinRel without join condition is not allowed"),
}
}
Some(RelType::Read(read)) => match &read.as_ref().read_type {
Expand Down
31 changes: 24 additions & 7 deletions datafusion/substrait/src/logical_plan/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,30 +277,47 @@ pub fn to_substrait_rel(
// parse filter if exists
let in_join_schema = join.left.schema().join(join.right.schema())?;
let join_filter = match &join.filter {
Some(filter) => Some(Box::new(to_substrait_rex(
Some(filter) => Some(to_substrait_rex(
filter,
&Arc::new(in_join_schema),
0,
extension_info,
)?)),
)?),
None => None,
};

// map the left and right columns to binary expressions in the form `l = r`
// build a single expression for the ON condition, such as `l.a = r.a AND l.b = r.b`
let eq_op = if join.null_equals_null {
Operator::IsNotDistinctFrom
} else {
Operator::Eq
};

let join_expr = to_substrait_join_expr(
let join_on = to_substrait_join_expr(
&join.on,
eq_op,
join.left.schema(),
join.right.schema(),
extension_info,
)?
.map(Box::new);
)?;

// create conjunction between `join_on` and `join_filter` to embed all join conditions,
// whether equal or non-equal in a single expression
let join_expr = match &join_on {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps you could use conjunction here and simplify the code https://docs.rs/datafusion/latest/datafusion/optimizer/utils/fn.conjunction.html

Some(on_expr) => match &join_filter {
Some(filter) => Some(Box::new(make_binary_op_scalar_func(
on_expr,
filter,
Operator::And,
extension_info,
))),
None => join_on.map(Box::new), // the join expression will only contain `join_on` if filter doesn't exist
},
None => match &join_filter {
Some(_) => join_filter.map(Box::new), // the join expression will only contain `join_file` if the `on` condition doesn't exist
None => None,
},
};

Ok(Box::new(Rel {
rel_type: Some(RelType::Join(Box::new(JoinRel {
Expand All @@ -309,7 +326,7 @@ pub fn to_substrait_rel(
right: Some(right),
r#type: join_type as i32,
expression: join_expr,
post_join_filter: join_filter,
post_join_filter: None,
advanced_extension: None,
}))),
}))
Expand Down