diff --git a/datafusion/physical-expr/src/window/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs index 2f93c4afaf0f..1d6cfc6b0418 100644 --- a/datafusion/physical-expr/src/window/lead_lag.rs +++ b/datafusion/physical-expr/src/window/lead_lag.rs @@ -121,7 +121,6 @@ impl BuiltInWindowFunctionExpr for WindowShift { default_value: self.default_value.clone(), ignore_nulls: self.ignore_nulls, non_null_offsets: VecDeque::new(), - curr_valid_idx: 0, })) } @@ -144,8 +143,6 @@ pub(crate) struct WindowShiftEvaluator { ignore_nulls: bool, // VecDeque contains offset values that between non-null entries non_null_offsets: VecDeque, - // Current idx pointing to non null value - curr_valid_idx: i64, } impl WindowShiftEvaluator { @@ -209,6 +206,7 @@ impl PartitionEvaluator for WindowShiftEvaluator { fn get_range(&self, idx: usize, n_rows: usize) -> Result> { if self.is_lag() { let start = if self.non_null_offsets.len() == self.shift_offset as usize { + // How many rows needed previous than the current row to get necessary lag result let offset: usize = self.non_null_offsets.iter().sum(); idx.saturating_sub(offset + 1) } else { @@ -217,8 +215,13 @@ impl PartitionEvaluator for WindowShiftEvaluator { let end = idx + 1; Ok(Range { start, end }) } else { - let offset = (-self.shift_offset) as usize; - let end = min(idx + offset, n_rows); + let end = if self.non_null_offsets.len() == (-self.shift_offset) as usize { + // How many rows needed further than the current row to get necessary lead result + let offset: usize = self.non_null_offsets.iter().sum(); + min(idx + offset + 1, n_rows) + } else { + n_rows + }; Ok(Range { start: idx, end }) } } @@ -247,10 +250,10 @@ impl PartitionEvaluator for WindowShiftEvaluator { range.start as i64 - self.shift_offset }; - // Support LAG only for now, as LEAD requires some brainstorm first // LAG with IGNORE NULLS calculated as the current row index - offset, but only for non-NULL rows // If current row index points to NULL value the row is NOT counted if self.ignore_nulls && self.is_lag() { + // LAG when NULLS are ignored. // Find the nonNULL row index that shifted by offset comparing to current row index idx = if self.non_null_offsets.len() == self.shift_offset as usize { let total_offset: usize = self.non_null_offsets.iter().sum(); @@ -273,22 +276,55 @@ impl PartitionEvaluator for WindowShiftEvaluator { self.non_null_offsets[end_idx] += 1; } } else if self.ignore_nulls && !self.is_lag() { + // LEAD when NULLS are ignored. + // Stores the necessary non-null entry number further than the current row. + let non_null_row_count = (-self.shift_offset) as usize; + if self.non_null_offsets.is_empty() { - self.non_null_offsets - .extend(array.nulls().unwrap().valid_indices().collect::>()); - } - if range.start == 0 && array.is_valid(0) { - self.curr_valid_idx = -self.shift_offset - 1; + // When empty, fill non_null offsets with the data further than the current row. + let mut offset_val = 1; + for idx in range.start + 1..range.end { + if array.is_valid(idx) { + self.non_null_offsets.push_back(offset_val); + offset_val = 1; + } else { + offset_val += 1; + } + // It is enough to keep track of `non_null_row_count + 1` non-null offset. + // further data is unnecessary for the result. + if self.non_null_offsets.len() == non_null_row_count + 1 { + break; + } + } + } else if range.end < len as usize && array.is_valid(range.end) { + // Update `non_null_offsets` with the new end data. + if array.is_valid(range.end) { + // When non-null, append a new offset. + self.non_null_offsets.push_back(1); + } else { + // When null, increment offset count of the last entry + let last_idx = self.non_null_offsets.len() - 1; + self.non_null_offsets[last_idx] += 1; + } } - let idx0 = self.non_null_offsets.get(self.curr_valid_idx as usize); - if idx0.is_some() && range.start == *idx0.unwrap() { - self.curr_valid_idx += 1; + + // Find the nonNULL row index that shifted by offset comparing to current row index + idx = if self.non_null_offsets.len() >= non_null_row_count { + let total_offset: usize = + self.non_null_offsets.iter().take(non_null_row_count).sum(); + (range.start + total_offset) as i64 + } else { + -1 + }; + // Prune `self.non_null_offsets` from the start. so that at next iteration + // start of the `self.non_null_offsets` matches with current row. + if !self.non_null_offsets.is_empty() { + self.non_null_offsets[0] -= 1; + if self.non_null_offsets[0] == 0 { + // When offset is 0. Remove it. + self.non_null_offsets.pop_front(); + } } - idx = self - .non_null_offsets - .get((self.curr_valid_idx - self.shift_offset - 1) as usize) - .map(|v| *v as i64) - .unwrap_or(-1); } // Set the default value if diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 788d4a744f41..6994de3f92d5 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -4258,14 +4258,15 @@ LIMIT 5; statement ok set datafusion.execution.batch_size = 1; -query I -SELECT LAG(c1, 2) IGNORE NULLS OVER() +query II +SELECT LAG(c1, 2) IGNORE NULLS OVER(), + LEAD(c1, 2) IGNORE NULLS OVER() FROM null_cases ORDER BY c2 LIMIT 5; ---- -78 -63 -3 -24 -14 +78 50 +63 38 +3 53 +24 31 +14 94