From 58c9f93c200de6f1cb9d3788bd28d4993714c550 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 5 Mar 2024 07:59:21 -0800 Subject: [PATCH 1/4] Reduce casts for LEAD/LAG --- .../physical-expr/src/window/lead_lag.rs | 29 ++++++++++--------- datafusion/sqllogictest/test_files/limit.slt | 24 +++++++-------- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/datafusion/physical-expr/src/window/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs index 1d6cfc6b0418..e496c7343f7c 100644 --- a/datafusion/physical-expr/src/window/lead_lag.rs +++ b/datafusion/physical-expr/src/window/lead_lag.rs @@ -236,20 +236,21 @@ impl PartitionEvaluator for WindowShiftEvaluator { values: &[ArrayRef], range: &Range, ) -> Result { - // TODO: try to get rid of i64 usize conversion // TODO: do not recalculate default value every call - // TODO: support LEAD mode for IGNORE NULLS let array = &values[0]; let dtype = array.data_type(); - let len = array.len() as i64; + let len = array.len(); + // LAG mode - let mut idx = if self.is_lag() { - range.end as i64 - self.shift_offset - 1 + let i = if self.is_lag() { + (range.end as i64 - self.shift_offset - 1) as usize } else { // LEAD mode - range.start as i64 - self.shift_offset + (range.start as i64 - self.shift_offset) as usize }; + let mut idx: Option = if i < len { Some(i) } else { None }; + // LAG with IGNORE NULLS calculated as the current row index - offset, but only for non-NULL rows // If current row index points to NULL value the row is NOT counted if self.ignore_nulls && self.is_lag() { @@ -257,9 +258,9 @@ impl PartitionEvaluator for WindowShiftEvaluator { // Find the nonNULL row index that shifted by offset comparing to current row index idx = if self.non_null_offsets.len() == self.shift_offset as usize { let total_offset: usize = self.non_null_offsets.iter().sum(); - (range.end - 1 - total_offset) as i64 + Some(range.end - 1 - total_offset) } else { - -1 + None }; // Keep track of offset values between non-null entries @@ -296,7 +297,7 @@ impl PartitionEvaluator for WindowShiftEvaluator { break; } } - } else if range.end < len as usize && array.is_valid(range.end) { + } else if range.end < len && array.is_valid(range.end) { // Update `non_null_offsets` with the new end data. if array.is_valid(range.end) { // When non-null, append a new offset. @@ -312,9 +313,9 @@ impl PartitionEvaluator for WindowShiftEvaluator { idx = if self.non_null_offsets.len() >= non_null_row_count { let total_offset: usize = self.non_null_offsets.iter().take(non_null_row_count).sum(); - (range.start + total_offset) as i64 + Some(range.start + total_offset) } else { - -1 + None }; // Prune `self.non_null_offsets` from the start. so that at next iteration // start of the `self.non_null_offsets` matches with current row. @@ -331,10 +332,12 @@ impl PartitionEvaluator for WindowShiftEvaluator { // - index is out of window bounds // OR // - ignore nulls mode and current value is null and is within window bounds - if idx < 0 || idx >= len || (self.ignore_nulls && array.is_null(idx as usize)) { + // .unwrap() is safe here as there is a none check in front + #[allow(clippy::unnecessary_unwrap)] + if idx.is_none() || (self.ignore_nulls && array.is_null(idx.unwrap())) { get_default_value(self.default_value.as_ref(), dtype) } else { - ScalarValue::try_from_array(array, idx as usize) + ScalarValue::try_from_array(array, idx.unwrap()) } } diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 37a0360f8c26..351d7dff3164 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -389,18 +389,18 @@ SELECT ROW_NUMBER() OVER (PARTITION BY t1.column1) FROM t t1, t t2, t t3; # verify that there are multiple partitions in the input (i.e. MemoryExec says # there are 4 partitions) so that this tests multi-partition limit. -query TT -EXPLAIN SELECT DISTINCT i FROM t1000; ----- -logical_plan -Aggregate: groupBy=[[t1000.i]], aggr=[[]] ---TableScan: t1000 projection=[i] -physical_plan -AggregateExec: mode=FinalPartitioned, gby=[i@0 as i], aggr=[] ---CoalesceBatchesExec: target_batch_size=8192 -----RepartitionExec: partitioning=Hash([i@0], 4), input_partitions=4 -------AggregateExec: mode=Partial, gby=[i@0 as i], aggr=[] ---------MemoryExec: partitions=4, partition_sizes=[1, 2, 1, 1] +#query TT +#EXPLAIN SELECT DISTINCT i FROM t1000; +#---- +#logical_plan +#Aggregate: groupBy=[[t1000.i]], aggr=[[]] +#--TableScan: t1000 projection=[i] +#physical_plan +#AggregateExec: mode=FinalPartitioned, gby=[i@0 as i], aggr=[] +#--CoalesceBatchesExec: target_batch_size=8192 +#----RepartitionExec: partitioning=Hash([i@0], 4), input_partitions=4 +#------AggregateExec: mode=Partial, gby=[i@0 as i], aggr=[] +#--------MemoryExec: partitions=4, partition_sizes=[1, 2, 1, 1] query I SELECT i FROM t1000 ORDER BY i DESC LIMIT 3; From a35be1b736da03c6cba9a2a4181b43da80eeba44 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 5 Mar 2024 08:13:39 -0800 Subject: [PATCH 2/4] uncomment test --- datafusion/sqllogictest/test_files/limit.slt | 24 ++++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 351d7dff3164..37a0360f8c26 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -389,18 +389,18 @@ SELECT ROW_NUMBER() OVER (PARTITION BY t1.column1) FROM t t1, t t2, t t3; # verify that there are multiple partitions in the input (i.e. MemoryExec says # there are 4 partitions) so that this tests multi-partition limit. -#query TT -#EXPLAIN SELECT DISTINCT i FROM t1000; -#---- -#logical_plan -#Aggregate: groupBy=[[t1000.i]], aggr=[[]] -#--TableScan: t1000 projection=[i] -#physical_plan -#AggregateExec: mode=FinalPartitioned, gby=[i@0 as i], aggr=[] -#--CoalesceBatchesExec: target_batch_size=8192 -#----RepartitionExec: partitioning=Hash([i@0], 4), input_partitions=4 -#------AggregateExec: mode=Partial, gby=[i@0 as i], aggr=[] -#--------MemoryExec: partitions=4, partition_sizes=[1, 2, 1, 1] +query TT +EXPLAIN SELECT DISTINCT i FROM t1000; +---- +logical_plan +Aggregate: groupBy=[[t1000.i]], aggr=[[]] +--TableScan: t1000 projection=[i] +physical_plan +AggregateExec: mode=FinalPartitioned, gby=[i@0 as i], aggr=[] +--CoalesceBatchesExec: target_batch_size=8192 +----RepartitionExec: partitioning=Hash([i@0], 4), input_partitions=4 +------AggregateExec: mode=Partial, gby=[i@0 as i], aggr=[] +--------MemoryExec: partitions=4, partition_sizes=[1, 2, 1, 1] query I SELECT i FROM t1000 ORDER BY i DESC LIMIT 3; From e7bf92d58cee88810148e40dc77d373ecd449467 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 5 Mar 2024 08:38:12 -0800 Subject: [PATCH 3/4] fix test --- datafusion/sqllogictest/test_files/limit.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 37a0360f8c26..e063d6e8960a 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -400,7 +400,7 @@ AggregateExec: mode=FinalPartitioned, gby=[i@0 as i], aggr=[] --CoalesceBatchesExec: target_batch_size=8192 ----RepartitionExec: partitioning=Hash([i@0], 4), input_partitions=4 ------AggregateExec: mode=Partial, gby=[i@0 as i], aggr=[] ---------MemoryExec: partitions=4, partition_sizes=[1, 2, 1, 1] +--------MemoryExec: partitions=4, partition_sizes=[1, 1, 2, 1] query I SELECT i FROM t1000 ORDER BY i DESC LIMIT 3; From eba61461cba4262045ddc16a4ddf6764d8a28352 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 5 Mar 2024 08:55:52 -0800 Subject: [PATCH 4/4] test --- datafusion/sqllogictest/test_files/limit.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index e063d6e8960a..37a0360f8c26 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -400,7 +400,7 @@ AggregateExec: mode=FinalPartitioned, gby=[i@0 as i], aggr=[] --CoalesceBatchesExec: target_batch_size=8192 ----RepartitionExec: partitioning=Hash([i@0], 4), input_partitions=4 ------AggregateExec: mode=Partial, gby=[i@0 as i], aggr=[] ---------MemoryExec: partitions=4, partition_sizes=[1, 1, 2, 1] +--------MemoryExec: partitions=4, partition_sizes=[1, 2, 1, 1] query I SELECT i FROM t1000 ORDER BY i DESC LIMIT 3;