Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Fix shard allocation scenario with dedicated search nodes #6718

Merged
merged 1 commit into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingPool;
import org.opensearch.cluster.routing.ShardRouting;
Expand All @@ -35,7 +36,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
if (RoutingPool.REMOTE_CAPABLE.equals(shardPool) && RoutingPool.LOCAL_ONLY.equals(targetNodePool)) {
logger.debug(
"Shard: [{}] has target pool: [{}]. Cannot allocate on node: [{}] with target pool: [{}]",
shardRouting.shortSummary(),
shardRouting,
shardPool,
node.node(),
targetNodePool
Expand All @@ -47,7 +48,25 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
shardPool,
targetNodePool
);
}
} else if (RoutingPool.LOCAL_ONLY.equals(shardPool)
&& RoutingPool.REMOTE_CAPABLE.equals(targetNodePool)
&& !node.node().getRoles().contains(DiscoveryNodeRole.DATA_ROLE)) {
logger.debug(
"Shard: [{}] has target pool: [{}]. Cannot allocate on node: [{}] without the [{}] node role",
shardRouting,
shardPool,
node.node(),
DiscoveryNodeRole.DATA_ROLE
);
return allocation.decision(
Decision.NO,
NAME,
"Routing pools are incompatible. Shard pool: [{}], Node Pool: [{}] without [{}] role",
shardPool,
targetNodePool,
DiscoveryNodeRole.DATA_ROLE
);
}
return allocation.decision(
Decision.YES,
NAME,
Expand Down Expand Up @@ -91,7 +110,25 @@ private Decision canAllocateInTargetPool(IndexMetadata indexMetadata, DiscoveryN
indexPool,
targetNodePool
);
}
} else if (RoutingPool.LOCAL_ONLY.equals(indexPool)
&& RoutingPool.REMOTE_CAPABLE.equals(targetNodePool)
&& !node.getRoles().contains(DiscoveryNodeRole.DATA_ROLE)) {
logger.debug(
"Index: [{}] has target pool: [{}]. Cannot allocate on node: [{}] without the [{}] node role",
indexMetadata.getIndex().getName(),
indexPool,
node,
DiscoveryNodeRole.DATA_ROLE
);
return allocation.decision(
Decision.NO,
NAME,
"Routing pools are incompatible. Index pool: [{}], Node Pool: [{}] without [{}] role",
indexPool,
targetNodePool,
DiscoveryNodeRole.DATA_ROLE
);
}
return allocation.decision(
Decision.YES,
NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.cluster.OpenSearchAllocationTestCase;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingNodes;
Expand All @@ -35,18 +34,13 @@
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.test.gateway.TestGatewayAllocator;

import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
Expand All @@ -66,6 +60,7 @@ public abstract class RemoteShardsBalancerBaseTestCase extends OpenSearchAllocat
DiscoveryNodeRole.DATA_ROLE,
DiscoveryNodeRole.SEARCH_ROLE
);
protected static final Set<DiscoveryNodeRole> SEARCH_ONLY_ROLE = Set.of(DiscoveryNodeRole.SEARCH_ROLE);

protected static final int PRIMARIES = 5;
protected static final int REPLICAS = 1;
Expand Down Expand Up @@ -116,52 +111,11 @@ public RoutingAllocation getRoutingAllocation(ClusterState clusterState, Routing
);
}

private Map<String, String> createNodeAttributes(String nodeId) {
Map<String, String> attr = new HashMap<>();
attr.put("name", nodeId);
attr.put("node_id", nodeId);
return attr;
public ClusterState createInitialCluster(int localOnlyNodes, int remoteNodes, int localIndices, int remoteIndices) {
return createInitialCluster(localOnlyNodes, remoteNodes, false, localIndices, remoteIndices);
}

public ClusterState addNodes(ClusterState clusterState, int nodeCount, boolean isRemote) {
DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes());
for (int i = 0; i < nodeCount; i++) {
String id = getNodeId(i, isRemote, "new");
nb.add(newNode(id, id, isRemote ? SEARCH_DATA_ROLES : MANAGER_DATA_ROLES));
}
return ClusterState.builder(clusterState).nodes(nb.build()).build();
}

public ClusterState addNodeWithIP(ClusterState clusterState, int nodeId, boolean isRemote, String IP) throws UnknownHostException {
TransportAddress ipAddress = new TransportAddress(Inet4Address.getByName(IP), 9200);
DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes());
String id = getNodeId(nodeId, isRemote, "new");
nb.add(
new DiscoveryNode(
id,
id,
ipAddress,
createNodeAttributes(id),
isRemote ? SEARCH_DATA_ROLES : MANAGER_DATA_ROLES,
Version.CURRENT
)
);
return ClusterState.builder(clusterState).nodes(nb.build()).build();
}

public ClusterState terminateNodes(ClusterState clusterState, AllocationService service, List<String> nodesToTerminate) {
if (nodesToTerminate.isEmpty()) {
return clusterState;
}
logger.info("Terminating following nodes from cluster: [{}]", nodesToTerminate);
DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes());
nodesToTerminate.forEach(nb::remove);
clusterState = ClusterState.builder(clusterState).nodes(nb.build()).build();
clusterState = service.disassociateDeadNodes(clusterState, false, "nodes-terminated");
return clusterState;
}

public ClusterState createInitialCluster(int localOnlyNodes, int remoteCapableNodes, int localIndices, int remoteIndices) {
public ClusterState createInitialCluster(int localOnlyNodes, int remoteNodes, boolean remoteOnly, int localIndices, int remoteIndices) {
Metadata.Builder mb = Metadata.builder();
for (int i = 0; i < localIndices; i++) {
mb.put(
Expand Down Expand Up @@ -199,9 +153,16 @@ public ClusterState createInitialCluster(int localOnlyNodes, int remoteCapableNo
String name = getNodeId(i, false);
nb.add(newNode(name, name, MANAGER_DATA_ROLES));
}
for (int i = 0; i < remoteCapableNodes; i++) {
String name = getNodeId(i, true);
nb.add(newNode(name, name, SEARCH_DATA_ROLES));
if (remoteOnly) {
for (int i = 0; i < remoteNodes; i++) {
String name = getNodeId(i, true);
nb.add(newNode(name, name, SEARCH_ONLY_ROLE));
}
} else {
for (int i = 0; i < remoteNodes; i++) {
String name = getNodeId(i, true);
nb.add(newNode(name, name, SEARCH_DATA_ROLES));
}
}
DiscoveryNodes nodes = nb.build();
return ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(routingTable).nodes(nodes).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.stream.Collectors;

public class TargetPoolAllocationDeciderTests extends RemoteShardsBalancerBaseTestCase {
public void testTargetPoolAllocationDecisions() {
public void testTargetPoolHybridAllocationDecisions() {
ClusterState clusterState = createInitialCluster(3, 3, 2, 2);
AllocationService service = this.createRemoteCapableAllocationService();
clusterState = allocateShardsAndBalance(clusterState, service);
Expand Down Expand Up @@ -111,4 +111,93 @@ public void testTargetPoolAllocationDecisions() {
assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(localIdx, localOnlyNode.node(), globalAllocation).type());
assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(remoteIdx, remoteCapableNode.node(), globalAllocation).type());
}

public void testTargetPoolDedicatedSearchNodeAllocationDecisions() {
ClusterState clusterState = createInitialCluster(3, 3, true, 2, 2);
AllocationService service = this.createRemoteCapableAllocationService();
clusterState = allocateShardsAndBalance(clusterState, service);

// Add an unassigned primary shard for force allocation checks
Metadata metadata = Metadata.builder(clusterState.metadata())
.put(IndexMetadata.builder("test_local_unassigned").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = RoutingTable.builder(clusterState.routingTable())
.addAsNew(metadata.index("test_local_unassigned"))
.build();
clusterState = ClusterState.builder(clusterState).metadata(metadata).routingTable(routingTable).build();

// Add remote index unassigned primary
clusterState = createRemoteIndex(clusterState, "test_remote_unassigned");

RoutingNodes defaultRoutingNodes = clusterState.getRoutingNodes();
RoutingAllocation globalAllocation = getRoutingAllocation(clusterState, defaultRoutingNodes);

ShardRouting localShard = clusterState.routingTable()
.allShards(getIndexName(0, false))
.stream()
.filter(ShardRouting::primary)
.collect(Collectors.toList())
.get(0);
ShardRouting remoteShard = clusterState.routingTable()
.allShards(getIndexName(0, true))
.stream()
.filter(ShardRouting::primary)
.collect(Collectors.toList())
.get(0);
ShardRouting unassignedLocalShard = clusterState.routingTable()
.allShards("test_local_unassigned")
.stream()
.filter(ShardRouting::primary)
.collect(Collectors.toList())
.get(0);
ShardRouting unassignedRemoteShard = clusterState.routingTable()
.allShards("test_remote_unassigned")
.stream()
.filter(ShardRouting::primary)
.collect(Collectors.toList())
.get(0);
IndexMetadata localIdx = globalAllocation.metadata().getIndexSafe(localShard.index());
IndexMetadata remoteIdx = globalAllocation.metadata().getIndexSafe(remoteShard.index());
String localNodeId = LOCAL_NODE_PREFIX;
for (RoutingNode routingNode : globalAllocation.routingNodes()) {
if (routingNode.nodeId().startsWith(LOCAL_NODE_PREFIX)) {
localNodeId = routingNode.nodeId();
break;
}
}
String remoteNodeId = remoteShard.currentNodeId();
RoutingNode localOnlyNode = defaultRoutingNodes.node(localNodeId);
RoutingNode remoteCapableNode = defaultRoutingNodes.node(remoteNodeId);

AllocationDeciders deciders = new AllocationDeciders(Collections.singletonList(new TargetPoolAllocationDecider()));

// Incompatible Pools
assertEquals(Decision.NO.type(), deciders.canAllocate(remoteShard, localOnlyNode, globalAllocation).type());
assertEquals(Decision.NO.type(), deciders.canAllocate(remoteIdx, localOnlyNode, globalAllocation).type());
assertEquals(Decision.NO.type(), deciders.canForceAllocatePrimary(unassignedRemoteShard, localOnlyNode, globalAllocation).type());
// A dedicated search node should not accept local shards and indices.
assertEquals(Decision.NO.type(), deciders.canAllocate(localShard, remoteCapableNode, globalAllocation).type());
assertEquals(Decision.NO.type(), deciders.canAllocate(localIdx, remoteCapableNode, globalAllocation).type());
assertEquals(
Decision.NO.type(),
deciders.canForceAllocatePrimary(unassignedLocalShard, remoteCapableNode, globalAllocation).type()
);

// Compatible Pools
assertEquals(Decision.YES.type(), deciders.canAllocate(remoteShard, remoteCapableNode, globalAllocation).type());
assertEquals(Decision.YES.type(), deciders.canAllocate(remoteIdx, remoteCapableNode, globalAllocation).type());
assertEquals(Decision.YES.type(), deciders.canAllocate(localShard, localOnlyNode, globalAllocation).type());
assertEquals(Decision.YES.type(), deciders.canAllocate(localIdx, localOnlyNode, globalAllocation).type());
assertEquals(
Decision.YES.type(),
deciders.canForceAllocatePrimary(unassignedRemoteShard, remoteCapableNode, globalAllocation).type()
);
assertEquals(Decision.YES.type(), deciders.canForceAllocatePrimary(unassignedLocalShard, localOnlyNode, globalAllocation).type());

// Verify only compatible nodes are used for auto expand replica decision for remote index and local index
assertEquals(Decision.NO.type(), deciders.shouldAutoExpandToNode(localIdx, remoteCapableNode.node(), globalAllocation).type());
assertEquals(Decision.NO.type(), deciders.shouldAutoExpandToNode(remoteIdx, localOnlyNode.node(), globalAllocation).type());
assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(localIdx, localOnlyNode.node(), globalAllocation).type());
assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(remoteIdx, remoteCapableNode.node(), globalAllocation).type());
}
}