Skip to content

Commit

Permalink
Addressed PR comments and added FIRST_MATCH & LAST_MATCH to SimpleUni…
Browse files Browse the repository at this point in the history
…queStaticNaturalJoinStateManager
  • Loading branch information
lbooker42 committed Feb 3, 2025
1 parent dc1ded9 commit ae241ea
Show file tree
Hide file tree
Showing 12 changed files with 1,810 additions and 1,780 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private static QueryTable naturalJoinInternal(QueryTable leftTable, QueryTable r
Assert.neqNull(bc.uniqueFunctor, "uniqueFunctor");
final SimpleUniqueStaticNaturalJoinStateManager jsm = new SimpleUniqueStaticNaturalJoinStateManager(
bc.originalLeftSources, bc.uniqueValuesRange(), bc.uniqueFunctor);
jsm.setRightSide(rightTable.getRowSet(), bc.rightSources[0]);
jsm.setRightSide(rightTable.getRowSet(), bc.rightSources[0], joinType);
final LongArraySource leftRedirections = new LongArraySource();
leftRedirections.ensureCapacity(leftTable.getRowSet().size());
jsm.decorateLeftSide(leftTable.getRowSet(), bc.leftSources, leftRedirections);
Expand Down Expand Up @@ -689,22 +689,22 @@ public void accept(int updatedSlot, long originalRightValue, byte flag) {
return;
}

long index = jsm.getRightRowKey(updatedSlot);
if (index == StaticNaturalJoinStateManager.DUPLICATE_RIGHT_VALUE) {
long rowKey = jsm.getRightRowKey(updatedSlot);
if (rowKey == StaticNaturalJoinStateManager.DUPLICATE_RIGHT_VALUE) {
if (joinType == NaturalJoinType.ERROR_ON_DUPLICATE
|| joinType == NaturalJoinType.EXACTLY_ONE_MATCH) {
throw new IllegalStateException(
"Natural Join found duplicate right key for " + jsm.keyString(updatedSlot));
}
// Get the correct row key from the duplicates on the RHS
final RowSet rightRowSet = jsm.getRightRowSet(updatedSlot);
index = joinType == NaturalJoinType.FIRST_MATCH
rowKey = joinType == NaturalJoinType.FIRST_MATCH
? rightRowSet.firstRowKey()
: rightRowSet.lastRowKey();
}
final long rightIndex = index;
final long rightRowKey = rowKey;

final boolean unchangedRedirection = rightIndex == originalRightValue;
final boolean unchangedRedirection = rightRowKey == originalRightValue;

// if we have no right columns that have changed, and our redirection is identical we can quit here
if (unchangedRedirection && !rightAddedColumnsChanged
Expand All @@ -727,11 +727,11 @@ public void accept(int updatedSlot, long originalRightValue, byte flag) {

changedRedirection = true;

if (rightIndex == RowSequence.NULL_ROW_KEY) {
jsm.checkExactMatch(joinType, leftIndices.firstRowKey(), rightIndex);
if (rightRowKey == RowSequence.NULL_ROW_KEY) {
jsm.checkExactMatch(joinType, leftIndices.firstRowKey(), rightRowKey);
rowRedirection.removeAll(leftIndices);
} else {
leftIndices.forAllRowKeys((long key) -> rowRedirection.putVoid(key, rightIndex));
leftIndices.forAllRowKeys((long key) -> rowRedirection.putVoid(key, rightRowKey));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class SimpleUniqueStaticNaturalJoinStateManager extends StaticNaturalJoinStateMa
}
}

void setRightSide(RowSet rightRowSet, ColumnSource<?> valueSource) {
void setRightSide(RowSet rightRowSet, ColumnSource<?> valueSource, NaturalJoinType joinType) {
try (final RowSequence.Iterator rsIt = rightRowSet.getRowSequenceIterator();
final ColumnSource.GetContext getContext =
valueSource.makeGetContext((int) Math.min(CHUNK_SIZE, rightRowSet.size()))) {
Expand All @@ -62,11 +62,15 @@ void setRightSide(RowSet rightRowSet, ColumnSource<?> valueSource) {
return true;
}
final long existingRight = rightRowSetSource.getLong(tableLocation);
if (existingRight == RowSequence.NULL_ROW_KEY) {
if (existingRight == RowSequence.NULL_ROW_KEY || joinType == NaturalJoinType.LAST_MATCH) {
rightRowSetSource.set(tableLocation, keyIndex);
} else {
if (joinType == NaturalJoinType.FIRST_MATCH) {
// no-op, already have the first match
} else {
rightRowSetSource.set(tableLocation, DUPLICATE_RIGHT_VALUE);
}
}
return true;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,18 +479,18 @@ private long getRightRowKeyFromState(
final long leftRowKey,
final long rightRowKeyForState,
final NaturalJoinType joinType) {
if (rightRowKeyForState <= FIRST_DUPLICATE) {
if (joinType == NaturalJoinType.ERROR_ON_DUPLICATE || joinType == NaturalJoinType.EXACTLY_ONE_MATCH) {
throw new IllegalStateException("Natural Join found duplicate right key for "
+ extractKeyStringFromSourceTable(leftRowKey));
}
final long location = duplicateLocationFromRowKey(rightRowKeyForState);
final WritableRowSet rightRowSet = rightSideDuplicateRowSets.getUnsafe(location);
return joinType == NaturalJoinType.FIRST_MATCH
? rightRowSet.firstRowKey()
: rightRowSet.lastRowKey();
if (rightRowKeyForState > FIRST_DUPLICATE) {
return rightRowKeyForState;
}
if (joinType == NaturalJoinType.ERROR_ON_DUPLICATE || joinType == NaturalJoinType.EXACTLY_ONE_MATCH) {
throw new IllegalStateException("Natural Join found duplicate right key for "
+ extractKeyStringFromSourceTable(leftRowKey));
}
return rightRowKeyForState;
final long location = duplicateLocationFromRowKey(rightRowKeyForState);
final WritableRowSet rightRowSet = rightSideDuplicateRowSets.getUnsafe(location);
return joinType == NaturalJoinType.FIRST_MATCH
? rightRowSet.firstRowKey()
: rightRowSet.lastRowKey();
}

public WritableRowRedirection buildIndexedRowRedirection(
Expand Down Expand Up @@ -518,11 +518,11 @@ public WritableRowRedirection buildIndexedRowRedirection(
// Reset mainLeftRowSet to contain the indexed row set.
mainLeftRowSet.set(ii, leftRowSetForKey.copy());
final long leftRowKey = leftRowSetForKey.firstRowKey();
final long rightRowKeyForState = mainRightRowKey.getUnsafe(ii);
final long key = getRightRowKeyFromState(leftRowKey, rightRowKeyForState, joinType);
checkExactMatch(joinType, leftRowKey, key);
final long rightState = mainRightRowKey.getUnsafe(ii);
final long rightRowKey = getRightRowKeyFromState(leftRowKey, rightState, joinType);
checkExactMatch(joinType, leftRowKey, rightRowKey);
// Set unconditionally, need to populate the entire array with NULL_ROW_KEY or the RHS key
leftRowSetForKey.forAllRowKeys(pos -> innerIndex[(int) pos] = key);
leftRowSetForKey.forAllRowKeys(pos -> innerIndex[(int) pos] = rightRowKey);
}
}

Expand All @@ -539,12 +539,12 @@ public WritableRowRedirection buildIndexedRowRedirection(
// Reset mainLeftRowSet to contain the indexed row set.
mainLeftRowSet.set(ii, leftRowSetForKey.copy());
final long leftRowKey = leftRowSetForKey.firstRowKey();
final long rightRowKeyForState = mainRightRowKey.getUnsafe(ii);
final long key = getRightRowKeyFromState(leftRowKey, rightRowKeyForState, joinType);
if (key == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowKey, key);
final long rightState = mainRightRowKey.getUnsafe(ii);
final long rightRowKey = getRightRowKeyFromState(leftRowKey, rightState, joinType);
if (rightRowKey == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowKey, rightRowKey);
} else {
leftRowSetForKey.forAllRowKeys(pos -> sparseRedirections.set(pos, key));
leftRowSetForKey.forAllRowKeys(pos -> sparseRedirections.set(pos, rightRowKey));
}
}
}
Expand All @@ -562,12 +562,12 @@ public WritableRowRedirection buildIndexedRowRedirection(
// Reset mainLeftRowSet to contain the indexed row set.
mainLeftRowSet.set(ii, leftRowSetForKey.copy());
final long leftRowKey = leftRowSetForKey.firstRowKey();
final long rightRowKeyForState = mainRightRowKey.getUnsafe(ii);
final long key = getRightRowKeyFromState(leftRowKey, rightRowKeyForState, joinType);
if (key == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowKey, key);
final long rightState = mainRightRowKey.getUnsafe(ii);
final long rightRowKey = getRightRowKeyFromState(leftRowKey, rightState, joinType);
if (rightRowKey == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowKey, rightRowKey);
} else {
leftRowSetForKey.forAllRowKeys(pos -> rowRedirection.put(pos, key));
leftRowSetForKey.forAllRowKeys(pos -> rowRedirection.put(pos, rightRowKey));
}
}
}
Expand All @@ -593,11 +593,11 @@ public WritableRowRedirection buildRowRedirectionFromRedirections(QueryTable lef
final WritableRowSet leftRowSet = this.mainLeftRowSet.getUnsafe(ii);
if (leftRowSet != null && !leftRowSet.isEmpty()) {
final long leftRowKey = leftRowSet.firstRowKey();
final long rightRowKeyForState = mainRightRowKey.getUnsafe(ii);
final long key = getRightRowKeyFromState(leftRowKey, rightRowKeyForState, joinType);
checkExactMatch(joinType, leftRowKey, key);
final long rightState = mainRightRowKey.getUnsafe(ii);
final long rightRowKey = getRightRowKeyFromState(leftRowKey, rightState, joinType);
checkExactMatch(joinType, leftRowKey, rightRowKey);
// Set unconditionally, need to populate the entire array with NULL_ROW_KEY or the RHS key
leftRowSet.forAllRowKeys(pos -> innerIndex[(int) pos] = key);
leftRowSet.forAllRowKeys(pos -> innerIndex[(int) pos] = rightRowKey);
}
}
return new ContiguousWritableRowRedirection(innerIndex);
Expand All @@ -608,12 +608,12 @@ public WritableRowRedirection buildRowRedirectionFromRedirections(QueryTable lef
final WritableRowSet leftRowSet = this.mainLeftRowSet.getUnsafe(ii);
if (leftRowSet != null && !leftRowSet.isEmpty()) {
final long leftRowKey = leftRowSet.firstRowKey();
final long rightRowKeyForState = mainRightRowKey.getUnsafe(ii);
final long key = getRightRowKeyFromState(leftRowKey, rightRowKeyForState, joinType);
if (key == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowKey, key);
final long rightState = mainRightRowKey.getUnsafe(ii);
final long rightRowKey = getRightRowKeyFromState(leftRowKey, rightState, joinType);
if (rightRowKey == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowKey, rightRowKey);
} else {
leftRowSet.forAllRowKeys(pos -> sparseRedirections.set(pos, key));
leftRowSet.forAllRowKeys(pos -> sparseRedirections.set(pos, rightRowKey));
}
}
}
Expand All @@ -626,12 +626,12 @@ public WritableRowRedirection buildRowRedirectionFromRedirections(QueryTable lef
final WritableRowSet leftRowSet = this.mainLeftRowSet.getUnsafe(ii);
if (leftRowSet != null && !leftRowSet.isEmpty()) {
final long leftRowKey = leftRowSet.firstRowKey();
final long rightRowKeyForState = mainRightRowKey.getUnsafe(ii);
final long key = getRightRowKeyFromState(leftRowKey, rightRowKeyForState, joinType);
if (key == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowKey, key);
final long rightState = mainRightRowKey.getUnsafe(ii);
final long rightRowKey = getRightRowKeyFromState(leftRowKey, rightState, joinType);
if (rightRowKey == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowKey, rightRowKey);
} else {
leftRowSet.forAllRowKeys(pos -> rowRedirection.put(pos, key));
leftRowSet.forAllRowKeys(pos -> rowRedirection.put(pos, rightRowKey));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,18 +298,18 @@ private long getRightRowKeyFromState(
final long leftRowKey,
final long rightRowKeyForState,
final NaturalJoinType joinType) {
if (rightRowKeyForState <= FIRST_DUPLICATE) {
if (joinType == NaturalJoinType.ERROR_ON_DUPLICATE || joinType == NaturalJoinType.EXACTLY_ONE_MATCH) {
throw new IllegalStateException("Natural Join found duplicate right key for "
+ extractKeyStringFromSourceTable(leftRowKey));
}
final long location = duplicateLocationFromRowKey(rightRowKeyForState);
final WritableRowSet rightRowSet = rightSideDuplicateRowSets.getUnsafe(location);
return joinType == NaturalJoinType.FIRST_MATCH
? rightRowSet.firstRowKey()
: rightRowSet.lastRowKey();
if (rightRowKeyForState > FIRST_DUPLICATE) {
return rightRowKeyForState;
}
if (joinType == NaturalJoinType.ERROR_ON_DUPLICATE || joinType == NaturalJoinType.EXACTLY_ONE_MATCH) {
throw new IllegalStateException("Natural Join found duplicate right key for "
+ extractKeyStringFromSourceTable(leftRowKey));
}
return rightRowKeyForState;
final long location = duplicateLocationFromRowKey(rightRowKeyForState);
final WritableRowSet rightRowSet = rightSideDuplicateRowSets.getUnsafe(location);
return joinType == NaturalJoinType.FIRST_MATCH
? rightRowSet.firstRowKey()
: rightRowSet.lastRowKey();
}

@Override
Expand All @@ -328,11 +328,11 @@ public WritableRowRedirection buildRowRedirectionFromHashSlot(QueryTable leftTab
final WritableRowSet leftRowSet = this.leftRowSet.getUnsafe(ii);
if (leftRowSet != null) {
final long leftRowKey = leftRowSet.firstRowKey();
final long rightRowKeyForState = rightRowKey.getUnsafe(ii);
final long key = getRightRowKeyFromState(leftRowKey, rightRowKeyForState, joinType);
checkExactMatch(joinType, leftRowKey, key);
final long rightState = rightRowKey.getUnsafe(ii);
final long rightRowKey = getRightRowKeyFromState(leftRowKey, rightState, joinType);
checkExactMatch(joinType, leftRowKey, rightRowKey);
// Set unconditionally, need to populate the entire array with NULL_ROW_KEY or the RHS key
leftRowSet.forAllRowKeys(pos -> innerIndex[(int) pos] = key);
leftRowSet.forAllRowKeys(pos -> innerIndex[(int) pos] = rightRowKey);
}
}

Expand All @@ -344,12 +344,12 @@ public WritableRowRedirection buildRowRedirectionFromHashSlot(QueryTable leftTab
final WritableRowSet leftRowSet = this.leftRowSet.getUnsafe(ii);
if (leftRowSet != null) {
final long leftRowKey = leftRowSet.firstRowKey();
final long rightRowKeyForState = rightRowKey.getUnsafe(ii);
final long key = getRightRowKeyFromState(leftRowKey, rightRowKeyForState, joinType);
if (key == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowKey, key);
final long rightState = rightRowKey.getUnsafe(ii);
final long rightRowKey = getRightRowKeyFromState(leftRowKey, rightState, joinType);
if (rightRowKey == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowKey, rightRowKey);
} else {
leftRowSet.forAllRowKeys(pos -> sparseRedirections.set(pos, key));
leftRowSet.forAllRowKeys(pos -> sparseRedirections.set(pos, rightRowKey));
}
}
}
Expand All @@ -362,12 +362,12 @@ public WritableRowRedirection buildRowRedirectionFromHashSlot(QueryTable leftTab
final WritableRowSet leftRowSet = this.leftRowSet.getUnsafe(ii);
if (leftRowSet != null) {
final long leftRowKey = leftRowSet.firstRowKey();
final long rightRowKeyForState = rightRowKey.getUnsafe(ii);
final long key = getRightRowKeyFromState(leftRowKey, rightRowKeyForState, joinType);
if (key == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowKey, key);
final long rightState = rightRowKey.getUnsafe(ii);
final long rightRowKey = getRightRowKeyFromState(leftRowKey, rightState, joinType);
if (rightRowKey == RowSet.NULL_ROW_KEY) {
checkExactMatch(joinType, leftRowKey, rightRowKey);
} else {
leftRowSet.forAllRowKeys(pos -> rowRedirection.put(pos, key));
leftRowSet.forAllRowKeys(pos -> rowRedirection.put(pos, rightRowKey));
}
}
}
Expand Down
Loading

0 comments on commit ae241ea

Please sign in to comment.