Skip to content

Commit

Permalink
Support IGNORE NULLS for LEAD window function (#9419)
Browse files Browse the repository at this point in the history
* IGNORE NULLS support for LEAD

* fix

* fix

* fix
  • Loading branch information
comphead authored Mar 4, 2024
1 parent 1a4dc00 commit dd1cf01
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 12 deletions.
23 changes: 19 additions & 4 deletions datafusion/physical-expr/src/window/lead_lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl BuiltInWindowFunctionExpr for WindowShift {
default_value: self.default_value.clone(),
ignore_nulls: self.ignore_nulls,
non_null_offsets: VecDeque::new(),
curr_valid_idx: 0,
}))
}

Expand All @@ -143,6 +144,8 @@ pub(crate) struct WindowShiftEvaluator {
ignore_nulls: bool,
// VecDeque contains offset values that between non-null entries
non_null_offsets: VecDeque<usize>,
// Current idx pointing to non null value
curr_valid_idx: i64,
}

impl WindowShiftEvaluator {
Expand Down Expand Up @@ -270,10 +273,22 @@ impl PartitionEvaluator for WindowShiftEvaluator {
self.non_null_offsets[end_idx] += 1;
}
} else if self.ignore_nulls && !self.is_lag() {
// IGNORE NULLS and LEAD mode.
return Err(exec_datafusion_err!(
"IGNORE NULLS mode for LEAD is not supported for BoundedWindowAggExec"
));
if self.non_null_offsets.is_empty() {
self.non_null_offsets
.extend(array.nulls().unwrap().valid_indices().collect::<Vec<_>>());
}
if range.start == 0 && array.is_valid(0) {
self.curr_valid_idx = -self.shift_offset - 1;
}
let idx0 = self.non_null_offsets.get(self.curr_valid_idx as usize);
if idx0.is_some() && range.start == *idx0.unwrap() {
self.curr_valid_idx += 1;
}
idx = self
.non_null_offsets
.get((self.curr_valid_idx - self.shift_offset - 1) as usize)
.map(|v| *v as i64)
.unwrap_or(-1);
}

// Set the default value if
Expand Down
76 changes: 68 additions & 8 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4169,32 +4169,92 @@ select lag(a, 2, null) ignore nulls over (order by id desc) as x1,
sum(id) over (order by id desc ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as sum_id
from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, null union all select 4 id, 'x')

# LEAD window function IGNORE/RESPECT NULLS support with ascending order and default offset 1
query TTTTTT
select lead(a) ignore nulls over (order by id) as x,
lead(a, 1, null) ignore nulls over (order by id) as x1,
lead(a, 1, 'def') ignore nulls over (order by id) as x2,
lead(a) respect nulls over (order by id) as x3,
lead(a, 1, null) respect nulls over (order by id) as x4,
lead(a, 1, 'def') respect nulls over (order by id) as x5
from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, null union all select 4 id, 'x')
----
b b b b b b
x x x NULL NULL NULL
x x x x x x
NULL NULL def NULL NULL def

# LEAD window function IGNORE/RESPECT NULLS support with descending order and default offset 1
query TTTTTT
select lead(a) ignore nulls over (order by id desc) as x,
lead(a, 1, null) ignore nulls over (order by id desc) as x1,
lead(a, 1, 'def') ignore nulls over (order by id desc) as x2,
lead(a) respect nulls over (order by id desc) as x3,
lead(a, 1, null) respect nulls over (order by id desc) as x4,
lead(a, 1, 'def') respect nulls over (order by id desc) as x5
from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, null union all select 4 id, 'x')
----
b b b NULL NULL NULL
b b b b b b
NULL NULL def NULL NULL NULL
NULL NULL def NULL NULL def

# LEAD window function IGNORE/RESPECT NULLS support with ascending order and nondefault offset
query TTTT
select lead(a, 2, null) ignore nulls over (order by id) as x1,
lead(a, 2, 'def') ignore nulls over (order by id) as x2,
lead(a, 2, null) respect nulls over (order by id) as x4,
lead(a, 2, 'def') respect nulls over (order by id) as x5
from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, null union all select 4 id, 'x')
----
x x NULL NULL
NULL def x x
NULL def NULL def
NULL def NULL def

# LEAD window function IGNORE/RESPECT NULLS support with descending order and nondefault offset
statement error Execution error: IGNORE NULLS mode for LEAD is not supported for BoundedWindowAggExec
query TTTT
select lead(a, 2, null) ignore nulls over (order by id desc) as x1,
lead(a, 2, 'def') ignore nulls over (order by id desc) as x2,
lead(a, 2, null) respect nulls over (order by id desc) as x4,
lead(a, 2, 'def') respect nulls over (order by id desc) as x5
from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, null union all select 4 id, 'x')
----
NULL def b b
NULL def NULL NULL
NULL def NULL def
NULL def NULL def

# LEAD window function IGNORE/RESPECT NULLS support with descending order and nondefault offset.
# To trigger WindowAggExec, we added a sum window function with all of the ranges.
statement error Execution error: IGNORE NULLS mode for LAG and LEAD is not supported for WindowAggExec
select lead(a, 2, null) ignore nulls over (order by id desc) as x1,
lead(a, 2, 'def') ignore nulls over (order by id desc) as x2,
lead(a, 2, null) respect nulls over (order by id desc) as x4,
lead(a, 2, 'def') respect nulls over (order by id desc) as x5,
sum(id) over (order by id desc ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as sum_id
from (select 2 id, 'b' a union all select 1 id, null a union all select 3 id, null union all select 4 id, 'x')

statement ok
set datafusion.execution.batch_size = 1000;

query I
SELECT LAG(c1, 2) IGNORE NULLS OVER()
query II
SELECT LAG(c1, 2) IGNORE NULLS OVER(),
LEAD(c1, 2) IGNORE NULLS OVER()
FROM null_cases
ORDER BY c2
LIMIT 5;
----
78
63
3
24
14
78 50
63 38
3 53
24 31
14 94

# result should be same with above, when lag algorithm work with pruned data.
# decreasing batch size, causes data to be produced in smaller chunks at the source.
# Hence sliding window algorithm is used during calculations.
# LEAD function to be added, there is some bug with incoming data array when LEAD is called
statement ok
set datafusion.execution.batch_size = 1;

Expand Down

0 comments on commit dd1cf01

Please sign in to comment.