diff --git a/datafusion/src/physical_plan/expressions/cume_dist.rs b/datafusion/src/physical_plan/expressions/cume_dist.rs new file mode 100644 index 0000000000000..e153615f0bf98 --- /dev/null +++ b/datafusion/src/physical_plan/expressions/cume_dist.rs @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines physical expression for `cume_dits` that can evaluated +//! at runtime during query execution + +use crate::error::Result; +use crate::physical_plan::window_functions::PartitionEvaluator; +use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr}; +use arrow::array::ArrayRef; +use arrow::array::Float64Array; +use arrow::datatypes::{DataType, Field}; +use arrow::record_batch::RecordBatch; +use std::any::Any; +use std::iter; +use std::ops::Range; +use std::sync::Arc; + +/// CumeDist calculates the cume_dist in the window function with order by +#[derive(Debug)] +pub struct CumeDist { + name: String, +} + +/// Create a cume_dist window function +pub fn cume_dist(name: String) -> CumeDist { + CumeDist { name } +} + +impl BuiltInWindowFunctionExpr for CumeDist { + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn field(&self) -> Result { + let nullable = false; + let data_type = DataType::Float64; + Ok(Field::new(self.name(), data_type, nullable)) + } + + fn expressions(&self) -> Vec> { + vec![] + } + + fn name(&self) -> &str { + &self.name + } + + fn create_evaluator( + &self, + _batch: &RecordBatch, + ) -> Result> { + Ok(Box::new(CumeDistEvaluator {})) + } +} + +pub(crate) struct CumeDistEvaluator; + +impl PartitionEvaluator for CumeDistEvaluator { + fn include_rank(&self) -> bool { + true + } + + fn evaluate_partition(&self, _partition: Range) -> Result { + unreachable!( + "cume_dist evaluation must be called with evaluate_partition_with_rank" + ) + } + + fn evaluate_partition_with_rank( + &self, + partition: Range, + ranks_in_partition: &[Range], + ) -> Result { + let scaler = (partition.end - partition.start) as f64; + let result = Float64Array::from_iter_values( + ranks_in_partition + .iter() + .scan(0_u64, |acc, range| { + let len = range.end - range.start; + *acc += len as u64; + let value: f64 = (*acc as f64) / scaler; + let result = iter::repeat(value).take(len); + Some(result) + }) + .flatten(), + ); + Ok(Arc::new(result)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::{array::*, datatypes::*}; + + fn test_i32_result( + expr: &CumeDist, + data: Vec, + partition: Range, + ranks: Vec>, + expected: Vec, + ) -> Result<()> { + let arr: ArrayRef = Arc::new(Int32Array::from(data)); + let values = vec![arr]; + let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]); + let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?; + let result = expr + .create_evaluator(&batch)? + .evaluate_with_rank(vec![partition], ranks)?; + assert_eq!(1, result.len()); + let result = result[0].as_any().downcast_ref::().unwrap(); + let result = result.values(); + assert_eq!(expected, result); + Ok(()) + } + + #[test] + fn test_cume_dist() -> Result<()> { + let r = cume_dist("arr".into()); + + let expected = vec![0.0; 0]; + test_i32_result(&r, vec![], 0..0, vec![], expected)?; + + let expected = vec![1.0; 1]; + test_i32_result(&r, vec![20; 1], 0..1, vec![0..1], expected)?; + + let expected = vec![1.0; 2]; + test_i32_result(&r, vec![20; 2], 0..2, vec![0..2], expected)?; + + let expected = vec![0.5, 0.5, 1.0, 1.0]; + test_i32_result(&r, vec![1, 1, 2, 2], 0..4, vec![0..2, 2..4], expected)?; + + let expected = vec![0.25, 0.5, 0.75, 1.0]; + test_i32_result( + &r, + vec![1, 2, 4, 5], + 0..4, + vec![0..1, 1..2, 2..3, 3..4], + expected, + )?; + + Ok(()) + } +} diff --git a/datafusion/src/physical_plan/expressions/mod.rs b/datafusion/src/physical_plan/expressions/mod.rs index 5a5a1189af053..9c05112d5ad78 100644 --- a/datafusion/src/physical_plan/expressions/mod.rs +++ b/datafusion/src/physical_plan/expressions/mod.rs @@ -33,6 +33,7 @@ mod cast; mod coercion; mod column; mod count; +mod cume_dist; mod in_list; mod is_not_null; mod is_null; @@ -62,6 +63,7 @@ pub use cast::{ }; pub use column::{col, Column}; pub use count::Count; +pub use cume_dist::cume_dist; pub use in_list::{in_list, InListExpr}; pub use is_not_null::{is_not_null, IsNotNullExpr}; pub use is_null::{is_null, IsNullExpr}; diff --git a/datafusion/src/physical_plan/expressions/rank.rs b/datafusion/src/physical_plan/expressions/rank.rs index b88dec378c06e..b82cfa43ec64e 100644 --- a/datafusion/src/physical_plan/expressions/rank.rs +++ b/datafusion/src/physical_plan/expressions/rank.rs @@ -15,7 +15,8 @@ // specific language governing permissions and limitations // under the License. -//! Defines physical expressions that can evaluated at runtime during query execution +//! Defines physical expression for `rank` and `dense_rank` that can evaluated +//! at runtime during query execution use crate::error::Result; use crate::physical_plan::window_functions::PartitionEvaluator; diff --git a/datafusion/src/physical_plan/windows/mod.rs b/datafusion/src/physical_plan/windows/mod.rs index 7bd8312d0525e..f34649f454af6 100644 --- a/datafusion/src/physical_plan/windows/mod.rs +++ b/datafusion/src/physical_plan/windows/mod.rs @@ -22,7 +22,8 @@ use crate::logical_plan::window_frames::WindowFrame; use crate::physical_plan::{ aggregates, expressions::{ - dense_rank, lag, lead, rank, Literal, NthValue, PhysicalSortExpr, RowNumber, + cume_dist, dense_rank, lag, lead, rank, Literal, NthValue, PhysicalSortExpr, + RowNumber, }, type_coercion::coerce, window_functions::{ @@ -95,6 +96,7 @@ fn create_built_in_window_expr( BuiltInWindowFunction::RowNumber => Arc::new(RowNumber::new(name)), BuiltInWindowFunction::Rank => Arc::new(rank(name)), BuiltInWindowFunction::DenseRank => Arc::new(dense_rank(name)), + BuiltInWindowFunction::CumeDist => Arc::new(cume_dist(name)), BuiltInWindowFunction::Lag => { let coerced_args = coerce(args, input_schema, &signature_for_built_in(fun))?; let arg = coerced_args[0].clone(); diff --git a/integration-tests/sqls/simple_window_ranked_built_in_functions.sql b/integration-tests/sqls/simple_window_ranked_built_in_functions.sql index 0ea6b042555cc..adffcbf982120 100644 --- a/integration-tests/sqls/simple_window_ranked_built_in_functions.sql +++ b/integration-tests/sqls/simple_window_ranked_built_in_functions.sql @@ -16,6 +16,7 @@ select c9, + cume_dist() OVER (PARTITION BY c2 ORDER BY c3) cume_dist_by_c3, rank() OVER (PARTITION BY c2 ORDER BY c3) rank_by_c3, dense_rank() OVER (PARTITION BY c2 ORDER BY c3) dense_rank_by_c3 FROM test