Skip to content

Commit

Permalink
pick oldest OS version replica to promote as primary
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed Aug 24, 2023
1 parent 762c70b commit 76dfe70
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ protected boolean resolveIndex(GetRequest request) {
return true;
}

static boolean isSegmentReplicationEnabled(ClusterState state, String indexName) {
public static boolean isSegmentReplicationEnabled(ClusterState state, String indexName) {
return Optional.ofNullable(state.getMetadata().index(indexName))
.map(
indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.action.get.TransportGetAction.isSegmentReplicationEnabled;

/**
* {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}.
* It can be either initialized as mutable or immutable (see {@link #RoutingNodes(ClusterState, boolean)}), allowing
Expand All @@ -82,6 +84,7 @@
* @opensearch.internal
*/
public class RoutingNodes implements Iterable<RoutingNode> {
private final ClusterState clusterState;

private final Map<String, RoutingNode> nodesToShards = new HashMap<>();

Expand All @@ -107,6 +110,7 @@ public RoutingNodes(ClusterState clusterState) {
}

public RoutingNodes(ClusterState clusterState, boolean readOnly) {
this.clusterState = clusterState;
this.readOnly = readOnly;
final RoutingTable routingTable = clusterState.routingTable();
this.nodesPerAttributeNames = Collections.synchronizedMap(new HashMap<>());
Expand Down Expand Up @@ -368,26 +372,36 @@ public ShardRouting activePrimary(ShardId shardId) {
* Returns one active replica shard for the given shard id or <code>null</code> if
* no active replica is found.
*
* Since replicas could possibly be on nodes with a older version of OpenSearch than
* the primary is, this will return replicas on the highest version of OpenSearch.
* Since replicas could possibly be on nodes with an older version of OpenSearch than
* the primary is, this will return replicas on the highest version of OpenSearch when
* document replication strategy is in use, and will return replicas on oldest version
* of OpenSearch when segment replication is enabled.
*
*/
public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) {
public ShardRouting activeReplicaBasedOnReplicationStrategy(ShardId shardId) {
// It's possible for replicaNodeVersion to be null, when disassociating dead nodes
// that have been removed, the shards are failed, and part of the shard failing
// calls this method with an out-of-date RoutingNodes, where the version might not
// be accessible. Therefore, we need to protect against the version being null
// (meaning the node will be going away).
return assignedShards(shardId).stream()
Stream<ShardRouting> candidateShards = assignedShards(shardId).stream()
.filter(shr -> !shr.primary() && shr.active())
.filter(shr -> node(shr.currentNodeId()) != null)
.max(
.filter(shr -> node(shr.currentNodeId()) != null);
if (isSegmentReplicationEnabled(clusterState, shardId.getIndexName())) {
return candidateShards.min(
Comparator.comparing(
shr -> node(shr.currentNodeId()).node(),
Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion))
)
).orElse(null);

}
return candidateShards.max(
Comparator.comparing(
shr -> node(shr.currentNodeId()).node(),
Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion))
)
.orElse(null);
).orElse(null);
}

/**
Expand Down Expand Up @@ -724,7 +738,7 @@ private void unassignPrimaryAndPromoteActiveReplicaIfExists(
RoutingChangesObserver routingChangesObserver
) {
assert failedShard.primary();
ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
ShardRouting activeReplica = activeReplicaBasedOnReplicationStrategy(failedShard.shardId());
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.VersionUtils;

import java.util.ArrayList;
Expand Down Expand Up @@ -587,7 +588,7 @@ public void testFailAllReplicasInitializingOnPrimaryFail() {
clusterState = startShardsAndReroute(allocation, clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0));
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId);
ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaBasedOnReplicationStrategy(shardId);

// fail the primary shard, check replicas get removed as well...
ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
Expand Down Expand Up @@ -647,10 +648,21 @@ public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToEle
}

public void testReplicaOnNewestVersionIsPromoted() {
testReplicaIsPromoted(false);
}

public void testReplicaOnOldestVersionIsPromoted() {
testReplicaIsPromoted(true);
}

private void testReplicaIsPromoted(boolean isSegmentReplicationEnabled) {
AllocationService allocation = createAllocationService(Settings.builder().build());

Settings.Builder settingsBuilder = isSegmentReplicationEnabled
? settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
: settings(Version.CURRENT);
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(3))
.put(IndexMetadata.builder("test").settings(settingsBuilder).numberOfShards(1).numberOfReplicas(3))
.build();

RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();
Expand Down Expand Up @@ -714,7 +726,7 @@ public void testReplicaOnNewestVersionIsPromoted() {
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0));

ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId);
ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaBasedOnReplicationStrategy(shardId);
logger.info("--> all shards allocated, replica that should be promoted: {}", startedReplica);

// fail the primary shard again and make sure the correct replica is promoted
Expand All @@ -739,13 +751,20 @@ public void testReplicaOnNewestVersionIsPromoted() {
continue;
}
Version nodeVer = cursor.getVersion();
assertTrue(
"expected node [" + cursor.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion,
replicaNodeVersion.onOrAfter(nodeVer)
);
if (isSegmentReplicationEnabled) {
assertTrue(
"expected node [" + cursor.getId() + "] with version " + nodeVer + " to be after " + replicaNodeVersion,
replicaNodeVersion.onOrBefore(nodeVer)
);
} else {
assertTrue(
"expected node [" + cursor.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion,
replicaNodeVersion.onOrAfter(nodeVer)
);
}
}

startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId);
startedReplica = clusterState.getRoutingNodes().activeReplicaBasedOnReplicationStrategy(shardId);
logger.info("--> failing primary shard a second time, should select: {}", startedReplica);

// fail the primary shard again, and ensure the same thing happens
Expand All @@ -771,10 +790,17 @@ public void testReplicaOnNewestVersionIsPromoted() {
continue;
}
Version nodeVer = cursor.getVersion();
assertTrue(
"expected node [" + cursor.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion,
replicaNodeVersion.onOrAfter(nodeVer)
);
if (isSegmentReplicationEnabled) {
assertTrue(
"expected node [" + cursor.getId() + "] with version " + nodeVer + " to be after " + replicaNodeVersion,
replicaNodeVersion.onOrBefore(nodeVer)
);
} else {
assertTrue(
"expected node [" + cursor.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion,
replicaNodeVersion.onOrAfter(nodeVer)
);
}
}
}
}

0 comments on commit 76dfe70

Please sign in to comment.