Skip to content

Commit

Permalink
Rename expr::window_function::WindowFunction to `WindowFunctionDefi…
Browse files Browse the repository at this point in the history
…nition`, make structure consistent with ScalarFunction (apache#8382)

* Refactoring WindowFunction into coherent structure with AggregateFunction

* One more cargo fmt

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
edmondop and alamb authored Jan 1, 2024
1 parent e82707e commit d2b3d1c
Show file tree
Hide file tree
Showing 20 changed files with 613 additions and 581 deletions.
6 changes: 4 additions & 2 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,7 @@ mod tests {
use datafusion_expr::{
avg, cast, count, count_distinct, create_udf, expr, lit, max, min, sum,
BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFrame,
WindowFunction,
WindowFunctionDefinition,
};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::get_plan_string;
Expand Down Expand Up @@ -1525,7 +1525,9 @@ mod tests {
// build plan using Table API
let t = test_table().await?;
let first_row = Expr::WindowFunction(expr::WindowFunction::new(
WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::FirstValue),
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::FirstValue,
),
vec![col("aggregate_test_100.c1")],
vec![col("aggregate_test_100.c2")],
vec![],
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::prelude::{CsvReadOptions, SessionContext};
use arrow_schema::{Schema, SchemaRef, SortOptions};
use datafusion_common::{JoinType, Statistics};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction};
use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunctionDefinition};
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};

Expand Down Expand Up @@ -234,7 +234,7 @@ pub fn bounded_window_exec(
Arc::new(
crate::physical_plan::windows::BoundedWindowAggExec::try_new(
vec![create_window_expr(
&WindowFunction::AggregateFunction(AggregateFunction::Count),
&WindowFunctionDefinition::AggregateFunction(AggregateFunction::Count),
"count".to_owned(),
&[col(col_name, &schema).unwrap()],
&[],
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use datafusion_expr::expr::{GroupingSet, Sort};
use datafusion_expr::{
array_agg, avg, col, count, exists, expr, in_subquery, lit, max, out_ref_col,
scalar_subquery, sum, wildcard, AggregateFunction, Expr, ExprSchemable, WindowFrame,
WindowFrameBound, WindowFrameUnits, WindowFunction,
WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
};
use datafusion_physical_expr::var_provider::{VarProvider, VarType};

Expand Down Expand Up @@ -170,7 +170,7 @@ async fn test_count_wildcard_on_window() -> Result<()> {
.table("t1")
.await?
.select(vec![Expr::WindowFunction(expr::WindowFunction::new(
WindowFunction::AggregateFunction(AggregateFunction::Count),
WindowFunctionDefinition::AggregateFunction(AggregateFunction::Count),
vec![wildcard()],
vec![],
vec![Expr::Sort(Sort::new(Box::new(col("a")), false, true))],
Expand Down
46 changes: 31 additions & 15 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use datafusion_common::{Result, ScalarValue};
use datafusion_expr::type_coercion::aggregates::coerce_types;
use datafusion_expr::{
AggregateFunction, BuiltInWindowFunction, WindowFrame, WindowFrameBound,
WindowFrameUnits, WindowFunction,
WindowFrameUnits, WindowFunctionDefinition,
};
use datafusion_physical_expr::expressions::{cast, col, lit};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
Expand Down Expand Up @@ -143,7 +143,7 @@ fn get_random_function(
schema: &SchemaRef,
rng: &mut StdRng,
is_linear: bool,
) -> (WindowFunction, Vec<Arc<dyn PhysicalExpr>>, String) {
) -> (WindowFunctionDefinition, Vec<Arc<dyn PhysicalExpr>>, String) {
let mut args = if is_linear {
// In linear test for the test version with WindowAggExec we use insert SortExecs to the plan to be able to generate
// same result with BoundedWindowAggExec which doesn't use any SortExec. To make result
Expand All @@ -159,28 +159,28 @@ fn get_random_function(
window_fn_map.insert(
"sum",
(
WindowFunction::AggregateFunction(AggregateFunction::Sum),
WindowFunctionDefinition::AggregateFunction(AggregateFunction::Sum),
vec![],
),
);
window_fn_map.insert(
"count",
(
WindowFunction::AggregateFunction(AggregateFunction::Count),
WindowFunctionDefinition::AggregateFunction(AggregateFunction::Count),
vec![],
),
);
window_fn_map.insert(
"min",
(
WindowFunction::AggregateFunction(AggregateFunction::Min),
WindowFunctionDefinition::AggregateFunction(AggregateFunction::Min),
vec![],
),
);
window_fn_map.insert(
"max",
(
WindowFunction::AggregateFunction(AggregateFunction::Max),
WindowFunctionDefinition::AggregateFunction(AggregateFunction::Max),
vec![],
),
);
Expand All @@ -191,28 +191,36 @@ fn get_random_function(
window_fn_map.insert(
"row_number",
(
WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::RowNumber),
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::RowNumber,
),
vec![],
),
);
window_fn_map.insert(
"rank",
(
WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::Rank),
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::Rank,
),
vec![],
),
);
window_fn_map.insert(
"dense_rank",
(
WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::DenseRank),
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::DenseRank,
),
vec![],
),
);
window_fn_map.insert(
"lead",
(
WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::Lead),
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::Lead,
),
vec![
lit(ScalarValue::Int64(Some(rng.gen_range(1..10)))),
lit(ScalarValue::Int64(Some(rng.gen_range(1..1000)))),
Expand All @@ -222,7 +230,9 @@ fn get_random_function(
window_fn_map.insert(
"lag",
(
WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::Lag),
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::Lag,
),
vec![
lit(ScalarValue::Int64(Some(rng.gen_range(1..10)))),
lit(ScalarValue::Int64(Some(rng.gen_range(1..1000)))),
Expand All @@ -233,29 +243,35 @@ fn get_random_function(
window_fn_map.insert(
"first_value",
(
WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::FirstValue),
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::FirstValue,
),
vec![],
),
);
window_fn_map.insert(
"last_value",
(
WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::LastValue),
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::LastValue,
),
vec![],
),
);
window_fn_map.insert(
"nth_value",
(
WindowFunction::BuiltInWindowFunction(BuiltInWindowFunction::NthValue),
WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::NthValue,
),
vec![lit(ScalarValue::Int64(Some(rng.gen_range(1..10))))],
),
);

let rand_fn_idx = rng.gen_range(0..window_fn_map.len());
let fn_name = window_fn_map.keys().collect::<Vec<_>>()[rand_fn_idx];
let (window_fn, new_args) = window_fn_map.values().collect::<Vec<_>>()[rand_fn_idx];
if let WindowFunction::AggregateFunction(f) = window_fn {
if let WindowFunctionDefinition::AggregateFunction(f) = window_fn {
let a = args[0].clone();
let dt = a.data_type(schema.as_ref()).unwrap();
let sig = f.signature();
Expand Down
Loading

0 comments on commit d2b3d1c

Please sign in to comment.