diff --git a/core/trino-main/src/main/java/io/trino/operator/JoinOperatorType.java b/core/trino-main/src/main/java/io/trino/operator/JoinOperatorType.java new file mode 100644 index 000000000000..f43771e69976 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/JoinOperatorType.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator; + +import io.trino.operator.join.LookupJoinOperatorFactory; +import io.trino.sql.planner.plan.JoinNode; + +import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.FULL_OUTER; +import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.INNER; +import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.LOOKUP_OUTER; +import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.PROBE_OUTER; +import static java.util.Objects.requireNonNull; + +public class JoinOperatorType +{ + private final LookupJoinOperatorFactory.JoinType type; + private final boolean outputSingleMatch; + private final boolean waitForBuild; + + public static JoinOperatorType ofJoinNodeType(JoinNode.Type joinNodeType, boolean outputSingleMatch, boolean waitForBuild) + { + return switch (joinNodeType) { + case INNER -> innerJoin(outputSingleMatch, waitForBuild); + case LEFT -> probeOuterJoin(outputSingleMatch); + case RIGHT -> lookupOuterJoin(waitForBuild); + case FULL -> fullOuterJoin(); + }; + } + + public static JoinOperatorType innerJoin(boolean outputSingleMatch, boolean waitForBuild) + { + return new JoinOperatorType(INNER, outputSingleMatch, waitForBuild); + } + + public static JoinOperatorType probeOuterJoin(boolean outputSingleMatch) + { + return new JoinOperatorType(PROBE_OUTER, outputSingleMatch, false); + } + + public static JoinOperatorType lookupOuterJoin(boolean waitForBuild) + { + return new JoinOperatorType(LOOKUP_OUTER, false, waitForBuild); + } + + public static JoinOperatorType fullOuterJoin() + { + return new JoinOperatorType(FULL_OUTER, false, false); + } + + private JoinOperatorType(LookupJoinOperatorFactory.JoinType type, boolean outputSingleMatch, boolean waitForBuild) + { + this.type = requireNonNull(type, "type is null"); + this.outputSingleMatch = outputSingleMatch; + this.waitForBuild = waitForBuild; + } + + public boolean isOutputSingleMatch() + { + return outputSingleMatch; + } + + public boolean isWaitForBuild() + { + return waitForBuild; + } + + public LookupJoinOperatorFactory.JoinType getType() + { + return type; + } +} diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorFactories.java b/core/trino-main/src/main/java/io/trino/operator/OperatorFactories.java index 3285660ebaa7..b72d85f19895 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorFactories.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorFactories.java @@ -14,28 +14,28 @@ package io.trino.operator; import io.trino.operator.join.JoinBridgeManager; -import io.trino.operator.join.LookupJoinOperatorFactory.JoinType; +import io.trino.operator.join.JoinProbe.JoinProbeFactory; +import io.trino.operator.join.LookupJoinOperatorFactory; import io.trino.operator.join.LookupSourceFactory; +import io.trino.operator.join.unspilled.JoinProbe; import io.trino.operator.join.unspilled.PartitionedLookupSourceFactory; import io.trino.spi.type.Type; import io.trino.spiller.PartitioningSpillerFactory; -import io.trino.sql.planner.plan.JoinNode; import io.trino.sql.planner.plan.PlanNodeId; import io.trino.type.BlockTypeOperators; import java.util.List; import java.util.Optional; import java.util.OptionalInt; +import java.util.stream.IntStream; -import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.FULL_OUTER; -import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.INNER; -import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.LOOKUP_OUTER; -import static io.trino.operator.join.LookupJoinOperatorFactory.JoinType.PROBE_OUTER; -import static java.util.Objects.requireNonNull; +import static com.google.common.collect.ImmutableList.toImmutableList; -public interface OperatorFactories +public class OperatorFactories { - OperatorFactory join( + private OperatorFactories() {} + + public static OperatorFactory join( JoinOperatorType joinType, int operatorId, PlanNodeId planNodeId, @@ -44,10 +44,29 @@ OperatorFactory join( List probeTypes, List probeJoinChannel, OptionalInt probeHashChannel, - Optional> probeOutputChannels, - BlockTypeOperators blockTypeOperators); + Optional> probeOutputChannelsOptional, + BlockTypeOperators blockTypeOperators) + { + List probeOutputChannels = probeOutputChannelsOptional.orElseGet(() -> rangeList(probeTypes.size())); + List probeOutputChannelTypes = probeOutputChannels.stream() + .map(probeTypes::get) + .collect(toImmutableList()); + + return new io.trino.operator.join.unspilled.LookupJoinOperatorFactory( + operatorId, + planNodeId, + lookupSourceFactory, + probeTypes, + probeOutputChannelTypes, + lookupSourceFactory.getBuildOutputTypes(), + joinType, + new JoinProbe.JoinProbeFactory(probeOutputChannels, probeJoinChannel, probeHashChannel, hasFilter), + blockTypeOperators, + probeJoinChannel, + probeHashChannel); + } - OperatorFactory spillingJoin( + public static OperatorFactory spillingJoin( JoinOperatorType joinType, int operatorId, PlanNodeId planNodeId, @@ -56,67 +75,36 @@ OperatorFactory spillingJoin( List probeTypes, List probeJoinChannel, OptionalInt probeHashChannel, - Optional> probeOutputChannels, + Optional> probeOutputChannelsOptional, OptionalInt totalOperatorsCount, PartitioningSpillerFactory partitioningSpillerFactory, - BlockTypeOperators blockTypeOperators); - - class JoinOperatorType + BlockTypeOperators blockTypeOperators) { - private final JoinType type; - private final boolean outputSingleMatch; - private final boolean waitForBuild; - - public static JoinOperatorType ofJoinNodeType(JoinNode.Type joinNodeType, boolean outputSingleMatch, boolean waitForBuild) - { - return switch (joinNodeType) { - case INNER -> innerJoin(outputSingleMatch, waitForBuild); - case LEFT -> probeOuterJoin(outputSingleMatch); - case RIGHT -> lookupOuterJoin(waitForBuild); - case FULL -> fullOuterJoin(); - }; - } - - public static JoinOperatorType innerJoin(boolean outputSingleMatch, boolean waitForBuild) - { - return new JoinOperatorType(INNER, outputSingleMatch, waitForBuild); - } + List probeOutputChannels = probeOutputChannelsOptional.orElseGet(() -> rangeList(probeTypes.size())); + List probeOutputChannelTypes = probeOutputChannels.stream() + .map(probeTypes::get) + .collect(toImmutableList()); - public static JoinOperatorType probeOuterJoin(boolean outputSingleMatch) - { - return new JoinOperatorType(PROBE_OUTER, outputSingleMatch, false); - } - - public static JoinOperatorType lookupOuterJoin(boolean waitForBuild) - { - return new JoinOperatorType(LOOKUP_OUTER, false, waitForBuild); - } - - public static JoinOperatorType fullOuterJoin() - { - return new JoinOperatorType(FULL_OUTER, false, false); - } - - private JoinOperatorType(JoinType type, boolean outputSingleMatch, boolean waitForBuild) - { - this.type = requireNonNull(type, "type is null"); - this.outputSingleMatch = outputSingleMatch; - this.waitForBuild = waitForBuild; - } - - public boolean isOutputSingleMatch() - { - return outputSingleMatch; - } - - public boolean isWaitForBuild() - { - return waitForBuild; - } + return new LookupJoinOperatorFactory( + operatorId, + planNodeId, + lookupSourceFactory, + probeTypes, + probeOutputChannelTypes, + lookupSourceFactory.getBuildOutputTypes(), + joinType, + new JoinProbeFactory(probeOutputChannels.stream().mapToInt(i -> i).toArray(), probeJoinChannel, probeHashChannel), + blockTypeOperators, + totalOperatorsCount, + probeJoinChannel, + probeHashChannel, + partitioningSpillerFactory); + } - public JoinType getType() - { - return type; - } + private static List rangeList(int endExclusive) + { + return IntStream.range(0, endExclusive) + .boxed() + .collect(toImmutableList()); } } diff --git a/core/trino-main/src/main/java/io/trino/operator/TrinoOperatorFactories.java b/core/trino-main/src/main/java/io/trino/operator/TrinoOperatorFactories.java deleted file mode 100644 index 08f637998414..000000000000 --- a/core/trino-main/src/main/java/io/trino/operator/TrinoOperatorFactories.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.operator; - -import io.trino.operator.join.JoinBridgeManager; -import io.trino.operator.join.JoinProbe.JoinProbeFactory; -import io.trino.operator.join.LookupJoinOperatorFactory; -import io.trino.operator.join.LookupSourceFactory; -import io.trino.operator.join.unspilled.JoinProbe; -import io.trino.operator.join.unspilled.PartitionedLookupSourceFactory; -import io.trino.spi.type.Type; -import io.trino.spiller.PartitioningSpillerFactory; -import io.trino.sql.planner.plan.PlanNodeId; -import io.trino.type.BlockTypeOperators; - -import java.util.List; -import java.util.Optional; -import java.util.OptionalInt; -import java.util.stream.IntStream; - -import static com.google.common.collect.ImmutableList.toImmutableList; - -public class TrinoOperatorFactories - implements OperatorFactories -{ - @Override - public OperatorFactory join( - JoinOperatorType joinType, - int operatorId, - PlanNodeId planNodeId, - JoinBridgeManager lookupSourceFactory, - boolean hasFilter, - List probeTypes, - List probeJoinChannel, - OptionalInt probeHashChannel, - Optional> probeOutputChannelsOptional, - BlockTypeOperators blockTypeOperators) - { - List probeOutputChannels = probeOutputChannelsOptional.orElseGet(() -> rangeList(probeTypes.size())); - List probeOutputChannelTypes = probeOutputChannels.stream() - .map(probeTypes::get) - .collect(toImmutableList()); - - return new io.trino.operator.join.unspilled.LookupJoinOperatorFactory( - operatorId, - planNodeId, - lookupSourceFactory, - probeTypes, - probeOutputChannelTypes, - lookupSourceFactory.getBuildOutputTypes(), - joinType, - new JoinProbe.JoinProbeFactory(probeOutputChannels, probeJoinChannel, probeHashChannel, hasFilter), - blockTypeOperators, - probeJoinChannel, - probeHashChannel); - } - - @Override - public OperatorFactory spillingJoin( - JoinOperatorType joinType, - int operatorId, - PlanNodeId planNodeId, - JoinBridgeManager lookupSourceFactory, - boolean hasFilter, - List probeTypes, - List probeJoinChannel, - OptionalInt probeHashChannel, - Optional> probeOutputChannelsOptional, - OptionalInt totalOperatorsCount, - PartitioningSpillerFactory partitioningSpillerFactory, - BlockTypeOperators blockTypeOperators) - { - List probeOutputChannels = probeOutputChannelsOptional.orElseGet(() -> rangeList(probeTypes.size())); - List probeOutputChannelTypes = probeOutputChannels.stream() - .map(probeTypes::get) - .collect(toImmutableList()); - - return new LookupJoinOperatorFactory( - operatorId, - planNodeId, - lookupSourceFactory, - probeTypes, - probeOutputChannelTypes, - lookupSourceFactory.getBuildOutputTypes(), - joinType, - new JoinProbeFactory(probeOutputChannels.stream().mapToInt(i -> i).toArray(), probeJoinChannel, probeHashChannel), - blockTypeOperators, - totalOperatorsCount, - probeJoinChannel, - probeHashChannel, - partitioningSpillerFactory); - } - - private static List rangeList(int endExclusive) - { - return IntStream.range(0, endExclusive) - .boxed() - .collect(toImmutableList()); - } -} diff --git a/core/trino-main/src/main/java/io/trino/operator/join/LookupJoinOperatorFactory.java b/core/trino-main/src/main/java/io/trino/operator/join/LookupJoinOperatorFactory.java index db0babd09068..31ec0e2a0a1b 100644 --- a/core/trino-main/src/main/java/io/trino/operator/join/LookupJoinOperatorFactory.java +++ b/core/trino-main/src/main/java/io/trino/operator/join/LookupJoinOperatorFactory.java @@ -17,9 +17,9 @@ import io.trino.operator.DriverContext; import io.trino.operator.HashGenerator; import io.trino.operator.InterpretedHashGenerator; +import io.trino.operator.JoinOperatorType; import io.trino.operator.Operator; import io.trino.operator.OperatorContext; -import io.trino.operator.OperatorFactories.JoinOperatorType; import io.trino.operator.OperatorFactory; import io.trino.operator.PrecomputedHashGenerator; import io.trino.operator.ProcessorContext; diff --git a/core/trino-main/src/main/java/io/trino/operator/join/unspilled/LookupJoinOperatorFactory.java b/core/trino-main/src/main/java/io/trino/operator/join/unspilled/LookupJoinOperatorFactory.java index 6d6f39ae7f51..77d15fbec80e 100644 --- a/core/trino-main/src/main/java/io/trino/operator/join/unspilled/LookupJoinOperatorFactory.java +++ b/core/trino-main/src/main/java/io/trino/operator/join/unspilled/LookupJoinOperatorFactory.java @@ -17,9 +17,9 @@ import io.trino.operator.DriverContext; import io.trino.operator.HashGenerator; import io.trino.operator.InterpretedHashGenerator; +import io.trino.operator.JoinOperatorType; import io.trino.operator.Operator; import io.trino.operator.OperatorContext; -import io.trino.operator.OperatorFactories.JoinOperatorType; import io.trino.operator.OperatorFactory; import io.trino.operator.PrecomputedHashGenerator; import io.trino.operator.ProcessorContext; diff --git a/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java b/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java index 08a0be49f677..8f4502fbe51a 100644 --- a/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java +++ b/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java @@ -87,11 +87,9 @@ import io.trino.operator.DirectExchangeClientSupplier; import io.trino.operator.ForExchange; import io.trino.operator.GroupByHashPageIndexerFactory; -import io.trino.operator.OperatorFactories; import io.trino.operator.PagesIndex; import io.trino.operator.PagesIndexPageSorter; import io.trino.operator.RetryPolicy; -import io.trino.operator.TrinoOperatorFactories; import io.trino.operator.index.IndexJoinLookupStats; import io.trino.operator.scalar.json.JsonExistsFunction; import io.trino.operator.scalar.json.JsonQueryFunction; @@ -332,7 +330,6 @@ protected void setup(Binder binder) binder.bind(OrderingCompiler.class).in(Scopes.SINGLETON); newExporter(binder).export(OrderingCompiler.class).withGeneratedName(); binder.bind(PagesIndex.Factory.class).to(PagesIndex.DefaultFactory.class); - newOptionalBinder(binder, OperatorFactories.class).setDefault().to(TrinoOperatorFactories.class).in(Scopes.SINGLETON); jaxrsBinder(binder).bind(PagesResponseWriter.class); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index 4766f6ca2b68..44595589c953 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -63,6 +63,7 @@ import io.trino.operator.GroupIdOperator; import io.trino.operator.HashAggregationOperator.HashAggregationOperatorFactory; import io.trino.operator.HashSemiJoinOperator; +import io.trino.operator.JoinOperatorType; import io.trino.operator.LeafTableFunctionOperator.LeafTableFunctionOperatorFactory; import io.trino.operator.LimitOperator.LimitOperatorFactory; import io.trino.operator.LocalPlannerAware; @@ -70,8 +71,6 @@ import io.trino.operator.MergeOperator.MergeOperatorFactory; import io.trino.operator.MergeProcessorOperator; import io.trino.operator.MergeWriterOperator.MergeWriterOperatorFactory; -import io.trino.operator.OperatorFactories; -import io.trino.operator.OperatorFactories.JoinOperatorType; import io.trino.operator.OperatorFactory; import io.trino.operator.OrderByOperator.OrderByOperatorFactory; import io.trino.operator.OutputFactory; @@ -317,6 +316,8 @@ import static io.trino.cache.SafeCaches.buildNonEvictableCache; import static io.trino.operator.DistinctLimitOperator.DistinctLimitOperatorFactory; import static io.trino.operator.HashArraySizeSupplier.incrementalLoadFactorHashArraySizeSupplier; +import static io.trino.operator.OperatorFactories.join; +import static io.trino.operator.OperatorFactories.spillingJoin; import static io.trino.operator.TableFinishOperator.TableFinishOperatorFactory; import static io.trino.operator.TableFinishOperator.TableFinisher; import static io.trino.operator.TableWriterOperator.FRAGMENT_CHANNEL; @@ -411,7 +412,6 @@ public class LocalExecutionPlanner private final PartitioningSpillerFactory partitioningSpillerFactory; private final PagesIndex.Factory pagesIndexFactory; private final JoinCompiler joinCompiler; - private final OperatorFactories operatorFactories; private final OrderingCompiler orderingCompiler; private final int largeMaxDistinctValuesPerDriver; private final int largePartitionedMaxDistinctValuesPerDriver; @@ -462,7 +462,6 @@ public LocalExecutionPlanner( PartitioningSpillerFactory partitioningSpillerFactory, PagesIndex.Factory pagesIndexFactory, JoinCompiler joinCompiler, - OperatorFactories operatorFactories, OrderingCompiler orderingCompiler, DynamicFilterConfig dynamicFilterConfig, BlockTypeOperators blockTypeOperators, @@ -492,7 +491,6 @@ public LocalExecutionPlanner( this.maxLocalExchangeBufferSize = taskManagerConfig.getMaxLocalExchangeBufferSize(); this.pagesIndexFactory = requireNonNull(pagesIndexFactory, "pagesIndexFactory is null"); this.joinCompiler = requireNonNull(joinCompiler, "joinCompiler is null"); - this.operatorFactories = requireNonNull(operatorFactories, "operatorFactories is null"); this.orderingCompiler = requireNonNull(orderingCompiler, "orderingCompiler is null"); this.largeMaxDistinctValuesPerDriver = dynamicFilterConfig.getLargeMaxDistinctValuesPerDriver(); this.smallMaxDistinctValuesPerDriver = dynamicFilterConfig.getSmallMaxDistinctValuesPerDriver(); @@ -2455,7 +2453,7 @@ public PhysicalOperation visitIndexJoin(IndexJoinNode node, LocalExecutionPlanCo OptionalInt totalOperatorsCount = context.getDriverInstanceCount(); // We use spilling operator since Non-spilling one does not support index lookup sources lookupJoinOperatorFactory = switch (node.getType()) { - case INNER -> operatorFactories.spillingJoin( + case INNER -> spillingJoin( JoinOperatorType.innerJoin(false, false), context.getNextOperatorId(), node.getId(), @@ -2468,7 +2466,7 @@ public PhysicalOperation visitIndexJoin(IndexJoinNode node, LocalExecutionPlanCo totalOperatorsCount, unsupportedPartitioningSpillerFactory(), blockTypeOperators); - case SOURCE_OUTER -> operatorFactories.spillingJoin( + case SOURCE_OUTER -> spillingJoin( JoinOperatorType.probeOuterJoin(false), context.getNextOperatorId(), node.getId(), @@ -2946,7 +2944,7 @@ private PhysicalOperation createLookupJoin( buildContext); JoinOperatorType joinType = JoinOperatorType.ofJoinNodeType(node.getType(), outputSingleMatch, waitForBuild); - operator = operatorFactories.spillingJoin( + operator = spillingJoin( joinType, context.getNextOperatorId(), node.getId(), @@ -2998,7 +2996,7 @@ private PhysicalOperation createLookupJoin( buildContext); JoinOperatorType joinType = JoinOperatorType.ofJoinNodeType(node.getType(), outputSingleMatch, waitForBuild); - operator = operatorFactories.join( + operator = join( joinType, context.getNextOperatorId(), node.getId(), diff --git a/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java b/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java index 2ed2ebaf2f99..bfe2a05e4bf5 100644 --- a/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java +++ b/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java @@ -116,12 +116,10 @@ import io.trino.operator.DriverFactory; import io.trino.operator.GroupByHashPageIndexerFactory; import io.trino.operator.OperatorContext; -import io.trino.operator.OperatorFactories; import io.trino.operator.OutputFactory; import io.trino.operator.PagesIndex; import io.trino.operator.PagesIndexPageSorter; import io.trino.operator.TaskContext; -import io.trino.operator.TrinoOperatorFactories; import io.trino.operator.index.IndexJoinLookupStats; import io.trino.operator.scalar.json.JsonExistsFunction; import io.trino.operator.scalar.json.JsonQueryFunction; @@ -316,7 +314,6 @@ public class LocalQueryRunner private final DataSize maxSpillPerNode; private final DataSize queryMaxSpillPerNode; private final OptimizerConfig optimizerConfig; - private final OperatorFactories operatorFactories; private final StatementAnalyzerFactory statementAnalyzerFactory; private boolean printPlan; @@ -342,7 +339,6 @@ private LocalQueryRunner( int nodeCountForStats, Map>> defaultSessionProperties, MetadataProvider metadataProvider, - OperatorFactories operatorFactories, Set extraSessionProperties) { requireNonNull(defaultSession, "defaultSession is null"); @@ -354,7 +350,6 @@ private LocalQueryRunner( requireNonNull(nodeSpillConfig, "nodeSpillConfig is null"); this.maxSpillPerNode = nodeSpillConfig.getMaxSpillPerNode(); this.queryMaxSpillPerNode = nodeSpillConfig.getQueryMaxSpillPerNode(); - this.operatorFactories = requireNonNull(operatorFactories, "operatorFactories is null"); this.alwaysRevokeMemory = alwaysRevokeMemory; this.notificationExecutor = newCachedThreadPool(daemonThreadsNamed("local-query-runner-executor-%s")); this.yieldExecutor = newScheduledThreadPool(2, daemonThreadsNamed("local-query-runner-scheduler-%s")); @@ -1000,7 +995,6 @@ private List createDrivers(Session session, Plan plan, OutputFactory out partitioningSpillerFactory, new PagesIndex.TestingFactory(false), joinCompiler, - operatorFactories, new OrderingCompiler(plannerContext.getTypeOperators()), new DynamicFilterConfig(), blockTypeOperators, @@ -1215,7 +1209,6 @@ public static class Builder private Set extraSessionProperties = ImmutableSet.of(); private int nodeCountForStats; private MetadataProvider metadataProvider = MetadataManager::new; - private OperatorFactories operatorFactories = new TrinoOperatorFactories(); private Builder(Session defaultSession) { @@ -1264,12 +1257,6 @@ public Builder withMetadataProvider(MetadataProvider metadataProvider) return this; } - public Builder withOperatorFactories(OperatorFactories operatorFactories) - { - this.operatorFactories = requireNonNull(operatorFactories, "operatorFactories is null"); - return this; - } - /** * This method is required to pass in system session properties and their * metadata for Trino extension modules (separate from the default system @@ -1292,7 +1279,6 @@ public LocalQueryRunner build() nodeCountForStats, defaultSessionProperties, metadataProvider, - operatorFactories, extraSessionProperties); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java b/core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java index b2a6e35566c9..c330906ccfcd 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java +++ b/core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java @@ -33,7 +33,6 @@ import io.trino.metadata.InMemoryNodeManager; import io.trino.metadata.Split; import io.trino.operator.PagesIndex; -import io.trino.operator.TrinoOperatorFactories; import io.trino.operator.index.IndexJoinLookupStats; import io.trino.spi.connector.CatalogHandle; import io.trino.spiller.GenericSpillerFactory; @@ -172,7 +171,6 @@ public static LocalExecutionPlanner createTestingPlanner() }, new PagesIndex.TestingFactory(false), new JoinCompiler(PLANNER_CONTEXT.getTypeOperators()), - new TrinoOperatorFactories(), new OrderingCompiler(PLANNER_CONTEXT.getTypeOperators()), new DynamicFilterConfig(), blockTypeOperators, diff --git a/core/trino-main/src/test/java/io/trino/operator/join/BenchmarkHashBuildAndJoinOperators.java b/core/trino-main/src/test/java/io/trino/operator/join/BenchmarkHashBuildAndJoinOperators.java index a589d378fe9d..d4cd8b593233 100644 --- a/core/trino-main/src/test/java/io/trino/operator/join/BenchmarkHashBuildAndJoinOperators.java +++ b/core/trino-main/src/test/java/io/trino/operator/join/BenchmarkHashBuildAndJoinOperators.java @@ -22,12 +22,10 @@ import io.trino.operator.DriverContext; import io.trino.operator.InterpretedHashGenerator; import io.trino.operator.Operator; -import io.trino.operator.OperatorFactories; import io.trino.operator.OperatorFactory; import io.trino.operator.PagesIndex; import io.trino.operator.PartitionFunction; import io.trino.operator.TaskContext; -import io.trino.operator.TrinoOperatorFactories; import io.trino.operator.exchange.LocalPartitionGenerator; import io.trino.operator.join.HashBuilderOperator.HashBuilderOperatorFactory; import io.trino.spi.Page; @@ -71,7 +69,8 @@ import static io.trino.SessionTestUtils.TEST_SESSION; import static io.trino.jmh.Benchmarks.benchmark; import static io.trino.operator.HashArraySizeSupplier.incrementalLoadFactorHashArraySizeSupplier; -import static io.trino.operator.OperatorFactories.JoinOperatorType.innerJoin; +import static io.trino.operator.JoinOperatorType.innerJoin; +import static io.trino.operator.OperatorFactories.spillingJoin; import static io.trino.operator.join.JoinBridgeManager.lookupAllAtOnce; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.VarcharType.VARCHAR; @@ -217,11 +216,6 @@ public static class JoinContext @Override @Setup public void setup() - { - setup(new TrinoOperatorFactories()); - } - - public void setup(OperatorFactories operatorFactories) { super.setup(); @@ -240,7 +234,7 @@ public void setup(OperatorFactories operatorFactories) } JoinBridgeManager lookupSourceFactory = getLookupSourceFactoryManager(this, outputChannels, partitionCount); - joinOperatorFactory = operatorFactories.spillingJoin( + joinOperatorFactory = spillingJoin( innerJoin(false, false), HASH_JOIN_OPERATOR_ID, TEST_PLAN_NODE_ID, diff --git a/core/trino-main/src/test/java/io/trino/operator/join/JoinTestUtils.java b/core/trino-main/src/test/java/io/trino/operator/join/JoinTestUtils.java index 97f2c7c5f125..8b2cc84daf71 100644 --- a/core/trino-main/src/test/java/io/trino/operator/join/JoinTestUtils.java +++ b/core/trino-main/src/test/java/io/trino/operator/join/JoinTestUtils.java @@ -21,7 +21,6 @@ import io.trino.memory.context.LocalMemoryContext; import io.trino.operator.Driver; import io.trino.operator.DriverContext; -import io.trino.operator.OperatorFactories; import io.trino.operator.OperatorFactory; import io.trino.operator.PagesIndex; import io.trino.operator.PipelineContext; @@ -63,7 +62,8 @@ import static com.google.common.util.concurrent.Futures.immediateVoidFuture; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.operator.HashArraySizeSupplier.incrementalLoadFactorHashArraySizeSupplier; -import static io.trino.operator.OperatorFactories.JoinOperatorType.innerJoin; +import static io.trino.operator.JoinOperatorType.innerJoin; +import static io.trino.operator.OperatorFactories.spillingJoin; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION; import static java.util.Objects.requireNonNull; @@ -76,24 +76,22 @@ public final class JoinTestUtils private JoinTestUtils() {} public static OperatorFactory innerJoinOperatorFactory( - OperatorFactories operatorFactories, JoinBridgeManager lookupSourceFactoryManager, RowPagesBuilder probePages, PartitioningSpillerFactory partitioningSpillerFactory, boolean hasFilter) { - return innerJoinOperatorFactory(operatorFactories, lookupSourceFactoryManager, probePages, partitioningSpillerFactory, false, hasFilter); + return innerJoinOperatorFactory(lookupSourceFactoryManager, probePages, partitioningSpillerFactory, false, hasFilter); } public static OperatorFactory innerJoinOperatorFactory( - OperatorFactories operatorFactories, JoinBridgeManager lookupSourceFactoryManager, RowPagesBuilder probePages, PartitioningSpillerFactory partitioningSpillerFactory, boolean outputSingleMatch, boolean hasFilter) { - return operatorFactories.spillingJoin( + return spillingJoin( innerJoin(outputSingleMatch, false), 0, new PlanNodeId("test"), diff --git a/core/trino-main/src/test/java/io/trino/operator/join/TestHashJoinOperator.java b/core/trino-main/src/test/java/io/trino/operator/join/TestHashJoinOperator.java index a959b2011ac0..d772f7b45004 100644 --- a/core/trino-main/src/test/java/io/trino/operator/join/TestHashJoinOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/join/TestHashJoinOperator.java @@ -37,11 +37,9 @@ import io.trino.operator.Operator; import io.trino.operator.OperatorAssertion; import io.trino.operator.OperatorContext; -import io.trino.operator.OperatorFactories; import io.trino.operator.OperatorFactory; import io.trino.operator.ProcessorContext; import io.trino.operator.TaskContext; -import io.trino.operator.TrinoOperatorFactories; import io.trino.operator.ValuesOperator.ValuesOperatorFactory; import io.trino.operator.WorkProcessor; import io.trino.operator.WorkProcessorOperator; @@ -90,13 +88,14 @@ import static io.airlift.testing.Assertions.assertEqualsIgnoreOrder; import static io.trino.RowPagesBuilder.rowPagesBuilder; import static io.trino.SessionTestUtils.TEST_SESSION; +import static io.trino.operator.JoinOperatorType.fullOuterJoin; +import static io.trino.operator.JoinOperatorType.innerJoin; +import static io.trino.operator.JoinOperatorType.lookupOuterJoin; +import static io.trino.operator.JoinOperatorType.probeOuterJoin; import static io.trino.operator.OperatorAssertion.assertOperatorEquals; import static io.trino.operator.OperatorAssertion.dropChannel; import static io.trino.operator.OperatorAssertion.without; -import static io.trino.operator.OperatorFactories.JoinOperatorType.fullOuterJoin; -import static io.trino.operator.OperatorFactories.JoinOperatorType.innerJoin; -import static io.trino.operator.OperatorFactories.JoinOperatorType.lookupOuterJoin; -import static io.trino.operator.OperatorFactories.JoinOperatorType.probeOuterJoin; +import static io.trino.operator.OperatorFactories.spillingJoin; import static io.trino.operator.WorkProcessor.ProcessState.finished; import static io.trino.operator.WorkProcessor.ProcessState.ofResult; import static io.trino.operator.join.JoinTestUtils.buildLookupSource; @@ -132,22 +131,10 @@ public class TestHashJoinOperator private static final PartitioningSpillerFactory PARTITIONING_SPILLER_FACTORY = new GenericPartitioningSpillerFactory(SINGLE_STREAM_SPILLER_FACTORY); private static final BlockTypeOperators TYPE_OPERATOR_FACTORY = new BlockTypeOperators(new TypeOperators()); - private final OperatorFactories operatorFactories; - private ExecutorService executor; private ScheduledExecutorService scheduledExecutor; private NodePartitioningManager nodePartitioningManager; - public TestHashJoinOperator() - { - this(new TrinoOperatorFactories()); - } - - protected TestHashJoinOperator(OperatorFactories operatorFactories) - { - this.operatorFactories = requireNonNull(operatorFactories, "operatorFactories is null"); - } - @BeforeMethod public void setUp() { @@ -214,7 +201,7 @@ public void testInnerJoin(boolean parallelBuild, boolean probeHashEnabled, boole List probeInput = probePages .addSequencePage(1000, 0, 1000, 2000) .build(); - OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(operatorFactories, lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY, false); + OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY, false); // build drivers and operators instantiateBuildDrivers(buildSideSetup, taskContext); @@ -255,7 +242,7 @@ public void testInnerJoinWithRunLengthEncodedProbe() new Page(RunLengthEncodedBlock.create(VARCHAR, Slices.utf8Slice("20"), 2)), new Page(RunLengthEncodedBlock.create(VARCHAR, Slices.utf8Slice("-1"), 2)), new Page(RunLengthEncodedBlock.create(VARCHAR, Slices.utf8Slice("21"), 2))); - OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(operatorFactories, lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY, false); + OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY, false); // build drivers and operators instantiateBuildDrivers(buildSideSetup, taskContext); @@ -297,7 +284,7 @@ public void testUnwrapsLazyBlocks() .map(page -> new Page(page.getBlock(0), new LazyBlock(1, () -> page.getBlock(1)))) .collect(toImmutableList()); - OperatorFactory joinOperatorFactory = operatorFactories.spillingJoin( + OperatorFactory joinOperatorFactory = spillingJoin( innerJoin(false, false), 0, new PlanNodeId("test"), @@ -350,7 +337,7 @@ public void testYield() // probe matching the above 40 entries RowPagesBuilder probePages = rowPagesBuilder(false, Ints.asList(0), ImmutableList.of(BIGINT)); List probeInput = probePages.addSequencePage(100, 0).build(); - OperatorFactory joinOperatorFactory = operatorFactories.spillingJoin( + OperatorFactory joinOperatorFactory = spillingJoin( innerJoin(false, false), 0, new PlanNodeId("test"), @@ -518,7 +505,7 @@ private void innerJoinWithSpill(boolean probeHashEnabled, List whenSp .pageBreak() .addSequencePage(20, 0, 123_000) .addSequencePage(10, 30, 123_000); - OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(operatorFactories, lookupSourceFactoryManager, probePages, joinSpillerFactory, true); + OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(lookupSourceFactoryManager, probePages, joinSpillerFactory, true); // build drivers and operators instantiateBuildDrivers(buildSideSetup, taskContext); @@ -717,7 +704,7 @@ public void testInnerJoinWithNullProbe(boolean parallelBuild, boolean probeHashE .row("a") .row("b") .build(); - OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(operatorFactories, lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY, false); + OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY, false); // build drivers and operators instantiateBuildDrivers(buildSideSetup, taskContext); @@ -754,7 +741,7 @@ public void testInnerJoinWithOutputSingleMatch(boolean parallelBuild, boolean pr .row("b") .row("c") .build(); - OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(operatorFactories, lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY, true, false); + OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY, true, false); // build drivers and operators instantiateBuildDrivers(buildSideSetup, taskContext); @@ -793,7 +780,7 @@ public void testInnerJoinWithNullBuild(boolean parallelBuild, boolean probeHashE .row("b") .row("c") .build(); - OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(operatorFactories, lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY, false); + OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY, false); // build drivers and operators instantiateBuildDrivers(buildSideSetup, taskContext); @@ -834,7 +821,7 @@ public void testInnerJoinWithNullOnBothSides(boolean parallelBuild, boolean prob .row((String) null) .row("c") .build(); - OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(operatorFactories, lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY, false); + OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY, false); // build drivers and operators instantiateBuildDrivers(buildSideSetup, taskContext); @@ -1235,7 +1222,7 @@ public void testInnerJoinWithEmptyLookupSource(boolean parallelBuild, boolean pr // probe factory List probeTypes = ImmutableList.of(VARCHAR); RowPagesBuilder probePages = rowPagesBuilder(probeHashEnabled, Ints.asList(0), probeTypes); - OperatorFactory joinOperatorFactory = operatorFactories.spillingJoin( + OperatorFactory joinOperatorFactory = spillingJoin( innerJoin(false, false), 0, new PlanNodeId("test"), @@ -1274,7 +1261,7 @@ public void testLookupOuterJoinWithEmptyLookupSource(boolean parallelBuild, bool // probe factory List probeTypes = ImmutableList.of(VARCHAR); RowPagesBuilder probePages = rowPagesBuilder(probeHashEnabled, Ints.asList(0), probeTypes); - OperatorFactory joinOperatorFactory = operatorFactories.spillingJoin( + OperatorFactory joinOperatorFactory = spillingJoin( lookupOuterJoin(false), 0, new PlanNodeId("test"), @@ -1319,7 +1306,7 @@ public void testProbeOuterJoinWithEmptyLookupSource(boolean parallelBuild, boole .row((String) null) .row("c") .build(); - OperatorFactory joinOperatorFactory = operatorFactories.spillingJoin( + OperatorFactory joinOperatorFactory = spillingJoin( probeOuterJoin(false), 0, new PlanNodeId("test"), @@ -1367,7 +1354,7 @@ public void testFullOuterJoinWithEmptyLookupSource(boolean parallelBuild, boolea .row((String) null) .row("c") .build(); - OperatorFactory joinOperatorFactory = operatorFactories.spillingJoin( + OperatorFactory joinOperatorFactory = spillingJoin( fullOuterJoin(), 0, new PlanNodeId("test"), @@ -1414,7 +1401,7 @@ public void testInnerJoinWithNonEmptyLookupSourceAndEmptyProbe(boolean parallelB List probeTypes = ImmutableList.of(VARCHAR); RowPagesBuilder probePages = rowPagesBuilder(probeHashEnabled, Ints.asList(0), probeTypes); List probeInput = probePages.build(); - OperatorFactory joinOperatorFactory = operatorFactories.spillingJoin( + OperatorFactory joinOperatorFactory = spillingJoin( innerJoin(false, false), 0, new PlanNodeId("test"), @@ -1533,7 +1520,7 @@ public void testInnerJoinLoadsPagesInOrder() List probeTypes = ImmutableList.of(VARCHAR, INTEGER, INTEGER); RowPagesBuilder probePages = rowPagesBuilder(false, Ints.asList(0), probeTypes); probePages.row("a", 1L, 2L); - WorkProcessorOperatorFactory joinOperatorFactory = (WorkProcessorOperatorFactory) innerJoinOperatorFactory(operatorFactories, lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY, false); + WorkProcessorOperatorFactory joinOperatorFactory = (WorkProcessorOperatorFactory) innerJoinOperatorFactory(lookupSourceFactory, probePages, PARTITIONING_SPILLER_FACTORY, false); // build drivers and operators instantiateBuildDrivers(buildSideSetup, taskContext); @@ -1606,7 +1593,7 @@ private OperatorFactory createJoinOperatorFactoryWithBlockingLookupSource(TaskCo // probe factory List probeTypes = ImmutableList.of(VARCHAR); RowPagesBuilder probePages = rowPagesBuilder(probeHashEnabled, Ints.asList(0), probeTypes); - OperatorFactory joinOperatorFactory = operatorFactories.spillingJoin( + OperatorFactory joinOperatorFactory = spillingJoin( innerJoin(false, waitForBuild), 0, new PlanNodeId("test"), @@ -1658,7 +1645,7 @@ private OperatorFactory probeOuterJoinOperatorFactory( RowPagesBuilder probePages, boolean hasFilter) { - return operatorFactories.spillingJoin( + return spillingJoin( probeOuterJoin(false), 0, new PlanNodeId("test"), diff --git a/core/trino-main/src/test/java/io/trino/operator/join/unspilled/BenchmarkHashBuildAndJoinOperators.java b/core/trino-main/src/test/java/io/trino/operator/join/unspilled/BenchmarkHashBuildAndJoinOperators.java index a388b8cc9997..2fa36a0d86d2 100644 --- a/core/trino-main/src/test/java/io/trino/operator/join/unspilled/BenchmarkHashBuildAndJoinOperators.java +++ b/core/trino-main/src/test/java/io/trino/operator/join/unspilled/BenchmarkHashBuildAndJoinOperators.java @@ -22,12 +22,10 @@ import io.trino.operator.DriverContext; import io.trino.operator.InterpretedHashGenerator; import io.trino.operator.Operator; -import io.trino.operator.OperatorFactories; import io.trino.operator.OperatorFactory; import io.trino.operator.PagesIndex; import io.trino.operator.PartitionFunction; import io.trino.operator.TaskContext; -import io.trino.operator.TrinoOperatorFactories; import io.trino.operator.exchange.LocalPartitionGenerator; import io.trino.operator.join.JoinBridgeManager; import io.trino.operator.join.LookupSource; @@ -72,7 +70,8 @@ import static io.trino.SessionTestUtils.TEST_SESSION; import static io.trino.jmh.Benchmarks.benchmark; import static io.trino.operator.HashArraySizeSupplier.incrementalLoadFactorHashArraySizeSupplier; -import static io.trino.operator.OperatorFactories.JoinOperatorType.innerJoin; +import static io.trino.operator.JoinOperatorType.innerJoin; +import static io.trino.operator.OperatorFactories.join; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.lang.String.format; @@ -216,11 +215,6 @@ public static class JoinContext @Override @Setup public void setup() - { - setup(new TrinoOperatorFactories()); - } - - public void setup(OperatorFactories operatorFactories) { super.setup(); @@ -239,7 +233,7 @@ public void setup(OperatorFactories operatorFactories) } JoinBridgeManager lookupSourceFactory = getLookupSourceFactoryManager(this, outputChannels, partitionCount); - joinOperatorFactory = operatorFactories.join( + joinOperatorFactory = join( innerJoin(false, false), HASH_JOIN_OPERATOR_ID, TEST_PLAN_NODE_ID, diff --git a/core/trino-main/src/test/java/io/trino/operator/join/unspilled/JoinTestUtils.java b/core/trino-main/src/test/java/io/trino/operator/join/unspilled/JoinTestUtils.java index a491399917f0..2afca2d4da86 100644 --- a/core/trino-main/src/test/java/io/trino/operator/join/unspilled/JoinTestUtils.java +++ b/core/trino-main/src/test/java/io/trino/operator/join/unspilled/JoinTestUtils.java @@ -18,7 +18,6 @@ import io.trino.RowPagesBuilder; import io.trino.operator.Driver; import io.trino.operator.DriverContext; -import io.trino.operator.OperatorFactories; import io.trino.operator.OperatorFactory; import io.trino.operator.PagesIndex; import io.trino.operator.PipelineContext; @@ -55,7 +54,8 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.trino.operator.HashArraySizeSupplier.incrementalLoadFactorHashArraySizeSupplier; -import static io.trino.operator.OperatorFactories.JoinOperatorType.innerJoin; +import static io.trino.operator.JoinOperatorType.innerJoin; +import static io.trino.operator.OperatorFactories.join; import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION; import static java.util.Objects.requireNonNull; @@ -67,22 +67,20 @@ public final class JoinTestUtils private JoinTestUtils() {} public static OperatorFactory innerJoinOperatorFactory( - OperatorFactories operatorFactories, JoinBridgeManager lookupSourceFactoryManager, RowPagesBuilder probePages, boolean hasFilter) { - return innerJoinOperatorFactory(operatorFactories, lookupSourceFactoryManager, probePages, false, hasFilter); + return innerJoinOperatorFactory(lookupSourceFactoryManager, probePages, false, hasFilter); } public static OperatorFactory innerJoinOperatorFactory( - OperatorFactories operatorFactories, JoinBridgeManager lookupSourceFactoryManager, RowPagesBuilder probePages, boolean outputSingleMatch, boolean hasFilter) { - return operatorFactories.join( + return join( innerJoin(outputSingleMatch, false), 0, new PlanNodeId("test"), diff --git a/core/trino-main/src/test/java/io/trino/operator/join/unspilled/TestHashJoinOperator.java b/core/trino-main/src/test/java/io/trino/operator/join/unspilled/TestHashJoinOperator.java index fe88cf1fe8fa..88aadf3c8477 100644 --- a/core/trino-main/src/test/java/io/trino/operator/join/unspilled/TestHashJoinOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/join/unspilled/TestHashJoinOperator.java @@ -28,13 +28,12 @@ import io.trino.execution.scheduler.UniformNodeSelectorFactory; import io.trino.metadata.InMemoryNodeManager; import io.trino.operator.DriverContext; +import io.trino.operator.JoinOperatorType; import io.trino.operator.Operator; import io.trino.operator.OperatorContext; -import io.trino.operator.OperatorFactories; import io.trino.operator.OperatorFactory; import io.trino.operator.ProcessorContext; import io.trino.operator.TaskContext; -import io.trino.operator.TrinoOperatorFactories; import io.trino.operator.WorkProcessor; import io.trino.operator.WorkProcessorOperator; import io.trino.operator.WorkProcessorOperatorFactory; @@ -76,13 +75,14 @@ import static io.trino.RowPagesBuilder.rowPagesBuilder; import static io.trino.SessionTestUtils.TEST_SESSION; import static io.trino.block.BlockAssertions.createLongsBlock; +import static io.trino.operator.JoinOperatorType.fullOuterJoin; +import static io.trino.operator.JoinOperatorType.innerJoin; +import static io.trino.operator.JoinOperatorType.probeOuterJoin; import static io.trino.operator.OperatorAssertion.assertOperatorEquals; import static io.trino.operator.OperatorAssertion.dropChannel; import static io.trino.operator.OperatorAssertion.toMaterializedResult; import static io.trino.operator.OperatorAssertion.toPages; -import static io.trino.operator.OperatorFactories.JoinOperatorType.fullOuterJoin; -import static io.trino.operator.OperatorFactories.JoinOperatorType.innerJoin; -import static io.trino.operator.OperatorFactories.JoinOperatorType.probeOuterJoin; +import static io.trino.operator.OperatorFactories.join; import static io.trino.operator.WorkProcessor.ProcessState.finished; import static io.trino.operator.WorkProcessor.ProcessState.ofResult; import static io.trino.operator.join.unspilled.JoinTestUtils.buildLookupSource; @@ -95,7 +95,6 @@ import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.testing.DataProviders.cartesianProduct; import static io.trino.testing.DataProviders.trueFalse; -import static java.util.Objects.requireNonNull; import static java.util.concurrent.Executors.newScheduledThreadPool; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; @@ -111,22 +110,10 @@ public class TestHashJoinOperator { private static final BlockTypeOperators TYPE_OPERATOR_FACTORY = new BlockTypeOperators(new TypeOperators()); - private final OperatorFactories operatorFactories; - private ExecutorService executor; private ScheduledExecutorService scheduledExecutor; private NodePartitioningManager nodePartitioningManager; - public TestHashJoinOperator() - { - this(new TrinoOperatorFactories()); - } - - protected TestHashJoinOperator(OperatorFactories operatorFactories) - { - this.operatorFactories = requireNonNull(operatorFactories, "operatorFactories is null"); - } - @BeforeMethod public void setUp() { @@ -179,7 +166,7 @@ public void testInnerJoin(boolean parallelBuild, boolean probeHashEnabled, boole List probeInput = probePages .addSequencePage(1000, 0, 1000, 2000) .build(); - OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(operatorFactories, lookupSourceFactory, probePages, false); + OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(lookupSourceFactory, probePages, false); // build drivers and operators instantiateBuildDrivers(buildSideSetup, taskContext); @@ -226,7 +213,7 @@ public void testInnerJoinWithRunLengthEncodedProbe(boolean withFilter, boolean p .addBlocksPage( RunLengthEncodedBlock.create(VARCHAR, Slices.utf8Slice("21"), 2), createLongsBlock(62, 63)); - OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(operatorFactories, lookupSourceFactory, probePagesBuilder, withFilter); + OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(lookupSourceFactory, probePagesBuilder, withFilter); // build drivers and operators instantiateBuildDrivers(buildSideSetup, taskContext); @@ -300,7 +287,7 @@ public void testUnwrapsLazyBlocks(boolean singleBigintLookupSource) .map(page -> new Page(page.getBlock(0), new LazyBlock(1, () -> page.getBlock(1)))) .collect(toImmutableList()); - OperatorFactory joinOperatorFactory = operatorFactories.join( + OperatorFactory joinOperatorFactory = join( innerJoin(false, false), 0, new PlanNodeId("test"), @@ -351,7 +338,7 @@ public void testYield(boolean singleBigintLookupSource) // probe matching the above 40 entries RowPagesBuilder probePages = rowPagesBuilder(false, Ints.asList(0), ImmutableList.of(BIGINT)); List probeInput = probePages.addSequencePage(100, 0).build(); - OperatorFactory joinOperatorFactory = operatorFactories.join( + OperatorFactory joinOperatorFactory = join( innerJoin(false, false), 0, new PlanNodeId("test"), @@ -416,7 +403,7 @@ public void testInnerJoinWithNullProbe(boolean parallelBuild, boolean probeHashE .row(1L) .row(2L) .build(); - OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(operatorFactories, lookupSourceFactory, probePages, false); + OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(lookupSourceFactory, probePages, false); // build drivers and operators instantiateBuildDrivers(buildSideSetup, taskContext); @@ -453,7 +440,7 @@ public void testInnerJoinWithOutputSingleMatch(boolean parallelBuild, boolean pr .row(2L) .row(3L) .build(); - OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(operatorFactories, lookupSourceFactory, probePages, true, false); + OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(lookupSourceFactory, probePages, true, false); // build drivers and operators instantiateBuildDrivers(buildSideSetup, taskContext); @@ -492,7 +479,7 @@ public void testInnerJoinWithNullBuild(boolean parallelBuild, boolean probeHashE .row(2L) .row(3L) .build(); - OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(operatorFactories, lookupSourceFactory, probePages, false); + OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(lookupSourceFactory, probePages, false); // build drivers and operators instantiateBuildDrivers(buildSideSetup, taskContext); @@ -533,7 +520,7 @@ public void testInnerJoinWithNullOnBothSides(boolean parallelBuild, boolean prob .row((String) null) .row(3L) .build(); - OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(operatorFactories, lookupSourceFactory, probePages, false); + OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(lookupSourceFactory, probePages, false); // build drivers and operators instantiateBuildDrivers(buildSideSetup, taskContext); @@ -934,7 +921,7 @@ public void testInnerJoinWithEmptyLookupSource(boolean parallelBuild, boolean pr // probe factory List probeTypes = ImmutableList.of(BIGINT); RowPagesBuilder probePages = rowPagesBuilder(probeHashEnabled, Ints.asList(0), probeTypes); - OperatorFactory joinOperatorFactory = operatorFactories.join( + OperatorFactory joinOperatorFactory = join( innerJoin(false, false), 0, new PlanNodeId("test"), @@ -971,8 +958,8 @@ public void testLookupOuterJoinWithEmptyLookupSource(boolean parallelBuild, bool // probe factory List probeTypes = ImmutableList.of(BIGINT); RowPagesBuilder probePages = rowPagesBuilder(probeHashEnabled, Ints.asList(0), probeTypes); - OperatorFactory joinOperatorFactory = operatorFactories.join( - OperatorFactories.JoinOperatorType.lookupOuterJoin(false), + OperatorFactory joinOperatorFactory = join( + JoinOperatorType.lookupOuterJoin(false), 0, new PlanNodeId("test"), lookupSourceFactoryManager, @@ -1014,7 +1001,7 @@ public void testProbeOuterJoinWithEmptyLookupSource(boolean parallelBuild, boole .row((String) null) .row(3L) .build(); - OperatorFactory joinOperatorFactory = operatorFactories.join( + OperatorFactory joinOperatorFactory = join( probeOuterJoin(false), 0, new PlanNodeId("test"), @@ -1060,7 +1047,7 @@ public void testFullOuterJoinWithEmptyLookupSource(boolean parallelBuild, boolea .row((String) null) .row(3L) .build(); - OperatorFactory joinOperatorFactory = operatorFactories.join( + OperatorFactory joinOperatorFactory = join( fullOuterJoin(), 0, new PlanNodeId("test"), @@ -1105,7 +1092,7 @@ public void testInnerJoinWithNonEmptyLookupSourceAndEmptyProbe(boolean parallelB List probeTypes = ImmutableList.of(BIGINT); RowPagesBuilder probePages = rowPagesBuilder(probeHashEnabled, Ints.asList(0), probeTypes); List probeInput = probePages.build(); - OperatorFactory joinOperatorFactory = operatorFactories.join( + OperatorFactory joinOperatorFactory = join( innerJoin(false, false), 0, new PlanNodeId("test"), @@ -1222,7 +1209,7 @@ public void testInnerJoinLoadsPagesInOrder() List probeTypes = ImmutableList.of(VARCHAR, INTEGER, INTEGER); RowPagesBuilder probePages = rowPagesBuilder(false, Ints.asList(0), probeTypes); probePages.row("a", 1L, 2L); - WorkProcessorOperatorFactory joinOperatorFactory = (WorkProcessorOperatorFactory) innerJoinOperatorFactory(operatorFactories, lookupSourceFactory, probePages, false); + WorkProcessorOperatorFactory joinOperatorFactory = (WorkProcessorOperatorFactory) innerJoinOperatorFactory(lookupSourceFactory, probePages, false); // build drivers and operators instantiateBuildDrivers(buildSideSetup, taskContext); @@ -1295,7 +1282,7 @@ private OperatorFactory createJoinOperatorFactoryWithBlockingLookupSource(TaskCo // probe factory List probeTypes = ImmutableList.of(VARCHAR); RowPagesBuilder probePages = rowPagesBuilder(probeHashEnabled, Ints.asList(0), probeTypes); - OperatorFactory joinOperatorFactory = operatorFactories.join( + OperatorFactory joinOperatorFactory = join( innerJoin(false, waitForBuild), 0, new PlanNodeId("test"), @@ -1368,7 +1355,7 @@ private OperatorFactory probeOuterJoinOperatorFactory( RowPagesBuilder probePages, boolean hasFilter) { - return operatorFactories.join( + return join( probeOuterJoin(false), 0, new PlanNodeId("test"), diff --git a/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashBuildAndJoinBenchmark.java b/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashBuildAndJoinBenchmark.java index 7653b8444dc0..2a47beb6f683 100644 --- a/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashBuildAndJoinBenchmark.java +++ b/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashBuildAndJoinBenchmark.java @@ -19,11 +19,9 @@ import io.trino.SystemSessionProperties; import io.trino.operator.Driver; import io.trino.operator.DriverFactory; -import io.trino.operator.OperatorFactories; import io.trino.operator.OperatorFactory; import io.trino.operator.PagesIndex; import io.trino.operator.TaskContext; -import io.trino.operator.TrinoOperatorFactories; import io.trino.operator.join.HashBuilderOperator.HashBuilderOperatorFactory; import io.trino.operator.join.JoinBridgeManager; import io.trino.operator.join.PartitionedLookupSourceFactory; @@ -43,11 +41,11 @@ import static io.trino.benchmark.BenchmarkQueryRunner.createLocalQueryRunner; import static io.trino.benchmark.BenchmarkQueryRunner.createLocalQueryRunnerHashEnabled; import static io.trino.operator.HashArraySizeSupplier.incrementalLoadFactorHashArraySizeSupplier; -import static io.trino.operator.OperatorFactories.JoinOperatorType.innerJoin; +import static io.trino.operator.JoinOperatorType.innerJoin; +import static io.trino.operator.OperatorFactories.spillingJoin; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spiller.PartitioningSpillerFactory.unsupportedPartitioningSpillerFactory; import static io.trino.testing.TestingSession.testSessionBuilder; -import static java.util.Objects.requireNonNull; public class HashBuildAndJoinBenchmark extends AbstractOperatorBenchmark @@ -57,18 +55,11 @@ public class HashBuildAndJoinBenchmark private final OperatorFactory ordersTableScan = createTableScanOperator(0, new PlanNodeId("test"), "orders", "orderkey", "totalprice"); private final List lineItemTableTypes = getColumnTypes("lineitem", "orderkey", "quantity"); private final OperatorFactory lineItemTableScan = createTableScanOperator(0, new PlanNodeId("test"), "lineitem", "orderkey", "quantity"); - private final OperatorFactories operatorFactories; public HashBuildAndJoinBenchmark(Session session, LocalQueryRunner localQueryRunner) - { - this(session, localQueryRunner, new TrinoOperatorFactories()); - } - - public HashBuildAndJoinBenchmark(Session session, LocalQueryRunner localQueryRunner, OperatorFactories operatorFactories) { super(session, localQueryRunner, "hash_build_and_join_hash_enabled_" + isHashEnabled(session), 4, 5); this.hashEnabled = isHashEnabled(session); - this.operatorFactories = requireNonNull(operatorFactories, "operatorFactories is null"); } private static boolean isHashEnabled(Session session) @@ -141,7 +132,7 @@ protected List createDrivers(TaskContext taskContext) hashChannel = OptionalInt.of(sourceTypes.size() - 1); } - OperatorFactory joinOperator = operatorFactories.spillingJoin( + OperatorFactory joinOperator = spillingJoin( innerJoin(false, false), 2, new PlanNodeId("test"), diff --git a/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashBuildBenchmark.java b/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashBuildBenchmark.java index 9235295fe0aa..2e3eb8e582a8 100644 --- a/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashBuildBenchmark.java +++ b/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashBuildBenchmark.java @@ -17,11 +17,9 @@ import com.google.common.primitives.Ints; import io.trino.operator.Driver; import io.trino.operator.DriverFactory; -import io.trino.operator.OperatorFactories; import io.trino.operator.OperatorFactory; import io.trino.operator.PagesIndex; import io.trino.operator.TaskContext; -import io.trino.operator.TrinoOperatorFactories; import io.trino.operator.ValuesOperator.ValuesOperatorFactory; import io.trino.operator.join.HashBuilderOperator.HashBuilderOperatorFactory; import io.trino.operator.join.JoinBridgeManager; @@ -41,25 +39,17 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.benchmark.BenchmarkQueryRunner.createLocalQueryRunner; import static io.trino.operator.HashArraySizeSupplier.incrementalLoadFactorHashArraySizeSupplier; -import static io.trino.operator.OperatorFactories.JoinOperatorType.innerJoin; +import static io.trino.operator.JoinOperatorType.innerJoin; +import static io.trino.operator.OperatorFactories.spillingJoin; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spiller.PartitioningSpillerFactory.unsupportedPartitioningSpillerFactory; -import static java.util.Objects.requireNonNull; public class HashBuildBenchmark extends AbstractOperatorBenchmark { - private final OperatorFactories operatorFactories; - public HashBuildBenchmark(LocalQueryRunner localQueryRunner) - { - this(localQueryRunner, new TrinoOperatorFactories()); - } - - public HashBuildBenchmark(LocalQueryRunner localQueryRunner, OperatorFactories operatorFactories) { super(localQueryRunner, "hash_build", 4, 5); - this.operatorFactories = requireNonNull(operatorFactories, "operatorFactories is null"); } @Override @@ -100,7 +90,7 @@ protected List createDrivers(TaskContext taskContext) // empty join so build finishes ImmutableList.Builder joinDriversBuilder = ImmutableList.builder(); joinDriversBuilder.add(new ValuesOperatorFactory(0, new PlanNodeId("values"), ImmutableList.of())); - OperatorFactory joinOperator = operatorFactories.spillingJoin( + OperatorFactory joinOperator = spillingJoin( innerJoin(false, false), 2, new PlanNodeId("test"), diff --git a/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashJoinBenchmark.java b/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashJoinBenchmark.java index c43edde2055c..87216d0f284d 100644 --- a/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashJoinBenchmark.java +++ b/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashJoinBenchmark.java @@ -18,11 +18,9 @@ import io.trino.operator.Driver; import io.trino.operator.DriverContext; import io.trino.operator.DriverFactory; -import io.trino.operator.OperatorFactories; import io.trino.operator.OperatorFactory; import io.trino.operator.PagesIndex; import io.trino.operator.TaskContext; -import io.trino.operator.TrinoOperatorFactories; import io.trino.operator.join.HashBuilderOperator.HashBuilderOperatorFactory; import io.trino.operator.join.JoinBridgeManager; import io.trino.operator.join.LookupSourceProvider; @@ -45,25 +43,18 @@ import static io.trino.benchmark.BenchmarkQueryRunner.createLocalQueryRunner; import static io.trino.execution.executor.PrioritizedSplitRunner.SPLIT_RUN_QUANTA; import static io.trino.operator.HashArraySizeSupplier.incrementalLoadFactorHashArraySizeSupplier; -import static io.trino.operator.OperatorFactories.JoinOperatorType.innerJoin; +import static io.trino.operator.JoinOperatorType.innerJoin; +import static io.trino.operator.OperatorFactories.spillingJoin; import static io.trino.spiller.PartitioningSpillerFactory.unsupportedPartitioningSpillerFactory; -import static java.util.Objects.requireNonNull; public class HashJoinBenchmark extends AbstractOperatorBenchmark { - private final OperatorFactories operatorFactories; private DriverFactory probeDriverFactory; public HashJoinBenchmark(LocalQueryRunner localQueryRunner) - { - this(localQueryRunner, new TrinoOperatorFactories()); - } - - public HashJoinBenchmark(LocalQueryRunner localQueryRunner, OperatorFactories operatorFactories) { super(localQueryRunner, "hash_join", 4, 50); - this.operatorFactories = requireNonNull(operatorFactories, "operatorFactories is null"); } /* @@ -110,7 +101,7 @@ protected List createDrivers(TaskContext taskContext) List lineItemTypes = getColumnTypes("lineitem", "orderkey", "quantity"); OperatorFactory lineItemTableScan = createTableScanOperator(0, new PlanNodeId("test"), "lineitem", "orderkey", "quantity"); - OperatorFactory joinOperator = operatorFactories.spillingJoin( + OperatorFactory joinOperator = spillingJoin( innerJoin(false, false), 1, new PlanNodeId("test"),