Skip to content

Commit

Permalink
Fix bug and add new test (#7099)
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafasrepo authored Jul 26, 2023
1 parent 581778d commit f9a1396
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 3 deletions.
5 changes: 2 additions & 3 deletions datafusion/core/src/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl ProjectionExec {

// construct a map from the input columns to the output columns of the Projection
let mut columns_map: HashMap<Column, Vec<Column>> = HashMap::new();
for (expression, name) in expr.iter() {
for (expr_idx, (expression, name)) in expr.iter().enumerate() {
if let Some(column) = expression.as_any().downcast_ref::<Column>() {
// For some executors, logical and physical plan schema fields
// are not the same. The information in a `Column` comes from
Expand All @@ -107,11 +107,10 @@ impl ProjectionExec {
let idx = column.index();
let matching_input_field = input_schema.field(idx);
let matching_input_column = Column::new(matching_input_field.name(), idx);
let new_col_idx = schema.index_of(name)?;
let entry = columns_map
.entry(matching_input_column)
.or_insert_with(Vec::new);
entry.push(Column::new(name, new_col_idx));
entry.push(Column::new(name, expr_idx));
};
}

Expand Down
62 changes: 62 additions & 0 deletions datafusion/core/tests/sqllogictests/test_files/groupby.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2568,6 +2568,52 @@ TUR 100 75 175
GRC 80 30 110
FRA 200 50 250

query TT
EXPLAIN SELECT s.zip_code, s.country, s.sn, s.ts, s.currency, LAST_VALUE(e.amount ORDER BY e.sn) AS last_rate
FROM sales_global AS s
JOIN sales_global AS e
ON s.currency = e.currency AND
s.ts >= e.ts
GROUP BY s.sn, s.zip_code, s.country, s.ts, s.currency
ORDER BY s.sn
----
logical_plan
Sort: s.sn ASC NULLS LAST
--Projection: s.zip_code, s.country, s.sn, s.ts, s.currency, LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST] AS last_rate
----Aggregate: groupBy=[[s.sn, s.zip_code, s.country, s.ts, s.currency]], aggr=[[LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST]]]
------Projection: s.zip_code, s.country, s.sn, s.ts, s.currency, e.sn, e.amount
--------Inner Join: s.currency = e.currency Filter: s.ts >= e.ts
----------SubqueryAlias: s
------------TableScan: sales_global projection=[zip_code, country, sn, ts, currency]
----------SubqueryAlias: e
------------TableScan: sales_global projection=[sn, ts, currency, amount]
physical_plan
SortExec: expr=[sn@2 ASC NULLS LAST]
--ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as sn, ts@3 as ts, currency@4 as currency, LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST]@5 as last_rate]
----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, country@1 as country, ts@3 as ts, currency@4 as currency], aggr=[LAST_VALUE(e.amount)]
------SortExec: expr=[sn@5 ASC NULLS LAST]
--------ProjectionExec: expr=[zip_code@0 as zip_code, country@1 as country, sn@2 as sn, ts@3 as ts, currency@4 as currency, sn@5 as sn, amount@8 as amount]
----------CoalesceBatchesExec: target_batch_size=8192
------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@4, currency@2)], filter=ts@0 >= ts@1
--------------MemoryExec: partitions=1, partition_sizes=[1]
--------------MemoryExec: partitions=1, partition_sizes=[1]

query ITIPTR
SELECT s.zip_code, s.country, s.sn, s.ts, s.currency, LAST_VALUE(e.amount ORDER BY e.sn) AS last_rate
FROM sales_global AS s
JOIN sales_global AS e
ON s.currency = e.currency AND
s.ts >= e.ts
GROUP BY s.sn, s.zip_code, s.country, s.ts, s.currency
ORDER BY s.sn
----
0 GRC 0 2022-01-01T06:00:00 EUR 30
1 FRA 1 2022-01-01T08:00:00 EUR 50
1 TUR 2 2022-01-01T11:30:00 TRY 75
1 FRA 3 2022-01-02T12:00:00 EUR 200
0 GRC 4 2022-01-03T10:00:00 EUR 80
1 TUR 4 2022-01-03T10:00:00 TRY 100

# Run order-sensitive aggregators in multiple partitions
statement ok
set datafusion.execution.target_partitions = 8;
Expand Down Expand Up @@ -2847,3 +2893,19 @@ SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) AS amounts,
FRA [200.0, 50.0] 50 50
GRC [80.0, 30.0] 30 30
TUR [100.0, 75.0] 75 75

query ITIPTR
SELECT s.zip_code, s.country, s.sn, s.ts, s.currency, LAST_VALUE(e.amount ORDER BY e.sn) AS last_rate
FROM sales_global AS s
JOIN sales_global AS e
ON s.currency = e.currency AND
s.ts >= e.ts
GROUP BY s.sn, s.zip_code, s.country, s.ts, s.currency
ORDER BY s.sn
----
0 GRC 0 2022-01-01T06:00:00 EUR 30
1 FRA 1 2022-01-01T08:00:00 EUR 50
1 TUR 2 2022-01-01T11:30:00 TRY 75
1 FRA 3 2022-01-02T12:00:00 EUR 200
0 GRC 4 2022-01-03T10:00:00 EUR 80
1 TUR 4 2022-01-03T10:00:00 TRY 100

0 comments on commit f9a1396

Please sign in to comment.