Skip to content

Commit

Permalink
Fix shard allocation scenario with dedicated search nodes (opensearch…
Browse files Browse the repository at this point in the history
…-project#6689)

Signed-off-by: Kunal Kotwani <[email protected]>
Signed-off-by: Mingshi Liu <[email protected]>
  • Loading branch information
kotwanikunal authored and mingshl committed Mar 24, 2023
1 parent fcb61f0 commit 278f4c6
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingPool;
import org.opensearch.cluster.routing.ShardRouting;
Expand All @@ -35,7 +36,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
if (RoutingPool.REMOTE_CAPABLE.equals(shardPool) && RoutingPool.LOCAL_ONLY.equals(targetNodePool)) {
logger.debug(
"Shard: [{}] has target pool: [{}]. Cannot allocate on node: [{}] with target pool: [{}]",
shardRouting.shortSummary(),
shardRouting,
shardPool,
node.node(),
targetNodePool
Expand All @@ -47,7 +48,25 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
shardPool,
targetNodePool
);
}
} else if (RoutingPool.LOCAL_ONLY.equals(shardPool)
&& RoutingPool.REMOTE_CAPABLE.equals(targetNodePool)
&& !node.node().getRoles().contains(DiscoveryNodeRole.DATA_ROLE)) {
logger.debug(
"Shard: [{}] has target pool: [{}]. Cannot allocate on node: [{}] without the [{}] node role",
shardRouting,
shardPool,
node.node(),
DiscoveryNodeRole.DATA_ROLE
);
return allocation.decision(
Decision.NO,
NAME,
"Routing pools are incompatible. Shard pool: [{}], Node Pool: [{}] without [{}] role",
shardPool,
targetNodePool,
DiscoveryNodeRole.DATA_ROLE
);
}
return allocation.decision(
Decision.YES,
NAME,
Expand Down Expand Up @@ -91,7 +110,25 @@ private Decision canAllocateInTargetPool(IndexMetadata indexMetadata, DiscoveryN
indexPool,
targetNodePool
);
}
} else if (RoutingPool.LOCAL_ONLY.equals(indexPool)
&& RoutingPool.REMOTE_CAPABLE.equals(targetNodePool)
&& !node.getRoles().contains(DiscoveryNodeRole.DATA_ROLE)) {
logger.debug(
"Index: [{}] has target pool: [{}]. Cannot allocate on node: [{}] without the [{}] node role",
indexMetadata.getIndex().getName(),
indexPool,
node,
DiscoveryNodeRole.DATA_ROLE
);
return allocation.decision(
Decision.NO,
NAME,
"Routing pools are incompatible. Index pool: [{}], Node Pool: [{}] without [{}] role",
indexPool,
targetNodePool,
DiscoveryNodeRole.DATA_ROLE
);
}
return allocation.decision(
Decision.YES,
NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.cluster.OpenSearchAllocationTestCase;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingNodes;
Expand All @@ -35,18 +34,13 @@
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.test.gateway.TestGatewayAllocator;

import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
Expand All @@ -66,6 +60,7 @@ public abstract class RemoteShardsBalancerBaseTestCase extends OpenSearchAllocat
DiscoveryNodeRole.DATA_ROLE,
DiscoveryNodeRole.SEARCH_ROLE
);
protected static final Set<DiscoveryNodeRole> SEARCH_ONLY_ROLE = Set.of(DiscoveryNodeRole.SEARCH_ROLE);

protected static final int PRIMARIES = 5;
protected static final int REPLICAS = 1;
Expand Down Expand Up @@ -116,52 +111,11 @@ public RoutingAllocation getRoutingAllocation(ClusterState clusterState, Routing
);
}

private Map<String, String> createNodeAttributes(String nodeId) {
Map<String, String> attr = new HashMap<>();
attr.put("name", nodeId);
attr.put("node_id", nodeId);
return attr;
public ClusterState createInitialCluster(int localOnlyNodes, int remoteNodes, int localIndices, int remoteIndices) {
return createInitialCluster(localOnlyNodes, remoteNodes, false, localIndices, remoteIndices);
}

public ClusterState addNodes(ClusterState clusterState, int nodeCount, boolean isRemote) {
DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes());
for (int i = 0; i < nodeCount; i++) {
String id = getNodeId(i, isRemote, "new");
nb.add(newNode(id, id, isRemote ? SEARCH_DATA_ROLES : MANAGER_DATA_ROLES));
}
return ClusterState.builder(clusterState).nodes(nb.build()).build();
}

public ClusterState addNodeWithIP(ClusterState clusterState, int nodeId, boolean isRemote, String IP) throws UnknownHostException {
TransportAddress ipAddress = new TransportAddress(Inet4Address.getByName(IP), 9200);
DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes());
String id = getNodeId(nodeId, isRemote, "new");
nb.add(
new DiscoveryNode(
id,
id,
ipAddress,
createNodeAttributes(id),
isRemote ? SEARCH_DATA_ROLES : MANAGER_DATA_ROLES,
Version.CURRENT
)
);
return ClusterState.builder(clusterState).nodes(nb.build()).build();
}

public ClusterState terminateNodes(ClusterState clusterState, AllocationService service, List<String> nodesToTerminate) {
if (nodesToTerminate.isEmpty()) {
return clusterState;
}
logger.info("Terminating following nodes from cluster: [{}]", nodesToTerminate);
DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes());
nodesToTerminate.forEach(nb::remove);
clusterState = ClusterState.builder(clusterState).nodes(nb.build()).build();
clusterState = service.disassociateDeadNodes(clusterState, false, "nodes-terminated");
return clusterState;
}

public ClusterState createInitialCluster(int localOnlyNodes, int remoteCapableNodes, int localIndices, int remoteIndices) {
public ClusterState createInitialCluster(int localOnlyNodes, int remoteNodes, boolean remoteOnly, int localIndices, int remoteIndices) {
Metadata.Builder mb = Metadata.builder();
for (int i = 0; i < localIndices; i++) {
mb.put(
Expand Down Expand Up @@ -199,9 +153,16 @@ public ClusterState createInitialCluster(int localOnlyNodes, int remoteCapableNo
String name = getNodeId(i, false);
nb.add(newNode(name, name, MANAGER_DATA_ROLES));
}
for (int i = 0; i < remoteCapableNodes; i++) {
String name = getNodeId(i, true);
nb.add(newNode(name, name, SEARCH_DATA_ROLES));
if (remoteOnly) {
for (int i = 0; i < remoteNodes; i++) {
String name = getNodeId(i, true);
nb.add(newNode(name, name, SEARCH_ONLY_ROLE));
}
} else {
for (int i = 0; i < remoteNodes; i++) {
String name = getNodeId(i, true);
nb.add(newNode(name, name, SEARCH_DATA_ROLES));
}
}
DiscoveryNodes nodes = nb.build();
return ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(routingTable).nodes(nodes).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.stream.Collectors;

public class TargetPoolAllocationDeciderTests extends RemoteShardsBalancerBaseTestCase {
public void testTargetPoolAllocationDecisions() {
public void testTargetPoolHybridAllocationDecisions() {
ClusterState clusterState = createInitialCluster(3, 3, 2, 2);
AllocationService service = this.createRemoteCapableAllocationService();
clusterState = allocateShardsAndBalance(clusterState, service);
Expand Down Expand Up @@ -111,4 +111,93 @@ public void testTargetPoolAllocationDecisions() {
assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(localIdx, localOnlyNode.node(), globalAllocation).type());
assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(remoteIdx, remoteCapableNode.node(), globalAllocation).type());
}

public void testTargetPoolDedicatedSearchNodeAllocationDecisions() {
ClusterState clusterState = createInitialCluster(3, 3, true, 2, 2);
AllocationService service = this.createRemoteCapableAllocationService();
clusterState = allocateShardsAndBalance(clusterState, service);

// Add an unassigned primary shard for force allocation checks
Metadata metadata = Metadata.builder(clusterState.metadata())
.put(IndexMetadata.builder("test_local_unassigned").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = RoutingTable.builder(clusterState.routingTable())
.addAsNew(metadata.index("test_local_unassigned"))
.build();
clusterState = ClusterState.builder(clusterState).metadata(metadata).routingTable(routingTable).build();

// Add remote index unassigned primary
clusterState = createRemoteIndex(clusterState, "test_remote_unassigned");

RoutingNodes defaultRoutingNodes = clusterState.getRoutingNodes();
RoutingAllocation globalAllocation = getRoutingAllocation(clusterState, defaultRoutingNodes);

ShardRouting localShard = clusterState.routingTable()
.allShards(getIndexName(0, false))
.stream()
.filter(ShardRouting::primary)
.collect(Collectors.toList())
.get(0);
ShardRouting remoteShard = clusterState.routingTable()
.allShards(getIndexName(0, true))
.stream()
.filter(ShardRouting::primary)
.collect(Collectors.toList())
.get(0);
ShardRouting unassignedLocalShard = clusterState.routingTable()
.allShards("test_local_unassigned")
.stream()
.filter(ShardRouting::primary)
.collect(Collectors.toList())
.get(0);
ShardRouting unassignedRemoteShard = clusterState.routingTable()
.allShards("test_remote_unassigned")
.stream()
.filter(ShardRouting::primary)
.collect(Collectors.toList())
.get(0);
IndexMetadata localIdx = globalAllocation.metadata().getIndexSafe(localShard.index());
IndexMetadata remoteIdx = globalAllocation.metadata().getIndexSafe(remoteShard.index());
String localNodeId = LOCAL_NODE_PREFIX;
for (RoutingNode routingNode : globalAllocation.routingNodes()) {
if (routingNode.nodeId().startsWith(LOCAL_NODE_PREFIX)) {
localNodeId = routingNode.nodeId();
break;
}
}
String remoteNodeId = remoteShard.currentNodeId();
RoutingNode localOnlyNode = defaultRoutingNodes.node(localNodeId);
RoutingNode remoteCapableNode = defaultRoutingNodes.node(remoteNodeId);

AllocationDeciders deciders = new AllocationDeciders(Collections.singletonList(new TargetPoolAllocationDecider()));

// Incompatible Pools
assertEquals(Decision.NO.type(), deciders.canAllocate(remoteShard, localOnlyNode, globalAllocation).type());
assertEquals(Decision.NO.type(), deciders.canAllocate(remoteIdx, localOnlyNode, globalAllocation).type());
assertEquals(Decision.NO.type(), deciders.canForceAllocatePrimary(unassignedRemoteShard, localOnlyNode, globalAllocation).type());
// A dedicated search node should not accept local shards and indices.
assertEquals(Decision.NO.type(), deciders.canAllocate(localShard, remoteCapableNode, globalAllocation).type());
assertEquals(Decision.NO.type(), deciders.canAllocate(localIdx, remoteCapableNode, globalAllocation).type());
assertEquals(
Decision.NO.type(),
deciders.canForceAllocatePrimary(unassignedLocalShard, remoteCapableNode, globalAllocation).type()
);

// Compatible Pools
assertEquals(Decision.YES.type(), deciders.canAllocate(remoteShard, remoteCapableNode, globalAllocation).type());
assertEquals(Decision.YES.type(), deciders.canAllocate(remoteIdx, remoteCapableNode, globalAllocation).type());
assertEquals(Decision.YES.type(), deciders.canAllocate(localShard, localOnlyNode, globalAllocation).type());
assertEquals(Decision.YES.type(), deciders.canAllocate(localIdx, localOnlyNode, globalAllocation).type());
assertEquals(
Decision.YES.type(),
deciders.canForceAllocatePrimary(unassignedRemoteShard, remoteCapableNode, globalAllocation).type()
);
assertEquals(Decision.YES.type(), deciders.canForceAllocatePrimary(unassignedLocalShard, localOnlyNode, globalAllocation).type());

// Verify only compatible nodes are used for auto expand replica decision for remote index and local index
assertEquals(Decision.NO.type(), deciders.shouldAutoExpandToNode(localIdx, remoteCapableNode.node(), globalAllocation).type());
assertEquals(Decision.NO.type(), deciders.shouldAutoExpandToNode(remoteIdx, localOnlyNode.node(), globalAllocation).type());
assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(localIdx, localOnlyNode.node(), globalAllocation).type());
assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(remoteIdx, remoteCapableNode.node(), globalAllocation).type());
}
}

0 comments on commit 278f4c6

Please sign in to comment.