Skip to content

Commit

Permalink
Inline method
Browse files Browse the repository at this point in the history
This makes further refactorings easier to read.
  • Loading branch information
skrzypo987 authored and sopel39 committed Sep 5, 2022
1 parent 33e4afe commit ebecf59
Showing 1 changed file with 82 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2681,15 +2681,88 @@ private PhysicalOperation createLookupJoin(
JoinBridgeManager<?> lookupSourceFactory =
createLookupSourceFactoryManager(node, buildNode, buildSymbols, buildHashSymbol, probeSource, context, spillEnabled, localDynamicFilters);

OperatorFactory operator = createLookupJoin(
node,
probeSource,
probeSymbols,
probeHashSymbol,
lookupSourceFactory,
context,
spillEnabled,
!localDynamicFilters.isEmpty());
boolean consumedLocalDynamicFilters = !localDynamicFilters.isEmpty();
List<Type> probeTypes = probeSource.getTypes();
List<Integer> probeOutputChannels = ImmutableList.copyOf(getChannelsForSymbols(node.getLeftOutputSymbols(), probeSource.getLayout()));
List<Integer> probeJoinChannels = ImmutableList.copyOf(getChannelsForSymbols(probeSymbols, probeSource.getLayout()));
OptionalInt probeHashChannel = probeHashSymbol.map(channelGetter(probeSource))
.map(OptionalInt::of).orElse(OptionalInt.empty());
OptionalInt totalOperatorsCount = OptionalInt.empty();
if (spillEnabled) {
totalOperatorsCount = context.getDriverInstanceCount();
checkState(totalOperatorsCount.isPresent(), "A fixed distribution is required for JOIN when spilling is enabled");
}

// Implementation of hash join operator may only take advantage of output duplicates insensitive joins when:
// 1. Join is of INNER or LEFT type. For right or full joins all matching build rows must be tagged as visited.
// 2. Right (build) output symbols are subset of equi-clauses right symbols. If additional build symbols
// are produced, then skipping build rows could skip some distinct rows.
boolean outputSingleMatch = node.isMaySkipOutputDuplicates() &&
node.getCriteria().stream()
.map(JoinNode.EquiJoinClause::getRight)
.collect(toImmutableSet())
.containsAll(node.getRightOutputSymbols());
// Wait for build side to be collected before local dynamic filters are
// consumed by table scan. This way table scan can filter data more efficiently.
boolean waitForBuild = consumedLocalDynamicFilters;
OperatorFactory operator = switch (node.getType()) {
case INNER -> operatorFactories.innerJoin(
context.getNextOperatorId(),
node.getId(),
lookupSourceFactory,
outputSingleMatch,
waitForBuild,
node.getFilter().isPresent(),
useSpillingJoinOperator(spillEnabled, session),
probeTypes,
probeJoinChannels,
probeHashChannel,
Optional.of(probeOutputChannels),
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
case LEFT -> operatorFactories.probeOuterJoin(
context.getNextOperatorId(),
node.getId(),
lookupSourceFactory,
outputSingleMatch,
node.getFilter().isPresent(),
useSpillingJoinOperator(spillEnabled, session),
probeTypes,
probeJoinChannels,
probeHashChannel,
Optional.of(probeOutputChannels),
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
case RIGHT -> operatorFactories.lookupOuterJoin(
context.getNextOperatorId(),
node.getId(),
lookupSourceFactory,
waitForBuild,
node.getFilter().isPresent(),
useSpillingJoinOperator(spillEnabled, session),
probeTypes,
probeJoinChannels,
probeHashChannel,
Optional.of(probeOutputChannels),
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
case FULL -> operatorFactories.fullOuterJoin(
context.getNextOperatorId(),
node.getId(),
lookupSourceFactory,
node.getFilter().isPresent(),
useSpillingJoinOperator(spillEnabled, session),
probeTypes,
probeJoinChannels,
probeHashChannel,
Optional.of(probeOutputChannels),
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
};

ImmutableMap.Builder<Symbol, Integer> outputMappings = ImmutableMap.builder();
List<Symbol> outputSymbols = node.getOutputSymbols();
Expand Down Expand Up @@ -2958,99 +3031,6 @@ private JoinFilterFunctionFactory compileJoinFilterFunction(
return joinFilterFunctionCompiler.compileJoinFilterFunction(translatedFilter, buildLayout.size());
}

private OperatorFactory createLookupJoin(
JoinNode node,
PhysicalOperation probeSource,
List<Symbol> probeSymbols,
Optional<Symbol> probeHashSymbol,
JoinBridgeManager<?> lookupSourceFactoryManager,
LocalExecutionPlanContext context,
boolean spillEnabled,
boolean consumedLocalDynamicFilters)
{
List<Type> probeTypes = probeSource.getTypes();
List<Integer> probeOutputChannels = ImmutableList.copyOf(getChannelsForSymbols(node.getLeftOutputSymbols(), probeSource.getLayout()));
List<Integer> probeJoinChannels = ImmutableList.copyOf(getChannelsForSymbols(probeSymbols, probeSource.getLayout()));
OptionalInt probeHashChannel = probeHashSymbol.map(channelGetter(probeSource))
.map(OptionalInt::of).orElse(OptionalInt.empty());
OptionalInt totalOperatorsCount = OptionalInt.empty();
if (spillEnabled) {
totalOperatorsCount = context.getDriverInstanceCount();
checkState(totalOperatorsCount.isPresent(), "A fixed distribution is required for JOIN when spilling is enabled");
}

// Implementation of hash join operator may only take advantage of output duplicates insensitive joins when:
// 1. Join is of INNER or LEFT type. For right or full joins all matching build rows must be tagged as visited.
// 2. Right (build) output symbols are subset of equi-clauses right symbols. If additional build symbols
// are produced, then skipping build rows could skip some distinct rows.
boolean outputSingleMatch = node.isMaySkipOutputDuplicates() &&
node.getCriteria().stream()
.map(JoinNode.EquiJoinClause::getRight)
.collect(toImmutableSet())
.containsAll(node.getRightOutputSymbols());
// Wait for build side to be collected before local dynamic filters are
// consumed by table scan. This way table scan can filter data more efficiently.
boolean waitForBuild = consumedLocalDynamicFilters;
return switch (node.getType()) {
case INNER -> operatorFactories.innerJoin(
context.getNextOperatorId(),
node.getId(),
lookupSourceFactoryManager,
outputSingleMatch,
waitForBuild,
node.getFilter().isPresent(),
useSpillingJoinOperator(spillEnabled, session),
probeTypes,
probeJoinChannels,
probeHashChannel,
Optional.of(probeOutputChannels),
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
case LEFT -> operatorFactories.probeOuterJoin(
context.getNextOperatorId(),
node.getId(),
lookupSourceFactoryManager,
outputSingleMatch,
node.getFilter().isPresent(),
useSpillingJoinOperator(spillEnabled, session),
probeTypes,
probeJoinChannels,
probeHashChannel,
Optional.of(probeOutputChannels),
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
case RIGHT -> operatorFactories.lookupOuterJoin(
context.getNextOperatorId(),
node.getId(),
lookupSourceFactoryManager,
waitForBuild,
node.getFilter().isPresent(),
useSpillingJoinOperator(spillEnabled, session),
probeTypes,
probeJoinChannels,
probeHashChannel,
Optional.of(probeOutputChannels),
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
case FULL -> operatorFactories.fullOuterJoin(
context.getNextOperatorId(),
node.getId(),
lookupSourceFactoryManager,
node.getFilter().isPresent(),
useSpillingJoinOperator(spillEnabled, session),
probeTypes,
probeJoinChannels,
probeHashChannel,
Optional.of(probeOutputChannels),
totalOperatorsCount,
partitioningSpillerFactory,
blockTypeOperators);
};
}

private Map<Symbol, Integer> createJoinSourcesLayout(Map<Symbol, Integer> lookupSourceLayout, Map<Symbol, Integer> probeSourceLayout)
{
ImmutableMap.Builder<Symbol, Integer> joinSourcesLayout = ImmutableMap.builder();
Expand Down

0 comments on commit ebecf59

Please sign in to comment.