Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi channel group by hash first batch of optimizations #54

Merged
merged 6 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 272 additions & 6 deletions presto-main/src/main/java/io/prestosql/operator/BigintGroupByHash.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.block.DictionaryBlock;
import io.prestosql.spi.block.RunLengthEncodedBlock;
import io.prestosql.spi.type.AbstractLongType;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.Type;
import org.openjdk.jol.info.ClassLayout;

import java.util.Arrays;
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -67,6 +70,7 @@ public class BigintGroupByHash
private final LongBigArray valuesByGroupId;

private int nextGroupId;
private DictionaryLookBack dictionaryLookBack;
private long hashCollisions;
private double expectedHashCollisions;

Expand Down Expand Up @@ -161,13 +165,29 @@ public void appendValuesTo(int groupId, PageBuilder pageBuilder, int outputChann
public Work<?> addPage(Page page)
{
currentPageSizeInBytes = page.getRetainedSizeInBytes();
return new AddPageWork(page.getBlock(hashChannel));
Block block = page.getBlock(hashChannel);
if (block instanceof RunLengthEncodedBlock) {
return new AddRunLengthEncodedPageWork((RunLengthEncodedBlock) block);
}
if (block instanceof DictionaryBlock) {
return new AddDictionaryPageWork((DictionaryBlock) block);
}

return new AddPageWork(block);
}

@Override
public Work<GroupByIdBlock> getGroupIds(Page page)
{
currentPageSizeInBytes = page.getRetainedSizeInBytes();
Block block = page.getBlock(hashChannel);
if (block instanceof RunLengthEncodedBlock) {
return new GetRunLengthEncodedGroupIdsWork((RunLengthEncodedBlock) block);
}
if (block instanceof DictionaryBlock) {
return new GetDictionaryGroupIdsWork((DictionaryBlock) block);
}

return new GetGroupIdsWork(page.getBlock(hashChannel));
}

Expand Down Expand Up @@ -269,7 +289,7 @@ private boolean tryRehash()

// An estimate of how much extra memory is needed before we can go ahead and expand the hash table.
// This includes the new capacity for values, groupIds, and valuesByGroupId as well as the size of the current page
preallocatedMemoryInBytes = (newCapacity - hashCapacity) * (long) (Long.BYTES + Integer.BYTES) + (calculateMaxFill(newCapacity) - maxFill) * Long.BYTES + currentPageSizeInBytes;
preallocatedMemoryInBytes = (newCapacity - hashCapacity) * (long) (Long.BYTES + Integer.BYTES) + (long) (calculateMaxFill(newCapacity) - maxFill) * Long.BYTES + currentPageSizeInBytes;
if (!updateMemory.update()) {
// reserved memory but has exceeded the limit
return false;
Expand Down Expand Up @@ -333,7 +353,26 @@ private static int calculateMaxFill(int hashSize)
return maxFill;
}

private class AddPageWork
private void updateDictionaryLookBack(Block dictionary)
{
if (dictionaryLookBack == null || dictionaryLookBack.getDictionary() != dictionary) {
dictionaryLookBack = new DictionaryLookBack(dictionary);
}
}

private int getGroupId(Block dictionary, int positionInDictionary)
{
if (dictionaryLookBack.isProcessed(positionInDictionary)) {
return dictionaryLookBack.getGroupId(positionInDictionary);
}

int groupId = putIfAbsent(positionInDictionary, dictionary);
dictionaryLookBack.setProcessed(positionInDictionary, groupId);
return groupId;
}

@VisibleForTesting
class AddPageWork
implements Work<Void>
{
private final Block block;
Expand All @@ -349,7 +388,7 @@ public AddPageWork(Block block)
public boolean process()
{
int positionCount = block.getPositionCount();
checkState(lastPosition < positionCount, "position count out of bound");
checkState(lastPosition <= positionCount, "position count out of bound");

// needRehash() == false indicates we have reached capacity boundary and a rehash is needed.
// We can only proceed if tryRehash() successfully did a rehash.
Expand All @@ -374,7 +413,95 @@ public Void getResult()
}
}

private class GetGroupIdsWork
@VisibleForTesting
class AddDictionaryPageWork
implements Work<Void>
{
private final Block dictionary;
private final DictionaryBlock block;

private int lastPosition;

public AddDictionaryPageWork(DictionaryBlock block)
{
this.block = requireNonNull(block, "block is null");
this.dictionary = block.getDictionary();
updateDictionaryLookBack(dictionary);
}

@Override
public boolean process()
{
int positionCount = block.getPositionCount();
checkState(lastPosition <= positionCount, "position count out of bound");

// needRehash() == false indicates we have reached capacity boundary and a rehash is needed.
// We can only proceed if tryRehash() successfully did a rehash.
if (needRehash() && !tryRehash()) {
return false;
}

// putIfAbsent will rehash automatically if rehash is needed, unless there isn't enough memory to do so.
// Therefore needRehash will not generally return true even if we have just crossed the capacity boundary.
while (lastPosition < positionCount && !needRehash()) {
int positionInDictionary = block.getId(lastPosition);
getGroupId(dictionary, positionInDictionary);
lastPosition++;
}
return lastPosition == positionCount;
}

@Override
public Void getResult()
{
throw new UnsupportedOperationException();
}
}

@VisibleForTesting
class AddRunLengthEncodedPageWork
implements Work<Void>
{
private final RunLengthEncodedBlock block;

private boolean finished;

public AddRunLengthEncodedPageWork(RunLengthEncodedBlock block)
{
this.block = requireNonNull(block, "block is null");
}

@Override
public boolean process()
{
checkState(!finished);
if (block.getPositionCount() == 0) {
finished = true;
return true;
}

// needRehash() == false indicates we have reached capacity boundary and a rehash is needed.
// We can only proceed if tryRehash() successfully did a rehash.
if (needRehash() && !tryRehash()) {
return false;
}

// Only needs to process the first row since it is Run Length Encoded
putIfAbsent(0, block.getValue());
finished = true;

return true;
}

@Override
public Void getResult()
{
throw new UnsupportedOperationException();
}
}

@VisibleForTesting
class GetGroupIdsWork
implements Work<GroupByIdBlock>
{
private final BlockBuilder blockBuilder;
Expand All @@ -394,7 +521,7 @@ public GetGroupIdsWork(Block block)
public boolean process()
{
int positionCount = block.getPositionCount();
checkState(lastPosition < positionCount, "position count out of bound");
checkState(lastPosition <= positionCount, "position count out of bound");
checkState(!finished);

// needRehash() == false indicates we have reached capacity boundary and a rehash is needed.
Expand Down Expand Up @@ -422,4 +549,143 @@ public GroupByIdBlock getResult()
return new GroupByIdBlock(nextGroupId, blockBuilder.build());
}
}

@VisibleForTesting
class GetDictionaryGroupIdsWork
implements Work<GroupByIdBlock>
{
private final BlockBuilder blockBuilder;
private final Block dictionary;
private final DictionaryBlock block;

private boolean finished;
private int lastPosition;

public GetDictionaryGroupIdsWork(DictionaryBlock block)
{
this.block = requireNonNull(block, "block is null");
this.dictionary = block.getDictionary();
updateDictionaryLookBack(dictionary);

// we know the exact size required for the block
this.blockBuilder = BIGINT.createFixedSizeBlockBuilder(block.getPositionCount());
}

@Override
public boolean process()
{
int positionCount = block.getPositionCount();
checkState(lastPosition <= positionCount, "position count out of bound");
checkState(!finished);

// needRehash() == false indicates we have reached capacity boundary and a rehash is needed.
// We can only proceed if tryRehash() successfully did a rehash.
if (needRehash() && !tryRehash()) {
return false;
}

// putIfAbsent will rehash automatically if rehash is needed, unless there isn't enough memory to do so.
// Therefore needRehash will not generally return true even if we have just crossed the capacity boundary.
while (lastPosition < positionCount && !needRehash()) {
int positionInDictionary = block.getId(lastPosition);
int groupId = getGroupId(dictionary, positionInDictionary);
BIGINT.writeLong(blockBuilder, groupId);
lastPosition++;
}
return lastPosition == positionCount;
}

@Override
public GroupByIdBlock getResult()
{
checkState(lastPosition == block.getPositionCount(), "process has not yet finished");
checkState(!finished, "result has produced");
finished = true;
return new GroupByIdBlock(nextGroupId, blockBuilder.build());
}
}

@VisibleForTesting
class GetRunLengthEncodedGroupIdsWork
implements Work<GroupByIdBlock>
{
private final RunLengthEncodedBlock block;

int groupId = -1;
private boolean processFinished;
private boolean resultProduced;

public GetRunLengthEncodedGroupIdsWork(RunLengthEncodedBlock block)
{
this.block = requireNonNull(block, "block is null");
}

@Override
public boolean process()
{
checkState(!processFinished);
if (block.getPositionCount() == 0) {
processFinished = true;
return true;
}

// needRehash() == false indicates we have reached capacity boundary and a rehash is needed.
// We can only proceed if tryRehash() successfully did a rehash.
if (needRehash() && !tryRehash()) {
return false;
}

// Only needs to process the first row since it is Run Length Encoded
groupId = putIfAbsent(0, block.getValue());
processFinished = true;
return true;
}

@Override
public GroupByIdBlock getResult()
{
checkState(processFinished);
checkState(!resultProduced);
resultProduced = true;

return new GroupByIdBlock(
nextGroupId,
new RunLengthEncodedBlock(
BIGINT.createFixedSizeBlockBuilder(1).writeLong(groupId).build(),
block.getPositionCount()));
}
}

private static final class DictionaryLookBack
{
private final Block dictionary;
private final int[] processed;

public DictionaryLookBack(Block dictionary)
{
this.dictionary = dictionary;
this.processed = new int[dictionary.getPositionCount()];
Arrays.fill(processed, -1);
}

public Block getDictionary()
{
return dictionary;
}

public int getGroupId(int position)
{
return processed[position];
}

public boolean isProcessed(int position)
{
return processed[position] != -1;
}

public void setProcessed(int position, int groupId)
{
processed[position] = groupId;
}
}
}
Loading