diff --git a/presto-main/src/main/java/com/facebook/presto/connector/ConnectorManager.java b/presto-main/src/main/java/com/facebook/presto/connector/ConnectorManager.java index f1edb1315ac1..ae1b3f004d72 100644 --- a/presto-main/src/main/java/com/facebook/presto/connector/ConnectorManager.java +++ b/presto-main/src/main/java/com/facebook/presto/connector/ConnectorManager.java @@ -53,7 +53,7 @@ import com.facebook.presto.split.RecordPageSourceProvider; import com.facebook.presto.split.SplitManager; import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager; -import com.facebook.presto.sql.planner.NodePartitioningManager; +import com.facebook.presto.sql.planner.PartitioningProviderManager; import com.facebook.presto.sql.planner.planPrinter.RowExpressionFormatter; import com.facebook.presto.sql.relational.ConnectorRowExpressionService; import com.facebook.presto.sql.relational.FunctionResolution; @@ -95,7 +95,7 @@ public class ConnectorManager private final SplitManager splitManager; private final PageSourceManager pageSourceManager; private final IndexManager indexManager; - private final NodePartitioningManager nodePartitioningManager; + private final PartitioningProviderManager partitioningProviderManager; private final ConnectorPlanOptimizerManager connectorPlanOptimizerManager; private final PageSinkManager pageSinkManager; @@ -126,7 +126,7 @@ public ConnectorManager( SplitManager splitManager, PageSourceManager pageSourceManager, IndexManager indexManager, - NodePartitioningManager nodePartitioningManager, + PartitioningProviderManager partitioningProviderManager, ConnectorPlanOptimizerManager connectorPlanOptimizerManager, PageSinkManager pageSinkManager, HandleResolver handleResolver, @@ -146,7 +146,7 @@ public ConnectorManager( this.splitManager = splitManager; this.pageSourceManager = pageSourceManager; this.indexManager = indexManager; - this.nodePartitioningManager = nodePartitioningManager; + this.partitioningProviderManager = partitioningProviderManager; this.connectorPlanOptimizerManager = connectorPlanOptimizerManager; this.pageSinkManager = pageSinkManager; this.handleResolver = handleResolver; @@ -280,7 +280,7 @@ private synchronized void addConnectorInternal(MaterializedConnector connector) .ifPresent(indexProvider -> indexManager.addIndexProvider(connectorId, indexProvider)); connector.getPartitioningProvider() - .ifPresent(partitioningProvider -> nodePartitioningManager.addPartitioningProvider(connectorId, partitioningProvider)); + .ifPresent(partitioningProvider -> partitioningProviderManager.addPartitioningProvider(connectorId, partitioningProvider)); if (nodeManager.getCurrentNode().isCoordinator()) { connector.getPlanOptimizerProvider() @@ -317,7 +317,7 @@ private synchronized void removeConnectorInternal(ConnectorId connectorId) pageSourceManager.removeConnectorPageSourceProvider(connectorId); pageSinkManager.removeConnectorPageSinkProvider(connectorId); indexManager.removeIndexProvider(connectorId); - nodePartitioningManager.removePartitioningProvider(connectorId); + partitioningProviderManager.removePartitioningProvider(connectorId); metadataManager.getProcedureRegistry().removeProcedures(connectorId); accessControlManager.removeCatalogAccessControl(connectorId); metadataManager.getTablePropertyManager().removeProperties(connectorId); diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index b64625c490af..124289753bc1 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -126,6 +126,7 @@ import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager; import com.facebook.presto.sql.planner.LocalExecutionPlanner; import com.facebook.presto.sql.planner.NodePartitioningManager; +import com.facebook.presto.sql.planner.PartitioningProviderManager; import com.facebook.presto.sql.relational.RowExpressionDeterminismEvaluator; import com.facebook.presto.sql.relational.RowExpressionDomainTranslator; import com.facebook.presto.sql.tree.Expression; @@ -394,6 +395,9 @@ protected void setup(Binder binder) // split manager binder.bind(SplitManager.class).in(Scopes.SINGLETON); + // partitioning provider manager + binder.bind(PartitioningProviderManager.class).in(Scopes.SINGLETON); + // node partitioning manager binder.bind(NodePartitioningManager.class).in(Scopes.SINGLETON); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java index 1bb708a772e2..9148e38920d3 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java @@ -40,8 +40,6 @@ import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.function.ToIntFunction; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -54,25 +52,13 @@ public class NodePartitioningManager { private final NodeScheduler nodeScheduler; - private final ConcurrentMap partitioningProviders = new ConcurrentHashMap<>(); + private final PartitioningProviderManager partitioningProviderManager; @Inject - public NodePartitioningManager(NodeScheduler nodeScheduler) + public NodePartitioningManager(NodeScheduler nodeScheduler, PartitioningProviderManager partitioningProviderManager) { this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null"); - } - - public void addPartitioningProvider(ConnectorId connectorId, ConnectorNodePartitioningProvider nodePartitioningProvider) - { - requireNonNull(connectorId, "connectorId is null"); - requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null"); - checkArgument(partitioningProviders.putIfAbsent(connectorId, nodePartitioningProvider) == null, - "NodePartitioningProvider for connector '%s' is already registered", connectorId); - } - - public void removePartitioningProvider(ConnectorId connectorId) - { - partitioningProviders.remove(connectorId); + this.partitioningProviderManager = requireNonNull(partitioningProviderManager, "partitioningProviderManager is null"); } public PartitionFunction getPartitionFunction( @@ -94,8 +80,7 @@ public PartitionFunction getPartitionFunction( partitioningScheme.getBucketToPartition().get()); } else { - ConnectorNodePartitioningProvider partitioningProvider = partitioningProviders.get(partitioningHandle.getConnectorId().get()); - checkArgument(partitioningProvider != null, "No partitioning provider for connector %s", partitioningHandle.getConnectorId().get()); + ConnectorNodePartitioningProvider partitioningProvider = partitioningProviderManager.getPartitioningProvider(partitioningHandle.getConnectorId().get()); bucketFunction = partitioningProvider.getBucketFunction( partitioningHandle.getTransactionHandle().orElse(null), @@ -113,7 +98,7 @@ public List listPartitionHandles( Session session, PartitioningHandle partitioningHandle) { - ConnectorNodePartitioningProvider partitioningProvider = partitioningProviders.get(partitioningHandle.getConnectorId().get()); + ConnectorNodePartitioningProvider partitioningProvider = partitioningProviderManager.getPartitioningProvider(partitioningHandle.getConnectorId().get()); return partitioningProvider.listPartitionHandles( partitioningHandle.getTransactionHandle().orElse(null), session.toConnectorSession(), @@ -129,11 +114,6 @@ public NodePartitionMap getNodePartitioningMap(Session session, PartitioningHand return ((SystemPartitioningHandle) partitioningHandle.getConnectorHandle()).getNodePartitionMap(session, nodeScheduler); } - ConnectorId connectorId = partitioningHandle.getConnectorId() - .orElseThrow(() -> new IllegalArgumentException("No connector ID for partitioning handle: " + partitioningHandle)); - ConnectorNodePartitioningProvider partitioningProvider = partitioningProviders.get(connectorId); - checkArgument(partitioningProvider != null, "No partitioning provider for connector %s", connectorId); - ConnectorBucketNodeMap connectorBucketNodeMap = getConnectorBucketNodeMap(session, partitioningHandle); // safety check for crazy partitioning checkArgument(connectorBucketNodeMap.getBucketCount() < 1_000_000, "Too many buckets in partitioning: %s", connectorBucketNodeMap.getBucketCount()); @@ -143,6 +123,8 @@ public NodePartitionMap getNodePartitioningMap(Session session, PartitioningHand bucketToNode = getFixedMapping(connectorBucketNodeMap); } else { + ConnectorId connectorId = partitioningHandle.getConnectorId() + .orElseThrow(() -> new IllegalArgumentException("No connector ID for partitioning handle: " + partitioningHandle)); bucketToNode = createArbitraryBucketToNode( nodeScheduler.createNodeSelector(connectorId).selectRandomNodes(getMaxTasksPerStage(session)), connectorBucketNodeMap.getBucketCount()); @@ -198,8 +180,7 @@ private ConnectorBucketNodeMap getConnectorBucketNodeMap(Session session, Partit { checkArgument(!(partitioningHandle.getConnectorHandle() instanceof SystemPartitioningHandle)); - ConnectorNodePartitioningProvider partitioningProvider = partitioningProviders.get(partitioningHandle.getConnectorId().get()); - checkArgument(partitioningProvider != null, "No partitioning provider for connector %s", partitioningHandle.getConnectorId().get()); + ConnectorNodePartitioningProvider partitioningProvider = partitioningProviderManager.getPartitioningProvider(partitioningHandle.getConnectorId().get()); ConnectorBucketNodeMap connectorBucketNodeMap = partitioningProvider.getBucketNodeMap( partitioningHandle.getTransactionHandle().orElse(null), @@ -212,8 +193,7 @@ private ConnectorBucketNodeMap getConnectorBucketNodeMap(Session session, Partit private ToIntFunction getSplitToBucket(Session session, PartitioningHandle partitioningHandle) { - ConnectorNodePartitioningProvider partitioningProvider = partitioningProviders.get(partitioningHandle.getConnectorId().get()); - checkArgument(partitioningProvider != null, "No partitioning provider for connector %s", partitioningHandle.getConnectorId().get()); + ConnectorNodePartitioningProvider partitioningProvider = partitioningProviderManager.getPartitioningProvider(partitioningHandle.getConnectorId().get()); ToIntFunction splitBucketFunction = partitioningProvider.getSplitBucketFunction( partitioningHandle.getTransactionHandle().orElse(null), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PartitioningProviderManager.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PartitioningProviderManager.java new file mode 100644 index 000000000000..17d0ec89890e --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PartitioningProviderManager.java @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner; + +import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +public class PartitioningProviderManager +{ + private final ConcurrentMap partitioningProviders = new ConcurrentHashMap<>(); + + public ConnectorNodePartitioningProvider getPartitioningProvider(ConnectorId connectorId) + { + ConnectorNodePartitioningProvider partitioningProvider = partitioningProviders.get(connectorId); + checkArgument(partitioningProvider != null, "No partitioning provider for connector %s", connectorId); + return partitioningProvider; + } + + public void addPartitioningProvider(ConnectorId connectorId, ConnectorNodePartitioningProvider nodePartitioningProvider) + { + requireNonNull(connectorId, "connectorId is null"); + requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null"); + checkArgument(partitioningProviders.putIfAbsent(connectorId, nodePartitioningProvider) == null, + "NodePartitioningProvider for connector '%s' is already registered", connectorId); + } + + public void removePartitioningProvider(ConnectorId connectorId) + { + partitioningProviders.remove(connectorId); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java index d34d58fca65f..64b922688d5b 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java @@ -135,6 +135,7 @@ import com.facebook.presto.sql.planner.LocalExecutionPlanner.LocalExecutionPlan; import com.facebook.presto.sql.planner.LogicalPlanner; import com.facebook.presto.sql.planner.NodePartitioningManager; +import com.facebook.presto.sql.planner.PartitioningProviderManager; import com.facebook.presto.sql.planner.Plan; import com.facebook.presto.sql.planner.PlanFragmenter; import com.facebook.presto.sql.planner.PlanOptimizers; @@ -238,6 +239,7 @@ public class LocalQueryRunner private final BlockEncodingManager blockEncodingManager; private final PageSourceManager pageSourceManager; private final IndexManager indexManager; + private final PartitioningProviderManager partitioningProviderManager; private final NodePartitioningManager nodePartitioningManager; private final ConnectorPlanOptimizerManager planOptimizerManager; private final PageSinkManager pageSinkManager; @@ -306,7 +308,8 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig, yieldExecutor, catalogManager, notificationExecutor); - this.nodePartitioningManager = new NodePartitioningManager(nodeScheduler); + this.partitioningProviderManager = new PartitioningProviderManager(); + this.nodePartitioningManager = new NodePartitioningManager(nodeScheduler, partitioningProviderManager); this.planOptimizerManager = new ConnectorPlanOptimizerManager(); this.blockEncodingManager = new BlockEncodingManager(typeRegistry); @@ -343,7 +346,7 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig, splitManager, pageSourceManager, indexManager, - nodePartitioningManager, + partitioningProviderManager, planOptimizerManager, pageSinkManager, new HandleResolver(), diff --git a/presto-main/src/test/java/com/facebook/presto/cost/TestCostCalculator.java b/presto-main/src/test/java/com/facebook/presto/cost/TestCostCalculator.java index 4d5d72f8d7c4..40e734fa666e 100644 --- a/presto-main/src/test/java/com/facebook/presto/cost/TestCostCalculator.java +++ b/presto-main/src/test/java/com/facebook/presto/cost/TestCostCalculator.java @@ -37,6 +37,7 @@ import com.facebook.presto.sql.analyzer.FeaturesConfig; import com.facebook.presto.sql.parser.SqlParser; import com.facebook.presto.sql.planner.NodePartitioningManager; +import com.facebook.presto.sql.planner.PartitioningProviderManager; import com.facebook.presto.sql.planner.Plan; import com.facebook.presto.sql.planner.PlanFragmenter; import com.facebook.presto.sql.planner.PlanVariableAllocator; @@ -133,7 +134,8 @@ public void setUp() new InMemoryNodeManager(), new NodeSchedulerConfig().setIncludeCoordinator(true), new NodeTaskMap(finalizerService)); - nodePartitioningManager = new NodePartitioningManager(nodeScheduler); + PartitioningProviderManager partitioningProviderManager = new PartitioningProviderManager(); + nodePartitioningManager = new NodePartitioningManager(nodeScheduler, partitioningProviderManager); planFragmenter = new PlanFragmenter(metadata, nodePartitioningManager, new QueryManagerConfig(), new SqlParser()); } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TaskTestUtils.java b/presto-main/src/test/java/com/facebook/presto/execution/TaskTestUtils.java index 562c76e3381b..389a53300b0e 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TaskTestUtils.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TaskTestUtils.java @@ -51,6 +51,7 @@ import com.facebook.presto.sql.planner.LocalExecutionPlanner; import com.facebook.presto.sql.planner.NodePartitioningManager; import com.facebook.presto.sql.planner.Partitioning; +import com.facebook.presto.sql.planner.PartitioningProviderManager; import com.facebook.presto.sql.planner.PartitioningScheme; import com.facebook.presto.sql.planner.PlanFragment; import com.facebook.presto.sql.planner.plan.PlanFragmentId; @@ -126,7 +127,8 @@ public static LocalExecutionPlanner createTestingPlanner() new InMemoryNodeManager(), new NodeSchedulerConfig().setIncludeCoordinator(true), new NodeTaskMap(finalizerService)); - NodePartitioningManager nodePartitioningManager = new NodePartitioningManager(nodeScheduler); + PartitioningProviderManager partitioningProviderManager = new PartitioningProviderManager(); + NodePartitioningManager nodePartitioningManager = new NodePartitioningManager(nodeScheduler, partitioningProviderManager); PageFunctionCompiler pageFunctionCompiler = new PageFunctionCompiler(metadata, 0); return new LocalExecutionPlanner(