Skip to content

Commit

Permalink
Consult Block#mayHaveNull() outside of HashSemiJoinOperator loop
Browse files Browse the repository at this point in the history
  • Loading branch information
pettyjamesm committed Jul 27, 2021
1 parent 4bf1467 commit f4cb08a
Showing 1 changed file with 12 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,11 @@ public WorkProcessor<Page> getOutputPages()
private static class SemiJoinPages
implements WorkProcessor.Transformation<Page, Page>
{
private static final int NO_PRECOMPUTED_HASH_CHANNEL = -1;

private final int probeJoinChannel;
private final int probeHashChannel; // when >= 0, this is the precomputed hash channel
private final ListenableFuture<ChannelSet> channelSetFuture;
private final Optional<Integer> probeHashChannel;
private final LocalMemoryContext localMemoryContext;

@Nullable
Expand All @@ -160,7 +162,7 @@ public SemiJoinPages(SetSupplier channelSetFuture, int probeJoinChannel, Optiona

this.channelSetFuture = requireNonNull(channelSetFuture, "channelSetFuture is null").getChannelSet();
this.probeJoinChannel = probeJoinChannel;
this.probeHashChannel = requireNonNull(probeHashChannel, "probeHashChannel is null");
this.probeHashChannel = requireNonNull(probeHashChannel, "probeHashChannel is null").orElse(NO_PRECOMPUTED_HASH_CHANNEL);
this.localMemoryContext = requireNonNull(aggregatedMemoryContext, "aggregatedMemoryContext is null").newLocalMemoryContext(SemiJoinPages.class.getSimpleName());
}

Expand All @@ -181,17 +183,20 @@ public TransformationState<Page> process(Page inputPage)
channelSet = getFutureValue(channelSetFuture);
localMemoryContext.setBytes(0);
}
// use an effectively-final local variable instead of the non-final instance field inside of the loop
ChannelSet channelSet = requireNonNull(this.channelSet, "channelSet is null");

// create the block builder for the new boolean column
// we know the exact size required for the block
BlockBuilder blockBuilder = BOOLEAN.createFixedSizeBlockBuilder(inputPage.getPositionCount());

Page probeJoinPage = inputPage.getColumns(probeJoinChannel);
Optional<Block> hashBlock = probeHashChannel.map(inputPage::getBlock);
Page probeJoinPage = inputPage.getLoadedPage(probeJoinChannel);
Block probeJoinNulls = probeJoinPage.getBlock(0).mayHaveNull() ? probeJoinPage.getBlock(0) : null;
Block hashBlock = probeHashChannel >= 0 ? inputPage.getBlock(probeHashChannel) : null;

// update hashing strategy to use probe cursor
for (int position = 0; position < inputPage.getPositionCount(); position++) {
if (probeJoinPage.getBlock(0).isNull(position)) {
if (probeJoinNulls != null && probeJoinNulls.isNull(position)) {
if (channelSet.isEmpty()) {
BOOLEAN.writeBoolean(blockBuilder, false);
}
Expand All @@ -201,8 +206,8 @@ public TransformationState<Page> process(Page inputPage)
}
else {
boolean contains;
if (hashBlock.isPresent()) {
long rawHash = BIGINT.getLong(hashBlock.get(), position);
if (hashBlock != null) {
long rawHash = BIGINT.getLong(hashBlock, position);
contains = channelSet.contains(position, probeJoinPage, rawHash);
}
else {
Expand Down

0 comments on commit f4cb08a

Please sign in to comment.