Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to handle an aggregation with functions of different steps? #4412

Closed
rui-mo opened this issue Mar 24, 2023 · 33 comments
Closed

How to handle an aggregation with functions of different steps? #4412

rui-mo opened this issue Mar 24, 2023 · 33 comments
Labels
question Further information is requested

Comments

@rui-mo
Copy link
Collaborator

rui-mo commented Mar 24, 2023

Description

Hi,
We noticed this issue when running TPC-DS q95 with Spark. For a query like below, Spark plans several aggregations as below to get the final result.

select count(distinct ws_order_number), sum(ws_ext_ship_cost), sum(ws_net_profit) from web_sales

Spark planned aggregations are:
(1) group by ws_order_number, partial_sum(ws_ext_ship_cost), partial_sum(ws_net_profit)
(2) group by ws_order_number, merge_sum(ws_ext_ship_cost), merge_sum(ws_net_profit)
(3) merge_sum(ws_ext_ship_cost), merge_sum(ws_net_profit), partial_count(ws_order_number)
(4) final_sum(ws_ext_ship_cost), final_sum(ws_net_profit), final_count(ws_order_number)

The problem occurs in the third part, because merge_sum and partial_count which belong to different phases are put together in an aggregation, while Velox currently does not support this behavior. If we ignore that and just use merge phase, incorrect result is got because the intermediate input for merge_count is not correct.

This issue can be reproduced with below unit test:

TEST_F(AverageAggregationTest, mergeAndPartial) {
  auto rows = makeRowVector({
    makeFlatVector<int32_t>(100, [&](auto row) { return row % 10; }),
    makeFlatVector<int32_t>(100, [&](auto row) { return row * 2; }),
    makeFlatVector<int32_t>(100, [&](auto row) { return row; })});
  
  createDuckDbTable("t", {rows});

  std::vector<TypePtr> resultType = {BIGINT(), BIGINT(), BIGINT()};
  auto plan = PlanBuilder()
                  .values({rows})
                  .partialAggregation({"c0"}, {"sum(c1)", "sum(c2)"})
                  .intermediateAggregation({"c0"}, {"sum(a0)", "sum(a1)"}, resultType)
                  .intermediateAggregation({}, {"sum(a0)", "sum(a1)", "count(c0)"}, resultType)
                  .planNode();
  assertQuery(plan, "SELECT sum(c1), sum(c2), count(distinct c0) from t");
}

@mbasmanova @oerling Do you have any advice for us? Thanks for your help.

@rui-mo rui-mo added the enhancement New feature or request label Mar 24, 2023
@rui-mo
Copy link
Collaborator Author

rui-mo commented Mar 24, 2023

cc @zhouyuan @FelixYBW

@mbasmanova
Copy link
Contributor

@rui-mo Rui, this query plan is surprising and as you noticed is not supported. Since partial aggregation may flush results when memory is low, the query may return incorrect results where some values of 'ws_order_number' are counted more than once.

Presto uses a different query plan with a MarkDistinct plan node. I assume MarkDistinct acts like a "final" aggregation and doesn't support flushing, but may support spilling.

Any chance you can share full query plan from Spark? Also, I wonder if there is any documentation about how Spark runs such queries.

presto:di> explain (type distributed) select count(distinct partkey), sum(quantity) FROM tpch.tiny.lineitem;
                                                                                                               Query Plan                                        >
----------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Fragment 0 [SINGLE]                                                                                                                                             >
     Output layout: [count, sum]                                                                                                                                 >
     Output partitioning: SINGLE []                                                                                                                              >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                               >
     - Output[_col0, _col1] => [count:bigint, sum:double]                                                                                                        >
             _col0 := count (1:35)                                                                                                                               >
             _col1 := sum (1:60)                                                                                                                                 >
         - Aggregate(FINAL) => [count:bigint, sum:double]                                                                                                        >
                 count := "presto.default.count"((count_9)) (1:35)                                                                                               >
                 sum := "presto.default.sum"((sum_10)) (1:60)                                                                                                    >
             - LocalExchange[SINGLE] () => [sum_10:double, count_9:bigint]                                                                                       >
                 - RemoteSource[1] => [sum_10:double, count_9:bigint]                                                                                            >
                                                                                                                                                                 >
 Fragment 1 [HASH]                                                                                                                                               >
     Output layout: [sum_10, count_9]                                                                                                                            >
     Output partitioning: SINGLE []                                                                                                                              >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                               >
     - Aggregate(PARTIAL) => [sum_10:double, count_9:bigint]                                                                                                     >
             sum_10 := "presto.default.sum"((quantity)) (1:60)                                                                                                   >
             count_9 := "presto.default.count"((partkey)) (mask = partkey$distinct) (1:35)                                                                       >
         - MarkDistinct[distinct=partkey:bigint marker=partkey$distinct] => [partkey:bigint, quantity:double, partkey$distinct:boolean]                          >
             - Project[projectLocality = LOCAL] => [partkey:bigint, quantity:double]                                                                             >
                     Estimates: {rows: 60175 (1.03MB), cpu: 7040475.00, memory: 0.00, network: 1624725.00}                                                       >
                 - LocalExchange[HASH][$hashvalue] (partkey) => [partkey:bigint, quantity:double, $hashvalue:bigint]                                             >
                         Estimates: {rows: 60175 (1.03MB), cpu: 5957325.00, memory: 0.00, network: 1624725.00}                                                   >
                     - RemoteSource[2] => [partkey:bigint, quantity:double, $hashvalue_11:bigint]                                                                >
                                                                                                                                                                 >
 Fragment 2 [tpch:orders:15000]                                                                                                                                  >
     Output layout: [partkey, quantity, $hashvalue_12]                                                                                                           >
     Output partitioning: HASH [partkey][$hashvalue_12]                                                                                                          >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                               >
     - ScanProject[table = TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false, projectLocal>
             Estimates: {rows: 60175 (1.55MB), cpu: 1083150.00, memory: 0.00, network: 0.00}/{rows: 60175 (1.55MB), cpu: 2707875.00, memory: 0.00, network: 0.00}>
             $hashvalue_12 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(partkey), BIGINT'0')) (1:79)                                                  >
             partkey := tpch:partkey (1:79)                                                                                                                      >
             quantity := tpch:quantity (1:79)                                                                                                                    >

@mbasmanova mbasmanova added question Further information is requested and removed enhancement New feature or request labels Mar 24, 2023
@rui-mo
Copy link
Collaborator Author

rui-mo commented Mar 27, 2023

@mbasmanova Masha, thanks a lot for the answering. Below is the full Spark plan for query:
select count(distinct ws_order_number), sum(ws_ext_ship_cost) from web_sales.
In this plan, merge_sum and partial_count coexist in the eighth section. And I haven't found any detailed doc yet, and will attach here if find any.

(1) Scan parquet 
Output [2]: [ws_order_number#747L, ws_ext_ship_cost#758]
Batched: true
Location: InMemoryFileIndex [file:/mnt/DP_disk2/tpcds_parquet_nopartition_with_date_with_decimal_1/web_sales]
ReadSchema: struct<ws_order_number:bigint,ws_ext_ship_cost:decimal(7,2)>

(2) ColumnarToRow [codegen id : 1]
Input [2]: [ws_order_number#747L, ws_ext_ship_cost#758]

(3) HashAggregate [codegen id : 1]
Input [2]: [ws_order_number#747L, ws_ext_ship_cost#758]
Keys [1]: [ws_order_number#747L]
Functions [1]: [partial_sum(UnscaledValue(ws_ext_ship_cost#758))]
Aggregate Attributes [1]: [sum(UnscaledValue(ws_ext_ship_cost#758))#853L]
Results [2]: [ws_order_number#747L, sum#863L]

(4) Exchange
Input [2]: [ws_order_number#747L, sum#863L]
Arguments: hashpartitioning(ws_order_number#747L, 288), ENSURE_REQUIREMENTS, [plan_id=112]

(5) ShuffleQueryStage
Output [2]: [ws_order_number#747L, sum#863L]
Arguments: 0

(6) AQEShuffleRead
Input [2]: [ws_order_number#747L, sum#863L]
Arguments: coalesced

(7) HashAggregate [codegen id : 2]
Input [2]: [ws_order_number#747L, sum#863L]
Keys [1]: [ws_order_number#747L]
Functions [1]: [merge_sum(UnscaledValue(ws_ext_ship_cost#758))]
Aggregate Attributes [1]: [sum(UnscaledValue(ws_ext_ship_cost#758))#853L]
Results [2]: [ws_order_number#747L, sum#863L]

(8) HashAggregate [codegen id : 2]
Input [2]: [ws_order_number#747L, sum#863L]
Keys: []
Functions [2]: [merge_sum(UnscaledValue(ws_ext_ship_cost#758)), partial_count(distinct ws_order_number#747L)]
Aggregate Attributes [2]: [sum(UnscaledValue(ws_ext_ship_cost#758))#853L, count(ws_order_number#747L)#852L]
Results [2]: [sum#863L, count#866L]

(9) Exchange
Input [2]: [sum#863L, count#866L]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=139]

(10) ShuffleQueryStage
Output [2]: [sum#863L, count#866L]
Arguments: 1

(11) HashAggregate [codegen id : 3]
Input [2]: [sum#863L, count#866L]
Keys: []
Functions [2]: [sum(UnscaledValue(ws_ext_ship_cost#758)), count(distinct ws_order_number#747L)]
Aggregate Attributes [2]: [sum(UnscaledValue(ws_ext_ship_cost#758))#853L, count(ws_order_number#747L)#852L]
Results [2]: [cast(count(ws_order_number#747L)#852L as string) AS order count#858, cast(MakeDecimal(sum(UnscaledValue(ws_ext_ship_cost#758))#853L,17,2) as string) AS total shipping cost#859]

(12) HashAggregate
Input [2]: [ws_order_number#747L, ws_ext_ship_cost#758]
Keys [1]: [ws_order_number#747L]
Functions [1]: [partial_sum(UnscaledValue(ws_ext_ship_cost#758))]
Aggregate Attributes [1]: [sum(UnscaledValue(ws_ext_ship_cost#758))#853L]
Results [2]: [ws_order_number#747L, sum#863L]

(13) Exchange
Input [2]: [ws_order_number#747L, sum#863L]
Arguments: hashpartitioning(ws_order_number#747L, 288), ENSURE_REQUIREMENTS, [plan_id=89]

(14) HashAggregate
Input [2]: [ws_order_number#747L, sum#863L]
Keys [1]: [ws_order_number#747L]
Functions [1]: [merge_sum(UnscaledValue(ws_ext_ship_cost#758))]
Aggregate Attributes [1]: [sum(UnscaledValue(ws_ext_ship_cost#758))#853L]
Results [2]: [ws_order_number#747L, sum#863L]

(15) HashAggregate
Input [2]: [ws_order_number#747L, sum#863L]
Keys: []
Functions [2]: [merge_sum(UnscaledValue(ws_ext_ship_cost#758)), partial_count(distinct ws_order_number#747L)]
Aggregate Attributes [2]: [sum(UnscaledValue(ws_ext_ship_cost#758))#853L, count(ws_order_number#747L)#852L]
Results [2]: [sum#863L, count#866L]

(16) Exchange
Input [2]: [sum#863L, count#866L]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=93]

(17) HashAggregate
Input [2]: [sum#863L, count#866L]
Keys: []
Functions [2]: [sum(UnscaledValue(ws_ext_ship_cost#758)), count(distinct ws_order_number#747L)]
Aggregate Attributes [2]: [sum(UnscaledValue(ws_ext_ship_cost#758))#853L, count(ws_order_number#747L)#852L]
Results [2]: [cast(count(ws_order_number#747L)#852L as string) AS order count#858, cast(MakeDecimal(sum(UnscaledValue(ws_ext_ship_cost#758))#853L,17,2) as string) AS total shipping cost#859]

(18) AdaptiveSparkPlan
Output [2]: [order count#858, total shipping cost#859]
Arguments: isFinalPlan=true

@mbasmanova
Copy link
Contributor

@rui-mo I found some description of this query plan in "Aggregate with One Distinct" section in https://dataninjago.com/2022/01/04/spark-deep-dive-12-aggregation-strategy/

I'm still reading it.

@mbasmanova
Copy link
Contributor

@rui-mo Wei is currently working on enhancing aggregate functions registration to automatically register a few companion functions for each aggregate function. Specifically, for aggregate function xxx, we'll automatically generate and register the following companion functions:

  • xxx_partial aggregate function - takes raw input and produces intermediate result
  • xxx_merge aggregate function - takes intermediate result and produces intermediate result
  • xxx_extract scalar function - takes intermediate result and produces final result

Once we have that the plan you describe above can be implemented using sum_partial, sum_merge and sum_extract companion functions like so:

  • Project: sum_extract(s4), c2
  • FinalAgg[]: s4 := sum_merge(s3), c2 := count(c1)
    • PartialAgg[]: s3 := sum_merge(s2), c1 := count()
      • FinalAgg[ws_order_number]: s2 := sum_partial(s1)
        • PartialAgg[ws_order_number]: s1:= sum_partial(ws_ext_ship_cost)

CC: @kagamiori

@rui-mo
Copy link
Collaborator Author

rui-mo commented Mar 28, 2023

@mbasmanova Thanks for your reply! It is very useful to us.

Specifically, for aggregate function xxx, we'll automatically generate and register the following companion functions.

Based on this, do you think it's good for us to fix this issue in a similar way? If I understand correctly, that is to change the Spark plan by adding a partial_count before the merge stage. Then, the Velox plan would become sth. like below:

  auto plan = PlanBuilder()
                  .values({rows})
                  .partialAggregation({"c0"}, {"sum(c1)", "count()"})
                  .intermediateAggregation({"c0"}, {"sum(a0)", "count()"}, resultType)
                  .intermediateAggregation({}, {"sum(a0)", "count(c0)"}, resultType)
                  .planNode();

@mbasmanova
Copy link
Contributor

@kagamiori Wei, would it be possible to publish a draft PR of the companion aggregate functions?

@rui-mo In the meantime, one option is to manually register companion aggregate function sum_partial, sum_merge and sum_extract. Then create Velox plan like so:

  auto plan = PlanBuilder()
                  .values({rows})
                  .partialAggregation({"c0"}, {"sum_partial(c1)"})
                  .localPartition({"c0"})
                  .finalAggregation()
                  .partialAggregation({}, {"sum_merge(a0)", "count(c0)"})
                  .localPartition()
                  .finalAggregation()
                  .planNode();

Note that for the production use case you'll need to create a distributed plan with shuffles in place of "localPartition".

Hope this helps.

@rui-mo
Copy link
Collaborator Author

rui-mo commented Mar 28, 2023

@mbasmanova Thank you. I see. Basically, we need to change the plan, instead of just offloading to Velox as Spark plans. We' ll try that. Thanks again!

@mbasmanova
Copy link
Contributor

@rui-mo Rui,

Just to clarify, my reading of "Aggregate with One Distinct" section in https://dataninjago.com/2022/01/04/spark-deep-dive-12-aggregation-strategy/ suggests that Spark's plan contains 6 nodes:

- partial agg(k1, k2, partial_sum)
   - shuffle(k1, k2)
      -  final agg(k1, k2, merge_sum)
         - partial agg(k1, merge_sum, count)
            - shuffle(k1)
               - final agg(k1, finalmerge_sum, count)

There are 2 shuffle nodes which break this plan into 3 fragments.

Fragment 1:

- partial agg(k1, k2, partial_sum)

Fragment 2:

-  final agg(k1, k2, partial_sum)
     - partial agg(k1, merge_sum, count)

Fragment 3:

- final agg(k1, finalmerge_sum, count)

Each fragment is translated to Velox plan and executed separately. The tricky part here is that partial_sum, merge_sum and finalmerge_sum aggregate functions do not map to a single sum aggregate function in Velox. These can be derived from the implementation of "sum", but still need to be defined and registered as separate aggregate functions.

Wei's work will make it easier to define and register these so-called companion functions, but we were thinking of defining these slightly differently. In our design partial_sum and merge_sum are the same, except that we planned to put 'partial' and 'merge' in the suffix rather then prefix. We didn't plan to provide finalmerge_sum function though. Instead, we offer sum_extract scalar function that can be combined with sum_merge to implement "finalmerge_sum". Doing so requires translating Fragment 3 into 2-node plan:

- project(sum_extract(s), c), 
   - final agg(k1, s := sum_merge, c := count)

I assume you refer to the addition of "project" node as "we need to change the plan". Let me know if that's not the case.

An alternative option is for Gluten to define Spark-compatible companion functions: partial_xxx, merge_xxx and finalmerge_xxx. This way the translation from Spark to Velox plan might be simpler.

CC: @kagamiori

@mbasmanova
Copy link
Contributor

@kagamiori Wei, let's discuss if it makes sense to for us to add "finalmerge" companion aggregate function.

@rui-mo
Copy link
Collaborator Author

rui-mo commented Mar 28, 2023

@mbasmanova Masha, let me clarify a bit here.

I assume you refer to the addition of "project" node as "we need to change the plan". Let me know if that's not the case.

This is how we translate Spark plan to Velox plan in this case. It's a one-one mapping from Spark plan to Velox plan, and that means AggregationNode::Step::kPartial is used for Fragment 1, AggregationNode::Step::kFinal is used for Fragment 3, and AggregationNode::Step::kIntermediate is used for Fragment 2 (7).

The problem occurs in Fragment 2 (8), in which merge_sum and partial_count functions coexist (they belong to different aggregation steps). If we simply use AggregationNode::Step::kIntermediate in Velox AggregationNode, incorrect result is got for the count function. Through debugging, I find decodedIntermediate_.valueAt<int64_t>(i) is invalid.

Fragment 1: convert to partialAggregation({"c0"}, {"sum(c1)"}) in Velox.

(3) HashAggregate [codegen id : 1]
Input [2]: [ws_order_number#747L, ws_ext_ship_cost#758]
Keys [1]: [ws_order_number#747L]
Functions [1]: [partial_sum(UnscaledValue(ws_ext_ship_cost#758))]
Aggregate Attributes [1]: [sum(UnscaledValue(ws_ext_ship_cost#758))#853L]
Results [2]: [ws_order_number#747L, sum#863L]

Fragment 2: convert to intermediateAggregation({"c0"}, {"sum(a0)"}, resultType) and intermediateAggregation({}, {"sum(a0)", "count(c0)"}, resultType)

(7) HashAggregate [codegen id : 2]
Input [2]: [ws_order_number#747L, sum#863L]
Keys [1]: [ws_order_number#747L]
Functions [1]: [merge_sum(UnscaledValue(ws_ext_ship_cost#758))]
Aggregate Attributes [1]: [sum(UnscaledValue(ws_ext_ship_cost#758))#853L]
Results [2]: [ws_order_number#747L, sum#863L]

(8) HashAggregate [codegen id : 2]
Input [2]: [ws_order_number#747L, sum#863L]
Keys: []
Functions [2]: [merge_sum(UnscaledValue(ws_ext_ship_cost#758)), partial_count(distinct ws_order_number#747L)]
Aggregate Attributes [2]: [sum(UnscaledValue(ws_ext_ship_cost#758))#853L, count(ws_order_number#747L)#852L]
Results [2]: [sum#863L, count#866L]

Fragment 3: convert to finalAggregation({}, {"sum(a0)", "count(c0)"}, resultType)

(11) HashAggregate [codegen id : 3]
Input [2]: [sum#863L, count#866L]
Keys: []
Functions [2]: [sum(UnscaledValue(ws_ext_ship_cost#758)), count(distinct ws_order_number#747L)]
Aggregate Attributes [2]: [sum(UnscaledValue(ws_ext_ship_cost#758))#853L, count(ws_order_number#747L)#852L]
Results [2]: [cast(count(ws_order_number#747L)#852L as string) AS order count#858, cast(MakeDecimal(sum(UnscaledValue(ws_ext_ship_cost#758))#853L,17,2) as string) AS total shipping cost#859]

Based on above discussion, I believe we need to change the plan to make it work in Velox. The purpose it to avoid putting partial_count and merge_sum together in a single Aggregation Node, for example, by adding a partial count to Fragment 1. Does that make sense?

@rui-mo
Copy link
Collaborator Author

rui-mo commented Mar 28, 2023

Also have a question here.

In our design partial_sum and merge_sum are the same.

I believe merge_sum relies on the intermediate result as input, but partial_sum starts from zero. Why they are treated the same?

@rui-mo rui-mo changed the title Any plan to support an AggregationNode with different function phases? How to handle an aggregation with functions of different steps? Mar 28, 2023
@mbasmanova
Copy link
Contributor

@rui-mo It sounds to me that translation from Spark to Velox plan is incorrect. How do you derive the aggregation step during translation? I expect aggregation steps: partial, final, partial, final for that query plan.

@mbasmanova
Copy link
Contributor

I believe merge_sum relies on the intermediate result as input, but partial_sum starts from zero. Why they are treated the same?

I meant that in our design sum_partial matches Spark's partial_sum and sum_merge matches Spark's merge_sum.

@rui-mo
Copy link
Collaborator Author

rui-mo commented Mar 29, 2023

@rui-mo It sounds to me that translation from Spark to Velox plan is incorrect. How do you derive the aggregation step during translation? I expect aggregation steps: partial, final, partial, final for that query plan.

@mbasmanova Masha, here are some details for Spark aggregation phases.
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L34-L63

We use below mapping relationship:

Spark Partial => Velox AggregationNode::Step::kPartial
Spark PartialMerge => AggregationNode::Step::kIntermediate
Spark Final => AggregationNode::Step::kFinal
Spark Complete => AggregationNode::Step::kSingle

Functions [2]: [merge_sum(UnscaledValue(ws_ext_ship_cost#758)), partial_count(distinct ws_order_number#747L)]

In Spark, merge_sum belongs to PartialMerge, and partial_count belongs to Partial, so we are confused here.

@mbasmanova
Copy link
Contributor

@rui-mo Thank you for the pointer. I see that in Spark, AggregateMode is attached to an aggregate expression, not to Aggregate plan node. Is this the case? Do you have a pointer to the definition of aggregation plan node? Are you seeing a single aggregation plan node with 2 aggregate expressions that use different AggregateMode?

@rui-mo
Copy link
Collaborator Author

rui-mo commented Mar 30, 2023

@mbasmanova Appreciate your time on this issue.

I see that in Spark, AggregateMode is attached to an aggregate expression, not to Aggregate plan node. Is this the case?

Yes, that's the case. Spark's aggregate modes can differ between aggregate expressions, but that is not a common case although.

Are you seeing a single aggregation plan node with 2 aggregate expressions that use different AggregateMode?

Yes, the case described above is just like that. And we only notice this issue in this case for now. At below plan, merge_sum means PartialMerge mode in Spark, and partial_count means Partial mode. Here is the link on how Spark translate mode to string: interfaces.scala#L161-L166.

(8) HashAggregate [codegen id : 2]
Input [2]: [ws_order_number#747L, sum#863L]
Keys: []
Functions [2]: [merge_sum(UnscaledValue(ws_ext_ship_cost#758)), partial_count(distinct ws_order_number#747L)]
Aggregate Attributes [2]: [sum(UnscaledValue(ws_ext_ship_cost#758))#853L, count(ws_order_number#747L)#852L]
Results [2]: [sum#863L, count#866L]

Do you have a pointer to the definition of aggregation plan node?

This is the class in Spark for hash aggregation: HashAggregateExec.scala#L47-L56. Mode can be accessed from aggregateExpressions.

@mbasmanova
Copy link
Contributor

@rui-mo Rui, thank you for the pointers. This is a bit tricky. I see a couple of issues:

1- Velox doesn't support mixed aggregations;
2- Velox doesn't support aggregation over distinct values agg(distinct x).

Functions [2]: [merge_sum(UnscaledValue(ws_ext_ship_cost#758)), partial_count(distinct ws_order_number#747L)]

We should add support for (2), but I'm not sure about (1).

In Velox, partial and intermediate aggregations differ from final aggregation in that they do not spill, but rather flush results if memory is tight. This means that it is possible for the results of partial or intermediate aggregation to include rows with duplicate grouping keys. The Spark plan we consider here assumes that the results of intermediate aggregation does not have duplicate grouping keys.

It might be possible to introduce a configuration flag to disable flushing in partial and intermediate aggregations and enable spill, but this would be quite complex change and it is not clear to me whether it makes sense. CC: @xiaoxmeng

Basically, we need to change the plan, instead of just offloading to Velox as Spark plans.

I now understand what you meant here. Do you think it would be possible to do that in Gluten?

@rui-mo
Copy link
Collaborator Author

rui-mo commented Mar 31, 2023

@mbasmanova Thanks for the comments. They're very useful to us.

Velox doesn't support aggregation over distinct values agg(distinct x).

We thought distinct count is not a problem bacause Spark plans grouping expressions in previous aggregations. It was assumed the input for this aggregation was already distinct, so normal count could do the work. But you remind me another thing:

This means that it is possible for the results of partial or intermediate aggregation to include rows with duplicate grouping keys.

This means the output of Velox's partial or intermediate aggregation can be not distinct, so normal count would not work then. Yeah, to disable flush is a solution to ensure the input for count is already distinct.

Do you think it would be possible to do that in Gluten?

Yes, we can try that if Velox decides not to support mixed aggregations. In that case, maybe some change to Spark plan as below could work.

The original Spark plan:

- group by a, partial_sum(b) --> Velox AggregationNode::Step::kPartial
- Shuffle
- group by a, merge_sum(b)  --> AggregationNode::Step::kIntermediate
- merge_sum(b), partial_count(distinct a) --> AggregationNode::Step::kIntermediate ?
- Shuffle
- final_sum(b), final_count(a) --> AggregationNode::Step::kFinal

The changed Spark plan:

- group by a, partial_sum(b) --> Velox AggregationNode::Step::kPartial
- Shuffle
- group by a, final_sum(b)  --> AggregationNode::Step::kFinal
- partial_sum(b), partial_count(a) --> AggregationNode::Step::kPartial
- Shuffle
- final_sum(b), final_count(a) --> AggregationNode::Step::kFinal

@mbasmanova
Copy link
Contributor

In that case, maybe some change to Spark plan as below could work.

@rui-mo Something like this should work. Just make sure to use merge_sum, not final_sum in 2-nd and 3rd aggregations. For 'sum' it doesn't really matter, but if you replace 'sum' with 'avg' then you'll notice the difference.

@rui-mo
Copy link
Collaborator Author

rui-mo commented Mar 31, 2023

For 'sum' it doesn't really matter, but if you replace 'sum' with 'avg' then you'll notice the difference.

@mbasmanova You're right, avg is different. If we use merge_avg in the 2-nd and 3-nd aggregations, this issue comes back. We need think futher here.

@mbasmanova
Copy link
Contributor

If we use merge_avg in the 2-nd and 3-nd aggregations, this issue comes back.

Somehow, I think this should work. By 'merge_avg' I refer to a companion function to 'avg', not orgiinal 'avg'.

@rui-mo
Copy link
Collaborator Author

rui-mo commented Apr 3, 2023

Somehow, I think this should work. By 'merge_avg' I refer to a companion function to 'avg', not orgiinal 'avg'.

@mbasmanova Understood. We will try to support the case described in this issue soon, and keep you updated. Thank you so much!

@mbasmanova
Copy link
Contributor

@rui-mo Thank you, Rui. Let us know how it goes and feel free to reach out in case you run into any issues.

@kagamiori
Copy link
Contributor

Hi @rui-mo, we're designing an adapter to automatically make companion functions like merge_avg available when a UDAF avg is registered: #4493. I think this could benefit your use case as well. Please let us know if you have any suggestions or ideas to share. (cc @mbasmanova)

@rui-mo
Copy link
Collaborator Author

rui-mo commented Apr 4, 2023

@mbasmanova @kagamiori I tried #4489 in a unit test, and found it could work for this case by using sum_merge and avg_merge. The next step is to generate such Velox plan through Gluten, and we assume the case described in this issue can be offloaded to Velox then.

The unit test is:

TEST_F(AverageAggregationTest, companion) {
  auto rows = makeRowVector({
    makeFlatVector<int32_t>(100, [&](auto row) { return row % 10; }),
    makeFlatVector<int32_t>(100, [&](auto row) { return row * 2; }),
    makeFlatVector<int32_t>(100, [&](auto row) { return row; })});
  
  createDuckDbTable("t", {rows});

  std::vector<TypePtr> resultType = {BIGINT(), ROW({DOUBLE(), BIGINT()})};
  auto plan = PlanBuilder()
                  .values({rows})
                  .partialAggregation({"c0"}, {"avg(c1)", "sum(c2)"})
                  .intermediateAggregation({"c0"}, {"avg(a0)", "sum(a1)"},
                      {ROW({DOUBLE(), BIGINT()}), BIGINT()})
                  .aggregation(
                      {},
                      {"avg_merge(a0)", "sum_merge(a1)", "count(c0)"},
                      {},
                      core::AggregationNode::Step::kPartial,
                      false,
                      {ROW({DOUBLE(), BIGINT()}), BIGINT(), BIGINT()})
                  .finalAggregation({}, {"avg(a0)", "sum(a1)", "count(a2)"}, {DOUBLE(), BIGINT(), BIGINT()})
                  .planNode();
  assertQuery(plan, "SELECT avg(c1), sum(c2), count(distinct c0) from t");
}

The Velox plan node is:

-- Aggregation[FINAL a0 := avg(ROW["a0"]), a1 := sum(ROW["a1"]), a2 := count(ROW["a2"])] -> a0:DOUBLE, a1:BIGINT, a2:BIGINT
  -- Aggregation[PARTIAL a0 := avg_merge(ROW["a0"]), a1 := sum_merge(ROW["a1"]), a2 := count(ROW["c0"])] -> a0:ROW<"":DOUBLE,"":BIGINT>, a1:BIGINT, a2:BIGINT
    -- Aggregation[INTERMEDIATE [c0] a0 := avg(ROW["a0"]), a1 := sum(ROW["a1"])] -> c0:INTEGER, a0:ROW<"":DOUBLE,"":BIGINT>, a1:BIGINT
      -- Aggregation[PARTIAL [c0] a0 := avg(ROW["c1"]), a1 := sum(ROW["c2"])] -> c0:INTEGER, a0:ROW<"":DOUBLE,"":BIGINT>, a1:BIGINT
        -- Values[100 rows in 1 vectors] -> c0:INTEGER, c1:INTEGER, c2:INTEGER

Thank you for all your support!

@mbasmanova
Copy link
Contributor

@rui-mo Thank you for looking into this. I think the Velox plan should look like this:

auto plan = PlanBuilder()
                  .values({rows})
                  .partialAggregation({"c0"}, {"avg_partial(c1)", "sum_partial(c2)"})
                  .finalAggregation()
                  .partialAggregation({}, {"avg_merge(a0)", "sum_merge(a1)", "count(c0)"})
                  .finalAggregation()
                  .project({"avg_extract(a0)", "sum_extract(a1)", "a2"})
                  .planNode();

First, group-by on c0 and compute avg_partial and sum_partial. Then, global agg with avg_merge, sum_merge and count. Finally, a project using avg_extract and sum_extract.

@rui-mo
Copy link
Collaborator Author

rui-mo commented Apr 6, 2023

auto plan = PlanBuilder()
.values({rows})
.partialAggregation({"c0"}, {"avg_partial(c1)", "sum_partial(c2)"})
.finalAggregation()
.partialAggregation({}, {"avg_merge(a0)", "sum_merge(a1)", "count(c0)"})
.finalAggregation()
.project({"avg_extract(a0)", "sum_extract(a1)", "a2"})
.planNode();

Hi Masha @mbasmanova, why do we need the second finalAggregation after the first partialAggregation? Because for avg, sum / count is computed in finalAggregation, and the extra division in the second finalAggregation would make the result incorrect. What I assume is the accumulation for sum and count should be continued till the last finalAggregation, and at there the division is computed. Please correct me if I'm wrong.

-- Aggregation[FINAL a0 := avg(ROW["a0"]), a1 := sum(ROW["a1"]), a2 := count(ROW["a2"])] -> a0:DOUBLE, a1:BIGINT, a2:BIGINT
  -- Aggregation[PARTIAL a0 := avg_merge(ROW["a0"]), a1 := sum_merge(ROW["a1"]), a2 := count(ROW["c0"])] -> a0:ROW<"":DOUBLE,"":BIGINT>, a1:BIGINT, a2:BIGINT
    -- Aggregation[INTERMEDIATE [c0] a0 := avg(ROW["a0"]), a1 := sum(ROW["a1"])] -> c0:INTEGER, a0:ROW<"":DOUBLE,"":BIGINT>, a1:BIGINT
      -- Aggregation[PARTIAL [c0] a0 := avg(ROW["c1"]), a1 := sum(ROW["c2"])] -> c0:INTEGER, a0:ROW<"":DOUBLE,"":BIGINT>, a1:BIGINT
        -- Values[100 rows in 1 vectors] -> c0:INTEGER, c1:INTEGER, c2:INTEGER

With above plan, because avg_merge's addRawInput function actually calls addIntermediateResults, the issue of mixed aggregation mode can be resolved. Although flush during intermediate aggregation should be disabled to ensure the input rows of count are distinct, this way of computing is aligned with Spark's plan.

Appreciate your help!

@mbasmanova
Copy link
Contributor

@rui-mo In this query, we first group by c0, then perform global aggregation. Strictly speaking, it is incorrect to computer intermediate aggregation for group-by-c0 and use the results to compute a different global aggregation. We need to finish group-by-c0 by running final aggregation. To make sure that final aggregation returns "intermediate" results, we use avg_partial and sum_partial companion functions instead of avg and sum. This way, we do not need to disable flushing in partial or intermediate aggregations. (Disabling flushing is tricky because doing so will require enabling spilling. Otherwise, some aggregations may run out of memory.)

@mbasmanova
Copy link
Contributor

  -- Aggregation[PARTIAL [c0] a0 := avg(ROW["c1"]), a1 := sum(ROW["c2"])] -> c0:INTEGER, a0:ROW<"":DOUBLE,"":BIGINT>, a1:BIGINT

@rui-mo In the above plan node, we should replace avg and sum with avg_partial and sum_partial companion functions.

@rui-mo
Copy link
Collaborator Author

rui-mo commented Apr 12, 2023

To make sure that final aggregation returns "intermediate" results, we use avg_partial and sum_partial companion functions instead of avg and sum. This way, we do not need to disable flushing in partial or intermediate aggregations.

@mbasmanova Thank you for your reply! I finally understand your suggestion. A quick update is, we implemented the related code in Gluten to use companion functions to solve this issue. It worked for us but was based on the intermediate aggregation way. We will continue to work on the improvement as you suggested.

@mbasmanova
Copy link
Contributor

@rui-mo Rui, thank you for the update. Happy to hear you are not blocked and making nice progress. Let us know if you'd like any further help.

@rui-mo
Copy link
Collaborator Author

rui-mo commented Apr 13, 2023

@mbasmanova Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants