Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve push down filter of join #13184

Merged

Conversation

JasonLi-cn
Copy link
Contributor

@JasonLi-cn JasonLi-cn commented Oct 30, 2024

Which issue does this PR close?

Closes #.

Rationale for this change

In the current version, the push down filter of Join is not very complete. For example:

> CREATE TABLE t1 (c0 CHAR, c1 INT);
0 row(s) fetched.
Elapsed 0.006 seconds.

> CREATE TABLE t2 (c0 CHAR, c1 INT);
0 row(s) fetched.
Elapsed 0.005 seconds.

> EXPLAIN SELECT * FROM t1 LEFT JOIN t2 ON t1.c0 = t2.c0 WHERE abs(t2.c1) > 5;
+---------------+---------------------------------------------------------------------------+
| plan_type     | plan                                                                      |
+---------------+---------------------------------------------------------------------------+
| logical_plan  | Filter: abs(t2.c1) > Int32(5)                                             |
|               |   Left Join: t1.c0 = t2.c0                                                |
|               |     TableScan: t1 projection=[c0, c1]                                     |
|               |     TableScan: t2 projection=[c0, c1]                                     |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192                               |
|               |   FilterExec: abs(c1@3) > 5                                               |
|               |     RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 |
|               |       CoalesceBatchesExec: target_batch_size=8192                         |
|               |         HashJoinExec: mode=Partitioned, join_type=Left, on=[(c0@0, c0@0)] |
|               |           MemoryExec: partitions=1, partition_sizes=[0]                   |
|               |           MemoryExec: partitions=1, partition_sizes=[0]                   |
|               |                                                                           |
+---------------+---------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.012 seconds.

Expect

> EXPLAIN SELECT * FROM t1 LEFT JOIN t2 ON t1.c0 = t2.c0 WHERE abs(t2.c1) > 5;
+---------------+-------------------------------------------------------------------------------+
| plan_type     | plan                                                                          |
+---------------+-------------------------------------------------------------------------------+
| logical_plan  | Inner Join: t1.c0 = t2.c0                                                      |
|               |   TableScan: t1 projection=[c0, c1]                                           |
|               |   Filter: abs(t2.c1) > Int32(5)                                               |
|               |     TableScan: t2 projection=[c0, c1]                                         |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192                                   |
|               |   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c0@0, c0@0)]           |
|               |     CoalesceBatchesExec: target_batch_size=8192                               |
|               |       RepartitionExec: partitioning=Hash([c0@0], 16), input_partitions=1      |
|               |         MemoryExec: partitions=1, partition_sizes=[0]                         |
|               |     CoalesceBatchesExec: target_batch_size=8192                               |
|               |       RepartitionExec: partitioning=Hash([c0@0], 16), input_partitions=16     |
|               |         RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 |
|               |           CoalesceBatchesExec: target_batch_size=8192                         |
|               |             FilterExec: abs(c1@1) > 5                                         |
|               |               MemoryExec: partitions=1, partition_sizes=[0]                   |
|               |                                                                               |
+---------------+-------------------------------------------------------------------------------+

BTW: If eliminate_outer_join can convert Left Join to Inner Join in this query, the filter abs(t2.c1) > Int32(5) can also be pushed down. So the next plan is to improve eliminate_outer_join rule. It's a little difficult.

What changes are included in this PR?

Modify the push_down_all_join functioninpush_down_filter` rule.

Are these changes tested?

yes

Are there any user-facing changes?

@github-actions github-actions bot added optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) labels Oct 30, 2024
@comphead comphead merged commit 7ae1ccb into apache:main Oct 31, 2024
25 checks passed
@comphead
Copy link
Contributor

Thanks everyone

Copy link
Contributor

@eejbyfeldt eejbyfeldt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR has correctness issues see: #13211

99 NULL NULL NULL

query ITIT
SELECT * FROM t1 LEFT JOIN t2 ON t1.t1_id = t2.t2_id AND t2.t2_name = 'z' ORDER BY t1_id, t1_name, t2_id, t2_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the predicates here are "to simple" and a lot of the outer joins here will be converted to Inner joins by eliminate_outer_join rule and therefore these tests do not provide the desired coverage.

Comment on lines +439 to +450
let columns = predicate.column_refs();
macro_rules! restrict_null {
() => {{
let predicate_cloned = predicate.clone();
let cols = columns.iter().cloned();
is_restrict_null_predicate(predicate_cloned, cols).unwrap_or(false)
}};
}

if checker.left_only(&columns) && (left_preserved || restrict_null!()) {
left_push.push(predicate);
} else if right_preserved && checker.is_right_only(&predicate) {
} else if checker.right_only(&columns) && (right_preserved || restrict_null!()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems incorrect. Why would a predicate being restrict null allow us to push it? It filtering it out null seems to be a argument against why it can be pushed down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is wrong. I neglected to modify JoinType here. Thank you @eejbyfeldt for your help in finding this problem.

eejbyfeldt added a commit to eejbyfeldt/datafusion that referenced this pull request Nov 2, 2024
alamb pushed a commit that referenced this pull request Nov 2, 2024
@alamb
Copy link
Contributor

alamb commented Nov 2, 2024

For anyone following along, this PR appears to have had some correctness issues so @eejbyfeldt reverted it #13229

@JasonLi-cn are you willing to create a new PR that we can work through the correctness issues

@Dandandan
Copy link
Contributor

Dandandan commented Nov 2, 2024

I think for this pushdown to be correct, the join type can be changed.

I think this can be done in two phases:

  1. if there is a filter for a left join on a right side (or left side for right join, etc.), the join can be transformed to inner join as the filter will remove all rows without match
  2. the existing filter pushdown will push down the below the inner join

@Dandandan
Copy link
Contributor

I think for this pushdown to be correct, the join type can be changed.

I think this can be done in two phases:

  1. if there is a filter for a left join on a right side (or left side for right join, etc.), the join can be transformed to inner join as the filter will remove all rows without match
  2. the existing filter pushdown will push down the below the inner join

Just adding to it, I think supporting this would be relatively simple. Currently eliminate_outer_join only supports columns that are non nullable, we can support expressions that filter out any nulls like most binary operators.

fn extract_non_nullable_columns(

@Dandandan
Copy link
Contributor

Hm actually, the transformation is already supported by EliminateOuterJoin, the only thing that's necessary is adding support for more expressions like abs like used in the example.

@Dandandan
Copy link
Contributor

I tracked this here #13232

@JasonLi-cn
Copy link
Contributor Author

For anyone following along, this PR appears to have had some correctness issues so @eejbyfeldt reverted it #13229

@JasonLi-cn are you willing to create a new PR that we can work through the correctness issues

I'm sorry that this PR has introduced bugs. I will continue to follow up this issue, and then please continue to help review. 🙏

@alamb alamb mentioned this pull request Nov 5, 2024
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants