Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Handle failover in mixed cluster mode #9536

Merged
merged 5 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.support.single.shard.TransportSingleShardAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -92,16 +93,21 @@ protected boolean resolveIndex(GetRequest request) {
/**
* Returns true if GET request should be routed to primary shards, else false.
*/
protected static boolean shouldForcePrimaryRouting(ClusterState state, boolean realtime, String preference, String indexName) {
return state.isSegmentReplicationEnabled(indexName) && realtime && preference == null;
protected static boolean shouldForcePrimaryRouting(Metadata metadata, boolean realtime, String preference, String indexName) {
return metadata.isSegmentReplicationEnabled(indexName) && realtime && preference == null;
}

@Override
protected ShardIterator shards(ClusterState state, InternalRequest request) {
final String preference;
// route realtime GET requests when segment replication is enabled to primary shards,
// iff there are no other preferences/routings enabled for routing to a specific shard
if (shouldForcePrimaryRouting(state, request.request().realtime, request.request().preference(), request.concreteIndex())) {
if (shouldForcePrimaryRouting(
state.getMetadata(),
request.request().realtime,
request.request().preference(),
request.concreteIndex()
)) {
preference = Preference.PRIMARY.type();
} else {
preference = request.request().preference();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL

MultiGetShardRequest shardRequest = shardRequests.get(shardId);
if (shardRequest == null) {
if (shouldForcePrimaryRouting(clusterState, request.realtime, request.preference, concreteSingleIndex)) {
if (shouldForcePrimaryRouting(clusterState.getMetadata(), request.realtime, request.preference, concreteSingleIndex)) {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
request.preference(Preference.PRIMARY.type());
}
shardRequest = new MultiGetShardRequest(request, shardId.getIndexName(), shardId.getId());
Expand Down
16 changes: 0 additions & 16 deletions server/src/main/java/org/opensearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.discovery.Discovery;
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -410,21 +409,6 @@ public boolean supersedes(ClusterState other) {

}

/**
* Utility to identify whether input index belongs to SEGMENT replication in established cluster state.
*
* @param indexName Index name
* @return true if index belong SEGMENT replication, false otherwise
*/
public boolean isSegmentReplicationEnabled(String indexName) {
return Optional.ofNullable(this.getMetadata().index(indexName))
.map(
indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE))
.equals(ReplicationType.SEGMENT)
)
.orElse(false);
}

/**
* Metrics for cluster state.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.gateway.MetadataStateFormat;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.MapperPlugin;

import java.io.IOException;
Expand Down Expand Up @@ -107,6 +108,21 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
public static final String UNKNOWN_CLUSTER_UUID = Strings.UNKNOWN_UUID_VALUE;
public static final Pattern NUMBER_PATTERN = Pattern.compile("[0-9]+$");

/**
* Utility to identify whether input index uses SEGMENT replication strategy in established cluster state metadata.
*
* @param indexName Index name
* @return true if index uses SEGMENT replication, false otherwise
*/
public boolean isSegmentReplicationEnabled(String indexName) {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
return Optional.ofNullable(index(indexName))
.map(
indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE))
.equals(ReplicationType.SEGMENT)
)
.orElse(false);
}

/**
* Context of the XContent.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
* @opensearch.internal
*/
public class RoutingNodes implements Iterable<RoutingNode> {
private final Metadata metadata;

private final Map<String, RoutingNode> nodesToShards = new HashMap<>();

Expand All @@ -107,6 +108,7 @@ public RoutingNodes(ClusterState clusterState) {
}

public RoutingNodes(ClusterState clusterState, boolean readOnly) {
this.metadata = clusterState.getMetadata();
this.readOnly = readOnly;
final RoutingTable routingTable = clusterState.routingTable();
this.nodesPerAttributeNames = Collections.synchronizedMap(new HashMap<>());
Expand Down Expand Up @@ -368,26 +370,36 @@ public ShardRouting activePrimary(ShardId shardId) {
* Returns one active replica shard for the given shard id or <code>null</code> if
* no active replica is found.
*
* Since replicas could possibly be on nodes with a older version of OpenSearch than
* the primary is, this will return replicas on the highest version of OpenSearch.
* Since replicas could possibly be on nodes with an older version of OpenSearch than
* the primary is, this will return replicas on the highest version of OpenSearch when
* document replication strategy is in use, and will return replicas on oldest version
* of OpenSearch when segment replication is enabled.
*
*/
public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) {
public ShardRouting activeReplicaBasedOnReplicationStrategy(ShardId shardId) {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
// It's possible for replicaNodeVersion to be null, when disassociating dead nodes
// that have been removed, the shards are failed, and part of the shard failing
// calls this method with an out-of-date RoutingNodes, where the version might not
// be accessible. Therefore, we need to protect against the version being null
// (meaning the node will be going away).
return assignedShards(shardId).stream()
Stream<ShardRouting> candidateShards = assignedShards(shardId).stream()
.filter(shr -> !shr.primary() && shr.active())
.filter(shr -> node(shr.currentNodeId()) != null)
.max(
.filter(shr -> node(shr.currentNodeId()) != null);
if (metadata.isSegmentReplicationEnabled(shardId.getIndexName())) {
return candidateShards.min(
Comparator.comparing(
shr -> node(shr.currentNodeId()).node(),
Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion))
)
).orElse(null);

}
return candidateShards.max(
Comparator.comparing(
shr -> node(shr.currentNodeId()).node(),
Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion))
)
.orElse(null);
).orElse(null);
}

/**
Expand Down Expand Up @@ -724,7 +736,7 @@ private void unassignPrimaryAndPromoteActiveReplicaIfExists(
RoutingChangesObserver routingChangesObserver
) {
assert failedShard.primary();
ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
ShardRouting activeReplica = activeReplicaBasedOnReplicationStrategy(failedShard.shardId());
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,24 @@ private static ClusterState clusterState(ReplicationType replicationType) {

public void testShouldForcePrimaryRouting() {

ClusterState clusterState = clusterState(ReplicationType.SEGMENT);
Metadata metadata = clusterState(ReplicationType.SEGMENT).getMetadata();

// should return false since preference is set for request
assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, Preference.REPLICA.type(), "index1"));
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, Preference.REPLICA.type(), "index1"));

// should return false since request is not realtime
assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, false, null, "index1"));
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, false, null, "index1"));

// should return true since segment replication is enabled
assertTrue(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index1"));
assertTrue(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index1"));

// should return false since index doesn't exist
assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index3"));
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index3"));

clusterState = clusterState(ReplicationType.DOCUMENT);
metadata = clusterState(ReplicationType.DOCUMENT).getMetadata();

// should fail since document replication enabled
assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index1"));
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index1"));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.TestCustomMetadata;

Expand Down Expand Up @@ -117,39 +116,6 @@ public void testSupersedes() {
);
}

public void testIsSegmentReplicationEnabled() {
final String indexName = "test";
ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();
Settings.Builder builder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(builder)
.numberOfShards(1)
.numberOfReplicas(1);
Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().addAsNew(indexMetadataBuilder.build());
clusterState = ClusterState.builder(clusterState)
.metadata(metadataBuilder.build())
.routingTable(routingTableBuilder.build())
.build();
assertTrue(clusterState.isSegmentReplicationEnabled(indexName));
}

public void testIsSegmentReplicationDisabled() {
final String indexName = "test";
ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1);
Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().addAsNew(indexMetadataBuilder.build());
clusterState = ClusterState.builder(clusterState)
.metadata(metadataBuilder.build())
.routingTable(routingTableBuilder.build())
.build();
assertFalse(clusterState.isSegmentReplicationEnabled(indexName));
}

public void testBuilderRejectsNullCustom() {
final ClusterState.Builder builder = ClusterState.builder(ClusterName.DEFAULT);
final String key = randomAlphaOfLength(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.MapperPlugin;
import org.opensearch.test.OpenSearchTestCase;

Expand Down Expand Up @@ -1425,6 +1426,29 @@ public void testMetadataBuildInvocations() {
compareMetadata(previousMetadata, builtMetadata, false, true, true);
}

public void testIsSegmentReplicationEnabled() {
final String indexName = "test";
Settings.Builder builder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(builder)
.numberOfShards(1)
.numberOfReplicas(1);
Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder);
Metadata metadata = metadataBuilder.build();
assertTrue(metadata.isSegmentReplicationEnabled(indexName));
}

public void testIsSegmentReplicationDisabled() {
final String indexName = "test";
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1);
Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder);
Metadata metadata = metadataBuilder.build();
assertFalse(metadata.isSegmentReplicationEnabled(indexName));
}

public static Metadata randomMetadata() {
Metadata.Builder md = Metadata.builder()
.put(buildIndexMetadata("index", "alias", randomBoolean() ? null : randomBoolean()).build(), randomBoolean())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.cluster.ClusterStateChanges;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.VersionUtils;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -69,6 +70,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -137,7 +139,15 @@ public void testSimpleFailedNodeTest() {
}
}

public void testRandomClusterPromotesOldestReplica() throws InterruptedException {
testRandomClusterPromotesReplica(true);
}

public void testRandomClusterPromotesNewestReplica() throws InterruptedException {
testRandomClusterPromotesReplica(false);
}

void testRandomClusterPromotesReplica(boolean isSegmentReplicationEnabled) throws InterruptedException {

ThreadPool threadPool = new TestThreadPool(getClass().getName());
ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool);
Expand All @@ -164,6 +174,9 @@ public void testRandomClusterPromotesNewestReplica() throws InterruptedException
Settings.Builder settingsBuilder = Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 4))
.put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(2, 4));
if (isSegmentReplicationEnabled) {
settingsBuilder.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
}
CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE);
state = cluster.createIndex(state, request);
assertTrue(state.metadata().hasIndex(name));
Expand Down Expand Up @@ -206,13 +219,23 @@ public void testRandomClusterPromotesNewestReplica() throws InterruptedException
Version candidateVer = getNodeVersion(sr, compareState);
if (candidateVer != null) {
logger.info("--> candidate on {} node; shard routing: {}", candidateVer, sr);
assertTrue(
"candidate was not on the newest version, new primary is on "
+ newPrimaryVersion
+ " and there is a candidate on "
+ candidateVer,
candidateVer.onOrBefore(newPrimaryVersion)
);
if (isSegmentReplicationEnabled) {
assertTrue(
"candidate was not on the oldest version, new primary is on "
+ newPrimaryVersion
+ " and there is a candidate on "
+ candidateVer,
candidateVer.onOrAfter(newPrimaryVersion)
);
} else {
assertTrue(
"candidate was not on the newest version, new primary is on "
+ newPrimaryVersion
+ " and there is a candidate on "
+ candidateVer,
candidateVer.onOrBefore(newPrimaryVersion)
);
}
}
});
});
Expand Down
Loading