Skip to content

Commit

Permalink
add cume_dist implementation (#1076)
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist authored Oct 8, 2021
1 parent d331fa2 commit 1eb5d55
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 2 deletions.
160 changes: 160 additions & 0 deletions datafusion/src/physical_plan/expressions/cume_dist.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines physical expression for `cume_dist` that can evaluated
//! at runtime during query execution
use crate::error::Result;
use crate::physical_plan::window_functions::PartitionEvaluator;
use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
use arrow::array::ArrayRef;
use arrow::array::Float64Array;
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use std::any::Any;
use std::iter;
use std::ops::Range;
use std::sync::Arc;

/// CumeDist calculates the cume_dist in the window function with order by
#[derive(Debug)]
pub struct CumeDist {
name: String,
}

/// Create a cume_dist window function
pub fn cume_dist(name: String) -> CumeDist {
CumeDist { name }
}

impl BuiltInWindowFunctionExpr for CumeDist {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn field(&self) -> Result<Field> {
let nullable = false;
let data_type = DataType::Float64;
Ok(Field::new(self.name(), data_type, nullable))
}

fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
vec![]
}

fn name(&self) -> &str {
&self.name
}

fn create_evaluator(
&self,
_batch: &RecordBatch,
) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(CumeDistEvaluator {}))
}
}

pub(crate) struct CumeDistEvaluator;

impl PartitionEvaluator for CumeDistEvaluator {
fn include_rank(&self) -> bool {
true
}

fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef> {
unreachable!(
"cume_dist evaluation must be called with evaluate_partition_with_rank"
)
}

fn evaluate_partition_with_rank(
&self,
partition: Range<usize>,
ranks_in_partition: &[Range<usize>],
) -> Result<ArrayRef> {
let scaler = (partition.end - partition.start) as f64;
let result = Float64Array::from_iter_values(
ranks_in_partition
.iter()
.scan(0_u64, |acc, range| {
let len = range.end - range.start;
*acc += len as u64;
let value: f64 = (*acc as f64) / scaler;
let result = iter::repeat(value).take(len);
Some(result)
})
.flatten(),
);
Ok(Arc::new(result))
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::{array::*, datatypes::*};

fn test_i32_result(
expr: &CumeDist,
data: Vec<i32>,
partition: Range<usize>,
ranks: Vec<Range<usize>>,
expected: Vec<f64>,
) -> Result<()> {
let arr: ArrayRef = Arc::new(Int32Array::from(data));
let values = vec![arr];
let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]);
let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
let result = expr
.create_evaluator(&batch)?
.evaluate_with_rank(vec![partition], ranks)?;
assert_eq!(1, result.len());
let result = result[0].as_any().downcast_ref::<Float64Array>().unwrap();
let result = result.values();
assert_eq!(expected, result);
Ok(())
}

#[test]
fn test_cume_dist() -> Result<()> {
let r = cume_dist("arr".into());

let expected = vec![0.0; 0];
test_i32_result(&r, vec![], 0..0, vec![], expected)?;

let expected = vec![1.0; 1];
test_i32_result(&r, vec![20; 1], 0..1, vec![0..1], expected)?;

let expected = vec![1.0; 2];
test_i32_result(&r, vec![20; 2], 0..2, vec![0..2], expected)?;

let expected = vec![0.5, 0.5, 1.0, 1.0];
test_i32_result(&r, vec![1, 1, 2, 2], 0..4, vec![0..2, 2..4], expected)?;

let expected = vec![0.25, 0.5, 0.75, 1.0];
test_i32_result(
&r,
vec![1, 2, 4, 5],
0..4,
vec![0..1, 1..2, 2..3, 3..4],
expected,
)?;

Ok(())
}
}
2 changes: 2 additions & 0 deletions datafusion/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mod cast;
mod coercion;
mod column;
mod count;
mod cume_dist;
mod in_list;
mod is_not_null;
mod is_null;
Expand Down Expand Up @@ -62,6 +63,7 @@ pub use cast::{
};
pub use column::{col, Column};
pub use count::Count;
pub use cume_dist::cume_dist;
pub use in_list::{in_list, InListExpr};
pub use is_not_null::{is_not_null, IsNotNullExpr};
pub use is_null::{is_null, IsNullExpr};
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/expressions/rank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
// specific language governing permissions and limitations
// under the License.

//! Defines physical expressions that can evaluated at runtime during query execution
//! Defines physical expression for `rank` and `dense_rank` that can evaluated
//! at runtime during query execution
use crate::error::Result;
use crate::physical_plan::window_functions::PartitionEvaluator;
Expand Down
4 changes: 3 additions & 1 deletion datafusion/src/physical_plan/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use crate::logical_plan::window_frames::WindowFrame;
use crate::physical_plan::{
aggregates,
expressions::{
dense_rank, lag, lead, rank, Literal, NthValue, PhysicalSortExpr, RowNumber,
cume_dist, dense_rank, lag, lead, rank, Literal, NthValue, PhysicalSortExpr,
RowNumber,
},
type_coercion::coerce,
window_functions::{
Expand Down Expand Up @@ -95,6 +96,7 @@ fn create_built_in_window_expr(
BuiltInWindowFunction::RowNumber => Arc::new(RowNumber::new(name)),
BuiltInWindowFunction::Rank => Arc::new(rank(name)),
BuiltInWindowFunction::DenseRank => Arc::new(dense_rank(name)),
BuiltInWindowFunction::CumeDist => Arc::new(cume_dist(name)),
BuiltInWindowFunction::Lag => {
let coerced_args = coerce(args, input_schema, &signature_for_built_in(fun))?;
let arg = coerced_args[0].clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

select
c9,
cume_dist() OVER (PARTITION BY c2 ORDER BY c3) cume_dist_by_c3,
rank() OVER (PARTITION BY c2 ORDER BY c3) rank_by_c3,
dense_rank() OVER (PARTITION BY c2 ORDER BY c3) dense_rank_by_c3
FROM test
Expand Down

0 comments on commit 1eb5d55

Please sign in to comment.