Skip to content

Commit

Permalink
Refactor NodePartitioningManager
Browse files Browse the repository at this point in the history
Extract connector partitioning provider management code into a
separate class, PartitioningProviderManager
  • Loading branch information
arhimondr committed Sep 5, 2019
1 parent 29bb4c6 commit 85efb93
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import com.facebook.presto.split.RecordPageSourceProvider;
import com.facebook.presto.split.SplitManager;
import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager;
import com.facebook.presto.sql.planner.NodePartitioningManager;
import com.facebook.presto.sql.planner.PartitioningProviderManager;
import com.facebook.presto.sql.planner.planPrinter.RowExpressionFormatter;
import com.facebook.presto.sql.relational.ConnectorRowExpressionService;
import com.facebook.presto.sql.relational.FunctionResolution;
Expand Down Expand Up @@ -95,7 +95,7 @@ public class ConnectorManager
private final SplitManager splitManager;
private final PageSourceManager pageSourceManager;
private final IndexManager indexManager;
private final NodePartitioningManager nodePartitioningManager;
private final PartitioningProviderManager partitioningProviderManager;
private final ConnectorPlanOptimizerManager connectorPlanOptimizerManager;

private final PageSinkManager pageSinkManager;
Expand Down Expand Up @@ -126,7 +126,7 @@ public ConnectorManager(
SplitManager splitManager,
PageSourceManager pageSourceManager,
IndexManager indexManager,
NodePartitioningManager nodePartitioningManager,
PartitioningProviderManager partitioningProviderManager,
ConnectorPlanOptimizerManager connectorPlanOptimizerManager,
PageSinkManager pageSinkManager,
HandleResolver handleResolver,
Expand All @@ -146,7 +146,7 @@ public ConnectorManager(
this.splitManager = splitManager;
this.pageSourceManager = pageSourceManager;
this.indexManager = indexManager;
this.nodePartitioningManager = nodePartitioningManager;
this.partitioningProviderManager = partitioningProviderManager;
this.connectorPlanOptimizerManager = connectorPlanOptimizerManager;
this.pageSinkManager = pageSinkManager;
this.handleResolver = handleResolver;
Expand Down Expand Up @@ -280,7 +280,7 @@ private synchronized void addConnectorInternal(MaterializedConnector connector)
.ifPresent(indexProvider -> indexManager.addIndexProvider(connectorId, indexProvider));

connector.getPartitioningProvider()
.ifPresent(partitioningProvider -> nodePartitioningManager.addPartitioningProvider(connectorId, partitioningProvider));
.ifPresent(partitioningProvider -> partitioningProviderManager.addPartitioningProvider(connectorId, partitioningProvider));

if (nodeManager.getCurrentNode().isCoordinator()) {
connector.getPlanOptimizerProvider()
Expand Down Expand Up @@ -317,7 +317,7 @@ private synchronized void removeConnectorInternal(ConnectorId connectorId)
pageSourceManager.removeConnectorPageSourceProvider(connectorId);
pageSinkManager.removeConnectorPageSinkProvider(connectorId);
indexManager.removeIndexProvider(connectorId);
nodePartitioningManager.removePartitioningProvider(connectorId);
partitioningProviderManager.removePartitioningProvider(connectorId);
metadataManager.getProcedureRegistry().removeProcedures(connectorId);
accessControlManager.removeCatalogAccessControl(connectorId);
metadataManager.getTablePropertyManager().removeProperties(connectorId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager;
import com.facebook.presto.sql.planner.LocalExecutionPlanner;
import com.facebook.presto.sql.planner.NodePartitioningManager;
import com.facebook.presto.sql.planner.PartitioningProviderManager;
import com.facebook.presto.sql.relational.RowExpressionDeterminismEvaluator;
import com.facebook.presto.sql.relational.RowExpressionDomainTranslator;
import com.facebook.presto.sql.tree.Expression;
Expand Down Expand Up @@ -394,6 +395,9 @@ protected void setup(Binder binder)
// split manager
binder.bind(SplitManager.class).in(Scopes.SINGLETON);

// partitioning provider manager
binder.bind(PartitioningProviderManager.class).in(Scopes.SINGLETON);

// node partitioning manager
binder.bind(NodePartitioningManager.class).in(Scopes.SINGLETON);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.ToIntFunction;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand All @@ -54,25 +52,13 @@
public class NodePartitioningManager
{
private final NodeScheduler nodeScheduler;
private final ConcurrentMap<ConnectorId, ConnectorNodePartitioningProvider> partitioningProviders = new ConcurrentHashMap<>();
private final PartitioningProviderManager partitioningProviderManager;

@Inject
public NodePartitioningManager(NodeScheduler nodeScheduler)
public NodePartitioningManager(NodeScheduler nodeScheduler, PartitioningProviderManager partitioningProviderManager)
{
this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null");
}

public void addPartitioningProvider(ConnectorId connectorId, ConnectorNodePartitioningProvider nodePartitioningProvider)
{
requireNonNull(connectorId, "connectorId is null");
requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null");
checkArgument(partitioningProviders.putIfAbsent(connectorId, nodePartitioningProvider) == null,
"NodePartitioningProvider for connector '%s' is already registered", connectorId);
}

public void removePartitioningProvider(ConnectorId connectorId)
{
partitioningProviders.remove(connectorId);
this.partitioningProviderManager = requireNonNull(partitioningProviderManager, "partitioningProviderManager is null");
}

public PartitionFunction getPartitionFunction(
Expand All @@ -94,8 +80,7 @@ public PartitionFunction getPartitionFunction(
partitioningScheme.getBucketToPartition().get());
}
else {
ConnectorNodePartitioningProvider partitioningProvider = partitioningProviders.get(partitioningHandle.getConnectorId().get());
checkArgument(partitioningProvider != null, "No partitioning provider for connector %s", partitioningHandle.getConnectorId().get());
ConnectorNodePartitioningProvider partitioningProvider = partitioningProviderManager.getPartitioningProvider(partitioningHandle.getConnectorId().get());

bucketFunction = partitioningProvider.getBucketFunction(
partitioningHandle.getTransactionHandle().orElse(null),
Expand All @@ -113,7 +98,7 @@ public List<ConnectorPartitionHandle> listPartitionHandles(
Session session,
PartitioningHandle partitioningHandle)
{
ConnectorNodePartitioningProvider partitioningProvider = partitioningProviders.get(partitioningHandle.getConnectorId().get());
ConnectorNodePartitioningProvider partitioningProvider = partitioningProviderManager.getPartitioningProvider(partitioningHandle.getConnectorId().get());
return partitioningProvider.listPartitionHandles(
partitioningHandle.getTransactionHandle().orElse(null),
session.toConnectorSession(),
Expand All @@ -129,11 +114,6 @@ public NodePartitionMap getNodePartitioningMap(Session session, PartitioningHand
return ((SystemPartitioningHandle) partitioningHandle.getConnectorHandle()).getNodePartitionMap(session, nodeScheduler);
}

ConnectorId connectorId = partitioningHandle.getConnectorId()
.orElseThrow(() -> new IllegalArgumentException("No connector ID for partitioning handle: " + partitioningHandle));
ConnectorNodePartitioningProvider partitioningProvider = partitioningProviders.get(connectorId);
checkArgument(partitioningProvider != null, "No partitioning provider for connector %s", connectorId);

ConnectorBucketNodeMap connectorBucketNodeMap = getConnectorBucketNodeMap(session, partitioningHandle);
// safety check for crazy partitioning
checkArgument(connectorBucketNodeMap.getBucketCount() < 1_000_000, "Too many buckets in partitioning: %s", connectorBucketNodeMap.getBucketCount());
Expand All @@ -143,6 +123,8 @@ public NodePartitionMap getNodePartitioningMap(Session session, PartitioningHand
bucketToNode = getFixedMapping(connectorBucketNodeMap);
}
else {
ConnectorId connectorId = partitioningHandle.getConnectorId()
.orElseThrow(() -> new IllegalArgumentException("No connector ID for partitioning handle: " + partitioningHandle));
bucketToNode = createArbitraryBucketToNode(
nodeScheduler.createNodeSelector(connectorId).selectRandomNodes(getMaxTasksPerStage(session)),
connectorBucketNodeMap.getBucketCount());
Expand Down Expand Up @@ -198,8 +180,7 @@ private ConnectorBucketNodeMap getConnectorBucketNodeMap(Session session, Partit
{
checkArgument(!(partitioningHandle.getConnectorHandle() instanceof SystemPartitioningHandle));

ConnectorNodePartitioningProvider partitioningProvider = partitioningProviders.get(partitioningHandle.getConnectorId().get());
checkArgument(partitioningProvider != null, "No partitioning provider for connector %s", partitioningHandle.getConnectorId().get());
ConnectorNodePartitioningProvider partitioningProvider = partitioningProviderManager.getPartitioningProvider(partitioningHandle.getConnectorId().get());

ConnectorBucketNodeMap connectorBucketNodeMap = partitioningProvider.getBucketNodeMap(
partitioningHandle.getTransactionHandle().orElse(null),
Expand All @@ -212,8 +193,7 @@ private ConnectorBucketNodeMap getConnectorBucketNodeMap(Session session, Partit

private ToIntFunction<Split> getSplitToBucket(Session session, PartitioningHandle partitioningHandle)
{
ConnectorNodePartitioningProvider partitioningProvider = partitioningProviders.get(partitioningHandle.getConnectorId().get());
checkArgument(partitioningProvider != null, "No partitioning provider for connector %s", partitioningHandle.getConnectorId().get());
ConnectorNodePartitioningProvider partitioningProvider = partitioningProviderManager.getPartitioningProvider(partitioningHandle.getConnectorId().get());

ToIntFunction<ConnectorSplit> splitBucketFunction = partitioningProvider.getSplitBucketFunction(
partitioningHandle.getTransactionHandle().orElse(null),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner;

import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class PartitioningProviderManager
{
private final ConcurrentMap<ConnectorId, ConnectorNodePartitioningProvider> partitioningProviders = new ConcurrentHashMap<>();

public ConnectorNodePartitioningProvider getPartitioningProvider(ConnectorId connectorId)
{
ConnectorNodePartitioningProvider partitioningProvider = partitioningProviders.get(connectorId);
checkArgument(partitioningProvider != null, "No partitioning provider for connector %s", connectorId);
return partitioningProvider;
}

public void addPartitioningProvider(ConnectorId connectorId, ConnectorNodePartitioningProvider nodePartitioningProvider)
{
requireNonNull(connectorId, "connectorId is null");
requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null");
checkArgument(partitioningProviders.putIfAbsent(connectorId, nodePartitioningProvider) == null,
"NodePartitioningProvider for connector '%s' is already registered", connectorId);
}

public void removePartitioningProvider(ConnectorId connectorId)
{
partitioningProviders.remove(connectorId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
import com.facebook.presto.sql.planner.LocalExecutionPlanner.LocalExecutionPlan;
import com.facebook.presto.sql.planner.LogicalPlanner;
import com.facebook.presto.sql.planner.NodePartitioningManager;
import com.facebook.presto.sql.planner.PartitioningProviderManager;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.sql.planner.PlanFragmenter;
import com.facebook.presto.sql.planner.PlanOptimizers;
Expand Down Expand Up @@ -238,6 +239,7 @@ public class LocalQueryRunner
private final BlockEncodingManager blockEncodingManager;
private final PageSourceManager pageSourceManager;
private final IndexManager indexManager;
private final PartitioningProviderManager partitioningProviderManager;
private final NodePartitioningManager nodePartitioningManager;
private final ConnectorPlanOptimizerManager planOptimizerManager;
private final PageSinkManager pageSinkManager;
Expand Down Expand Up @@ -306,7 +308,8 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
yieldExecutor,
catalogManager,
notificationExecutor);
this.nodePartitioningManager = new NodePartitioningManager(nodeScheduler);
this.partitioningProviderManager = new PartitioningProviderManager();
this.nodePartitioningManager = new NodePartitioningManager(nodeScheduler, partitioningProviderManager);
this.planOptimizerManager = new ConnectorPlanOptimizerManager();

this.blockEncodingManager = new BlockEncodingManager(typeRegistry);
Expand Down Expand Up @@ -343,7 +346,7 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
splitManager,
pageSourceManager,
indexManager,
nodePartitioningManager,
partitioningProviderManager,
planOptimizerManager,
pageSinkManager,
new HandleResolver(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.planner.NodePartitioningManager;
import com.facebook.presto.sql.planner.PartitioningProviderManager;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.sql.planner.PlanFragmenter;
import com.facebook.presto.sql.planner.PlanVariableAllocator;
Expand Down Expand Up @@ -133,7 +134,8 @@ public void setUp()
new InMemoryNodeManager(),
new NodeSchedulerConfig().setIncludeCoordinator(true),
new NodeTaskMap(finalizerService));
nodePartitioningManager = new NodePartitioningManager(nodeScheduler);
PartitioningProviderManager partitioningProviderManager = new PartitioningProviderManager();
nodePartitioningManager = new NodePartitioningManager(nodeScheduler, partitioningProviderManager);
planFragmenter = new PlanFragmenter(metadata, nodePartitioningManager, new QueryManagerConfig(), new SqlParser());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.facebook.presto.sql.planner.LocalExecutionPlanner;
import com.facebook.presto.sql.planner.NodePartitioningManager;
import com.facebook.presto.sql.planner.Partitioning;
import com.facebook.presto.sql.planner.PartitioningProviderManager;
import com.facebook.presto.sql.planner.PartitioningScheme;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
Expand Down Expand Up @@ -126,7 +127,8 @@ public static LocalExecutionPlanner createTestingPlanner()
new InMemoryNodeManager(),
new NodeSchedulerConfig().setIncludeCoordinator(true),
new NodeTaskMap(finalizerService));
NodePartitioningManager nodePartitioningManager = new NodePartitioningManager(nodeScheduler);
PartitioningProviderManager partitioningProviderManager = new PartitioningProviderManager();
NodePartitioningManager nodePartitioningManager = new NodePartitioningManager(nodeScheduler, partitioningProviderManager);

PageFunctionCompiler pageFunctionCompiler = new PageFunctionCompiler(metadata, 0);
return new LocalExecutionPlanner(
Expand Down

0 comments on commit 85efb93

Please sign in to comment.