From acf5d9db84d179a3e217d6f2836d480a8f2bfe46 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 5 Mar 2024 10:06:51 -0800 Subject: [PATCH 01/11] Support IGNORE NULLS for FIRST/LAST window function --- datafusion/common/src/scalar/mod.rs | 20 ++++++++++++++++ .../physical-expr/src/window/nth_value.rs | 24 +++++++++++++++---- .../src/windows/bounded_window_agg_exec.rs | 6 ++--- datafusion/physical-plan/src/windows/mod.rs | 6 ++--- .../tests/cases/roundtrip_physical_plan.rs | 1 + 5 files changed, 47 insertions(+), 10 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 5ace44f24b69..7b7d97889f7c 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -2225,6 +2225,26 @@ impl ScalarValue { } } + /// Converts a value in `array` at `index` into a ScalarValue + pub fn try_from_array_ignore_nulls_first(array: &dyn Array, mut index: usize, ignore_nulls: bool) -> Result { + if ignore_nulls { + while index < (array.len() - 1) && !array.is_valid(index.clone()) { + index += 1; + } + } + Self::try_from_array(array, index) + } + + pub fn try_from_array_ignore_nulls_last(array: &dyn Array, mut index: usize, ignore_nulls: bool) -> Result { + // If ignoring nulls, find the next non-null index. + if ignore_nulls { + while index > 0 && !array.is_valid(index.clone()) { + index -= 1; + } + } + Self::try_from_array(array, index) + } + /// Converts a value in `array` at `index` into a ScalarValue pub fn try_from_array(array: &dyn Array, index: usize) -> Result { // handle NULL value diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index a7bb31b6e109..222acfab570d 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -29,7 +29,7 @@ use crate::PhysicalExpr; use arrow::array::{Array, ArrayRef}; use arrow::datatypes::{DataType, Field}; -use datafusion_common::Result; +use datafusion_common::{not_impl_err, Result}; use datafusion_common::{exec_err, ScalarValue}; use datafusion_expr::window_state::WindowAggState; use datafusion_expr::PartitionEvaluator; @@ -42,6 +42,7 @@ pub struct NthValue { /// Output data type data_type: DataType, kind: NthValueKind, + ignore_nulls: bool, } impl NthValue { @@ -50,12 +51,14 @@ impl NthValue { name: impl Into, expr: Arc, data_type: DataType, + ignore_nulls: bool, ) -> Self { Self { name: name.into(), expr, data_type, kind: NthValueKind::First, + ignore_nulls, } } @@ -64,12 +67,14 @@ impl NthValue { name: impl Into, expr: Arc, data_type: DataType, + ignore_nulls: bool, ) -> Self { Self { name: name.into(), expr, data_type, kind: NthValueKind::Last, + ignore_nulls, } } @@ -79,7 +84,11 @@ impl NthValue { expr: Arc, data_type: DataType, n: u32, + ignore_nulls: bool, ) -> Result { + if ignore_nulls { + return exec_err!("NTH_VALUE ignore_nulls is not supported yet"); + } match n { 0 => exec_err!("NTH_VALUE expects n to be non-zero"), _ => Ok(Self { @@ -87,6 +96,7 @@ impl NthValue { expr, data_type, kind: NthValueKind::Nth(n as i64), + ignore_nulls, }), } } @@ -122,7 +132,7 @@ impl BuiltInWindowFunctionExpr for NthValue { finalized_result: None, kind: self.kind, }; - Ok(Box::new(NthValueEvaluator { state })) + Ok(Box::new(NthValueEvaluator { state, ignore_nulls: self.ignore_nulls })) } fn reverse_expr(&self) -> Option> { @@ -136,6 +146,7 @@ impl BuiltInWindowFunctionExpr for NthValue { expr: self.expr.clone(), data_type: self.data_type.clone(), kind: reversed_kind, + ignore_nulls: self.ignore_nulls.clone(), })) } } @@ -144,6 +155,7 @@ impl BuiltInWindowFunctionExpr for NthValue { #[derive(Debug)] pub(crate) struct NthValueEvaluator { state: NthValueState, + ignore_nulls: bool, } impl PartitionEvaluator for NthValueEvaluator { @@ -211,8 +223,8 @@ impl PartitionEvaluator for NthValueEvaluator { return ScalarValue::try_from(arr.data_type()); } match self.state.kind { - NthValueKind::First => ScalarValue::try_from_array(arr, range.start), - NthValueKind::Last => ScalarValue::try_from_array(arr, range.end - 1), + NthValueKind::First => ScalarValue::try_from_array_ignore_nulls_first(arr, range.start, self.ignore_nulls), + NthValueKind::Last => ScalarValue::try_from_array_ignore_nulls_last(arr, range.end - 1, self.ignore_nulls), NthValueKind::Nth(n) => { match n.cmp(&0) { Ordering::Greater => { @@ -295,6 +307,7 @@ mod tests { "first_value".to_owned(), Arc::new(Column::new("arr", 0)), DataType::Int32, + false, ); test_i32_result(first_value, Int32Array::from(vec![1; 8]))?; Ok(()) @@ -306,6 +319,7 @@ mod tests { "last_value".to_owned(), Arc::new(Column::new("arr", 0)), DataType::Int32, + false, ); test_i32_result( last_value, @@ -330,6 +344,7 @@ mod tests { Arc::new(Column::new("arr", 0)), DataType::Int32, 1, + false, )?; test_i32_result(nth_value, Int32Array::from(vec![1; 8]))?; Ok(()) @@ -342,6 +357,7 @@ mod tests { Arc::new(Column::new("arr", 0)), DataType::Int32, 2, + false, )?; test_i32_result( nth_value, diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 4cba571054de..aacac2f7d6b6 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -1179,15 +1179,15 @@ mod tests { .map(|e| Arc::new(e) as Arc)?; let col_a = col("a", &schema)?; let nth_value_func1 = - NthValue::nth("nth_value(-1)", col_a.clone(), DataType::Int32, 1)? + NthValue::nth("nth_value(-1)", col_a.clone(), DataType::Int32, 1, false)? .reverse_expr() .unwrap(); let nth_value_func2 = - NthValue::nth("nth_value(-2)", col_a.clone(), DataType::Int32, 2)? + NthValue::nth("nth_value(-2)", col_a.clone(), DataType::Int32, 2, false)? .reverse_expr() .unwrap(); let last_value_func = - Arc::new(NthValue::last("last", col_a.clone(), DataType::Int32)) as _; + Arc::new(NthValue::last("last", col_a.clone(), DataType::Int32, false)) as _; let window_exprs = vec![ // LAST_VALUE(a) Arc::new(BuiltInWindowExpr::new( diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index f91b525d6090..3b7c86303976 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -250,15 +250,15 @@ fn create_built_in_window_expr( .try_into() .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; let n: u32 = n as u32; - Arc::new(NthValue::nth(name, arg, data_type.clone(), n)?) + Arc::new(NthValue::nth(name, arg, data_type.clone(), n, ignore_nulls)?) } BuiltInWindowFunction::FirstValue => { let arg = args[0].clone(); - Arc::new(NthValue::first(name, arg, data_type.clone())) + Arc::new(NthValue::first(name, arg, data_type.clone(), ignore_nulls)) } BuiltInWindowFunction::LastValue => { let arg = args[0].clone(); - Arc::new(NthValue::last(name, arg, data_type.clone())) + Arc::new(NthValue::last(name, arg, data_type.clone(), ignore_nulls)) } }) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index a3c0b3eccd3c..004261eff595 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -271,6 +271,7 @@ fn roundtrip_window() -> Result<()> { "FIRST_VALUE(a) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", col("a", &schema)?, DataType::Int64, + false, )), &[col("b", &schema)?], &[PhysicalSortExpr { From 656aec4a8c9e3166920760abc829ed57b034b436 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 5 Mar 2024 12:53:50 -0800 Subject: [PATCH 02/11] fix error --- datafusion/common/src/scalar/mod.rs | 17 ++++++++++++----- .../physical-expr/src/window/nth_value.rs | 19 +++++++++++++++---- .../src/windows/bounded_window_agg_exec.rs | 8 ++++++-- datafusion/physical-plan/src/windows/mod.rs | 8 +++++++- 4 files changed, 40 insertions(+), 12 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 7b7d97889f7c..8704f3a46294 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -2225,20 +2225,27 @@ impl ScalarValue { } } - /// Converts a value in `array` at `index` into a ScalarValue - pub fn try_from_array_ignore_nulls_first(array: &dyn Array, mut index: usize, ignore_nulls: bool) -> Result { + pub fn try_from_array_ignore_nulls_first( + array: &dyn Array, + mut index: usize, + ignore_nulls: bool, + ) -> Result { if ignore_nulls { - while index < (array.len() - 1) && !array.is_valid(index.clone()) { + while index < (array.len() - 1) && !array.is_valid(index) { index += 1; } } Self::try_from_array(array, index) } - pub fn try_from_array_ignore_nulls_last(array: &dyn Array, mut index: usize, ignore_nulls: bool) -> Result { + pub fn try_from_array_ignore_nulls_last( + array: &dyn Array, + mut index: usize, + ignore_nulls: bool, + ) -> Result { // If ignoring nulls, find the next non-null index. if ignore_nulls { - while index > 0 && !array.is_valid(index.clone()) { + while index > 0 && !array.is_valid(index) { index -= 1; } } diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index 222acfab570d..746945da278e 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -29,7 +29,7 @@ use crate::PhysicalExpr; use arrow::array::{Array, ArrayRef}; use arrow::datatypes::{DataType, Field}; -use datafusion_common::{not_impl_err, Result}; +use datafusion_common::Result; use datafusion_common::{exec_err, ScalarValue}; use datafusion_expr::window_state::WindowAggState; use datafusion_expr::PartitionEvaluator; @@ -132,7 +132,10 @@ impl BuiltInWindowFunctionExpr for NthValue { finalized_result: None, kind: self.kind, }; - Ok(Box::new(NthValueEvaluator { state, ignore_nulls: self.ignore_nulls })) + Ok(Box::new(NthValueEvaluator { + state, + ignore_nulls: self.ignore_nulls, + })) } fn reverse_expr(&self) -> Option> { @@ -223,8 +226,16 @@ impl PartitionEvaluator for NthValueEvaluator { return ScalarValue::try_from(arr.data_type()); } match self.state.kind { - NthValueKind::First => ScalarValue::try_from_array_ignore_nulls_first(arr, range.start, self.ignore_nulls), - NthValueKind::Last => ScalarValue::try_from_array_ignore_nulls_last(arr, range.end - 1, self.ignore_nulls), + NthValueKind::First => ScalarValue::try_from_array_ignore_nulls_first( + arr, + range.start, + self.ignore_nulls, + ), + NthValueKind::Last => ScalarValue::try_from_array_ignore_nulls_last( + arr, + range.end - 1, + self.ignore_nulls, + ), NthValueKind::Nth(n) => { match n.cmp(&0) { Ordering::Greater => { diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index aacac2f7d6b6..0349f8f1eeec 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -1186,8 +1186,12 @@ mod tests { NthValue::nth("nth_value(-2)", col_a.clone(), DataType::Int32, 2, false)? .reverse_expr() .unwrap(); - let last_value_func = - Arc::new(NthValue::last("last", col_a.clone(), DataType::Int32, false)) as _; + let last_value_func = Arc::new(NthValue::last( + "last", + col_a.clone(), + DataType::Int32, + false, + )) as _; let window_exprs = vec![ // LAST_VALUE(a) Arc::new(BuiltInWindowExpr::new( diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 3b7c86303976..6712bc855ffd 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -250,7 +250,13 @@ fn create_built_in_window_expr( .try_into() .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; let n: u32 = n as u32; - Arc::new(NthValue::nth(name, arg, data_type.clone(), n, ignore_nulls)?) + Arc::new(NthValue::nth( + name, + arg, + data_type.clone(), + n, + ignore_nulls, + )?) } BuiltInWindowFunction::FirstValue => { let arg = args[0].clone(); From 720a0a7faa0af11f8536f15cffde3f87bd9bceb1 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 5 Mar 2024 13:34:32 -0800 Subject: [PATCH 03/11] fix style error --- datafusion/common/src/scalar/mod.rs | 4 ++-- datafusion/physical-expr/src/window/nth_value.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 8704f3a46294..c60b83dec5c3 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -2225,11 +2225,11 @@ impl ScalarValue { } } - pub fn try_from_array_ignore_nulls_first( + pub fn try_from_array_ignore_nulls_first( array: &dyn Array, mut index: usize, ignore_nulls: bool, - ) -> Result { + ) -> Result { if ignore_nulls { while index < (array.len() - 1) && !array.is_valid(index) { index += 1; diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index 746945da278e..024f2a9adb69 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -226,11 +226,11 @@ impl PartitionEvaluator for NthValueEvaluator { return ScalarValue::try_from(arr.data_type()); } match self.state.kind { - NthValueKind::First => ScalarValue::try_from_array_ignore_nulls_first( + NthValueKind::First => ScalarValue::try_from_array_ignore_nulls_first( arr, range.start, self.ignore_nulls, - ), + ), NthValueKind::Last => ScalarValue::try_from_array_ignore_nulls_last( arr, range.end - 1, From 60ff3faa9288a99b74a0c9280b84f9d12f3a59c1 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 5 Mar 2024 13:55:11 -0800 Subject: [PATCH 04/11] fix clippy error --- datafusion/physical-expr/src/window/nth_value.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index 024f2a9adb69..58a1ebc74813 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -149,7 +149,7 @@ impl BuiltInWindowFunctionExpr for NthValue { expr: self.expr.clone(), data_type: self.data_type.clone(), kind: reversed_kind, - ignore_nulls: self.ignore_nulls.clone(), + ignore_nulls: self.ignore_nulls, })) } } From ced95830063507d0ea215f52d7e6673a8899d05c Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 5 Mar 2024 15:33:52 -0800 Subject: [PATCH 05/11] add tests for all NULL values --- datafusion/common/src/scalar/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index c60b83dec5c3..7a396f1402ff 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -2230,6 +2230,7 @@ impl ScalarValue { mut index: usize, ignore_nulls: bool, ) -> Result { + // If ignoring nulls, find the next non-null index. if ignore_nulls { while index < (array.len() - 1) && !array.is_valid(index) { index += 1; From 30ef3d216f0e97bf66a0282a34802f91c5e5237d Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 6 Mar 2024 17:50:40 -0800 Subject: [PATCH 06/11] address comments --- datafusion/common/src/scalar/mod.rs | 28 --------------- .../physical-expr/src/window/nth_value.rs | 34 +++++++++++++------ 2 files changed, 24 insertions(+), 38 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 7a396f1402ff..5ace44f24b69 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -2225,34 +2225,6 @@ impl ScalarValue { } } - pub fn try_from_array_ignore_nulls_first( - array: &dyn Array, - mut index: usize, - ignore_nulls: bool, - ) -> Result { - // If ignoring nulls, find the next non-null index. - if ignore_nulls { - while index < (array.len() - 1) && !array.is_valid(index) { - index += 1; - } - } - Self::try_from_array(array, index) - } - - pub fn try_from_array_ignore_nulls_last( - array: &dyn Array, - mut index: usize, - ignore_nulls: bool, - ) -> Result { - // If ignoring nulls, find the next non-null index. - if ignore_nulls { - while index > 0 && !array.is_valid(index) { - index -= 1; - } - } - Self::try_from_array(array, index) - } - /// Converts a value in `array` at `index` into a ScalarValue pub fn try_from_array(array: &dyn Array, index: usize) -> Result { // handle NULL value diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index 58a1ebc74813..5e6630e732ac 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -225,17 +225,31 @@ impl PartitionEvaluator for NthValueEvaluator { // We produce None if the window is empty. return ScalarValue::try_from(arr.data_type()); } + + let mut valid_indices = Vec::new(); + if self.ignore_nulls { + valid_indices = arr.nulls().unwrap().valid_indices().collect::>(); + if valid_indices.is_empty() { + return ScalarValue::try_from(arr.data_type()); + } + } match self.state.kind { - NthValueKind::First => ScalarValue::try_from_array_ignore_nulls_first( - arr, - range.start, - self.ignore_nulls, - ), - NthValueKind::Last => ScalarValue::try_from_array_ignore_nulls_last( - arr, - range.end - 1, - self.ignore_nulls, - ), + NthValueKind::First => { + let index: usize = if self.ignore_nulls { + valid_indices[0] + } else { + range.start + }; + ScalarValue::try_from_array(arr, index) + }, + NthValueKind::Last => { + let index = if self.ignore_nulls { + valid_indices[valid_indices.len() - 1] + } else { + range.end - 1 + }; + ScalarValue::try_from_array(arr, index) + }, NthValueKind::Nth(n) => { match n.cmp(&0) { Ordering::Greater => { From cc9f3b4b6848c4daec7f90bc1bb683b12433d855 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 6 Mar 2024 17:53:59 -0800 Subject: [PATCH 07/11] fix format --- datafusion/physical-expr/src/window/nth_value.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index 5e6630e732ac..647409b08982 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -226,7 +226,7 @@ impl PartitionEvaluator for NthValueEvaluator { return ScalarValue::try_from(arr.data_type()); } - let mut valid_indices = Vec::new(); + let mut valid_indices = Vec::new(); if self.ignore_nulls { valid_indices = arr.nulls().unwrap().valid_indices().collect::>(); if valid_indices.is_empty() { @@ -241,7 +241,7 @@ impl PartitionEvaluator for NthValueEvaluator { range.start }; ScalarValue::try_from_array(arr, index) - }, + } NthValueKind::Last => { let index = if self.ignore_nulls { valid_indices[valid_indices.len() - 1] @@ -249,7 +249,7 @@ impl PartitionEvaluator for NthValueEvaluator { range.end - 1 }; ScalarValue::try_from_array(arr, index) - }, + } NthValueKind::Nth(n) => { match n.cmp(&0) { Ordering::Greater => { From 7a15263d0b5d06840aba78a6f79e379db8250cc0 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sat, 9 Mar 2024 17:23:20 -0800 Subject: [PATCH 08/11] address comments --- .../physical-expr/src/window/nth_value.rs | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index 647409b08982..5e64c96e5d1f 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -226,29 +226,33 @@ impl PartitionEvaluator for NthValueEvaluator { return ScalarValue::try_from(arr.data_type()); } - let mut valid_indices = Vec::new(); - if self.ignore_nulls { - valid_indices = arr.nulls().unwrap().valid_indices().collect::>(); + // Extract valid indices if ignoring nulls. + let (slice, valid_indices) = if self.ignore_nulls { + let slice = arr.slice(range.start, n_range); + let valid_indices = slice.nulls().unwrap().valid_indices().collect::>(); if valid_indices.is_empty() { return ScalarValue::try_from(arr.data_type()); } - } + (Some(slice), Some(valid_indices)) + } else { + (None, None) + }; match self.state.kind { NthValueKind::First => { - let index: usize = if self.ignore_nulls { - valid_indices[0] + if let Some(slice) = &slice { + let valid_indices = valid_indices.unwrap(); + ScalarValue::try_from_array(slice, valid_indices[0]) } else { - range.start - }; - ScalarValue::try_from_array(arr, index) + ScalarValue::try_from_array(arr, range.start) + } } NthValueKind::Last => { - let index = if self.ignore_nulls { - valid_indices[valid_indices.len() - 1] + if let Some(slice) = &slice { + let valid_indices = valid_indices.unwrap(); + ScalarValue::try_from_array(slice, valid_indices[valid_indices.len() - 1]) } else { - range.end - 1 - }; - ScalarValue::try_from_array(arr, index) + ScalarValue::try_from_array(arr, range.end - 1) + } } NthValueKind::Nth(n) => { match n.cmp(&0) { From 49abb8ab65eb43b280fe04fcdb8eeceedf5f7b5c Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sat, 9 Mar 2024 18:30:11 -0800 Subject: [PATCH 09/11] fix format --- datafusion/physical-expr/src/window/nth_value.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index 5e64c96e5d1f..eed733c19b51 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -229,7 +229,8 @@ impl PartitionEvaluator for NthValueEvaluator { // Extract valid indices if ignoring nulls. let (slice, valid_indices) = if self.ignore_nulls { let slice = arr.slice(range.start, n_range); - let valid_indices = slice.nulls().unwrap().valid_indices().collect::>(); + let valid_indices = + slice.nulls().unwrap().valid_indices().collect::>(); if valid_indices.is_empty() { return ScalarValue::try_from(arr.data_type()); } @@ -249,7 +250,10 @@ impl PartitionEvaluator for NthValueEvaluator { NthValueKind::Last => { if let Some(slice) = &slice { let valid_indices = valid_indices.unwrap(); - ScalarValue::try_from_array(slice, valid_indices[valid_indices.len() - 1]) + ScalarValue::try_from_array( + slice, + valid_indices[valid_indices.len() - 1], + ) } else { ScalarValue::try_from_array(arr, range.end - 1) } From 73a9924f0d31ce905ddfc83b2162489883ae05e3 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 11 Mar 2024 14:36:47 +0300 Subject: [PATCH 10/11] Fix commented test case --- datafusion/physical-expr/src/window/nth_value.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index eed733c19b51..5c7c891f92d2 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -199,7 +199,8 @@ impl PartitionEvaluator for NthValueEvaluator { } } }; - if is_prunable { + // Do not memoize results when nulls are ignored. + if is_prunable && !self.ignore_nulls { if self.state.finalized_result.is_none() && !is_reverse_direction { let result = ScalarValue::try_from_array(out, size - 1)?; self.state.finalized_result = Some(result); From 7c7aa8b7deb2ec041dc106fd282f9dcc65525919 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 11 Mar 2024 17:40:12 -0700 Subject: [PATCH 11/11] resolve conflicts --- datafusion/sqllogictest/test_files/window.slt | 272 ++++++++++++++++++ 1 file changed, 272 insertions(+) diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index cce67d898d37..39c105a4dcce 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -4307,3 +4307,275 @@ select lag(a) over (order by a ASC NULLS FIRST) as x1 NULL NULL NULL + +# Test for ignore nulls in FIRST_VALUE +statement ok +CREATE TABLE t AS VALUES (null::bigint), (3), (4); + +query I +SELECT FIRST_VALUE(column1) OVER() FROM t; +---- +NULL +NULL +NULL + +query I +SELECT FIRST_VALUE(column1) RESPECT NULLS OVER() FROM t; +---- +NULL +NULL +NULL + +query I +SELECT FIRST_VALUE(column1) IGNORE NULLS OVER() FROM t; +---- +3 +3 +3 + +statement ok +DROP TABLE t; + +# Test for ignore nulls with ORDER BY in FIRST_VALUE +statement ok +CREATE TABLE t AS VALUES (3, 4), (4, 3), (null::bigint, 1), (null::bigint, 2), (5, 5), (6, 6); + +query II +SELECT column1, column2 FROM t ORDER BY column2; +---- +NULL 1 +NULL 2 +4 3 +3 4 +5 5 +6 6 + +query II +SELECT FIRST_VALUE(column1) OVER(ORDER BY column2), column2 FROM t; +---- +NULL 1 +NULL 2 +NULL 3 +NULL 4 +NULL 5 +NULL 6 + +query II +SELECT FIRST_VALUE(column1) RESPECT NULLS OVER(ORDER BY column2), column2 FROM t; +---- +NULL 1 +NULL 2 +NULL 3 +NULL 4 +NULL 5 +NULL 6 + +query II +SELECT FIRST_VALUE(column1) IGNORE NULLS OVER(ORDER BY column2), column2 FROM t; +---- +NULL 1 +NULL 2 +4 3 +4 4 +4 5 +4 6 + +query II +SELECT FIRST_VALUE(column1)OVER(ORDER BY column2 RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING), column2 FROM t; +---- +NULL 1 +NULL 2 +NULL 3 +4 4 +3 5 +5 6 + +query II +SELECT FIRST_VALUE(column1) IGNORE NULLS OVER(ORDER BY column2 RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING), column2 FROM t; +---- +NULL 1 +4 2 +4 3 +4 4 +3 5 +5 6 + +statement ok +DROP TABLE t; + +# Test for ignore nulls with ORDER BY in FIRST_VALUE with all NULL values +statement ok +CREATE TABLE t AS VALUES (null::bigint, 4), (null::bigint, 3), (null::bigint, 1), (null::bigint, 2); + +query II +SELECT FIRST_VALUE(column1) OVER(ORDER BY column2), column2 FROM t; +---- +NULL 1 +NULL 2 +NULL 3 +NULL 4 + +query II +SELECT FIRST_VALUE(column1) RESPECT NULLS OVER(ORDER BY column2), column2 FROM t; +---- +NULL 1 +NULL 2 +NULL 3 +NULL 4 + +query II +SELECT FIRST_VALUE(column1) IGNORE NULLS OVER(ORDER BY column2), column2 FROM t; +---- +NULL 1 +NULL 2 +NULL 3 +NULL 4 + +statement ok +DROP TABLE t; + +# Test for ignore nulls in LAST_VALUE +statement ok +CREATE TABLE t AS VALUES (1), (3), (null::bigint); + +query I +SELECT LAST_VALUE(column1) OVER() FROM t; +---- +NULL +NULL +NULL + +query I +SELECT LAST_VALUE(column1) RESPECT NULLS OVER() FROM t; +---- +NULL +NULL +NULL + +query I +SELECT LAST_VALUE(column1) IGNORE NULLS OVER() FROM t; +---- +3 +3 +3 + +statement ok +DROP TABLE t; + +# Test for ignore nulls with ORDER BY in LAST_VALUE +statement ok +CREATE TABLE t AS VALUES (3, 4), (4, 3), (null::bigint, 1), (null::bigint, 2), (5, 5), (6, 6); + +query II +SELECT column1, column2 FROM t ORDER BY column2 DESC NULLS LAST; +---- +6 6 +5 5 +3 4 +4 3 +NULL 2 +NULL 1 + +query II +SELECT LAST_VALUE(column1) OVER(ORDER BY column2 DESC NULLS LAST), column2 FROM t; +---- +6 6 +5 5 +3 4 +4 3 +NULL 2 +NULL 1 + +query II +SELECT LAST_VALUE(column1) IGNORE NULLS OVER(ORDER BY column2 DESC NULLS LAST), column2 FROM t; +---- +6 6 +5 5 +3 4 +4 3 +4 2 +4 1 + +query II +SELECT LAST_VALUE(column1) OVER(ORDER BY column2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), column2 FROM t; +---- +NULL 6 +NULL 5 +NULL 4 +NULL 3 +NULL 2 +NULL 1 + +query II +SELECT LAST_VALUE(column1) RESPECT NULLS OVER(ORDER BY column2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), column2 FROM t; +---- +NULL 6 +NULL 5 +NULL 4 +NULL 3 +NULL 2 +NULL 1 + +query II +SELECT LAST_VALUE(column1) IGNORE NULLS OVER(ORDER BY column2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), column2 FROM t; +---- +4 6 +4 5 +4 4 +4 3 +4 2 +4 1 + +query II +SELECT LAST_VALUE(column1) OVER(ORDER BY column2 DESC NULLS LAST RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING), column2 FROM t; +---- +5 6 +3 5 +4 4 +NULL 3 +NULL 2 +NULL 1 + +query II +SELECT LAST_VALUE(column1) IGNORE NULLS OVER(ORDER BY column2 DESC NULLS LAST RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING), column2 FROM t; +---- +5 6 +3 5 +4 4 +4 3 +4 2 +NULL 1 + +statement ok +DROP TABLE t; + +# Test for ignore nulls with ORDER BY in LAST_VALUE with all NULLs +statement ok +CREATE TABLE t AS VALUES (null::bigint, 4), (null::bigint, 3), (null::bigint, 1), (null::bigint, 2); + +query II +SELECT LAST_VALUE(column1) OVER(ORDER BY column2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), column2 FROM t; +---- +NULL 4 +NULL 3 +NULL 2 +NULL 1 + +query II +SELECT LAST_VALUE(column1) RESPECT NULLS OVER(ORDER BY column2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), column2 FROM t; +---- +NULL 4 +NULL 3 +NULL 2 +NULL 1 + +query II +SELECT LAST_VALUE(column1) IGNORE NULLS OVER(ORDER BY column2 DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), column2 FROM t; +---- +NULL 4 +NULL 3 +NULL 2 +NULL 1 + +statement ok +DROP TABLE t;