Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore null LEAD support for small batch sizes. #9445

Merged
merged 8 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 55 additions & 19 deletions datafusion/physical-expr/src/window/lead_lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ impl BuiltInWindowFunctionExpr for WindowShift {
default_value: self.default_value.clone(),
ignore_nulls: self.ignore_nulls,
non_null_offsets: VecDeque::new(),
curr_valid_idx: 0,
}))
}

Expand All @@ -144,8 +143,6 @@ pub(crate) struct WindowShiftEvaluator {
ignore_nulls: bool,
// VecDeque contains offset values that between non-null entries
non_null_offsets: VecDeque<usize>,
// Current idx pointing to non null value
curr_valid_idx: i64,
}

impl WindowShiftEvaluator {
Expand Down Expand Up @@ -209,6 +206,7 @@ impl PartitionEvaluator for WindowShiftEvaluator {
fn get_range(&self, idx: usize, n_rows: usize) -> Result<Range<usize>> {
if self.is_lag() {
let start = if self.non_null_offsets.len() == self.shift_offset as usize {
// How many rows needed previous than the current row to get necessary lag result
let offset: usize = self.non_null_offsets.iter().sum();
idx.saturating_sub(offset + 1)
} else {
Expand All @@ -217,8 +215,13 @@ impl PartitionEvaluator for WindowShiftEvaluator {
let end = idx + 1;
Ok(Range { start, end })
} else {
let offset = (-self.shift_offset) as usize;
let end = min(idx + offset, n_rows);
let end = if self.non_null_offsets.len() == (-self.shift_offset) as usize {
// How many rows needed further than the current row to get necessary lead result
let offset: usize = self.non_null_offsets.iter().sum();
min(idx + offset + 1, n_rows)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

} else {
n_rows
};
Ok(Range { start: idx, end })
}
}
Expand Down Expand Up @@ -247,10 +250,10 @@ impl PartitionEvaluator for WindowShiftEvaluator {
range.start as i64 - self.shift_offset
};

// Support LAG only for now, as LEAD requires some brainstorm first
// LAG with IGNORE NULLS calculated as the current row index - offset, but only for non-NULL rows
// If current row index points to NULL value the row is NOT counted
if self.ignore_nulls && self.is_lag() {
// LAG when NULLS are ignored.
// Find the nonNULL row index that shifted by offset comparing to current row index
idx = if self.non_null_offsets.len() == self.shift_offset as usize {
let total_offset: usize = self.non_null_offsets.iter().sum();
Expand All @@ -273,22 +276,55 @@ impl PartitionEvaluator for WindowShiftEvaluator {
self.non_null_offsets[end_idx] += 1;
}
} else if self.ignore_nulls && !self.is_lag() {
// LEAD when NULLS are ignored.
// Stores the necessary non-null entry number further than the current row.
let non_null_row_count = (-self.shift_offset) as usize;

if self.non_null_offsets.is_empty() {
self.non_null_offsets
.extend(array.nulls().unwrap().valid_indices().collect::<Vec<_>>());
}
if range.start == 0 && array.is_valid(0) {
self.curr_valid_idx = -self.shift_offset - 1;
// When empty, fill non_null offsets with the data further than the current row.
let mut offset_val = 1;
for idx in range.start + 1..range.end {
if array.is_valid(idx) {
self.non_null_offsets.push_back(offset_val);
offset_val = 1;
} else {
offset_val += 1;
}
// It is enough to keep track of `non_null_row_count + 1` non-null offset.
// further data is unnecessary for the result.
if self.non_null_offsets.len() == non_null_row_count + 1 {
break;
}
}
} else if range.end < len as usize && array.is_valid(range.end) {
// Update `non_null_offsets` with the new end data.
if array.is_valid(range.end) {
// When non-null, append a new offset.
self.non_null_offsets.push_back(1);
} else {
// When null, increment offset count of the last entry
let last_idx = self.non_null_offsets.len() - 1;
self.non_null_offsets[last_idx] += 1;
}
}
let idx0 = self.non_null_offsets.get(self.curr_valid_idx as usize);
if idx0.is_some() && range.start == *idx0.unwrap() {
self.curr_valid_idx += 1;

// Find the nonNULL row index that shifted by offset comparing to current row index
idx = if self.non_null_offsets.len() >= non_null_row_count {
let total_offset: usize =
self.non_null_offsets.iter().take(non_null_row_count).sum();
(range.start + total_offset) as i64
} else {
-1
};
// Prune `self.non_null_offsets` from the start. so that at next iteration
// start of the `self.non_null_offsets` matches with current row.
if !self.non_null_offsets.is_empty() {
self.non_null_offsets[0] -= 1;
if self.non_null_offsets[0] == 0 {
// When offset is 0. Remove it.
self.non_null_offsets.pop_front();
}
}
idx = self
.non_null_offsets
.get((self.curr_valid_idx - self.shift_offset - 1) as usize)
.map(|v| *v as i64)
.unwrap_or(-1);
}

// Set the default value if
Expand Down
15 changes: 8 additions & 7 deletions datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4258,14 +4258,15 @@ LIMIT 5;
statement ok
set datafusion.execution.batch_size = 1;

query I
SELECT LAG(c1, 2) IGNORE NULLS OVER()
query II
SELECT LAG(c1, 2) IGNORE NULLS OVER(),
LEAD(c1, 2) IGNORE NULLS OVER()
FROM null_cases
ORDER BY c2
LIMIT 5;
----
78
63
3
24
14
78 50
63 38
3 53
24 31
14 94
Loading