Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[window function] add percent_rank window function #1077

Merged
merged 1 commit into from
Oct 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub use negative::{negative, NegativeExpr};
pub use not::{not, NotExpr};
pub use nth_value::NthValue;
pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
pub use rank::{dense_rank, rank};
pub use rank::{dense_rank, percent_rank, rank};
pub use row_number::RowNumber;
pub use sum::{sum_return_type, Sum};
pub use try_cast::{try_cast, TryCastExpr};
Expand Down
135 changes: 115 additions & 20 deletions datafusion/src/physical_plan/expressions/rank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
// specific language governing permissions and limitations
// under the License.

//! Defines physical expression for `rank` and `dense_rank` that can evaluated
//! Defines physical expression for `rank`, `dense_rank`, and `percent_rank` that can evaluated
//! at runtime during query execution

use crate::error::Result;
use crate::physical_plan::window_functions::PartitionEvaluator;
use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
use arrow::array::ArrayRef;
use arrow::array::UInt64Array;
use arrow::array::{Float64Array, UInt64Array};
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use std::any::Any;
Expand All @@ -34,17 +34,38 @@ use std::sync::Arc;
#[derive(Debug)]
pub struct Rank {
name: String,
dense: bool,
rank_type: RankType,
}

#[derive(Debug, Copy, Clone)]
pub(crate) enum RankType {
Rank,
DenseRank,
PercentRank,
}

/// Create a rank window function
pub fn rank(name: String) -> Rank {
Rank { name, dense: false }
Rank {
name,
rank_type: RankType::Rank,
}
}

/// Create a dense rank window function
pub fn dense_rank(name: String) -> Rank {
Rank { name, dense: true }
Rank {
name,
rank_type: RankType::DenseRank,
}
}

/// Create a percent rank window function
pub fn percent_rank(name: String) -> Rank {
Rank {
name,
rank_type: RankType::PercentRank,
}
}

impl BuiltInWindowFunctionExpr for Rank {
Expand All @@ -55,7 +76,10 @@ impl BuiltInWindowFunctionExpr for Rank {

fn field(&self) -> Result<Field> {
let nullable = false;
let data_type = DataType::UInt64;
let data_type = match self.rank_type {
RankType::Rank | RankType::DenseRank => DataType::UInt64,
RankType::PercentRank => DataType::Float64,
};
Ok(Field::new(self.name(), data_type, nullable))
}

Expand All @@ -71,12 +95,14 @@ impl BuiltInWindowFunctionExpr for Rank {
&self,
_batch: &RecordBatch,
) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(RankEvaluator { dense: self.dense }))
Ok(Box::new(RankEvaluator {
rank_type: self.rank_type,
}))
}
}

pub(crate) struct RankEvaluator {
dense: bool,
rank_type: RankType,
}

impl PartitionEvaluator for RankEvaluator {
Expand All @@ -90,18 +116,37 @@ impl PartitionEvaluator for RankEvaluator {

fn evaluate_partition_with_rank(
&self,
_partition: Range<usize>,
partition: Range<usize>,
ranks_in_partition: &[Range<usize>],
) -> Result<ArrayRef> {
let result = if self.dense {
UInt64Array::from_iter_values(ranks_in_partition.iter().zip(1u64..).flat_map(
|(range, rank)| {
let len = range.end - range.start;
iter::repeat(rank).take(len)
},
))
} else {
UInt64Array::from_iter_values(
// see https://www.postgresql.org/docs/current/functions-window.html
let result: ArrayRef = match self.rank_type {
RankType::DenseRank => Arc::new(UInt64Array::from_iter_values(
ranks_in_partition
.iter()
.zip(1u64..)
.flat_map(|(range, rank)| {
let len = range.end - range.start;
iter::repeat(rank).take(len)
}),
)),
RankType::PercentRank => {
// Returns the relative rank of the current row, that is (rank - 1) / (total partition rows - 1). The value thus ranges from 0 to 1 inclusive.
let denominator = (partition.end - partition.start) as f64;
Arc::new(Float64Array::from_iter_values(
ranks_in_partition
.iter()
.scan(0_u64, |acc, range| {
let len = range.end - range.start;
let value = (*acc as f64) / (denominator - 1.0).max(1.0);
let result = iter::repeat(value).take(len);
*acc += len as u64;
Some(result)
})
.flatten(),
))
}
RankType::Rank => Arc::new(UInt64Array::from_iter_values(
ranks_in_partition
.iter()
.scan(1_u64, |acc, range| {
Expand All @@ -111,9 +156,9 @@ impl PartitionEvaluator for RankEvaluator {
Some(result)
})
.flatten(),
)
)),
};
Ok(Arc::new(result))
Ok(result)
}
}

Expand All @@ -135,6 +180,27 @@ mod tests {
test_i32_result(expr, vec![-2, -2, 1, 3, 3, 3, 7, 8], vec![0..8], expected)
}

fn test_f64_result(
expr: &Rank,
data: Vec<i32>,
range: Range<usize>,
ranks: Vec<Range<usize>>,
expected: Vec<f64>,
) -> Result<()> {
let arr: ArrayRef = Arc::new(Int32Array::from(data));
let values = vec![arr];
let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]);
let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
let result = expr
.create_evaluator(&batch)?
.evaluate_with_rank(vec![range], ranks)?;
assert_eq!(1, result.len());
let result = result[0].as_any().downcast_ref::<Float64Array>().unwrap();
let result = result.values();
assert_eq!(expected, result);
Ok(())
}

fn test_i32_result(
expr: &Rank,
data: Vec<i32>,
Expand Down Expand Up @@ -170,4 +236,33 @@ mod tests {
test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
Ok(())
}

#[test]
fn test_percent_rank() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also add a test for dense_rank() here as well

let r = percent_rank("arr".into());

// empty case
let expected = vec![0.0; 0];
test_f64_result(&r, vec![0; 0], 0..0, vec![0..0; 0], expected)?;

// singleton case
let expected = vec![0.0];
test_f64_result(&r, vec![13], 0..1, vec![0..1], expected)?;

// uniform case
let expected = vec![0.0; 7];
test_f64_result(&r, vec![4; 7], 0..7, vec![0..7], expected)?;

// non-trivial case
let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
test_f64_result(
&r,
vec![1, 1, 1, 2, 2, 2, 2],
0..7,
vec![0..3, 3..7],
expected,
)?;

Ok(())
}
}
9 changes: 9 additions & 0 deletions datafusion/src/physical_plan/window_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,15 @@ mod tests {
Ok(())
}

#[test]
fn test_percent_rank_return_type() -> Result<()> {
let fun = WindowFunction::from_str("percent_rank")?;
let observed = return_type(&fun, &[])?;
assert_eq!(DataType::Float64, observed);

Ok(())
}

#[test]
fn test_cume_dist_return_type() -> Result<()> {
let fun = WindowFunction::from_str("cume_dist")?;
Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/physical_plan/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use crate::logical_plan::window_frames::WindowFrame;
use crate::physical_plan::{
aggregates,
expressions::{
cume_dist, dense_rank, lag, lead, rank, Literal, NthValue, PhysicalSortExpr,
RowNumber,
cume_dist, dense_rank, lag, lead, percent_rank, rank, Literal, NthValue,
PhysicalSortExpr, RowNumber,
},
type_coercion::coerce,
window_functions::{
Expand Down Expand Up @@ -96,6 +96,7 @@ fn create_built_in_window_expr(
BuiltInWindowFunction::RowNumber => Arc::new(RowNumber::new(name)),
BuiltInWindowFunction::Rank => Arc::new(rank(name)),
BuiltInWindowFunction::DenseRank => Arc::new(dense_rank(name)),
BuiltInWindowFunction::PercentRank => Arc::new(percent_rank(name)),
BuiltInWindowFunction::CumeDist => Arc::new(cume_dist(name)),
BuiltInWindowFunction::Lag => {
let coerced_args = coerce(args, input_schema, &signature_for_built_in(fun))?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ select
c9,
cume_dist() OVER (PARTITION BY c2 ORDER BY c3) cume_dist_by_c3,
rank() OVER (PARTITION BY c2 ORDER BY c3) rank_by_c3,
dense_rank() OVER (PARTITION BY c2 ORDER BY c3) dense_rank_by_c3
dense_rank() OVER (PARTITION BY c2 ORDER BY c3) dense_rank_by_c3,
percent_rank() OVER (PARTITION BY c2 ORDER BY c3) percent_rank_by_c3
FROM test
ORDER BY c9;