Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract meaningful methods in join operator #16468

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,6 @@ public final class BigintPagesHash
private final long[] values;
private final long size;

public static long getEstimatedRetainedSizeInBytes(
int positionCount,
HashArraySizeSupplier hashArraySizeSupplier,
LongArrayList addresses,
List<ObjectArrayList<Block>> channels,
long blocksSizeInBytes)
{
return sizeOf(addresses.elements()) +
(channels.size() > 0 ? sizeOf(channels.get(0).elements()) * channels.size() : 0) +
blocksSizeInBytes +
sizeOfIntArray(hashArraySizeSupplier.getHashArraySize(positionCount)) +
sizeOfLongArray(positionCount);
}

public BigintPagesHash(
LongArrayList addresses,
PagesHashStrategy pagesHashStrategy,
Expand Down Expand Up @@ -105,42 +91,52 @@ public BigintPagesHash(
int stepEndPosition = Math.min((step + 1) * positionsInStep, addresses.size());
int stepSize = stepEndPosition - stepBeginPosition;

// index pages
for (int batchIndex = 0; batchIndex < stepSize; batchIndex++) {
int addressIndex = batchIndex + stepBeginPosition;
if (isPositionNull(addressIndex)) {
continue;
}
indexPages(addresses, positionLinks, stepBeginPosition, stepSize);
}

long address = addresses.getLong(addressIndex);
int blockIndex = decodeSliceIndex(address);
int blockPosition = decodePosition(address);
long value = joinChannelBlocks.get(blockIndex).getLong(blockPosition, 0);

int pos = getHashPosition(value, mask);

// look for an empty slot or a slot containing this key
while (keys[pos] != -1) {
int currentKey = keys[pos];
if (value == values[currentKey]) {
// found a slot for this key
// link the new key position to the current key position
addressIndex = positionLinks.link(addressIndex, currentKey);

// key[pos] updated outside of this loop
break;
}
// increment position and mask to handler wrap around
pos = (pos + 1) & mask;
}
size = sizeOf(addresses.elements()) + pagesHashStrategy.getSizeInBytes() +
sizeOf(keys) + sizeOf(values);
}

keys[pos] = addressIndex;
values[addressIndex] = value;
private void indexPages(LongArrayList addresses, PositionLinks.FactoryBuilder positionLinks, int stepBeginPosition, int stepSize)
{
// index pages
for (int batchIndex = 0; batchIndex < stepSize; batchIndex++) {
int addressIndex = batchIndex + stepBeginPosition;
if (isPositionNull(addressIndex)) {
continue;
}

long address = addresses.getLong(addressIndex);
int blockIndex = decodeSliceIndex(address);
int blockPosition = decodePosition(address);
long value = joinChannelBlocks.get(blockIndex).getLong(blockPosition, 0);

int pos = getHashPosition(value, mask);

insertValue(positionLinks, addressIndex, value, pos);
}
}

size = sizeOf(addresses.elements()) + pagesHashStrategy.getSizeInBytes() +
sizeOf(keys) + sizeOf(values);
private void insertValue(PositionLinks.FactoryBuilder positionLinks, int addressIndex, long value, int pos)
{
// look for an empty slot or a slot containing this key
while (keys[pos] != -1) {
int currentKey = keys[pos];
if (value == values[currentKey]) {
// found a slot for this key
// link the new key position to the current key position
addressIndex = positionLinks.link(addressIndex, currentKey);

// key[pos] updated outside of this loop
break;
}
// increment position and mask to handler wrap around
pos = (pos + 1) & mask;
}

keys[pos] = addressIndex;
values[addressIndex] = value;
}

@Override
Expand Down Expand Up @@ -192,10 +188,7 @@ public int[] getAddressIndex(int[] positions, Page hashChannelsPage)
long[] incomingValues = new long[positionCount];
int[] hashPositions = new int[positionCount];

for (int i = 0; i < positionCount; i++) {
incomingValues[i] = hashChannelsPage.getBlock(0).getLong(positions[i], 0);
hashPositions[i] = getHashPosition(incomingValues[i], mask);
}
extractAndHashValues(positions, hashChannelsPage, positionCount, incomingValues, hashPositions);

int[] found = new int[positionCount];
int foundCount = 0;
Expand All @@ -205,9 +198,7 @@ public int[] getAddressIndex(int[] positions, Page hashChannelsPage)

// Search for positions in the hash array. This is the most CPU-consuming part as
// it relies on random memory accesses
for (int i = 0; i < positionCount; i++) {
foundKeys[i] = keys[hashPositions[i]];
}
findPositions(positionCount, hashPositions, foundKeys);
// Found positions are put into `found` array
for (int i = 0; i < positionCount; i++) {
if (foundKeys[i] != -1) {
Expand All @@ -217,21 +208,18 @@ public int[] getAddressIndex(int[] positions, Page hashChannelsPage)

// At this step we determine if the found keys were indeed the proper ones or it is a hash collision.
// The result array is updated for the found ones, while the collisions land into `remaining` array.
int remainingCount = checkFoundPositions(incomingValues, found, foundCount, result, foundKeys);
int[] remaining = found; // Rename for readability
int remainingCount = 0;

for (int i = 0; i < foundCount; i++) {
int index = found[i];
if (values[foundKeys[index]] == incomingValues[index]) {
result[index] = foundKeys[index];
}
else {
remaining[remainingCount++] = index;
}
}

// At this point for any reasoable load factor of a hash array (< .75), there is no more than
// 10 - 15% of positions left. We search for them in a sequential order and update the result array.
findRemainingPositions(incomingValues, hashPositions, result, remaining, remainingCount);

return result;
}

private void findRemainingPositions(long[] incomingValues, int[] hashPositions, int[] result, int[] remaining, int remainingCount)
{
for (int i = 0; i < remainingCount; i++) {
int index = remaining[i];
int position = (hashPositions[index] + 1) & mask; // hashPositions[index] position has already been checked
Expand All @@ -245,8 +233,37 @@ public int[] getAddressIndex(int[] positions, Page hashChannelsPage)
position = (position + 1) & mask;
}
}
}

return result;
private int checkFoundPositions(long[] incomingValues, int[] found, int foundCount, int[] result, int[] foundKeys)
{
int[] remaining = found; // Rename for readability
int remainingCount = 0;
for (int i = 0; i < foundCount; i++) {
int index = found[i];
if (values[foundKeys[index]] == incomingValues[index]) {
result[index] = foundKeys[index];
}
else {
remaining[remainingCount++] = index;
}
}
return remainingCount;
}

private void findPositions(int positionCount, int[] hashPositions, int[] foundKeys)
{
for (int i = 0; i < positionCount; i++) {
foundKeys[i] = keys[hashPositions[i]];
}
}

private void extractAndHashValues(int[] positions, Page hashChannelsPage, int positionCount, long[] incomingValues, int[] hashPositions)
{
for (int i = 0; i < positionCount; i++) {
incomingValues[i] = hashChannelsPage.getBlock(0).getLong(positions[i], 0);
hashPositions[i] = getHashPosition(incomingValues[i], mask);
}
}

@Override
Expand All @@ -267,4 +284,18 @@ private boolean isPositionNull(int position)

return joinChannelBlocks.get(blockIndex).isNull(blockPosition);
}

public static long getEstimatedRetainedSizeInBytes(
int positionCount,
HashArraySizeSupplier hashArraySizeSupplier,
LongArrayList addresses,
List<ObjectArrayList<Block>> channels,
long blocksSizeInBytes)
{
return sizeOf(addresses.elements()) +
(channels.size() > 0 ? sizeOf(channels.get(0).elements()) * channels.size() : 0) +
blocksSizeInBytes +
sizeOfIntArray(hashArraySizeSupplier.getHashArraySize(positionCount)) +
sizeOfLongArray(positionCount);
}
}
Loading