Skip to content

Commit

Permalink
Remove unnecessary OperatorFactories interface
Browse files Browse the repository at this point in the history
  • Loading branch information
raunaqmorarka committed Jul 27, 2023
1 parent ec8e763 commit e146c8b
Show file tree
Hide file tree
Showing 18 changed files with 215 additions and 347 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.operator;

import io.trino.operator.join.LookupJoinOperatorFactory;
import io.trino.sql.planner.plan.JoinNode;

import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.FULL_OUTER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.INNER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.LOOKUP_OUTER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.PROBE_OUTER;
import static java.util.Objects.requireNonNull;

public class JoinOperatorType
{
private final LookupJoinOperatorFactory.JoinType type;
private final boolean outputSingleMatch;
private final boolean waitForBuild;

public static JoinOperatorType ofJoinNodeType(JoinNode.Type joinNodeType, boolean outputSingleMatch, boolean waitForBuild)
{
return switch (joinNodeType) {
case INNER -> innerJoin(outputSingleMatch, waitForBuild);
case LEFT -> probeOuterJoin(outputSingleMatch);
case RIGHT -> lookupOuterJoin(waitForBuild);
case FULL -> fullOuterJoin();
};
}

public static JoinOperatorType innerJoin(boolean outputSingleMatch, boolean waitForBuild)
{
return new JoinOperatorType(INNER, outputSingleMatch, waitForBuild);
}

public static JoinOperatorType probeOuterJoin(boolean outputSingleMatch)
{
return new JoinOperatorType(PROBE_OUTER, outputSingleMatch, false);
}

public static JoinOperatorType lookupOuterJoin(boolean waitForBuild)
{
return new JoinOperatorType(LOOKUP_OUTER, false, waitForBuild);
}

public static JoinOperatorType fullOuterJoin()
{
return new JoinOperatorType(FULL_OUTER, false, false);
}

private JoinOperatorType(LookupJoinOperatorFactory.JoinType type, boolean outputSingleMatch, boolean waitForBuild)
{
this.type = requireNonNull(type, "type is null");
this.outputSingleMatch = outputSingleMatch;
this.waitForBuild = waitForBuild;
}

public boolean isOutputSingleMatch()
{
return outputSingleMatch;
}

public boolean isWaitForBuild()
{
return waitForBuild;
}

public LookupJoinOperatorFactory.JoinType getType()
{
return type;
}
}
126 changes: 57 additions & 69 deletions core/trino-main/src/main/java/io/trino/operator/OperatorFactories.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,28 @@
package io.trino.operator;

import io.trino.operator.join.JoinBridgeManager;
import io.trino.operator.join.LookupJoinOperatorFactory.JoinType;
import io.trino.operator.join.JoinProbe.JoinProbeFactory;
import io.trino.operator.join.LookupJoinOperatorFactory;
import io.trino.operator.join.LookupSourceFactory;
import io.trino.operator.join.unspilled.JoinProbe;
import io.trino.operator.join.unspilled.PartitionedLookupSourceFactory;
import io.trino.spi.type.Type;
import io.trino.spiller.PartitioningSpillerFactory;
import io.trino.sql.planner.plan.JoinNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.type.BlockTypeOperators;

import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.stream.IntStream;

import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.FULL_OUTER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.INNER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.LOOKUP_OUTER;
import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.PROBE_OUTER;
import static java.util.Objects.requireNonNull;
import static com.google.common.collect.ImmutableList.toImmutableList;

public interface OperatorFactories
public class OperatorFactories
{
OperatorFactory join(
private OperatorFactories() {}

public static OperatorFactory join(
JoinOperatorType joinType,
int operatorId,
PlanNodeId planNodeId,
Expand All @@ -44,10 +44,29 @@ OperatorFactory join(
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
BlockTypeOperators blockTypeOperators);
Optional<List<Integer>> probeOutputChannelsOptional,
BlockTypeOperators blockTypeOperators)
{
List<Integer> probeOutputChannels = probeOutputChannelsOptional.orElseGet(() -> rangeList(probeTypes.size()));
List<Type> probeOutputChannelTypes = probeOutputChannels.stream()
.map(probeTypes::get)
.collect(toImmutableList());

return new io.trino.operator.join.unspilled.LookupJoinOperatorFactory(
operatorId,
planNodeId,
lookupSourceFactory,
probeTypes,
probeOutputChannelTypes,
lookupSourceFactory.getBuildOutputTypes(),
joinType,
new JoinProbe.JoinProbeFactory(probeOutputChannels, probeJoinChannel, probeHashChannel, hasFilter),
blockTypeOperators,
probeJoinChannel,
probeHashChannel);
}

OperatorFactory spillingJoin(
public static OperatorFactory spillingJoin(
JoinOperatorType joinType,
int operatorId,
PlanNodeId planNodeId,
Expand All @@ -56,67 +75,36 @@ OperatorFactory spillingJoin(
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
Optional<List<Integer>> probeOutputChannelsOptional,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
BlockTypeOperators blockTypeOperators);

class JoinOperatorType
BlockTypeOperators blockTypeOperators)
{
private final JoinType type;
private final boolean outputSingleMatch;
private final boolean waitForBuild;

public static JoinOperatorType ofJoinNodeType(JoinNode.Type joinNodeType, boolean outputSingleMatch, boolean waitForBuild)
{
return switch (joinNodeType) {
case INNER -> innerJoin(outputSingleMatch, waitForBuild);
case LEFT -> probeOuterJoin(outputSingleMatch);
case RIGHT -> lookupOuterJoin(waitForBuild);
case FULL -> fullOuterJoin();
};
}

public static JoinOperatorType innerJoin(boolean outputSingleMatch, boolean waitForBuild)
{
return new JoinOperatorType(INNER, outputSingleMatch, waitForBuild);
}
List<Integer> probeOutputChannels = probeOutputChannelsOptional.orElseGet(() -> rangeList(probeTypes.size()));
List<Type> probeOutputChannelTypes = probeOutputChannels.stream()
.map(probeTypes::get)
.collect(toImmutableList());

public static JoinOperatorType probeOuterJoin(boolean outputSingleMatch)
{
return new JoinOperatorType(PROBE_OUTER, outputSingleMatch, false);
}

public static JoinOperatorType lookupOuterJoin(boolean waitForBuild)
{
return new JoinOperatorType(LOOKUP_OUTER, false, waitForBuild);
}

public static JoinOperatorType fullOuterJoin()
{
return new JoinOperatorType(FULL_OUTER, false, false);
}

private JoinOperatorType(JoinType type, boolean outputSingleMatch, boolean waitForBuild)
{
this.type = requireNonNull(type, "type is null");
this.outputSingleMatch = outputSingleMatch;
this.waitForBuild = waitForBuild;
}

public boolean isOutputSingleMatch()
{
return outputSingleMatch;
}

public boolean isWaitForBuild()
{
return waitForBuild;
}
return new LookupJoinOperatorFactory(
operatorId,
planNodeId,
lookupSourceFactory,
probeTypes,
probeOutputChannelTypes,
lookupSourceFactory.getBuildOutputTypes(),
joinType,
new JoinProbeFactory(probeOutputChannels.stream().mapToInt(i -> i).toArray(), probeJoinChannel, probeHashChannel),
blockTypeOperators,
totalOperatorsCount,
probeJoinChannel,
probeHashChannel,
partitioningSpillerFactory);
}

public JoinType getType()
{
return type;
}
private static List<Integer> rangeList(int endExclusive)
{
return IntStream.range(0, endExclusive)
.boxed()
.collect(toImmutableList());
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import io.trino.operator.DriverContext;
import io.trino.operator.HashGenerator;
import io.trino.operator.InterpretedHashGenerator;
import io.trino.operator.JoinOperatorType;
import io.trino.operator.Operator;
import io.trino.operator.OperatorContext;
import io.trino.operator.OperatorFactories.JoinOperatorType;
import io.trino.operator.OperatorFactory;
import io.trino.operator.PrecomputedHashGenerator;
import io.trino.operator.ProcessorContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import io.trino.operator.DriverContext;
import io.trino.operator.HashGenerator;
import io.trino.operator.InterpretedHashGenerator;
import io.trino.operator.JoinOperatorType;
import io.trino.operator.Operator;
import io.trino.operator.OperatorContext;
import io.trino.operator.OperatorFactories.JoinOperatorType;
import io.trino.operator.OperatorFactory;
import io.trino.operator.PrecomputedHashGenerator;
import io.trino.operator.ProcessorContext;
Expand Down
Loading

0 comments on commit e146c8b

Please sign in to comment.