Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed Aug 29, 2023
1 parent b71276e commit dc41002
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.support.single.shard.TransportSingleShardAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -92,16 +93,21 @@ protected boolean resolveIndex(GetRequest request) {
/**
* Returns true if GET request should be routed to primary shards, else false.
*/
protected static boolean shouldForcePrimaryRouting(ClusterState state, boolean realtime, String preference, String indexName) {
return state.isSegmentReplicationEnabled(indexName) && realtime && preference == null;
protected static boolean shouldForcePrimaryRouting(Metadata metadata, boolean realtime, String preference, String indexName) {
return metadata.isSegmentReplicationEnabled(indexName) && realtime && preference == null;
}

@Override
protected ShardIterator shards(ClusterState state, InternalRequest request) {
final String preference;
// route realtime GET requests when segment replication is enabled to primary shards,
// iff there are no other preferences/routings enabled for routing to a specific shard
if (shouldForcePrimaryRouting(state, request.request().realtime, request.request().preference(), request.concreteIndex())) {
if (shouldForcePrimaryRouting(
state.getMetadata(),
request.request().realtime,
request.request().preference(),
request.concreteIndex()
)) {
preference = Preference.PRIMARY.type();
} else {
preference = request.request().preference();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL

MultiGetShardRequest shardRequest = shardRequests.get(shardId);
if (shardRequest == null) {
if (shouldForcePrimaryRouting(clusterState, request.realtime, request.preference, concreteSingleIndex)) {
if (shouldForcePrimaryRouting(clusterState.getMetadata(), request.realtime, request.preference, concreteSingleIndex)) {
request.preference(Preference.PRIMARY.type());
}
shardRequest = new MultiGetShardRequest(request, shardId.getIndexName(), shardId.getId());
Expand Down
16 changes: 0 additions & 16 deletions server/src/main/java/org/opensearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.discovery.Discovery;
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -410,21 +409,6 @@ public boolean supersedes(ClusterState other) {

}

/**
* Utility to identify whether input index belongs to SEGMENT replication in established cluster state.
*
* @param indexName Index name
* @return true if index belong SEGMENT replication, false otherwise
*/
public boolean isSegmentReplicationEnabled(String indexName) {
return Optional.ofNullable(this.getMetadata().index(indexName))
.map(
indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE))
.equals(ReplicationType.SEGMENT)
)
.orElse(false);
}

/**
* Metrics for cluster state.
*
Expand Down
16 changes: 16 additions & 0 deletions server/src/main/java/org/opensearch/cluster/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.gateway.MetadataStateFormat;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.MapperPlugin;

import java.io.IOException;
Expand Down Expand Up @@ -107,6 +108,21 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
public static final String UNKNOWN_CLUSTER_UUID = Strings.UNKNOWN_UUID_VALUE;
public static final Pattern NUMBER_PATTERN = Pattern.compile("[0-9]+$");

/**
* Utility to identify whether input index uses SEGMENT replication strategy in established cluster state metadata.
*
* @param indexName Index name
* @return true if index uses SEGMENT replication, false otherwise
*/
public boolean isSegmentReplicationEnabled(String indexName) {
return Optional.ofNullable(index(indexName))
.map(
indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE))
.equals(ReplicationType.SEGMENT)
)
.orElse(false);
}

/**
* Context of the XContent.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.action.get.TransportGetAction.isSegmentReplicationEnabled;

/**
* {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}.
* It can be either initialized as mutable or immutable (see {@link #RoutingNodes(ClusterState, boolean)}), allowing
Expand All @@ -84,7 +82,7 @@
* @opensearch.internal
*/
public class RoutingNodes implements Iterable<RoutingNode> {
private final ClusterState clusterState;
private final Metadata metadata;

private final Map<String, RoutingNode> nodesToShards = new HashMap<>();

Expand All @@ -110,7 +108,7 @@ public RoutingNodes(ClusterState clusterState) {
}

public RoutingNodes(ClusterState clusterState, boolean readOnly) {
this.clusterState = clusterState;
this.metadata = clusterState.getMetadata();
this.readOnly = readOnly;
final RoutingTable routingTable = clusterState.routingTable();
this.nodesPerAttributeNames = Collections.synchronizedMap(new HashMap<>());
Expand Down Expand Up @@ -387,7 +385,7 @@ public ShardRouting activeReplicaBasedOnReplicationStrategy(ShardId shardId) {
Stream<ShardRouting> candidateShards = assignedShards(shardId).stream()
.filter(shr -> !shr.primary() && shr.active())
.filter(shr -> node(shr.currentNodeId()) != null);
if (isSegmentReplicationEnabled(clusterState, shardId.getIndexName())) {
if (metadata.isSegmentReplicationEnabled(shardId.getIndexName())) {
return candidateShards.min(
Comparator.comparing(
shr -> node(shr.currentNodeId()).node(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,24 @@ private static ClusterState clusterState(ReplicationType replicationType) {

public void testShouldForcePrimaryRouting() {

ClusterState clusterState = clusterState(ReplicationType.SEGMENT);
Metadata metadata = clusterState(ReplicationType.SEGMENT).getMetadata();

// should return false since preference is set for request
assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, Preference.REPLICA.type(), "index1"));
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, Preference.REPLICA.type(), "index1"));

// should return false since request is not realtime
assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, false, null, "index1"));
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, false, null, "index1"));

// should return true since segment replication is enabled
assertTrue(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index1"));
assertTrue(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index1"));

// should return false since index doesn't exist
assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index3"));
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index3"));

clusterState = clusterState(ReplicationType.DOCUMENT);
metadata = clusterState(ReplicationType.DOCUMENT).getMetadata();

// should fail since document replication enabled
assertFalse(TransportGetAction.shouldForcePrimaryRouting(clusterState, true, null, "index1"));
assertFalse(TransportGetAction.shouldForcePrimaryRouting(metadata, true, null, "index1"));

}

Expand Down
34 changes: 0 additions & 34 deletions server/src/test/java/org/opensearch/cluster/ClusterStateTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.TestCustomMetadata;

Expand Down Expand Up @@ -117,39 +116,6 @@ public void testSupersedes() {
);
}

public void testIsSegmentReplicationEnabled() {
final String indexName = "test";
ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();
Settings.Builder builder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(builder)
.numberOfShards(1)
.numberOfReplicas(1);
Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().addAsNew(indexMetadataBuilder.build());
clusterState = ClusterState.builder(clusterState)
.metadata(metadataBuilder.build())
.routingTable(routingTableBuilder.build())
.build();
assertTrue(clusterState.isSegmentReplicationEnabled(indexName));
}

public void testIsSegmentReplicationDisabled() {
final String indexName = "test";
ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1);
Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder);
RoutingTable.Builder routingTableBuilder = RoutingTable.builder().addAsNew(indexMetadataBuilder.build());
clusterState = ClusterState.builder(clusterState)
.metadata(metadataBuilder.build())
.routingTable(routingTableBuilder.build())
.build();
assertFalse(clusterState.isSegmentReplicationEnabled(indexName));
}

public void testBuilderRejectsNullCustom() {
final ClusterState.Builder builder = ClusterState.builder(ClusterName.DEFAULT);
final String key = randomAlphaOfLength(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.MapperPlugin;
import org.opensearch.test.OpenSearchTestCase;

Expand Down Expand Up @@ -1425,6 +1426,29 @@ public void testMetadataBuildInvocations() {
compareMetadata(previousMetadata, builtMetadata, false, true, true);
}

public void testIsSegmentReplicationEnabled() {
final String indexName = "test";
Settings.Builder builder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(builder)
.numberOfShards(1)
.numberOfReplicas(1);
Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder);
Metadata metadata = metadataBuilder.build();
assertTrue(metadata.isSegmentReplicationEnabled(indexName));
}

public void testIsSegmentReplicationDisabled() {
final String indexName = "test";
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName)
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1);
Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder);
Metadata metadata = metadataBuilder.build();
assertFalse(metadata.isSegmentReplicationEnabled(indexName));
}

public static Metadata randomMetadata() {
Metadata.Builder md = Metadata.builder()
.put(buildIndexMetadata("index", "alias", randomBoolean() ? null : randomBoolean()).build(), randomBoolean())
Expand Down

0 comments on commit dc41002

Please sign in to comment.