diff --git a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java index 583815b91ae68..8ce4e57467b76 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java @@ -92,7 +92,7 @@ protected boolean resolveIndex(GetRequest request) { return true; } - static boolean isSegmentReplicationEnabled(ClusterState state, String indexName) { + public static boolean isSegmentReplicationEnabled(ClusterState state, String indexName) { return Optional.ofNullable(state.getMetadata().index(indexName)) .map( indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE)) diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 981e21537c078..f78977492a168 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -66,6 +66,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.action.get.TransportGetAction.isSegmentReplicationEnabled; + /** * {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}. * It can be either initialized as mutable or immutable (see {@link #RoutingNodes(ClusterState, boolean)}), allowing @@ -82,6 +84,7 @@ * @opensearch.internal */ public class RoutingNodes implements Iterable { + private final ClusterState clusterState; private final Map nodesToShards = new HashMap<>(); @@ -107,6 +110,7 @@ public RoutingNodes(ClusterState clusterState) { } public RoutingNodes(ClusterState clusterState, boolean readOnly) { + this.clusterState = clusterState; this.readOnly = readOnly; final RoutingTable routingTable = clusterState.routingTable(); this.nodesPerAttributeNames = Collections.synchronizedMap(new HashMap<>()); @@ -368,26 +372,36 @@ public ShardRouting activePrimary(ShardId shardId) { * Returns one active replica shard for the given shard id or null if * no active replica is found. * - * Since replicas could possibly be on nodes with a older version of OpenSearch than - * the primary is, this will return replicas on the highest version of OpenSearch. + * Since replicas could possibly be on nodes with an older version of OpenSearch than + * the primary is, this will return replicas on the highest version of OpenSearch when + * document replication strategy is in use, and will return replicas on oldest version + * of OpenSearch when segment replication is enabled. * */ - public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) { + public ShardRouting activeReplicaBasedOnReplicationStrategy(ShardId shardId) { // It's possible for replicaNodeVersion to be null, when disassociating dead nodes // that have been removed, the shards are failed, and part of the shard failing // calls this method with an out-of-date RoutingNodes, where the version might not // be accessible. Therefore, we need to protect against the version being null // (meaning the node will be going away). - return assignedShards(shardId).stream() + Stream candidateShards = assignedShards(shardId).stream() .filter(shr -> !shr.primary() && shr.active()) - .filter(shr -> node(shr.currentNodeId()) != null) - .max( + .filter(shr -> node(shr.currentNodeId()) != null); + if (isSegmentReplicationEnabled(clusterState, shardId.getIndexName())) { + return candidateShards.min( Comparator.comparing( shr -> node(shr.currentNodeId()).node(), Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion)) ) + ).orElse(null); + + } + return candidateShards.max( + Comparator.comparing( + shr -> node(shr.currentNodeId()).node(), + Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion)) ) - .orElse(null); + ).orElse(null); } /** @@ -724,7 +738,7 @@ private void unassignPrimaryAndPromoteActiveReplicaIfExists( RoutingChangesObserver routingChangesObserver ) { assert failedShard.primary(); - ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); + ShardRouting activeReplica = activeReplicaBasedOnReplicationStrategy(failedShard.shardId()); if (activeReplica == null) { moveToUnassigned(failedShard, unassignedInfo); } else { diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java index f2dc745ad33bf..2970a27d73414 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -49,6 +49,7 @@ import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.opensearch.common.settings.Settings; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.VersionUtils; import java.util.ArrayList; @@ -587,7 +588,7 @@ public void testFailAllReplicasInitializingOnPrimaryFail() { clusterState = startShardsAndReroute(allocation, clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0)); assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); - ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); + ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaBasedOnReplicationStrategy(shardId); // fail the primary shard, check replicas get removed as well... ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); @@ -647,10 +648,21 @@ public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToEle } public void testReplicaOnNewestVersionIsPromoted() { + testReplicaIsPromoted(false); + } + + public void testReplicaOnOldestVersionIsPromoted() { + testReplicaIsPromoted(true); + } + + private void testReplicaIsPromoted(boolean isSegmentReplicationEnabled) { AllocationService allocation = createAllocationService(Settings.builder().build()); + Settings.Builder settingsBuilder = isSegmentReplicationEnabled + ? settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + : settings(Version.CURRENT); Metadata metadata = Metadata.builder() - .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(3)) + .put(IndexMetadata.builder("test").settings(settingsBuilder).numberOfShards(1).numberOfReplicas(3)) .build(); RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); @@ -714,7 +726,7 @@ public void testReplicaOnNewestVersionIsPromoted() { assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4)); assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); - ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); + ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaBasedOnReplicationStrategy(shardId); logger.info("--> all shards allocated, replica that should be promoted: {}", startedReplica); // fail the primary shard again and make sure the correct replica is promoted @@ -739,13 +751,20 @@ public void testReplicaOnNewestVersionIsPromoted() { continue; } Version nodeVer = cursor.getVersion(); - assertTrue( - "expected node [" + cursor.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion, - replicaNodeVersion.onOrAfter(nodeVer) - ); + if (isSegmentReplicationEnabled) { + assertTrue( + "expected node [" + cursor.getId() + "] with version " + nodeVer + " to be after " + replicaNodeVersion, + replicaNodeVersion.onOrBefore(nodeVer) + ); + } else { + assertTrue( + "expected node [" + cursor.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion, + replicaNodeVersion.onOrAfter(nodeVer) + ); + } } - startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); + startedReplica = clusterState.getRoutingNodes().activeReplicaBasedOnReplicationStrategy(shardId); logger.info("--> failing primary shard a second time, should select: {}", startedReplica); // fail the primary shard again, and ensure the same thing happens @@ -771,10 +790,17 @@ public void testReplicaOnNewestVersionIsPromoted() { continue; } Version nodeVer = cursor.getVersion(); - assertTrue( - "expected node [" + cursor.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion, - replicaNodeVersion.onOrAfter(nodeVer) - ); + if (isSegmentReplicationEnabled) { + assertTrue( + "expected node [" + cursor.getId() + "] with version " + nodeVer + " to be after " + replicaNodeVersion, + replicaNodeVersion.onOrBefore(nodeVer) + ); + } else { + assertTrue( + "expected node [" + cursor.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion, + replicaNodeVersion.onOrAfter(nodeVer) + ); + } } } }