Skip to content

Commit

Permalink
Replace BigArrays with primitive ones in BigInt aggregation
Browse files Browse the repository at this point in the history
LongBigArray and IntBigArray with long[] and int[].

The original implementation was done because Java prior to version 11
had some problems handling humongous (very big) objects and that sometimes
caused giant GC pauses. Since Java 11 humongous objects are first allocated
on eden which mitigates problems with collecting them.

before:
BenchmarkGroupByHash.bigintGroupByHash     1           true  avgt   10   60,944 ±   1,730   ns/op
BenchmarkGroupByHash.bigintGroupByHash     1          false  avgt   10   58,081 ±   1,310   ns/op
after:
BenchmarkGroupByHash.bigintGroupByHash     1           true  avgt   10   49,011 ±   1,100   ns/op
BenchmarkGroupByHash.bigintGroupByHash     1          false  avgt   10   47,924 ±   2,408   ns/op
  • Loading branch information
skrzypo987 authored and sopel39 committed Mar 10, 2022
1 parent 6b6c938 commit 70dd978
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.trino.array.IntBigArray;
import io.trino.array.LongBigArray;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
Expand All @@ -34,6 +33,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.slice.SizeOf.sizeOf;
import static io.trino.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.type.TypeUtils.NULL_HASH_CODE;
Expand All @@ -60,8 +60,8 @@ public class BigintGroupByHash
private int mask;

// the hash table from values to groupIds
private LongBigArray values;
private IntBigArray groupIds;
private long[] values;
private int[] groupIds;

// groupId for the null value
private int nullGroupId = -1;
Expand Down Expand Up @@ -91,10 +91,9 @@ public BigintGroupByHash(int hashChannel, boolean outputRawHash, int expectedSiz

maxFill = calculateMaxFill(hashCapacity);
mask = hashCapacity - 1;
values = new LongBigArray();
values.ensureCapacity(hashCapacity);
groupIds = new IntBigArray(-1);
groupIds.ensureCapacity(hashCapacity);
values = new long[hashCapacity];
groupIds = new int[hashCapacity];
Arrays.fill(groupIds, -1);

valuesByGroupId = new LongBigArray();
valuesByGroupId.ensureCapacity(hashCapacity);
Expand All @@ -108,8 +107,8 @@ public BigintGroupByHash(int hashChannel, boolean outputRawHash, int expectedSiz
public long getEstimatedSize()
{
return INSTANCE_SIZE +
groupIds.sizeOf() +
values.sizeOf() +
sizeOf(groupIds) +
sizeOf(values) +
valuesByGroupId.sizeOf() +
preallocatedMemoryInBytes;
}
Expand Down Expand Up @@ -200,15 +199,15 @@ public boolean contains(int position, Page page, int[] hashChannels)
}

long value = BIGINT.getLong(block, position);
long hashPosition = getHashPosition(value, mask);
int hashPosition = getHashPosition(value, mask);

// look for an empty slot or a slot containing this key
while (true) {
int groupId = groupIds.get(hashPosition);
int groupId = groupIds[hashPosition];
if (groupId == -1) {
return false;
}
if (value == values.get(hashPosition)) {
if (value == values[hashPosition]) {
return true;
}

Expand Down Expand Up @@ -242,16 +241,16 @@ private int putIfAbsent(int position, Block block)
}

long value = BIGINT.getLong(block, position);
long hashPosition = getHashPosition(value, mask);
int hashPosition = getHashPosition(value, mask);

// look for an empty slot or a slot containing this key
while (true) {
int groupId = groupIds.get(hashPosition);
int groupId = groupIds[hashPosition];
if (groupId == -1) {
break;
}

if (value == values.get(hashPosition)) {
if (value == values[hashPosition]) {
return groupId;
}

Expand All @@ -263,14 +262,14 @@ private int putIfAbsent(int position, Block block)
return addNewGroup(hashPosition, value);
}

private int addNewGroup(long hashPosition, long value)
private int addNewGroup(int hashPosition, long value)
{
// record group id in hash
int groupId = nextGroupId++;

values.set(hashPosition, value);
values[hashPosition] = value;
valuesByGroupId.set(groupId, value);
groupIds.set(hashPosition, groupId);
groupIds[hashPosition] = groupId;

// increase capacity, if necessary
if (needRehash()) {
Expand Down Expand Up @@ -299,10 +298,9 @@ private boolean tryRehash()
expectedHashCollisions += estimateNumberOfHashCollisions(getGroupCount(), hashCapacity);

int newMask = newCapacity - 1;
LongBigArray newValues = new LongBigArray();
newValues.ensureCapacity(newCapacity);
IntBigArray newGroupIds = new IntBigArray(-1);
newGroupIds.ensureCapacity(newCapacity);
long[] newValues = new long[newCapacity];
int[] newGroupIds = new int[newCapacity];
Arrays.fill(newGroupIds, -1);

for (int groupId = 0; groupId < nextGroupId; groupId++) {
if (groupId == nullGroupId) {
Expand All @@ -311,15 +309,15 @@ private boolean tryRehash()
long value = valuesByGroupId.get(groupId);

// find an empty slot for the address
long hashPosition = getHashPosition(value, newMask);
while (newGroupIds.get(hashPosition) != -1) {
int hashPosition = getHashPosition(value, newMask);
while (newGroupIds[hashPosition] != -1) {
hashPosition = (hashPosition + 1) & newMask;
hashCollisions++;
}

// record the mapping
newValues.set(hashPosition, value);
newGroupIds.set(hashPosition, groupId);
newValues[hashPosition] = value;
newGroupIds[hashPosition] = groupId;
}

mask = newMask;
Expand All @@ -337,9 +335,9 @@ private boolean needRehash()
return nextGroupId >= maxFill;
}

private static long getHashPosition(long rawHash, int mask)
private static int getHashPosition(long rawHash, int mask)
{
return murmurHash3(rawHash) & mask;
return (int) (murmurHash3(rawHash) & mask);
}

private static int calculateMaxFill(int hashSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public void testHashAggregationMemoryReservation(boolean hashEnabled, boolean sp
Operator operator = operatorFactory.createOperator(driverContext);
toPages(operator, input.iterator(), revokeMemoryWhenAddingPages);
// TODO (https://github.com/trinodb/trino/issues/10596): it should be 0, since operator is finished
assertEquals(getOnlyElement(operator.getOperatorContext().getNestedOperatorStats()).getUserMemoryReservation().toBytes(), spillEnabled && revokeMemoryWhenAddingPages ? 5_350_968 : 0);
assertEquals(getOnlyElement(operator.getOperatorContext().getNestedOperatorStats()).getUserMemoryReservation().toBytes(), spillEnabled && revokeMemoryWhenAddingPages ? 5_322_192 : 0);
assertEquals(getOnlyElement(operator.getOperatorContext().getNestedOperatorStats()).getRevocableMemoryReservation().toBytes(), 0);
}

Expand Down

0 comments on commit 70dd978

Please sign in to comment.