Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Handle failover in mixed cluster mode #9536

Merged
merged 5 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.support.single.shard.TransportSingleShardAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -92,16 +93,21 @@ protected boolean resolveIndex(GetRequest request) {
/**
* Returns true if GET request should be routed to primary shards, else false.
*/
protected static boolean shouldForcePrimaryRouting(ClusterState state, boolean realtime, String preference, String indexName) {
return state.isSegmentReplicationEnabled(indexName) && realtime && preference == null;
protected static boolean shouldForcePrimaryRouting(Metadata metadata, boolean realtime, String preference, String indexName) {
return metadata.isSegmentReplicationEnabled(indexName) && realtime && preference == null;
}

@Override
protected ShardIterator shards(ClusterState state, InternalRequest request) {
final String preference;
// route realtime GET requests when segment replication is enabled to primary shards,
// iff there are no other preferences/routings enabled for routing to a specific shard
if (shouldForcePrimaryRouting(state, request.request().realtime, request.request().preference(), request.concreteIndex())) {
if (shouldForcePrimaryRouting(
state.getMetadata(),
request.request().realtime,
request.request().preference(),
request.concreteIndex()
)) {
preference = Preference.PRIMARY.type();
} else {
preference = request.request().preference();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
Expand All @@ -51,8 +52,6 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.action.get.TransportGetAction.shouldForcePrimaryRouting;

/**
* Perform the multi get action.
*
Expand All @@ -78,6 +77,10 @@ public TransportMultiGetAction(
this.indexNameExpressionResolver = resolver;
}

protected static boolean shouldForcePrimaryRouting(Metadata metadata, boolean realtime, String preference, String indexName) {
return metadata.isSegmentReplicationEnabled(indexName) && realtime && preference == null;
}

@Override
protected void doExecute(Task task, final MultiGetRequest request, final ActionListener<MultiGetResponse> listener) {
ClusterState clusterState = clusterService.state();
Expand Down Expand Up @@ -112,7 +115,7 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL

MultiGetShardRequest shardRequest = shardRequests.get(shardId);
if (shardRequest == null) {
if (shouldForcePrimaryRouting(clusterState, request.realtime, request.preference, concreteSingleIndex)) {
if (shouldForcePrimaryRouting(clusterState.getMetadata(), request.realtime(), request.preference(), concreteSingleIndex)) {
request.preference(Preference.PRIMARY.type());
}
shardRequest = new MultiGetShardRequest(request, shardId.getIndexName(), shardId.getId());
Expand Down
16 changes: 0 additions & 16 deletions server/src/main/java/org/opensearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.discovery.Discovery;
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -410,21 +409,6 @@ public boolean supersedes(ClusterState other) {

}

/**
* Utility to identify whether input index belongs to SEGMENT replication in established cluster state.
*
* @param indexName Index name
* @return true if index belong SEGMENT replication, false otherwise
*/
public boolean isSegmentReplicationEnabled(String indexName) {
return Optional.ofNullable(this.getMetadata().index(indexName))
.map(
indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE))
.equals(ReplicationType.SEGMENT)
)
.orElse(false);
}

/**
* Metrics for cluster state.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.gateway.MetadataStateFormat;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.MapperPlugin;

import java.io.IOException;
Expand Down Expand Up @@ -107,6 +108,21 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
public static final String UNKNOWN_CLUSTER_UUID = Strings.UNKNOWN_UUID_VALUE;
public static final Pattern NUMBER_PATTERN = Pattern.compile("[0-9]+$");

/**
* Utility to identify whether input index uses SEGMENT replication strategy in established cluster state metadata.
*
* @param indexName Index name
* @return true if index uses SEGMENT replication, false otherwise
*/
public boolean isSegmentReplicationEnabled(String indexName) {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
return Optional.ofNullable(index(indexName))
.map(
indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE))
.equals(ReplicationType.SEGMENT)
)
.orElse(false);
}

/**
* Context of the XContent.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
* @opensearch.internal
*/
public class RoutingNodes implements Iterable<RoutingNode> {
private final Metadata metadata;

private final Map<String, RoutingNode> nodesToShards = new HashMap<>();

Expand All @@ -107,6 +108,7 @@ public RoutingNodes(ClusterState clusterState) {
}

public RoutingNodes(ClusterState clusterState, boolean readOnly) {
this.metadata = clusterState.getMetadata();
this.readOnly = readOnly;
final RoutingTable routingTable = clusterState.routingTable();
this.nodesPerAttributeNames = Collections.synchronizedMap(new HashMap<>());
Expand Down Expand Up @@ -364,14 +366,6 @@ public ShardRouting activePrimary(ShardId shardId) {
return null;
}

/**
mch2 marked this conversation as resolved.
Show resolved Hide resolved
* Returns one active replica shard for the given shard id or <code>null</code> if
* no active replica is found.
*
* Since replicas could possibly be on nodes with a older version of OpenSearch than
* the primary is, this will return replicas on the highest version of OpenSearch.
*
*/
public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) {
// It's possible for replicaNodeVersion to be null, when disassociating dead nodes
// that have been removed, the shards are failed, and part of the shard failing
Expand All @@ -390,6 +384,21 @@ public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) {
.orElse(null);
}

public ShardRouting activeReplicaWithOldestVersion(ShardId shardId) {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
// It's possible for replicaNodeVersion to be null. Therefore, we need to protect against the version being null
// (meaning the node will be going away).
return assignedShards(shardId).stream()
.filter(shr -> !shr.primary() && shr.active())
.filter(shr -> node(shr.currentNodeId()) != null)
.min(
Comparator.comparing(
shr -> node(shr.currentNodeId()).node(),
Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion))
)
)
.orElse(null);
}

/**
* Returns <code>true</code> iff all replicas are active for the given shard routing. Otherwise <code>false</code>
*/
Expand Down Expand Up @@ -724,7 +733,12 @@ private void unassignPrimaryAndPromoteActiveReplicaIfExists(
RoutingChangesObserver routingChangesObserver
) {
assert failedShard.primary();
ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
ShardRouting activeReplica;
if (metadata.isSegmentReplicationEnabled(failedShard.getIndexName())) {
activeReplica = activeReplicaWithOldestVersion(failedShard.shardId());
} else {
activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
}
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,24 @@ private static ClusterState clusterState(ReplicationType replicationType) {

public void testShouldForcePrimaryRouting() {

ClusterState clusterState = clusterState(ReplicationType.SEGMENT);
Metadata metadata = clusterState(ReplicationType.SEGMENT).getMetadata();

// should return false since preference is set for request
assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, Preference.REPLICA.type(), "index1"));
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, Preference.REPLICA.type(), "index1"));

// should return false since request is not realtime
assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, false, null, "index1"));
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, false, null, "index1"));

// should return true since segment replication is enabled
assertTrue(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index1"));
assertTrue(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index1"));

// should return false since index doesn't exist
assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index3"));
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index3"));

clusterState = clusterState(ReplicationType.DOCUMENT);
metadata = clusterState(ReplicationType.DOCUMENT).getMetadata();

// should fail since document replication enabled
assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index1"));
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index1"));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.OperationRouting;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
Expand All @@ -58,6 +59,7 @@
import org.opensearch.core.tasks.TaskId;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskManager;
import org.opensearch.test.OpenSearchTestCase;
Expand All @@ -68,6 +70,7 @@
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -91,39 +94,16 @@ public class TransportMultiGetActionTests extends OpenSearchTestCase {
private static TransportMultiGetAction transportAction;
private static TransportShardMultiGetAction shardAction;

@BeforeClass
public static void beforeClass() throws Exception {
threadPool = new TestThreadPool(TransportMultiGetActionTests.class.getSimpleName());

transportService = new TransportService(
Settings.EMPTY,
mock(Transport.class),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(
Settings.builder().put("node.name", "node1").build(),
boundAddress.publishAddress(),
randomBase64UUID()
),
null,
emptySet()
) {
@Override
public TaskManager getTaskManager() {
return taskManager;
}
};

final Index index1 = new Index("index1", randomBase64UUID());
final Index index2 = new Index("index2", randomBase64UUID());
final ClusterState clusterState = ClusterState.builder(new ClusterName(TransportMultiGetActionTests.class.getSimpleName()))
private static ClusterState clusterState(ReplicationType replicationType, Index index1, Index index2) throws IOException {
return ClusterState.builder(new ClusterName(TransportMultiGetActionTests.class.getSimpleName()))
.metadata(
new Metadata.Builder().put(
new IndexMetadata.Builder(index1.getName()).settings(
Settings.builder()
.put("index.version.created", Version.CURRENT)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, replicationType)
.put(IndexMetadata.SETTING_INDEX_UUID, index1.getUUID())
)
.putMapping(
Expand All @@ -149,6 +129,7 @@ public TaskManager getTaskManager() {
.put("index.version.created", Version.CURRENT)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, replicationType)
.put(IndexMetadata.SETTING_INDEX_UUID, index1.getUUID())
)
.putMapping(
Expand All @@ -170,6 +151,34 @@ public TaskManager getTaskManager() {
)
)
.build();
}

@BeforeClass
public static void beforeClass() throws Exception {
threadPool = new TestThreadPool(TransportMultiGetActionTests.class.getSimpleName());

transportService = new TransportService(
Settings.EMPTY,
mock(Transport.class),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(
Settings.builder().put("node.name", "node1").build(),
boundAddress.publishAddress(),
randomBase64UUID()
),
null,
emptySet()
) {
@Override
public TaskManager getTaskManager() {
return taskManager;
}
};

final Index index1 = new Index("index1", randomBase64UUID());
final Index index2 = new Index("index2", randomBase64UUID());
ClusterState clusterState = clusterState(randomBoolean() ? ReplicationType.SEGMENT : ReplicationType.DOCUMENT, index1, index2);

final ShardIterator index1ShardIterator = mock(ShardIterator.class);
when(index1ShardIterator.shardId()).thenReturn(new ShardId(index1, randomInt()));
Expand Down Expand Up @@ -285,6 +294,30 @@ protected void executeShardAction(

}

public void testShouldForcePrimaryRouting() throws IOException {
final Index index1 = new Index("index1", randomBase64UUID());
final Index index2 = new Index("index2", randomBase64UUID());
Metadata metadata = clusterState(ReplicationType.SEGMENT, index1, index2).getMetadata();

// should return false since preference is set for request
assertFalse(TransportMultiGetAction.shouldForcePrimaryRouting(metadata, true, Preference.REPLICA.type(), "index1"));

// should return false since request is not realtime
assertFalse(TransportMultiGetAction.shouldForcePrimaryRouting(metadata, false, null, "index2"));

// should return true since segment replication is enabled
assertTrue(TransportMultiGetAction.shouldForcePrimaryRouting(metadata, true, null, "index1"));

// should return false since index doesn't exist
assertFalse(TransportMultiGetAction.shouldForcePrimaryRouting(metadata, true, null, "index3"));

metadata = clusterState(ReplicationType.DOCUMENT, index1, index2).getMetadata();

// should fail since document replication enabled
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index1"));

}

private static Task createTask() {
return new Task(
randomLong(),
Expand Down
Loading