Skip to content

Commit

Permalink
fix: Incorrect LEFT JOIN evaluation result on OR conditions (apache#1…
Browse files Browse the repository at this point in the history
…1203)

* fix: Incorrect LEFT JOIN evaluation result on OR conditions

* Add a few more test cases

* Don't push join filter predicates into join_conditions

* Add test case and fix typo

* Add test case

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
2 people authored and findepi committed Jul 16, 2024
1 parent c812f37 commit 94324ee
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 2 deletions.
22 changes: 20 additions & 2 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,10 @@ fn push_down_all_join(
}
}

let mut on_filter_join_conditions = vec![];
let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(join.join_type)?;

if !on_filter.is_empty() {
let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(join.join_type)?;
for on in on_filter {
if on_left_preserved && can_pushdown_join_predicate(&on, left_schema)? {
left_push.push(on)
Expand All @@ -434,7 +436,7 @@ fn push_down_all_join(
{
right_push.push(on)
} else {
join_conditions.push(on)
on_filter_join_conditions.push(on)
}
}
}
Expand All @@ -450,6 +452,21 @@ fn push_down_all_join(
right_push.extend(extract_or_clauses_for_join(&join_conditions, right_schema));
}

// For predicates from join filter, we should check with if a join side is preserved
// in term of join filtering.
if on_left_preserved {
left_push.extend(extract_or_clauses_for_join(
&on_filter_join_conditions,
left_schema,
));
}
if on_right_preserved {
right_push.extend(extract_or_clauses_for_join(
&on_filter_join_conditions,
right_schema,
));
}

if let Some(predicate) = conjunction(left_push) {
join.left = Arc::new(LogicalPlan::Filter(Filter::try_new(predicate, join.left)?));
}
Expand All @@ -459,6 +476,7 @@ fn push_down_all_join(
}

// Add any new join conditions as the non join predicates
join_conditions.extend(on_filter_join_conditions);
join.filter = conjunction(join_conditions);

// wrap the join on the filter whose predicates must be kept, if any
Expand Down
193 changes: 193 additions & 0 deletions datafusion/sqllogictest/test_files/join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -793,3 +793,196 @@ DROP TABLE companies

statement ok
DROP TABLE leads

####
## Test ON clause predicates are not pushed past join for OUTER JOINs
####


# create tables
statement ok
CREATE TABLE employees(emp_id INT, name VARCHAR);

statement ok
CREATE TABLE department(emp_id INT, dept_name VARCHAR);

statement ok
INSERT INTO employees (emp_id, name) VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carol');

statement ok
INSERT INTO department (emp_id, dept_name) VALUES (1, 'HR'), (3, 'Engineering'), (4, 'Sales');

# Can not push the ON filter below an OUTER JOIN
query TT
EXPLAIN SELECT e.emp_id, e.name, d.dept_name
FROM employees AS e
LEFT JOIN department AS d
ON (e.name = 'Alice' OR e.name = 'Bob');
----
logical_plan
01)Left Join: Filter: e.name = Utf8("Alice") OR e.name = Utf8("Bob")
02)--SubqueryAlias: e
03)----TableScan: employees projection=[emp_id, name]
04)--SubqueryAlias: d
05)----TableScan: department projection=[dept_name]
physical_plan
01)ProjectionExec: expr=[emp_id@1 as emp_id, name@2 as name, dept_name@0 as dept_name]
02)--NestedLoopJoinExec: join_type=Right, filter=name@0 = Alice OR name@0 = Bob
03)----MemoryExec: partitions=1, partition_sizes=[1]
04)----MemoryExec: partitions=1, partition_sizes=[1]

query ITT
SELECT e.emp_id, e.name, d.dept_name
FROM employees AS e
LEFT JOIN department AS d
ON (e.name = 'Alice' OR e.name = 'Bob');
----
1 Alice HR
2 Bob HR
1 Alice Engineering
2 Bob Engineering
1 Alice Sales
2 Bob Sales
3 Carol NULL

# neither RIGHT OUTER JOIN
query ITT
SELECT e.emp_id, e.name, d.dept_name
FROM department AS d
RIGHT JOIN employees AS e
ON (e.name = 'Alice' OR e.name = 'Bob');
----
1 Alice HR
2 Bob HR
1 Alice Engineering
2 Bob Engineering
1 Alice Sales
2 Bob Sales
3 Carol NULL

# neither FULL OUTER JOIN
query ITT
SELECT e.emp_id, e.name, d.dept_name
FROM department AS d
FULL JOIN employees AS e
ON (e.name = 'Alice' OR e.name = 'Bob');
----
1 Alice HR
2 Bob HR
1 Alice Engineering
2 Bob Engineering
1 Alice Sales
2 Bob Sales
3 Carol NULL

query ITT
SELECT e.emp_id, e.name, d.dept_name
FROM employees e
LEFT JOIN department d
ON (e.name = 'NotExist1' OR e.name = 'NotExist2');
----
1 Alice NULL
2 Bob NULL
3 Carol NULL

query ITT
SELECT e.emp_id, e.name, d.dept_name
FROM employees e
LEFT JOIN department d
ON (e.name = 'Alice' OR e.name = 'NotExist');
----
1 Alice HR
1 Alice Engineering
1 Alice Sales
2 Bob NULL
3 Carol NULL

# Can push the ON filter below the JOIN for INNER JOIN (expect to see a filter below the join)
query TT
EXPLAIN SELECT e.emp_id, e.name, d.dept_name
FROM employees AS e
JOIN department AS d
ON (e.name = 'Alice' OR e.name = 'Bob');
----
logical_plan
01)CrossJoin:
02)--SubqueryAlias: e
03)----Filter: employees.name = Utf8("Alice") OR employees.name = Utf8("Bob")
04)------TableScan: employees projection=[emp_id, name]
05)--SubqueryAlias: d
06)----TableScan: department projection=[dept_name]
physical_plan
01)CrossJoinExec
02)--CoalesceBatchesExec: target_batch_size=8192
03)----FilterExec: name@1 = Alice OR name@1 = Bob
04)------MemoryExec: partitions=1, partition_sizes=[1]
05)--MemoryExec: partitions=1, partition_sizes=[1]

# expect no row for Carol
query ITT
SELECT e.emp_id, e.name, d.dept_name
FROM employees AS e
JOIN department AS d
ON (e.name = 'Alice' OR e.name = 'Bob');
----
1 Alice HR
1 Alice Engineering
1 Alice Sales
2 Bob HR
2 Bob Engineering
2 Bob Sales

# OR conditions on Filter (not join filter)
query ITT
SELECT e.emp_id, e.name, d.dept_name
FROM employees AS e
LEFT JOIN department AS d
ON e.emp_id = d.emp_id
WHERE (e.name = 'Alice' OR e.name = 'Carol');
----
1 Alice HR
3 Carol Engineering

# Push down OR conditions on Filter through LEFT JOIN if possible
query TT
EXPLAIN SELECT e.emp_id, e.name, d.dept_name
FROM employees AS e
LEFT JOIN department AS d
ON e.emp_id = d.emp_id
WHERE ((dept_name != 'Engineering' AND e.name = 'Alice') OR (name != 'Alice' AND e.name = 'Carol'));
----
logical_plan
01)Filter: d.dept_name != Utf8("Engineering") AND e.name = Utf8("Alice") OR e.name != Utf8("Alice") AND e.name = Utf8("Carol")
02)--Projection: e.emp_id, e.name, d.dept_name
03)----Left Join: e.emp_id = d.emp_id
04)------SubqueryAlias: e
05)--------Filter: employees.name = Utf8("Alice") OR employees.name != Utf8("Alice") AND employees.name = Utf8("Carol")
06)----------TableScan: employees projection=[emp_id, name]
07)------SubqueryAlias: d
08)--------TableScan: department projection=[emp_id, dept_name]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: dept_name@2 != Engineering AND name@1 = Alice OR name@1 != Alice AND name@1 = Carol
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------HashJoinExec: mode=CollectLeft, join_type=Left, on=[(emp_id@0, emp_id@0)], projection=[emp_id@0, name@1, dept_name@3]
06)----------CoalesceBatchesExec: target_batch_size=8192
07)------------FilterExec: name@1 = Alice OR name@1 != Alice AND name@1 = Carol
08)--------------MemoryExec: partitions=1, partition_sizes=[1]
09)----------MemoryExec: partitions=1, partition_sizes=[1]

query ITT
SELECT e.emp_id, e.name, d.dept_name
FROM employees AS e
LEFT JOIN department AS d
ON e.emp_id = d.emp_id
WHERE ((dept_name != 'Engineering' AND e.name = 'Alice') OR (name != 'Alice' AND e.name = 'Carol'));
----
1 Alice HR
3 Carol Engineering

statement ok
DROP TABLE employees

statement ok
DROP TABLE department

0 comments on commit 94324ee

Please sign in to comment.