-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use prep_null_mask_filter to handle nulls in selection mask #9163
Conversation
@@ -1209,7 +1209,14 @@ impl SMJStream { | |||
) { | |||
// The reverse of the selection mask. For the rows not pass join filter above, | |||
// we need to join them (left or right) with null rows for outer joins. | |||
let not_mask = compute::not(mask)?; | |||
let not_mask = if mask.null_count() > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I re-checked the results in sort_merge_join.slt
and it should be correct (as it is same as join.slt
which is produced by hash join operator).
// If the mask contains nulls, we need to use `prep_null_mask_filter` to | ||
// handle the nulls in the mask as false. | ||
compute::not(&compute::prep_null_mask_filter(mask))? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the test in sort_merge_join.slt
as example:
CREATE TABLE IF NOT EXISTS t1(t1_id INT, t1_name TEXT, t1_int INT) AS VALUES
(11, 'a', 1),
(22, 'b', 2),
(33, 'c', 3),
(44, 'd', 4);
CREATE TABLE IF NOT EXISTS t2(t2_id INT, t2_name TEXT, t2_int INT) AS VALUES
(11, 'z', 3),
(22, 'y', 1),
(44, 'x', 3),
(55, 'w', 3);
For the query SELECT t1_id, t1_int, t2_int FROM t1 LEFT JOIN t2 ON t1_id = t2_id AND t1_int >= t2_int
, 33 3 NULL
is one row to output without join filter. Join filter returns null on this row, so its value in mask selection array is null and the row is not picked.
Here we take reverse mask. As it is null, its reverse mask is still false, but this row should be selected here actually. So we need to call prep_null_mask_filter
to process the mask before taking reverse array.
// Handle not mask for buffered side further. | ||
// For buffered side, we want to output the rows that are not null joined with | ||
// the streamed side. i.e. the rows that are not null in the `buffered_indices`. | ||
let not_mask = if buffered_indices.null_count() > 0 { | ||
let nulls = buffered_indices.nulls().unwrap(); | ||
let mask = not_mask.values() & nulls.inner(); | ||
BooleanArray::new(mask, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For full outer join, we need to output buffered rows that fail join filter. But in the output_batch
batch, we only care about the rows with buffered_indices
not null. Other rows with null indices are rows failed with equijoin predicates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code looks good to me, but I am worried that we have no test coverage for this
I double checked and sort_merge_join.slt
seems to be running in CI. For example https://github.com/apache/arrow-datafusion/actions/runs/7848003246/job/21418418648#step:5:5748
Maybe it is time to increase the priority of running the CI tests on a mac: #9117
// Handle not mask for buffered side further. | ||
// For buffered side, we want to output the rows that are not null joined with | ||
// the streamed side. i.e. the rows that are not null in the `buffered_indices`. | ||
let not_mask = if buffered_indices.null_count() > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might be able to avoid the unwrap by using
let not_mask = if let Some(nulls) = if buffered_indices.nulls() {
let mask = not_mask.values() & nulls.inner();
BooleanArray::new(mask, None)
} else {
not_mask
};
Co-authored-by: Andrew Lamb <[email protected]>
…pache#9163)" This reverts commit b2ff63f.
Which issue does this PR close?
Follow up of #9080.
Close #9179
Rationale for this change
When applying join filter, we take filter evaluation array as selection mask to filter output batch. Next, we take the reverse mask to select (streamed) rows to join null rows.
With outer join cases, if filter evaluation has null values resulted in the selection mask, their reverse values are still nulls. Which makes some rows missed in (streamed) rows joined null rows.
We should use
prep_null_mask_filter
to handle nulls in selection mask as false before taking reverse mask from it.What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?