Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support IGNORE NULLS for LEAD window function #9419

Merged
merged 4 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions datafusion/physical-expr/src/window/lead_lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl BuiltInWindowFunctionExpr for WindowShift {
default_value: self.default_value.clone(),
ignore_nulls: self.ignore_nulls,
non_null_offsets: VecDeque::new(),
curr_valid_idx: 0,
}))
}

Expand All @@ -143,6 +144,8 @@ pub(crate) struct WindowShiftEvaluator {
ignore_nulls: bool,
// VecDeque contains offset values that between non-null entries
non_null_offsets: VecDeque<usize>,
// Current idx pointing to non null value
curr_valid_idx: i64,
}

impl WindowShiftEvaluator {
Expand Down Expand Up @@ -270,10 +273,22 @@ impl PartitionEvaluator for WindowShiftEvaluator {
self.non_null_offsets[end_idx] += 1;
}
} else if self.ignore_nulls && !self.is_lag() {
// IGNORE NULLS and LEAD mode.
return Err(exec_datafusion_err!(
"IGNORE NULLS mode for LEAD is not supported for BoundedWindowAggExec"
));
if self.non_null_offsets.is_empty() {
self.non_null_offsets
.extend(array.nulls().unwrap().valid_indices().collect::<Vec<_>>());
}
if range.start == 0 && array.is_valid(0) {
self.curr_valid_idx = -self.shift_offset - 1;
}
let idx0 = self.non_null_offsets.get(self.curr_valid_idx as usize);
if idx0.is_some() && range.start == *idx0.unwrap() {
self.curr_valid_idx += 1;
}
idx = self
.non_null_offsets
.get((self.curr_valid_idx - self.shift_offset - 1) as usize)
.map(|v| *v as i64)
.unwrap_or(-1);
}

// Set the default value if
Expand Down
78 changes: 69 additions & 9 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4169,36 +4169,96 @@ select lag(a, 2, null) ignore nulls over (order by id desc) as x1,
sum(id) over (order by id desc ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as sum_id
from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, null union all select 4 id, 'x')

# LEAD window function IGNORE/RESPECT NULLS support with ascending order and default offset 1
query TTTTTT
select lead(a) ignore nulls over (order by id) as x,
lead(a, 1, null) ignore nulls over (order by id) as x1,
lead(a, 1, 'def') ignore nulls over (order by id) as x2,
lead(a) respect nulls over (order by id) as x3,
lead(a, 1, null) respect nulls over (order by id) as x4,
lead(a, 1, 'def') respect nulls over (order by id) as x5
from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, null union all select 4 id, 'x')
----
b b b b b b
x x x NULL NULL NULL
x x x x x x
NULL NULL def NULL NULL def

# LEAD window function IGNORE/RESPECT NULLS support with descending order and default offset 1
query TTTTTT
select lead(a) ignore nulls over (order by id desc) as x,
lead(a, 1, null) ignore nulls over (order by id desc) as x1,
lead(a, 1, 'def') ignore nulls over (order by id desc) as x2,
lead(a) respect nulls over (order by id desc) as x3,
lead(a, 1, null) respect nulls over (order by id desc) as x4,
lead(a, 1, 'def') respect nulls over (order by id desc) as x5
from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, null union all select 4 id, 'x')
----
b b b NULL NULL NULL
b b b b b b
NULL NULL def NULL NULL NULL
NULL NULL def NULL NULL def

# LEAD window function IGNORE/RESPECT NULLS support with ascending order and nondefault offset
query TTTT
select lead(a, 2, null) ignore nulls over (order by id) as x1,
lead(a, 2, 'def') ignore nulls over (order by id) as x2,
lead(a, 2, null) respect nulls over (order by id) as x4,
lead(a, 2, 'def') respect nulls over (order by id) as x5
from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, null union all select 4 id, 'x')
----
x x NULL NULL
NULL def x x
NULL def NULL def
NULL def NULL def

# LEAD window function IGNORE/RESPECT NULLS support with descending order and nondefault offset
statement error Execution error: IGNORE NULLS mode for LEAD is not supported for BoundedWindowAggExec
query TTTT
select lead(a, 2, null) ignore nulls over (order by id desc) as x1,
lead(a, 2, 'def') ignore nulls over (order by id desc) as x2,
lead(a, 2, null) respect nulls over (order by id desc) as x4,
lead(a, 2, 'def') respect nulls over (order by id desc) as x5
from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, null union all select 4 id, 'x')
----
NULL def b b
NULL def NULL NULL
NULL def NULL def
NULL def NULL def

# LEAD window function IGNORE/RESPECT NULLS support with descending order and nondefault offset.
# To trigger WindowAggExec, we added a sum window function with all of the ranges.
statement error Execution error: IGNORE NULLS mode for LAG and LEAD is not supported for WindowAggExec
select lead(a, 2, null) ignore nulls over (order by id desc) as x1,
lead(a, 2, 'def') ignore nulls over (order by id desc) as x2,
lead(a, 2, null) respect nulls over (order by id desc) as x4,
lead(a, 2, 'def') respect nulls over (order by id desc) as x5,
sum(id) over (order by id desc ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as sum_id
from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, null union all select 4 id, 'x')

statement ok
set datafusion.execution.batch_size = 1000;

query I
SELECT LAG(c1, 2) IGNORE NULLS OVER()
query II
SELECT LAG(c1, 2) IGNORE NULLS OVER(),
LEAD(c1, 2) IGNORE NULLS OVER()
FROM null_cases
ORDER BY c2
LIMIT 5;
----
78
63
3
24
14
78 50
63 38
3 53
24 31
14 94

# result should be same with above, when lag algorithm work with pruned data.
# decreasing batch size, causes data to be produced in smaller chunks at the source.
# Hence sliding window algorithm is used during calculations.
# LEAD function to be added, there is some bug with incoming data array when LEAD is called
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll address this in following PR, what I found is the incoming data is incomplete for LEAD.
so for set datafusion.execution.batch_size = 8000;
the incoming array is complete

[datafusion/physical-expr/src/window/lead_lag.rs:280:17] &array = StringArray
[
  "x",
  null,
  "b",
  null,
]

and for set datafusion.execution.batch_size = 1;
the data is partially missed

[datafusion/physical-expr/src/window/lead_lag.rs:280:17] &array = StringArray
[
  "x",
  null,
  "b"
]

Probably the error is in built_in.rs I'll check it in following PR

statement ok
set datafusion.execution.batch_size = 1;

query I
query II
SELECT LAG(c1, 2) IGNORE NULLS OVER()
FROM null_cases
ORDER BY c2
Expand Down
Loading