Skip to content

Commit

Permalink
[MINOR]: Add size check for aggregate (apache#8813)
Browse files Browse the repository at this point in the history
* Add size check for aggregate

* Fix failing tests

* Minor changes
  • Loading branch information
mustafasrepo authored Jan 11, 2024
1 parent 5e0970a commit a154884
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,13 @@ mod tests {
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
) -> Arc<dyn ExecutionPlan> {
let schema = input.schema();
let n_aggr = aggr_expr.len();
Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
group_by,
aggr_expr,
vec![],
vec![None; n_aggr],
input,
schema,
)
Expand All @@ -288,12 +289,13 @@ mod tests {
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
) -> Arc<dyn ExecutionPlan> {
let schema = input.schema();
let n_aggr = aggr_expr.len();
Arc::new(
AggregateExec::try_new(
AggregateMode::Final,
group_by,
aggr_expr,
vec![],
vec![None; n_aggr],
input,
schema,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,15 +305,15 @@ mod tests {
AggregateMode::Partial,
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
vec![], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
vec![], /* filter_expr */
Arc::new(partial_agg), /* input */
schema.clone(), /* input_schema */
)?;
Expand Down Expand Up @@ -355,7 +355,7 @@ mod tests {
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
vec![], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
Expand Down Expand Up @@ -396,7 +396,7 @@ mod tests {
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
vec![], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
Expand Down Expand Up @@ -437,15 +437,15 @@ mod tests {
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string(), "b".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
vec![], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
let distinct_agg = AggregateExec::try_new(
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
vec![], /* filter_expr */
Arc::new(group_by_agg), /* input */
schema.clone(), /* input_schema */
)?;
Expand Down Expand Up @@ -487,7 +487,7 @@ mod tests {
AggregateMode::Single,
build_group_by(&schema.clone(), vec![]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
vec![], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
Expand Down Expand Up @@ -549,13 +549,14 @@ mod tests {
cast(expressions::lit(1u32), &schema, DataType::Int32)?,
&schema,
)?);
let agg = TestAggregate::new_count_star();
let single_agg = AggregateExec::try_new(
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![filter_expr], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
vec![agg.count_expr()], /* aggr_expr */
vec![filter_expr], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
let limit_exec = LocalLimitExec::new(
Arc::new(single_agg),
Expand All @@ -565,7 +566,7 @@ mod tests {
// TODO(msirek): open an issue for `filter_expr` of `AggregateExec` not printing out
let expected = [
"LocalLimitExec: fetch=10",
"AggregateExec: mode=Single, gby=[a@0 as a], aggr=[]",
"AggregateExec: mode=Single, gby=[a@0 as a], aggr=[COUNT(*)]",
"MemoryExec: partitions=1, partition_sizes=[1]",
];
let plan: Arc<dyn ExecutionPlan> = Arc::new(limit_exec);
Expand All @@ -588,7 +589,7 @@ mod tests {
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
vec![], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
Expand Down
10 changes: 8 additions & 2 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use arrow::array::ArrayRef;
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::stats::Precision;
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
use datafusion_common::{internal_err, not_impl_err, plan_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_expr::Accumulator;
use datafusion_physical_expr::{
Expand Down Expand Up @@ -321,6 +321,11 @@ impl AggregateExec {
input_schema: SchemaRef,
schema: SchemaRef,
) -> Result<Self> {
// Make sure arguments are consistent in size
if aggr_expr.len() != filter_expr.len() {
return internal_err!("Inconsistent aggregate expr: {:?} and filter expr: {:?} for AggregateExec, their size should match", aggr_expr, filter_expr);
}

let input_eq_properties = input.equivalence_properties();
// Get GROUP BY expressions:
let groupby_exprs = group_by.input_exprs();
Expand Down Expand Up @@ -1795,11 +1800,12 @@ mod tests {
(1, groups_some.clone(), aggregates_v1),
(2, groups_some, aggregates_v2),
] {
let n_aggr = aggregates.len();
let partial_aggregate = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
groups,
aggregates,
vec![None; 3],
vec![None; n_aggr],
input.clone(),
input_schema.clone(),
)?);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ mod tests {
AggregateMode::Final,
build_group_by(&csv.schema().clone(), vec!["i".to_string()]),
vec![],
vec![None],
vec![],
csv.clone(),
csv.schema().clone(),
)?;
Expand Down

0 comments on commit a154884

Please sign in to comment.