Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inefficient and redundant implementation of aggregation states. #11820

Open
radek-kondziolka opened this issue Apr 6, 2022 · 21 comments
Open
Labels
enhancement New feature or request performance

Comments

@radek-kondziolka
Copy link
Contributor

The problem

There are some redundant calls for aggregating operators that use more than one state.

Let's take the real example - io.trino.operator.aggregation.RealAverageAggregation

public class RealAverageAggregation
        extends SqlAggregationFunction
{
    ...
    public static void input(LongState count, DoubleState sum, long value)
    {
        count.setValue(count.getValue() + 1);
        sum.setValue(sum.getValue() + intBitsToFloat((int) value));
    }

    public static void combine(LongState count, DoubleState sum, LongState otherCount, DoubleState otherSum)
    {
        count.setValue(count.getValue() + otherCount.getValue());
        sum.setValue(sum.getValue() + otherSum.getValue());
    }

    public static void output(LongState count, DoubleState sum, BlockBuilder out)
    {
        if (count.getValue() == 0) {
            out.appendNull();
        }
        else {
            REAL.writeLong(out, floatToIntBits((float) (sum.getValue() / count.getValue())));
        }
    }
}

This aggregation function bases on two states - LongState and DoubleState. Let's call them atomic states.
Currently, for an aggregation functions that uses more than one atomic state, AccumulatorCompiler wraps that set of atomic states to RowType. As a result, to make groupped accumulator state groupId aware, the compiler generates setGroupId call (memory store) and getGroupId (memory load) for every atomic state.

It could be not so expensive but it is important to note that:

  • addInput method is called per every row. Let's how looks like generated code for:
   public void addInput(GroupByIdBlock groupIdsBlock, Page arguments, Optional mask) {
      this.state_0.ensureCapacity(groupIdsBlock.getGroupCount());
      this.state_1.ensureCapacity(groupIdsBlock.getGroupCount());
      Block masksBlock = (Block)mask.orElse((Object)null);
      Block block0 = arguments.getBlock(0);
      int rows = arguments.getPositionCount();
      int position = false;
      if (!AggregationUtils.maskGuaranteedToFilterAllRows(rows, masksBlock)) {
         for(int position = 0; CompilerOperations.lessThan(position, rows); ++position) {
            if (CompilerOperations.testMask(masksBlock, position) && !block0.isNull(position)) {
               this.state_0.setGroupId(groupIdsBlock.getGroupId(position));
               this.state_1.setGroupId(groupIdsBlock.getGroupId(position));
               this.state_0.input<invokedynamic>(this.state_0, this.state_1, block0, position);
            }
         }
      }

   }
  • addIntermediate method method is called per every partial group:
      for(int position = 0; CompilerOperations.lessThan(position, rows); ++position) {
         if (!block.isNull(position) && !groupIdsBlock.isNull(position)) {
            this.state_0.setGroupId(groupIdsBlock.getGroupId(position));
            GroupedAccumulatorState var10000 = this.state_0;
            this.state_1.setGroupId(groupIdsBlock.getGroupId(position));
            GroupedAccumulatorState var10001 = this.state_1;
            this.stateSerializer_0.deserialize(columnBlock_0, position, (AccumulatorState)scratchState_0);
            this.stateSerializer_1.deserialize(columnBlock_1, position, (AccumulatorState)scratchState_1);
            var10000.combine<invokedynamic>(var10000, var10001, scratchState_0, scratchState_1);
         }
      }
  • evaluateIntermediate, evaluateFinal are similar to the addIntermediate

Especially, that problem rises when we are trying to rewrite decimal operators (sum, avg) to use atomic state. In that case we need to replace the dedicated state with three atomic states. As a result it is impossible to rewrite this without performance degredation.

Some results

Currently, the decmial average function has a dedicated accumulator state (that means - not generated by a compiler): LongDecimalWithOverflowAndLongState. It was replaced by me with atomic states and collected microbenchmark DecimalAverageAggregation.benchmark. Actually, that metod benchmarks addInput method. This one is very performance sensitive because - let's say it one more time - it is executed per every row.

The result is following:

(baseline) benchmarkAddInput   avg          1000       LONG  avgt    8      7.701           ns/op
(atomic)   benchmarkAddInput   avg          1000       LONG  avgt    8      9.960           ns/op

Let's take the "atomic" version of avg decimal function and apply change that can be described
as: "Do not setGroupId and getGroupId for every atomic state. Instead of that, just pass
groupId as an argument to the DecimalAverageAggregation.inputLongDecimal (and, respectively,
add specialized methods for getting / setting state value, e.g.. By specialized metod I mean the additional methods that takes the groupId as an argument to get / save the value from / to an atomic state.
It is just only a demonstration version to show an negative impact of setting / getting groupId for every atomic state).
Let's look at result now:

(baseline)              benchmarkAddInput           avg              7.701           ns/op
(atomic)                benchmarkAddInput           avg              9.960           ns/op
(atomic+no_set_groupId) benchmarkAddInput           avg              7.287           ns/op

To sum up:
*There are some actually redundant memory stores and memory loads executed per row / partial groups. That redundancy is not free - there is a perofmance degradation. This is why that issue was created. In the next section there are presentend sketeches of some possible approaches.

Propostion

There could be some sketch of the approaches we can take here:

  1. Add an annotation @GroupId and pass groupId as an argument to methods addInput, addCombine and so on.
    That way makes operators groupId-aware. Now, operators are groupId agnostic.
    However, this change can be added without any change to existing operators. We could just rewrite some operators that are performance critical - like decimal operators.
  2. For every generated state (and only for them - unfortunately - we need to remember that we have much more dedicated states as well) we could inject special object by reference - something like AccumulatorMetadata. That object could track the groupId for all atomic states.
    Moreover, that AccumulatorMetadata object could keep the information about potential emptiness of the group - now even for empty (null) group combine, serialize, desarialize methods are called. In a result the combine method is complicated - bigger, slower, more hardly inlineable - for decimal sum it was proved that it could much faster (like 1.5x - 2x times).

This issue should just open the discussion and draw our attention to it.

@sopel39
Copy link
Member

sopel39 commented Apr 6, 2022

cc @martint @dain

@radek-kondziolka radek-kondziolka added performance enhancement New feature or request labels Apr 6, 2022
@martint
Copy link
Member

martint commented Apr 27, 2022

Option 1 is reasonable. Option 2 is too complicated.

Also, we're going to need to make the group id explicit in the calls anyway to be able to support enhanced aggregation implementations that can use vectorization APIs, GPU, etc.

@dain
Copy link
Member

dain commented May 17, 2022

Option 1 is not reasonable. This is not how aggregations work and would rewrite redesigning the framework.
Option 2 I don't understand but sounds complicated.

If your goal is to reduce the extra calls in generated aggregation code, then then you could generate a single aggregation state the encodes all nested states into one object. This could help slightly but honestly, I don't think this minor performance change is worth any effort.

@dain
Copy link
Member

dain commented May 17, 2022

BTW, my guess is the performance change is more related to explicitly passing the groupId value than it is to merging the two states together. Under the covers these grouped states are just arrays, and passing the array index value via a parameter is likely to be faster then dereferencing the array index from a field in the object.... BTW, just a guess

@martint
Copy link
Member

martint commented May 17, 2022

and passing the array index value via a parameter is likely to be faster then dereferencing the array index from a field in the object...

That's exactly what option 1 is about -- pass the group id explicitly as an argument to the method instead of calling setGroupId first and then calling the method. And yes, the performance issue is from dereferencing the group id from a field in every call.

@radek-kondziolka
Copy link
Contributor Author

radek-kondziolka commented May 17, 2022

  1. Please note that we are going to rewrite dedicated state LongDecimalWithOverflowAndLongState to 3 atomic states. When we do it, we have two extra call setGroupId for every row, partial group and so on.
  2. The second issue: When we have an explicit groupId argument we need to have two methods:
  • addInput(state, .., groupId) (and same for other aggregating methods) for grouping case
  • addInput(state, ..) (and same for other aggregating methods) for not grouping case
    Moreover, we need to have an additional interface, like:
interface LongState {
   void setValue(long value);
} 

interface GroupedLongState {
   void setValue(long groupId, long value);
} 

but only for some operators.

It is doeable by kind of extension and changes in compilers. I am aware that it is the change to the aggregation framework and this is why I've created that issue.
Anyway, if we want to rewrite LongDecimalWithOverflowAndLongState to atomic states without performance degredation we have to avoid extra setGroupId per every row.

@radek-kondziolka
Copy link
Contributor Author

radek-kondziolka commented May 18, 2022

If your goal is to reduce the extra calls in generated aggregation code, then then you could generate a single aggregation state the encodes all nested states into one object. This could help slightly but honestly, I don't think this minor performance change is worth any effort.

If I understand you correctly, this what LongDecimalWithOverflowAndLongState do - it is written (not generated) but it implements that idea.

@sopel39
Copy link
Member

sopel39 commented May 18, 2022

The second issue: When we have an explicit groupId argument we need to have two methods:

I'm not sure you need two methods and two interfaces. It could be that for non partitioned case, group id is just 0.

@radek-kondziolka
Copy link
Contributor Author

radek-kondziolka commented May 18, 2022

I'm not sure you need two methods and two interfaces. It could be that for non partitioned case, group id is just 0.

Yes, I was thinking about it. Then, you have the inteface like:

interface LongState {
    void setValue(long groupId, long value);
    long getValue(long groupId);
}

class SingleLongState implements LongState {
    void setValue(long groupId, long value) {
           this.value = value; // ignore groupId
     }
    long getValue(long groupId) {
         return this.value;
    }

class GroupedLongState implements LongState {
    void setValue(long groupId, long value) {
           // set the element in the array by groupId
     }
    long getValue(long groupId) {
           // get the element from the array by groupId
    }
}

I decided that it is too confusing for our users of API (in the future). For internal purpose it is acceptable for me (and good approach). But, it is a debatable issue, that approach is simpler and more elegant. What do you think @dain , @martint ?

@dain
Copy link
Member

dain commented May 26, 2022

I have been thinking about this for a while now, and think there is an approach that is backward and not too complex (maybe you all have already thought of this).

I believe the proposal is to convert the function signature from:

void input(LongState count, DoubleState sum, long value);
void combine(LongState count, DoubleState sum, LongState otherCount, DoubleState otherSum);
void output(LongState count, DoubleState sum, BlockBuilder out);

to

void input(@GroupId long groupId, LongState count, DoubleState sum, long value);
void combine(@GroupId long groupId, LongState count, DoubleState sum, @GroupId long groupId, LongState otherCount, DoubleState otherSum);
void output(@GroupId long groupId, LongState count, DoubleState sum, BlockBuilder out);

We could adapt the old interface to the new interface using basic method handle combinators. Basically we use combinators to process the args as follows:

void input(@GroupId long groupId, LongState count, DoubleState sum, long value)
{
    count.setGroupId(groupId);
    sum.getGroupId(groupId);
    oldStyleInput(count, sum, value);
}

This can be done completely without byte code generation.

So, assuming we want to make this change, we would change the AggregationMetadata to expect the new method signature, and obviously change the aggregation compiler to expect invoke the new signature. Finally, the AggregationFromAnnotationsParser would be changed to detect the old and new patterns. When the old is detected the method handle would be adapted to the new signature (using combinators). Finally, aggregations would be switched to the new style. This work should be based on the PR i have up that rewrites all (except for reduce) aggregations to annotated forms, which will make this easier. Also, that PR adds a number of new features that would conflict with this change.

@radek-kondziolka
Copy link
Contributor Author

radek-kondziolka commented May 26, 2022

Please go to: #11820 (comment)


It solves the problem of input/combine/output interfaces.

Now, we have to imagine how our "atomic" states interfaces should look like. We have two methods:

oldstyle one:

    void oldStyleInput(LongState count, LongState sum, long value) {
         count.setValue(count.getValue() + value); // We have the method without `groupId`
    }

newstyle one:

    void oldStyleInput(LongState count, LongState sum, long value, @GroupId int groupId) {
         count.setValue(groupId, count.getValue(groupId) + value); // We have the method with `groupId` given
    }

Potentially, we would have the interface:

class LongState {
    void setValue(@GroupId int groupId, long value); 
    void setValue(long value); 
    ...
 }

and generated implementation of SingleLongState could just throw the exception UnsupportedOperation for grouped operations. But we have another problem: the combine takes two parameters: grouped state for the leftside, and single state for the rightside - I mean:

    void combine(@GroupId int groupId, LongState lhs, LongState rhs) {
          lhs.setValue(groupId, lhs.getGroupId(groupId) + rhs.getValue(groupId)); 
    }

This method causes the UnsupportedException because rhs is actually Single state. Especially, it would be very misleading and confusing for our potential users of our API.

These are my rationale and this is why I decided that explicit states for group-id aware states should be separated states.
It is not nice, super seamless and so on. It is possible that I cannot find here any better idea to implement it.

@sopel39
Copy link
Member

sopel39 commented May 26, 2022

and generated implementation of SingleLongState could just throw the exception UnsupportedOperation for grouped operations.

Why would it fail? Maybe it could just assert group id is 0

@radek-kondziolka
Copy link
Contributor Author

radek-kondziolka commented May 26, 2022

Why would it fail? Maybe it could just assert group id is 0

It is one option. However, combine is still problematic. Anyway, if do not assert groupId == 0 for SingleState, this approach works and it resolves all problems I see (for now). So, instead of making an assumption about groupId is zero or not, let's ignore that for not grouping case.

Summary

It looks like we have a kind of common version. To sum up our discussion, our proposal changes could be expressed as:

Make aggregation operators in Trino group oriented instead of not-group oriented.

Then, basically we have:

interface LongState {
    void setValue(int groupId, long value);
}

class SingleLongState {
    long v;
    void setValue(int groupId, long value) {
         this.v = value;
   } 
}

class GroupedLongState {
    LongBigArray longs;
    void setValue(int groupId, long value) {
         longs.set(groupId, value);
   } 
}

and similarly for serializer / deserializers. Then StateCompiler / AccumulatorCompiler could be smart enough to be backward compatible and we could rewrite it step by step.

Do I miss someting, is it ok? @dain @martint @sopel39 ?

@dain
Copy link
Member

dain commented May 26, 2022

Yes. Also since we are effectively introducing a v2 of the aggregation annotations, we should do a review of the APIs here since anyone using v2 has to rewrite their code. I say this because processing the very flexible super-legacy annotated aggregations makes the code very complex, so the more we can do to say "it must be exactly like this" makes this stuff easier to maintain.

Specifically, we should require full annotation of the parameters aggregation, as I did rewrite PR I mentioned above. In that PR, I added support for having multiple accumulator states, and for the aggregations that switched to that mode, I added tighter controls over annotation requirements, and argument ordering. For example, I would require the argument for input method to be:

  1. Injected dependencies (operators, types, etc)
  2. @GroupId
  3. @AggregationState
  4. @SqlType input parameters
    I would have similar rules for combine and output. Similarly, for states and serializers. If we find the rules too burdensome, we can relax them later, but this is our one chance to tighten things up.

@radek-kondziolka
Copy link
Contributor Author

radek-kondziolka commented May 27, 2022

Sketch of the APIv2:

AggregationFunction

  1. input

    @InputFunction
    void input(
      `Injected dependencies (operators, types, etc)`, 
      @GroupId long groupId,
      @AggregationState AccumulatorState state ..., 
      @SqlType input parameters);
    
    • Injected dependencies are optional.

    • parameters represent the input value that can be written as:

      @SqlType("TYPE") Type value

      or

      @BlockPosition @SqlType(value = SQL_TYPE, nativeContainerType = type.class) Block block, @BlockIndex int position

  2. removeInput

    @RemoveInputFunction
    void removeInput(
      `Injected dependencies (operators, types, etc)`, 
      @GroupId long groupId,
      @AggregationState AccumulatorState state ..., 
      @SqlType input parameters);
    
    • Injected dependencies are optional.

    • parameters represent the input value that can be written as:

      @SqlType("TYPE") Type value

      or

      @BlockPosition @SqlType(value = SQL_TYPE, nativeContainerType = type.class) Block block, @BlockIndex int position

  3. combine

    @CombineFunction
    void combine(
      `Injected dependencies (operators, types, etc)`, 
      @GroupId long groupId, 
      @AggregationState AccumulatorState state...,
      @AggregationState AccumulatorState otherState...);
    
    • Injected dependencies are optional.
  4. output

    @OutputFunction("TYPE")
    void output(
      `Injected dependencies (operators, types, etc)`, 
      @GroupId long groupId, 
      @AggregationState AccumulatorState state..., 
      BlockBuilder out);
    
    • Injected dependencies are optional.

Serializer / Deserializer

public interface AccumulatorStateSerializer<T extends AccumulatorState>
{
    Type getSerializedType();

    void serialize(long groupId, T state, BlockBuilder out);
    void deserialize(long groupId, Block block, int index, T state);
}

AccumulatorState

public interface AccumulatorState
{
    long getEstimatedSize();

    default AccumulatorState copy()
    {
        throw new UnsupportedOperationException("copy not implemented for " + getClass());
    }
}

class LongState implements AccumulatorState {
      long getValue(@GroupId long groupId);
      void setValue(@GroupId long groupId, long value);
}

@dain , is it ok for you?

@sopel39
Copy link
Member

sopel39 commented May 30, 2022

Finally, aggregations would be switched to the new style. This work should be based on the PR i have up that rewrites all (except for reduce) aggregations to annotated forms, which will make this easier. Also, that PR adds a number of new features that would conflict with this change.

@dain do you mean #11477? Is #12588 also something that should land first?

@dain
Copy link
Member

dain commented Jun 1, 2022

@radek-starburst some notes:

  • All functions support Injected dependencies (operators, types, etc), most don't use them. For example, in the output function you show @TypeParameter("TYPE") Type type which is a type dependency.
  • Block and position shown in the first input is just another argument calling convention, so same thing as parameters.
  • You forgot about the remove function, but it has the same shape as input.
  • AccumulatorStateSerializer doesn't need the @GroupId annotation on the argument since this is an interface we provide that the function provider implements
  • For the AccumulatorState, I don't think I would require the @GroupId annotation either, but I'm not sure here. Unlike the main aggregation functions, it is very clear which argument is the group id, but it might be nice for readability... I don't have a strong preference either way.
  • The new InOut state (see MaxAggregationFunction) will need to be modified as part of this. This will require changes to the IN_OUT calling convention, and the code ScalarFunctionAdapter. This is a very complex change. I think we may be able to run these functions in "legacy" mode until, someone that really knows Java method handle combinators can update that code. I expect that this will be me, but I'm happy to just be a reviewer :)

@sopel39 Yes I meant #11477. My new PR #12588 moves the function interface to the SPI. Before we make the new interfaces non-experimental, we need to finish this work because there is no way to make this a backwards compatible change for the low level APIs.... so to be super clear, if we decide to land #12588 before this work is done, we must make clear that the function APIs are experimental and will change. Once this work is finalized and landed, then we could make function APIs final (non-experimental).

@radek-kondziolka
Copy link
Contributor Author

radek-kondziolka commented Jun 1, 2022

All functions support Injected dependencies (operators, types, etc), most don't use them. For example, in the output function you show @TypeParameter("TYPE") Type type which is a type dependency.

Block and position shown in the first input is just another argument calling convention, so same thing as parameters.

You forgot about the remove function, but it has the same shape as input.

AccumulatorStateSerializer doesn't need the @groupId annotation on the argument since this is an interface we provide that the function provider implements

edited

For the AccumulatorState, I don't think I would require the @groupId annotation either, but I'm not sure here. Unlike the main aggregation functions, it is very clear which argument is the group id, but it might be nice for readability... I don't have a strong preference either way.

On my eye, we should keep the annotation here. At the beginnig we are going to support for groupId-aware operators and not-groupId-aware as well (legacy mode). So, for the StateCompiler it would be much easier to validate (and generate) the implementation of states for groupId-aware operators. Obviously, It is not necessary, but it is a kind of facilitate.

The new InOut state (see MaxAggregationFunction) will need to be modified as part of this. This will require changes to the IN_OUT calling convention, and the code ScalarFunctionAdapter. This is a very complex change. I think we may be able to run these functions in "legacy" mode until, someone that really knows Java method handle combinators can update that code. I expect that this will be me, but I'm happy to just be a reviewer :)

Yeah, I see. We would need to make a comparator groupId-aware as well. But, as you said, we could run in legacy mode and then go further to expanse our change. It should much easier to process that change step by step :).


So, as you can see I edited to have the best possible version. It looks like that we:

  1. Have agreed APIv2
  2. I suppose that we would rather to finish that work firstly to avoid the round trip:
    • Make SPI function experimental
    • Change the API as we agreed here.
    • Make changes in SPI function (if needed).
    • Make SPI function final.
  1. We need to prepeare groupId-aware proposal and finally implementation for IN_OUT states.

Right?

@radek-kondziolka
Copy link
Contributor Author

radek-kondziolka commented Jul 12, 2022

I have run today benchmarks with explicit groupId and rewritten dedicated state (LongDecimalWithOverflowAndLongState) to atomic states:
Results are:
image
(parquet, part)

@findepi
Copy link
Member

findepi commented Mar 26, 2024

do @dain 's recent changes to aggregate states make this issue obsolete?

@radek-kondziolka
Copy link
Contributor Author

I do not think so - I do not see how they improve partial representation of aggregation states.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request performance
Development

No branches or pull requests

5 participants