Skip to content

Commit

Permalink
add multi ordering test case (apache#8439)
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 authored and appletreeisyellow committed Dec 15, 2023
1 parent 2737f43 commit 5c76c91
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 30 deletions.
11 changes: 11 additions & 0 deletions datafusion/core/tests/data/aggregate_agg_multi_order.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
c1,c2,c3
1,20,0
2,20,1
3,10,2
4,10,3
5,30,4
6,30,5
7,30,6
8,30,7
9,30,8
10,10,9
49 changes: 19 additions & 30 deletions datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};

use arrow::array::ArrayRef;
use arrow::datatypes::{DataType, Field};
use arrow_array::cast::AsArray;
use arrow_array::Array;
use arrow_schema::{Fields, SortOptions};
use datafusion_common::cast::as_list_array;
use datafusion_common::utils::{compare_rows, get_row_at_idx};
use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::Accumulator;
Expand Down Expand Up @@ -214,7 +214,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
// values received from its ordering requirement expression. (This information is necessary for during merging).
let agg_orderings = &states[1];

if as_list_array(agg_orderings).is_ok() {
if let Some(agg_orderings) = agg_orderings.as_list_opt::<i32>() {
// Stores ARRAY_AGG results coming from each partition
let mut partition_values = vec![];
// Stores ordering requirement expression results coming from each partition
Expand All @@ -232,10 +232,21 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
}

let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?;
// Ordering requirement expression values for each entry in the ARRAY_AGG list
let other_ordering_values = self.convert_array_agg_to_orderings(orderings)?;
for v in other_ordering_values.into_iter() {
partition_ordering_values.push(v);

for partition_ordering_rows in orderings.into_iter() {
// Extract value from struct to ordering_rows for each group/partition
let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| {
if let ScalarValue::Struct(Some(ordering_columns_per_row), _) = ordering_row {
Ok(ordering_columns_per_row)
} else {
exec_err!(
"Expects to receive ScalarValue::Struct(Some(..), _) but got:{:?}",
ordering_row.data_type()
)
}
}).collect::<Result<Vec<_>>>()?;

partition_ordering_values.push(ordering_value);
}

let sort_options = self
Expand Down Expand Up @@ -293,33 +304,10 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
}

impl OrderSensitiveArrayAggAccumulator {
/// Inner Vec\<ScalarValue> in the ordering_values can be thought as ordering information for the each ScalarValue in the values array.
/// See [`merge_ordered_arrays`] for more information.
fn convert_array_agg_to_orderings(
&self,
array_agg: Vec<Vec<ScalarValue>>,
) -> Result<Vec<Vec<Vec<ScalarValue>>>> {
let mut orderings = vec![];
// in_data is Vec<ScalarValue> where ScalarValue does not include ScalarValue::List
for in_data in array_agg.into_iter() {
let ordering = in_data.into_iter().map(|struct_vals| {
if let ScalarValue::Struct(Some(orderings), _) = struct_vals {
Ok(orderings)
} else {
exec_err!(
"Expects to receive ScalarValue::Struct(Some(..), _) but got:{:?}",
struct_vals.data_type()
)
}
}).collect::<Result<Vec<_>>>()?;
orderings.push(ordering);
}
Ok(orderings)
}

fn evaluate_orderings(&self) -> Result<ScalarValue> {
let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
let struct_field = Fields::from(fields.clone());

let orderings: Vec<ScalarValue> = self
.ordering_values
.iter()
Expand All @@ -329,6 +317,7 @@ impl OrderSensitiveArrayAggAccumulator {
.collect();
let struct_type = DataType::Struct(Fields::from(fields));

// Wrap in List, so we have the same data structure ListArray(StructArray..) for group by cases
let arr = ScalarValue::new_list(&orderings, &struct_type);
Ok(ScalarValue::List(arr))
}
Expand Down
30 changes: 30 additions & 0 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,36 @@ FROM
----
[0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm, 0keZ5G8BffGwgF2RwQD59TFzMStxCB, 0og6hSkhbX8AC1ktFS4kounvTzy8Vo, 1aOcrEGd0cOqZe2I5XBOm0nDcwtBZO, 2T3wSlHdEmASmO0xcXHnndkKEt6bz8]

statement ok
CREATE EXTERNAL TABLE agg_order (
c1 INT NOT NULL,
c2 INT NOT NULL,
c3 INT NOT NULL
)
STORED AS CSV
WITH HEADER ROW
LOCATION '../core/tests/data/aggregate_agg_multi_order.csv';

# test array_agg with order by multiple columns
query ?
select array_agg(c1 order by c2 desc, c3) from agg_order;
----
[5, 6, 7, 8, 9, 1, 2, 3, 4, 10]

query TT
explain select array_agg(c1 order by c2 desc, c3) from agg_order;
----
logical_plan
Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]]
--TableScan: agg_order projection=[c1, c2, c3]
physical_plan
AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
--CoalescePartitionsExec
----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
------SortExec: expr=[c2@1 DESC,c3@2 ASC NULLS LAST]
--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], has_header=true

statement error This feature is not implemented: LIMIT not supported in ARRAY_AGG: 1
SELECT array_agg(c13 LIMIT 1) FROM aggregate_test_100

Expand Down

0 comments on commit 5c76c91

Please sign in to comment.