Skip to content

Commit

Permalink
Convert rank / dense_rank and percent_rank builtin functions to…
Browse files Browse the repository at this point in the history
… UDWF (#12718)

* wip: converting rank builtin function to UDWF

* commented BuiltInWindowFunction in datafusion.proto and fixed issue related to Datafusion window function

* implemented rank.rs, percent_rank.rs and dense_rank.rs in datafusion functions-window

* removed a test from built in window function test for percent_rank and updated pbson fields

* removed unnecessary code

* added window_functions field to the MockSessionState

* updated rank, percent_rank and dense_rank udwf to use macros

* wip: fix rank functionality in sql integration

* fixed rank udwf not found issue in sql_integration.rs

* evaluating rank, percent_rank and dense_rank udwf with evaluate_with_rank function

* fixed rank projection test

* wip: fixing the percent_rank() documentation

* fixed the docs error issue

* fixed data type of the percent_rank udwf

* updated prost.rs file

* updated test and documentation

* Fix logical conflicts

* tweak module documentation

---------

Co-authored-by: jatin <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
3 people authored Oct 10, 2024
1 parent 101e455 commit 939ef9e
Show file tree
Hide file tree
Showing 27 changed files with 687 additions and 469 deletions.
23 changes: 8 additions & 15 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use test_utils::add_empty_batches;

use datafusion::functions_window::row_number::row_number_udwf;
use datafusion_functions_window::dense_rank::dense_rank_udwf;
use datafusion_functions_window::rank::rank_udwf;
use hashbrown::HashMap;
use rand::distributions::Alphanumeric;
use rand::rngs::StdRng;
Expand Down Expand Up @@ -224,9 +226,9 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
// )
(
// Window function
WindowFunctionDefinition::BuiltInWindowFunction(BuiltInWindowFunction::Rank),
WindowFunctionDefinition::WindowUDF(rank_udwf()),
// its name
"RANK",
"rank",
// no argument
vec![],
// Expected causality, for None cases causality will be determined from window frame boundaries
Expand All @@ -238,11 +240,9 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
// )
(
// Window function
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::DenseRank,
),
WindowFunctionDefinition::WindowUDF(dense_rank_udwf()),
// its name
"DENSE_RANK",
"dense_rank",
// no argument
vec![],
// Expected causality, for None cases causality will be determined from window frame boundaries
Expand Down Expand Up @@ -382,19 +382,12 @@ fn get_random_function(
);
window_fn_map.insert(
"rank",
(
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::Rank,
),
vec![],
),
(WindowFunctionDefinition::WindowUDF(rank_udwf()), vec![]),
);
window_fn_map.insert(
"dense_rank",
(
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::DenseRank,
),
WindowFunctionDefinition::WindowUDF(dense_rank_udwf()),
vec![],
),
);
Expand Down
25 changes: 3 additions & 22 deletions datafusion/expr/src/built_in_window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ impl fmt::Display for BuiltInWindowFunction {
/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, EnumIter)]
pub enum BuiltInWindowFunction {
/// rank of the current row with gaps; same as row_number of its first peer
Rank,
/// rank of the current row without gaps; this function counts peer groups
DenseRank,
/// relative rank of the current row: (rank - 1) / (total rows - 1)
PercentRank,
/// relative rank of the current row: (number of rows preceding or peer with current row) / (total rows)
CumeDist,
/// integer ranging from 1 to the argument value, dividing the partition as equally as possible
Expand All @@ -72,9 +66,6 @@ impl BuiltInWindowFunction {
pub fn name(&self) -> &str {
use BuiltInWindowFunction::*;
match self {
Rank => "RANK",
DenseRank => "DENSE_RANK",
PercentRank => "PERCENT_RANK",
CumeDist => "CUME_DIST",
Ntile => "NTILE",
Lag => "LAG",
Expand All @@ -90,9 +81,6 @@ impl FromStr for BuiltInWindowFunction {
type Err = DataFusionError;
fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
Ok(match name.to_uppercase().as_str() {
"RANK" => BuiltInWindowFunction::Rank,
"DENSE_RANK" => BuiltInWindowFunction::DenseRank,
"PERCENT_RANK" => BuiltInWindowFunction::PercentRank,
"CUME_DIST" => BuiltInWindowFunction::CumeDist,
"NTILE" => BuiltInWindowFunction::Ntile,
"LAG" => BuiltInWindowFunction::Lag,
Expand Down Expand Up @@ -127,12 +115,8 @@ impl BuiltInWindowFunction {
})?;

match self {
BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank
| BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
BuiltInWindowFunction::PercentRank | BuiltInWindowFunction::CumeDist => {
Ok(DataType::Float64)
}
BuiltInWindowFunction::Ntile => Ok(DataType::UInt64),
BuiltInWindowFunction::CumeDist => Ok(DataType::Float64),
BuiltInWindowFunction::Lag
| BuiltInWindowFunction::Lead
| BuiltInWindowFunction::FirstValue
Expand All @@ -145,10 +129,7 @@ impl BuiltInWindowFunction {
pub fn signature(&self) -> Signature {
// note: the physical expression must accept the type returned by this function or the execution panics.
match self {
BuiltInWindowFunction::Rank
| BuiltInWindowFunction::DenseRank
| BuiltInWindowFunction::PercentRank
| BuiltInWindowFunction::CumeDist => Signature::any(0, Volatility::Immutable),
BuiltInWindowFunction::CumeDist => Signature::any(0, Volatility::Immutable),
BuiltInWindowFunction::Lag | BuiltInWindowFunction::Lead => {
Signature::one_of(
vec![
Expand Down
12 changes: 0 additions & 12 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2598,15 +2598,6 @@ mod test {
Ok(())
}

#[test]
fn test_percent_rank_return_type() -> Result<()> {
let fun = find_df_window_func("percent_rank").unwrap();
let observed = fun.return_type(&[], &[], "")?;
assert_eq!(DataType::Float64, observed);

Ok(())
}

#[test]
fn test_cume_dist_return_type() -> Result<()> {
let fun = find_df_window_func("cume_dist").unwrap();
Expand All @@ -2628,9 +2619,6 @@ mod test {
#[test]
fn test_window_function_case_insensitive() -> Result<()> {
let names = vec![
"rank",
"dense_rank",
"percent_rank",
"cume_dist",
"ntile",
"lag",
Expand Down
4 changes: 3 additions & 1 deletion datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,6 @@ pub fn interval_month_day_nano_lit(value: &str) -> Expr {
/// # use datafusion_expr::test::function_stub::count;
/// # use sqlparser::ast::NullTreatment;
/// # use datafusion_expr::{ExprFunctionExt, lit, Expr, col};
/// # use datafusion_expr::window_function::percent_rank;
/// # // first_value is an aggregate function in another crate
/// # fn first_value(_arg: Expr) -> Expr {
/// unimplemented!() }
Expand All @@ -721,6 +720,9 @@ pub fn interval_month_day_nano_lit(value: &str) -> Expr {
/// // Create a window expression for percent rank partitioned on column a
/// // equivalent to:
/// // `PERCENT_RANK() OVER (PARTITION BY a ORDER BY b ASC NULLS LAST IGNORE NULLS)`
/// // percent_rank is an udwf function in another crate
/// # fn percent_rank() -> Expr {
/// unimplemented!() }
/// let window = percent_rank()
/// .partition_by(vec![col("a")])
/// .order_by(vec![col("b").sort(true, true)])
Expand Down
21 changes: 0 additions & 21 deletions datafusion/expr/src/window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,6 @@ use datafusion_common::ScalarValue;

use crate::{expr::WindowFunction, BuiltInWindowFunction, Expr, Literal};

/// Create an expression to represent the `rank` window function
pub fn rank() -> Expr {
Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::Rank, vec![]))
}

/// Create an expression to represent the `dense_rank` window function
pub fn dense_rank() -> Expr {
Expr::WindowFunction(WindowFunction::new(
BuiltInWindowFunction::DenseRank,
vec![],
))
}

/// Create an expression to represent the `percent_rank` window function
pub fn percent_rank() -> Expr {
Expr::WindowFunction(WindowFunction::new(
BuiltInWindowFunction::PercentRank,
vec![],
))
}

/// Create an expression to represent the `cume_dist` window function
pub fn cume_dist() -> Expr {
Expr::WindowFunction(WindowFunction::new(BuiltInWindowFunction::CumeDist, vec![]))
Expand Down
205 changes: 205 additions & 0 deletions datafusion/functions-window/src/dense_rank.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! `dense_rank` window function implementation
use std::any::Any;
use std::fmt::Debug;
use std::iter;
use std::ops::Range;
use std::sync::Arc;

use crate::define_udwf_and_expr;
use crate::rank::RankState;
use datafusion_common::arrow::array::ArrayRef;
use datafusion_common::arrow::array::UInt64Array;
use datafusion_common::arrow::compute::SortOptions;
use datafusion_common::arrow::datatypes::DataType;
use datafusion_common::arrow::datatypes::Field;
use datafusion_common::utils::get_row_at_idx;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{PartitionEvaluator, Signature, Volatility, WindowUDFImpl};
use datafusion_functions_window_common::field;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use field::WindowUDFFieldArgs;

define_udwf_and_expr!(
DenseRank,
dense_rank,
"Returns rank of the current row without gaps. This function counts peer groups"
);

/// dense_rank expression
#[derive(Debug)]
pub struct DenseRank {
signature: Signature,
}

impl DenseRank {
/// Create a new `dense_rank` function
pub fn new() -> Self {
Self {
signature: Signature::any(0, Volatility::Immutable),
}
}
}

impl Default for DenseRank {
fn default() -> Self {
Self::new()
}
}

impl WindowUDFImpl for DenseRank {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"dense_rank"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn partition_evaluator(
&self,
_partition_evaluator_args: PartitionEvaluatorArgs,
) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::<DenseRankEvaluator>::default())
}

fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
Ok(Field::new(field_args.name(), DataType::UInt64, false))
}

fn sort_options(&self) -> Option<SortOptions> {
Some(SortOptions {
descending: false,
nulls_first: false,
})
}
}

/// State for the `dense_rank` built-in window function.
#[derive(Debug, Default)]
struct DenseRankEvaluator {
state: RankState,
}

impl PartitionEvaluator for DenseRankEvaluator {
fn is_causal(&self) -> bool {
// The dense_rank function doesn't need "future" values to emit results:
true
}

fn evaluate(
&mut self,
values: &[ArrayRef],
range: &Range<usize>,
) -> Result<ScalarValue> {
let row_idx = range.start;
// There is no argument, values are order by column values (where rank is calculated)
let range_columns = values;
let last_rank_data = get_row_at_idx(range_columns, row_idx)?;
let new_rank_encountered =
if let Some(state_last_rank_data) = &self.state.last_rank_data {
// if rank data changes, new rank is encountered
state_last_rank_data != &last_rank_data
} else {
// First rank seen
true
};

if new_rank_encountered {
self.state.last_rank_data = Some(last_rank_data);
self.state.last_rank_boundary += self.state.current_group_count;
self.state.current_group_count = 1;
self.state.n_rank += 1;
} else {
// data is still in the same rank
self.state.current_group_count += 1;
}

Ok(ScalarValue::UInt64(Some(self.state.n_rank as u64)))
}

fn evaluate_all_with_rank(
&self,
_num_rows: usize,
ranks_in_partition: &[Range<usize>],
) -> Result<ArrayRef> {
let result = Arc::new(UInt64Array::from_iter_values(
ranks_in_partition
.iter()
.zip(1u64..)
.flat_map(|(range, rank)| {
let len = range.end - range.start;
iter::repeat(rank).take(len)
}),
));

Ok(result)
}

fn supports_bounded_execution(&self) -> bool {
true
}

fn include_rank(&self) -> bool {
true
}
}

#[cfg(test)]
mod tests {
use super::*;
use datafusion_common::cast::as_uint64_array;

fn test_with_rank(expr: &DenseRank, expected: Vec<u64>) -> Result<()> {
test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
}

#[allow(clippy::single_range_in_vec_init)]
fn test_without_rank(expr: &DenseRank, expected: Vec<u64>) -> Result<()> {
test_i32_result(expr, vec![0..8], expected)
}

fn test_i32_result(
expr: &DenseRank,
ranks: Vec<Range<usize>>,
expected: Vec<u64>,
) -> Result<()> {
let args = PartitionEvaluatorArgs::default();
let result = expr
.partition_evaluator(args)?
.evaluate_all_with_rank(8, &ranks)?;
let result = as_uint64_array(&result)?;
let result = result.values();
assert_eq!(expected, *result);
Ok(())
}

#[test]
fn test_dense_rank() -> Result<()> {
let r = DenseRank::default();
test_without_rank(&r, vec![1; 8])?;
test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
Ok(())
}
}
Loading

0 comments on commit 939ef9e

Please sign in to comment.