From 278f4c6c4a1040319b156e1ade2fe9d872057d80 Mon Sep 17 00:00:00 2001 From: Kunal Kotwani Date: Thu, 16 Mar 2023 09:45:42 -0700 Subject: [PATCH] Fix shard allocation scenario with dedicated search nodes (#6689) Signed-off-by: Kunal Kotwani Signed-off-by: Mingshi Liu --- .../decider/TargetPoolAllocationDecider.java | 43 ++++++++- .../RemoteShardsBalancerBaseTestCase.java | 67 +++----------- .../TargetPoolAllocationDeciderTests.java | 91 ++++++++++++++++++- 3 files changed, 144 insertions(+), 57 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java index c87f7d16079e9..c11f5823cf3a7 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingPool; import org.opensearch.cluster.routing.ShardRouting; @@ -35,7 +36,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing if (RoutingPool.REMOTE_CAPABLE.equals(shardPool) && RoutingPool.LOCAL_ONLY.equals(targetNodePool)) { logger.debug( "Shard: [{}] has target pool: [{}]. Cannot allocate on node: [{}] with target pool: [{}]", - shardRouting.shortSummary(), + shardRouting, shardPool, node.node(), targetNodePool @@ -47,7 +48,25 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing shardPool, targetNodePool ); - } + } else if (RoutingPool.LOCAL_ONLY.equals(shardPool) + && RoutingPool.REMOTE_CAPABLE.equals(targetNodePool) + && !node.node().getRoles().contains(DiscoveryNodeRole.DATA_ROLE)) { + logger.debug( + "Shard: [{}] has target pool: [{}]. Cannot allocate on node: [{}] without the [{}] node role", + shardRouting, + shardPool, + node.node(), + DiscoveryNodeRole.DATA_ROLE + ); + return allocation.decision( + Decision.NO, + NAME, + "Routing pools are incompatible. Shard pool: [{}], Node Pool: [{}] without [{}] role", + shardPool, + targetNodePool, + DiscoveryNodeRole.DATA_ROLE + ); + } return allocation.decision( Decision.YES, NAME, @@ -91,7 +110,25 @@ private Decision canAllocateInTargetPool(IndexMetadata indexMetadata, DiscoveryN indexPool, targetNodePool ); - } + } else if (RoutingPool.LOCAL_ONLY.equals(indexPool) + && RoutingPool.REMOTE_CAPABLE.equals(targetNodePool) + && !node.getRoles().contains(DiscoveryNodeRole.DATA_ROLE)) { + logger.debug( + "Index: [{}] has target pool: [{}]. Cannot allocate on node: [{}] without the [{}] node role", + indexMetadata.getIndex().getName(), + indexPool, + node, + DiscoveryNodeRole.DATA_ROLE + ); + return allocation.decision( + Decision.NO, + NAME, + "Routing pools are incompatible. Index pool: [{}], Node Pool: [{}] without [{}] role", + indexPool, + targetNodePool, + DiscoveryNodeRole.DATA_ROLE + ); + } return allocation.decision( Decision.YES, NAME, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java index 789de474d8ce5..ae9799545e6af 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java @@ -20,7 +20,6 @@ import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RoutingNodes; @@ -35,18 +34,13 @@ import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; -import org.opensearch.common.transport.TransportAddress; import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexModule; import org.opensearch.test.gateway.TestGatewayAllocator; -import java.net.Inet4Address; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Set; import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; @@ -66,6 +60,7 @@ public abstract class RemoteShardsBalancerBaseTestCase extends OpenSearchAllocat DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.SEARCH_ROLE ); + protected static final Set SEARCH_ONLY_ROLE = Set.of(DiscoveryNodeRole.SEARCH_ROLE); protected static final int PRIMARIES = 5; protected static final int REPLICAS = 1; @@ -116,52 +111,11 @@ public RoutingAllocation getRoutingAllocation(ClusterState clusterState, Routing ); } - private Map createNodeAttributes(String nodeId) { - Map attr = new HashMap<>(); - attr.put("name", nodeId); - attr.put("node_id", nodeId); - return attr; + public ClusterState createInitialCluster(int localOnlyNodes, int remoteNodes, int localIndices, int remoteIndices) { + return createInitialCluster(localOnlyNodes, remoteNodes, false, localIndices, remoteIndices); } - public ClusterState addNodes(ClusterState clusterState, int nodeCount, boolean isRemote) { - DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes()); - for (int i = 0; i < nodeCount; i++) { - String id = getNodeId(i, isRemote, "new"); - nb.add(newNode(id, id, isRemote ? SEARCH_DATA_ROLES : MANAGER_DATA_ROLES)); - } - return ClusterState.builder(clusterState).nodes(nb.build()).build(); - } - - public ClusterState addNodeWithIP(ClusterState clusterState, int nodeId, boolean isRemote, String IP) throws UnknownHostException { - TransportAddress ipAddress = new TransportAddress(Inet4Address.getByName(IP), 9200); - DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes()); - String id = getNodeId(nodeId, isRemote, "new"); - nb.add( - new DiscoveryNode( - id, - id, - ipAddress, - createNodeAttributes(id), - isRemote ? SEARCH_DATA_ROLES : MANAGER_DATA_ROLES, - Version.CURRENT - ) - ); - return ClusterState.builder(clusterState).nodes(nb.build()).build(); - } - - public ClusterState terminateNodes(ClusterState clusterState, AllocationService service, List nodesToTerminate) { - if (nodesToTerminate.isEmpty()) { - return clusterState; - } - logger.info("Terminating following nodes from cluster: [{}]", nodesToTerminate); - DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes()); - nodesToTerminate.forEach(nb::remove); - clusterState = ClusterState.builder(clusterState).nodes(nb.build()).build(); - clusterState = service.disassociateDeadNodes(clusterState, false, "nodes-terminated"); - return clusterState; - } - - public ClusterState createInitialCluster(int localOnlyNodes, int remoteCapableNodes, int localIndices, int remoteIndices) { + public ClusterState createInitialCluster(int localOnlyNodes, int remoteNodes, boolean remoteOnly, int localIndices, int remoteIndices) { Metadata.Builder mb = Metadata.builder(); for (int i = 0; i < localIndices; i++) { mb.put( @@ -199,9 +153,16 @@ public ClusterState createInitialCluster(int localOnlyNodes, int remoteCapableNo String name = getNodeId(i, false); nb.add(newNode(name, name, MANAGER_DATA_ROLES)); } - for (int i = 0; i < remoteCapableNodes; i++) { - String name = getNodeId(i, true); - nb.add(newNode(name, name, SEARCH_DATA_ROLES)); + if (remoteOnly) { + for (int i = 0; i < remoteNodes; i++) { + String name = getNodeId(i, true); + nb.add(newNode(name, name, SEARCH_ONLY_ROLE)); + } + } else { + for (int i = 0; i < remoteNodes; i++) { + String name = getNodeId(i, true); + nb.add(newNode(name, name, SEARCH_DATA_ROLES)); + } } DiscoveryNodes nodes = nb.build(); return ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(routingTable).nodes(nodes).build(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java index 9a415ed0b339b..8f2db5db969d2 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java @@ -24,7 +24,7 @@ import java.util.stream.Collectors; public class TargetPoolAllocationDeciderTests extends RemoteShardsBalancerBaseTestCase { - public void testTargetPoolAllocationDecisions() { + public void testTargetPoolHybridAllocationDecisions() { ClusterState clusterState = createInitialCluster(3, 3, 2, 2); AllocationService service = this.createRemoteCapableAllocationService(); clusterState = allocateShardsAndBalance(clusterState, service); @@ -111,4 +111,93 @@ public void testTargetPoolAllocationDecisions() { assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(localIdx, localOnlyNode.node(), globalAllocation).type()); assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(remoteIdx, remoteCapableNode.node(), globalAllocation).type()); } + + public void testTargetPoolDedicatedSearchNodeAllocationDecisions() { + ClusterState clusterState = createInitialCluster(3, 3, true, 2, 2); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + + // Add an unassigned primary shard for force allocation checks + Metadata metadata = Metadata.builder(clusterState.metadata()) + .put(IndexMetadata.builder("test_local_unassigned").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + RoutingTable routingTable = RoutingTable.builder(clusterState.routingTable()) + .addAsNew(metadata.index("test_local_unassigned")) + .build(); + clusterState = ClusterState.builder(clusterState).metadata(metadata).routingTable(routingTable).build(); + + // Add remote index unassigned primary + clusterState = createRemoteIndex(clusterState, "test_remote_unassigned"); + + RoutingNodes defaultRoutingNodes = clusterState.getRoutingNodes(); + RoutingAllocation globalAllocation = getRoutingAllocation(clusterState, defaultRoutingNodes); + + ShardRouting localShard = clusterState.routingTable() + .allShards(getIndexName(0, false)) + .stream() + .filter(ShardRouting::primary) + .collect(Collectors.toList()) + .get(0); + ShardRouting remoteShard = clusterState.routingTable() + .allShards(getIndexName(0, true)) + .stream() + .filter(ShardRouting::primary) + .collect(Collectors.toList()) + .get(0); + ShardRouting unassignedLocalShard = clusterState.routingTable() + .allShards("test_local_unassigned") + .stream() + .filter(ShardRouting::primary) + .collect(Collectors.toList()) + .get(0); + ShardRouting unassignedRemoteShard = clusterState.routingTable() + .allShards("test_remote_unassigned") + .stream() + .filter(ShardRouting::primary) + .collect(Collectors.toList()) + .get(0); + IndexMetadata localIdx = globalAllocation.metadata().getIndexSafe(localShard.index()); + IndexMetadata remoteIdx = globalAllocation.metadata().getIndexSafe(remoteShard.index()); + String localNodeId = LOCAL_NODE_PREFIX; + for (RoutingNode routingNode : globalAllocation.routingNodes()) { + if (routingNode.nodeId().startsWith(LOCAL_NODE_PREFIX)) { + localNodeId = routingNode.nodeId(); + break; + } + } + String remoteNodeId = remoteShard.currentNodeId(); + RoutingNode localOnlyNode = defaultRoutingNodes.node(localNodeId); + RoutingNode remoteCapableNode = defaultRoutingNodes.node(remoteNodeId); + + AllocationDeciders deciders = new AllocationDeciders(Collections.singletonList(new TargetPoolAllocationDecider())); + + // Incompatible Pools + assertEquals(Decision.NO.type(), deciders.canAllocate(remoteShard, localOnlyNode, globalAllocation).type()); + assertEquals(Decision.NO.type(), deciders.canAllocate(remoteIdx, localOnlyNode, globalAllocation).type()); + assertEquals(Decision.NO.type(), deciders.canForceAllocatePrimary(unassignedRemoteShard, localOnlyNode, globalAllocation).type()); + // A dedicated search node should not accept local shards and indices. + assertEquals(Decision.NO.type(), deciders.canAllocate(localShard, remoteCapableNode, globalAllocation).type()); + assertEquals(Decision.NO.type(), deciders.canAllocate(localIdx, remoteCapableNode, globalAllocation).type()); + assertEquals( + Decision.NO.type(), + deciders.canForceAllocatePrimary(unassignedLocalShard, remoteCapableNode, globalAllocation).type() + ); + + // Compatible Pools + assertEquals(Decision.YES.type(), deciders.canAllocate(remoteShard, remoteCapableNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(remoteIdx, remoteCapableNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(localShard, localOnlyNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(localIdx, localOnlyNode, globalAllocation).type()); + assertEquals( + Decision.YES.type(), + deciders.canForceAllocatePrimary(unassignedRemoteShard, remoteCapableNode, globalAllocation).type() + ); + assertEquals(Decision.YES.type(), deciders.canForceAllocatePrimary(unassignedLocalShard, localOnlyNode, globalAllocation).type()); + + // Verify only compatible nodes are used for auto expand replica decision for remote index and local index + assertEquals(Decision.NO.type(), deciders.shouldAutoExpandToNode(localIdx, remoteCapableNode.node(), globalAllocation).type()); + assertEquals(Decision.NO.type(), deciders.shouldAutoExpandToNode(remoteIdx, localOnlyNode.node(), globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(localIdx, localOnlyNode.node(), globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(remoteIdx, remoteCapableNode.node(), globalAllocation).type()); + } }