Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi channel group by hash first batch of optimizations #12072

Conversation

skrzypo987
Copy link
Member

Description

Some simple optimizations for multi channel group by hash.

Is this change a fix, improvement, new feature, refactoring, or other?

improvement & refactor

Is this a change to the core query engine, a connector, client library, or the SPI interfaces? (be specific)

core query engine

How would you describe this change to a non-technical end user or system administrator?

[slow] -> improvement -> [less slow]

Related issues, pull requests, and links

Documentation

(x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.

Release notes

( ) No release notes entries required.
(x) Release notes entries required with the following suggested text:

# Section
* Improve performance of aggregation operator

@cla-bot cla-bot bot added the cla-signed label Apr 21, 2022
@skrzypo987 skrzypo987 added the WIP label Apr 21, 2022
@skrzypo987
Copy link
Member Author

Labeled as WIP as some tests are still failing. This is due to decreased memory footprint

@skrzypo987 skrzypo987 removed the WIP label Apr 26, 2022
@skrzypo987 skrzypo987 requested review from radek-kondziolka and removed request for radek-kondziolka April 26, 2022 05:24
@@ -16,7 +16,6 @@
import com.google.common.annotations.VisibleForTesting;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have micro benchmarks for this? Same approach could be applied for PagesIndex and join

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the commit message. Some cases offer substantial gains.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the commit message. Some cases offer substantial gains.

I don't think commit message has micro benchmarks results.

Nice find BTW!

@@ -140,23 +131,21 @@ public MultiChannelGroupByHash(
PagesHashStrategyFactory pagesHashStrategyFactory = joinCompiler.compilePagesHashStrategyFactory(this.types, outputChannels.build());
hashStrategy = pagesHashStrategyFactory.createPagesHashStrategy(this.channelBuilders, this.precomputedHashChannel);

startNewPage();
pageBuilder = new PageBuilder(types);
Copy link
Member

@sopel39 sopel39 Apr 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ince the hash table can only store
2^30 positions, the structure can be flattened to a single huge page.

Retained size of single large page will probably be greater than if multiple pages are used (because we use currentPageBuilder.newPageBuilderLike() which is adaptive)

Cost of rezising page is neglibable? What if page consists of long strings?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retained size of single large page will probably be greater than if multiple pages are used

That is true. However, the size of hash table and other primitive arrays shrinks from ~37 bytes/row to ~9 bytes which gives us some headroom.
Just to play with numbers some more:
Block is resized by 1.5 so on average there is a 25% additional space retained. Wasted space for multiple-page approach is negligible for big enough number of groups.
So there is a breakeven for rows of average size (37-9) / 0.25 = 112B
So for rows lower than that we use less memory, but for bigger ones we use more. There is tradeoff here and we need to choose whether we accept it.

Cost of rezising page is neglibable?

I guess it is not bigger than handling and allocating multiple pages, especially for multi-row case.

What if page consists of long strings?

Aggregating by text is not necessarily the smartest thing. You can make a query fail in a much simpler way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true. However, the size of hash table and other primitive arrays shrinks from ~37 bytes/row to ~9 bytes which gives us some headroom.

That is only valid assumption with primitive arrays

I guess it is not bigger than handling and allocating multiple pages, especially for multi-row case.

It also depends on not having wide rows.

Aggregating by text is not necessarily the smartest thing. You can make a query fail in a much simpler way.

There are already such aggregations in our favorite benchmark. We also insert deduplicating aggregations or aggregations for correlated subqueries synthetically which store entire rows. Customers sometimes have very wide rows (e.g. hundreds of columns).

That's why I'm asking if similar effect could be obtained by increasing page size which by default is pretty low 9f4dbe8#r858629278. It should reduce mem overhead, but also allow for 1-level addressing to fit into CPU cache

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

increasing page size

I don't quite follow.
Having a 2-level addressing with just a single page is still 2-level addressing. You still need to make 2 lookups per operation to get the data

private final List<ObjectArrayList<Block>> channelBuilders;
private final Optional<Integer> inputHashChannel;
private final HashGenerator hashGenerator;
private final OptionalInt precomputedHashChannel;
private final boolean processDictionary;
private PageBuilder currentPageBuilder;

private long completedPagesMemorySize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would increasing max page size in PageBuilder achieve similar results?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. The gain is IMO the result of the removal of two-level addressing.

@skrzypo987 skrzypo987 force-pushed the skrzypo/067-multi-channel-group-by-hash-first-optimizations branch from 597f788 to 9f4dbe8 Compare April 27, 2022 06:28
Copy link
Member Author

@skrzypo987 skrzypo987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remarks addressed.
I dropped the last two commits:
-load factor -> .25
-compact rawHashByHashPosition
The microbenchmark results were inconsistent. I will return to them after this is merged

@@ -16,7 +16,6 @@
import com.google.common.annotations.VisibleForTesting;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the commit message. Some cases offer substantial gains.

private final List<ObjectArrayList<Block>> channelBuilders;
private final Optional<Integer> inputHashChannel;
private final HashGenerator hashGenerator;
private final OptionalInt precomputedHashChannel;
private final boolean processDictionary;
private PageBuilder currentPageBuilder;

private long completedPagesMemorySize;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. The gain is IMO the result of the removal of two-level addressing.

@@ -140,23 +131,21 @@ public MultiChannelGroupByHash(
PagesHashStrategyFactory pagesHashStrategyFactory = joinCompiler.compilePagesHashStrategyFactory(this.types, outputChannels.build());
hashStrategy = pagesHashStrategyFactory.createPagesHashStrategy(this.channelBuilders, this.precomputedHashChannel);

startNewPage();
pageBuilder = new PageBuilder(types);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retained size of single large page will probably be greater than if multiple pages are used

That is true. However, the size of hash table and other primitive arrays shrinks from ~37 bytes/row to ~9 bytes which gives us some headroom.
Just to play with numbers some more:
Block is resized by 1.5 so on average there is a 25% additional space retained. Wasted space for multiple-page approach is negligible for big enough number of groups.
So there is a breakeven for rows of average size (37-9) / 0.25 = 112B
So for rows lower than that we use less memory, but for bigger ones we use more. There is tradeoff here and we need to choose whether we accept it.

Cost of rezising page is neglibable?

I guess it is not bigger than handling and allocating multiple pages, especially for multi-row case.

What if page consists of long strings?

Aggregating by text is not necessarily the smartest thing. You can make a query fail in a much simpler way.

skrzypo987 added 5 commits April 28, 2022 15:08
For simplicity and tiny performance gain.
To make them shorter and consistent with BigintGroupByHash
Temporary output data used to be kept in a list of variable-size pages. That
required using a two-level address to point an exact row. Since the hash table
can only store 2^30 positions, the number of groups can be stored in a 4-byte
int variable, given that the page size is fixed. This way we can get rid of
some data structures used to map between group id and output row position
skipping some calculations and reducing memory footprint from 8+8+4+1 bytes
per hash bucket to 4+1 bytes which improves memory locality.
Previously the hash table capacity was checked every row to see whether a rehash
is needed. Now the input page is split into batches and it is assumed that every
row in batch will create a new group (which is rarely the case) and rehashing
is done in advance before processing.
This may slightly increase memory footprint for small number of groups, however
there is a tiny performance gain as the capacity is not checked every row.
@skrzypo987 skrzypo987 force-pushed the skrzypo/067-multi-channel-group-by-hash-first-optimizations branch from 9f4dbe8 to b7de749 Compare April 29, 2022 06:13
@skrzypo987
Copy link
Member Author

Again some memory-management tests are failing, I'll look into that

Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm % comments

@@ -16,7 +16,6 @@
import com.google.common.annotations.VisibleForTesting;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the commit message. Some cases offer substantial gains.

I don't think commit message has micro benchmarks results.

Nice find BTW!

@@ -62,6 +59,9 @@
private static final float FILL_RATIO = 0.75f;
// Max (page value count / cumulative dictionary size) to trigger the low cardinality case
private static final double SMALL_DICTIONARIES_MAX_CARDINALITY_RATIO = .25;
private static final int OUTPUT_PAGE_BITS = 14; // 16k positions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So page now always has 16k positions even with single row? Or we start with almost empty single builder?

@@ -62,6 +59,9 @@
private static final float FILL_RATIO = 0.75f;
// Max (page value count / cumulative dictionary size) to trigger the low cardinality case
private static final double SMALL_DICTIONARIES_MAX_CARDINALITY_RATIO = .25;
private static final int OUTPUT_PAGE_BITS = 14; // 16k positions
private static final int OUTPUT_PAGE_SIZE = 1 << OUTPUT_PAGE_BITS;
private static final int OUTPUT_PAGE_MASK = OUTPUT_PAGE_SIZE - 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not really an output age, but rather VALUES_PAGE or GROUP_VALUES_PAGE. These pages keep values of grouping channels. Note that we already have io.trino.operator.GroupByHash#appendValuesTo method which adds rows from these pages.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to VALUES_PAGE

long address = groupAddressByGroupId.get(groupId);
int blockIndex = decodeSliceIndex(address);
int position = decodePosition(address);
int blockIndex = groupId >> OUTPUT_PAGE_BITS;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should either use >>> (unsigned shift) or mask result. Please add a test for edge case (large group id) for this method if it's possible to do

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

groupId is never negative so both shifts work the same way (extend using 0)

long address = groupAddressByGroupId.get(groupId);
int blockIndex = decodeSliceIndex(address);
int position = decodePosition(address);
int blockIndex = groupId >> OUTPUT_PAGE_BITS;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -365,7 +350,7 @@ private void startNewPage()
currentPageBuilder = currentPageBuilder.newPageBuilderLike();
}
else {
currentPageBuilder = new PageBuilder(types);
currentPageBuilder = new PageBuilder(OUTPUT_PAGE_SIZE, Integer.MAX_VALUE, types);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? Why not to start with minimal builder for first page so that we don't have to allocate for 16k positions and waste memory?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem to have impact on perf. I reverted it.

pre size to OUTPUT_PAGE_SIZE
Benchmark                                   (channelCount)  (dataType)  (groupCount)  (hashEnabled)  Mode  Cnt    Score   Error  Units
BenchmarkGroupByHash.groupByHashPreCompute               2      BIGINT          8000           true  avgt   20   33.467 ± 2.076  ns/op
BenchmarkGroupByHash.groupByHashPreCompute               2      BIGINT       3000000           true  avgt   20  234.790 ± 2.115  ns/op

no pre size
Benchmark                                   (channelCount)  (dataType)  (groupCount)  (hashEnabled)  Mode  Cnt    Score   Error  Units
BenchmarkGroupByHash.groupByHashPreCompute               2      BIGINT          8000           true  avgt   20   34.916 ± 1.100  ns/op
BenchmarkGroupByHash.groupByHashPreCompute               2      BIGINT       3000000           true  avgt   20  233.863 ± 6.016  ns/op

byte[] rawHashes = new byte[newCapacity];
Arrays.fill(newKey, -1);
int[] newValue = new int[newCapacity];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better would be newGroupIdByHash

{
int sliceIndex = decodeSliceIndex(sliceAddress);
int position = decodePosition(sliceAddress);
int blockIndex = groupId >> OUTPUT_PAGE_BITS;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

diito about shift

{
if (rawHashByHashPosition[hashPosition] != rawHash) {
return false;
}
return hashStrategy.positionNotDistinctFromRow(decodeSliceIndex(address), decodePosition(address), position, page, hashChannels);
int blockIndex = groupId >> OUTPUT_PAGE_BITS;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -576,7 +578,6 @@ public void setProcessed(int position, int groupId)
implements Work<Void>
{
private final Page page;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undo

@sopel39
Copy link
Member

sopel39 commented Apr 29, 2022

@skrzypo987 do you have micro/maco benchmark results?

@@ -365,7 +350,7 @@ private void startNewPage()
currentPageBuilder = currentPageBuilder.newPageBuilderLike();
Copy link
Member

@sopel39 sopel39 Apr 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For fixed width type you can assume here that there will be OUTPUT_PAGE_SIZE positions (type check on instaceof FixedWidthType.
For variable length type, (instanceof VariableWidthType) you can assume OUTPUT_PAGE_SIZE positions, but can also assume some skew via io.trino.spi.block.BlockUtil#calculateBlockResetBytes. You can then use io.trino.spi.type.Type#createBlockBuilder(io.trino.spi.block.BlockBuilderStatus, int, int) to construct builder of right size.

This should save memory compared to existing code. I wouldn't be surprised to see aggregation mem usage drop by 10-20%

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this properly is not that straightforward. currentPageBuilder is PageBuilder not a single BlockBuilder so this would have to be implemented in the PageBuilder.
Also, there is no direct access to the number of bytes in the VariableWidthBlockBuilder.We could use getSizeInBytes but the result contains also size of valueIsNull and offsets arrays.
IMO this looks like a separate PR. WDYT?

Copy link
Member

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I addressed most comments but at the moment I don't have access to the source branch repo so I put the changes here .
Gonna push it here once I get access

long address = groupAddressByGroupId.get(groupId);
int blockIndex = decodeSliceIndex(address);
int position = decodePosition(address);
int blockIndex = groupId >> OUTPUT_PAGE_BITS;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

groupId is never negative so both shifts work the same way (extend using 0)

@@ -365,7 +350,7 @@ private void startNewPage()
currentPageBuilder = currentPageBuilder.newPageBuilderLike();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this properly is not that straightforward. currentPageBuilder is PageBuilder not a single BlockBuilder so this would have to be implemented in the PageBuilder.
Also, there is no direct access to the number of bytes in the VariableWidthBlockBuilder.We could use getSizeInBytes but the result contains also size of valueIsNull and offsets arrays.
IMO this looks like a separate PR. WDYT?

@@ -365,7 +350,7 @@ private void startNewPage()
currentPageBuilder = currentPageBuilder.newPageBuilderLike();
}
else {
currentPageBuilder = new PageBuilder(types);
currentPageBuilder = new PageBuilder(OUTPUT_PAGE_SIZE, Integer.MAX_VALUE, types);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem to have impact on perf. I reverted it.

pre size to OUTPUT_PAGE_SIZE
Benchmark                                   (channelCount)  (dataType)  (groupCount)  (hashEnabled)  Mode  Cnt    Score   Error  Units
BenchmarkGroupByHash.groupByHashPreCompute               2      BIGINT          8000           true  avgt   20   33.467 ± 2.076  ns/op
BenchmarkGroupByHash.groupByHashPreCompute               2      BIGINT       3000000           true  avgt   20  234.790 ± 2.115  ns/op

no pre size
Benchmark                                   (channelCount)  (dataType)  (groupCount)  (hashEnabled)  Mode  Cnt    Score   Error  Units
BenchmarkGroupByHash.groupByHashPreCompute               2      BIGINT          8000           true  avgt   20   34.916 ± 1.100  ns/op
BenchmarkGroupByHash.groupByHashPreCompute               2      BIGINT       3000000           true  avgt   20  233.863 ± 6.016  ns/op

@@ -80,12 +80,9 @@
private int hashCapacity;
private int maxFill;
private int mask;
private long[] groupAddressByHash;
private int[] groupIdsByHash;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment added, take a look if it makes sense to you.

@@ -62,6 +59,9 @@
private static final float FILL_RATIO = 0.75f;
// Max (page value count / cumulative dictionary size) to trigger the low cardinality case
private static final double SMALL_DICTIONARIES_MAX_CARDINALITY_RATIO = .25;
private static final int OUTPUT_PAGE_BITS = 14; // 16k positions
private static final int OUTPUT_PAGE_SIZE = 1 << OUTPUT_PAGE_BITS;
private static final int OUTPUT_PAGE_MASK = OUTPUT_PAGE_SIZE - 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to VALUES_PAGE

@lukasz-stec
Copy link
Member

I addressed most comments but at the moment I don't have access to the source branch repo so I put the changes [here](https://github.com/starburstdata/trino/tree/skrzypo-copy/067-multi-channel-group-by-hash-first-optimizations . Gonna push it here once I get access

@skrzypo987 is out without internet access so I forked this here + applied changes mentioned + fixed tests + improved memory accounting

@sopel39
Copy link
Member

sopel39 commented May 19, 2022

Closed in favor of #12336

@sopel39 sopel39 closed this May 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

3 participants