Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce casts for LEAD/LAG #9468

Merged
merged 4 commits into from
Mar 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions datafusion/physical-expr/src/window/lead_lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,30 +236,31 @@ impl PartitionEvaluator for WindowShiftEvaluator {
values: &[ArrayRef],
range: &Range<usize>,
) -> Result<ScalarValue> {
// TODO: try to get rid of i64 usize conversion
// TODO: do not recalculate default value every call
// TODO: support LEAD mode for IGNORE NULLS
let array = &values[0];
let dtype = array.data_type();
let len = array.len() as i64;
let len = array.len();

// LAG mode
let mut idx = if self.is_lag() {
range.end as i64 - self.shift_offset - 1
let i = if self.is_lag() {
(range.end as i64 - self.shift_offset - 1) as usize
} else {
// LEAD mode
range.start as i64 - self.shift_offset
(range.start as i64 - self.shift_offset) as usize
};

let mut idx: Option<usize> = if i < len { Some(i) } else { None };

// LAG with IGNORE NULLS calculated as the current row index - offset, but only for non-NULL rows
// If current row index points to NULL value the row is NOT counted
if self.ignore_nulls && self.is_lag() {
// LAG when NULLS are ignored.
// Find the nonNULL row index that shifted by offset comparing to current row index
idx = if self.non_null_offsets.len() == self.shift_offset as usize {
let total_offset: usize = self.non_null_offsets.iter().sum();
(range.end - 1 - total_offset) as i64
Some(range.end - 1 - total_offset)
} else {
-1
None
};

// Keep track of offset values between non-null entries
Expand Down Expand Up @@ -296,7 +297,7 @@ impl PartitionEvaluator for WindowShiftEvaluator {
break;
}
}
} else if range.end < len as usize && array.is_valid(range.end) {
} else if range.end < len && array.is_valid(range.end) {
// Update `non_null_offsets` with the new end data.
if array.is_valid(range.end) {
// When non-null, append a new offset.
Expand All @@ -312,9 +313,9 @@ impl PartitionEvaluator for WindowShiftEvaluator {
idx = if self.non_null_offsets.len() >= non_null_row_count {
let total_offset: usize =
self.non_null_offsets.iter().take(non_null_row_count).sum();
(range.start + total_offset) as i64
Some(range.start + total_offset)
} else {
-1
None
};
// Prune `self.non_null_offsets` from the start. so that at next iteration
// start of the `self.non_null_offsets` matches with current row.
Expand All @@ -331,10 +332,12 @@ impl PartitionEvaluator for WindowShiftEvaluator {
// - index is out of window bounds
// OR
// - ignore nulls mode and current value is null and is within window bounds
if idx < 0 || idx >= len || (self.ignore_nulls && array.is_null(idx as usize)) {
// .unwrap() is safe here as there is a none check in front
#[allow(clippy::unnecessary_unwrap)]
if idx.is_none() || (self.ignore_nulls && array.is_null(idx.unwrap())) {
get_default_value(self.default_value.as_ref(), dtype)
} else {
ScalarValue::try_from_array(array, idx as usize)
ScalarValue::try_from_array(array, idx.unwrap())
}
}

Expand Down
Loading