Skip to content

Commit

Permalink
Change default search preference to _primary for searchable snapshot …
Browse files Browse the repository at this point in the history
…indices

Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal committed May 22, 2023
1 parent 4053b28 commit 1a366c2
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Moved concurrent-search from sandbox plugin to server module behind feature flag ([#7203](https://github.com/opensearch-project/OpenSearch/pull/7203))
- Allow access to indices cache clear APIs for read only indexes ([#7303](https://github.com/opensearch-project/OpenSearch/pull/7303))
- Changed concurrent-search threadpool type to be resizable and support task resource tracking ([#7502](https://github.com/opensearch-project/OpenSearch/pull/7502))
- Default search preference to _primary for searchable snapshot indices ([#7628](https://github.com/opensearch-project/OpenSearch/pull/7628))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
Expand Down Expand Up @@ -41,6 +43,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -664,6 +667,58 @@ public void testCacheIndexFilesClearedOnDelete() throws Exception {
logger.info("--> validated that the cache file path doesn't exist");
}

/**
* Test scenario that validates that the default search preference for searchable snapshot
* is primary shards
*/
public void testDefaultShardPreference() throws Exception {
final int numReplicas = 1;
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final String repoName = "test-repo";
final String snapshotName = "test-snap";
final Client client = client();

// Create an index, snapshot and restore as a searchable snapshot index
internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicas + 1);
createIndexWithDocsAndEnsureGreen(numReplicas, 100, indexName);
createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertDocCount(restoredIndexName, 100L);
assertRemoteSnapshotIndexSettings(client, restoredIndexName);

// ClusterSearchShards API returns a list of shards that will be used
// when querying a particular index
ClusterSearchShardsGroup[] shardGroups = client.admin()
.cluster()
.searchShards(new ClusterSearchShardsRequest(restoredIndexName))
.actionGet()
.getGroups();

// Ensure when no preferences are set (default preference), the only compatible shards are primary
for (ClusterSearchShardsGroup shardsGroup : shardGroups) {
assertEquals(1, shardsGroup.getShards().length);
assertTrue(shardsGroup.getShards()[0].primary());
}

// Ensure when preferences are set, all the compatible shards are returned
shardGroups = client.admin()
.cluster()
.searchShards(new ClusterSearchShardsRequest(restoredIndexName).preference("foo"))
.actionGet()
.getGroups();

// Ensures that the compatible shards are not just primaries
for (ClusterSearchShardsGroup shardsGroup : shardGroups) {
assertTrue(shardsGroup.getShards().length > 1);
boolean containsReplica = Arrays.stream(shardsGroup.getShards())
.map(shardRouting -> !shardRouting.primary())
.reduce(false, (s1, s2) -> s1 || s2);
assertTrue(containsReplica);
}
}

/**
* Asserts the cache folder count to match the number of shards and the number of indices within the cache folder
* as provided.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.Strings;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.shard.ShardId;
import org.opensearch.node.ResponseCollectorService;
Expand Down Expand Up @@ -238,6 +239,13 @@ public GroupShardsIterator<ShardIterator> searchShards(
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
final Set<ShardIterator> set = new HashSet<>(shards.size());
for (IndexShardRoutingTable shard : shards) {
IndexMetadata indexMetadataForShard = indexMetadata(clusterState, shard.shardId.getIndex().getName());
if (IndexModule.Type.REMOTE_SNAPSHOT.match(
indexMetadataForShard.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey())
) && (preference == null || preference.isEmpty())) {
preference = Preference.PRIMARY.type();
}

ShardIterator iterator = preferenceActiveShardIterator(
shard,
clusterState.nodes().getLocalNodeId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.index.shard.ShardId;
import org.opensearch.node.ResponseCollectorService;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -992,6 +994,66 @@ public void testWeightedOperationRoutingWeightUndefinedForOneZone() throws Excep
}
}

public void testSearchableSnapshotPrimaryDefault() throws Exception {
final int numIndices = 1;
final int numShards = 2;
final int numReplicas = 2;
final String[] indexNames = new String[numIndices];
for (int i = 0; i < numIndices; i++) {
indexNames[i] = "test" + i;
}
// The first index is a searchable snapshot index
final String searchableSnapshotIndex = indexNames[0];
ClusterService clusterService = null;
ThreadPool threadPool = null;

try {
OperationRouting opRouting = new OperationRouting(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);

ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(indexNames, numShards, numReplicas);
threadPool = new TestThreadPool("testSearchableSnapshotPreference");
clusterService = ClusterServiceUtils.createClusterService(threadPool);

// Update the index config within the cluster state to modify the index to a searchable snapshot index
IndexMetadata searchableSnapshotIndexMetadata = IndexMetadata.builder(searchableSnapshotIndex)
.settings(
Settings.builder()
.put(state.metadata().index(searchableSnapshotIndex).getSettings())
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey())
.build()
)
.build();
Metadata.Builder metadataBuilder = Metadata.builder(state.metadata())
.put(searchableSnapshotIndexMetadata, false)
.generateClusterUuidIfNeeded();
state = ClusterState.builder(state).metadata(metadataBuilder.build()).build();

// Verify default preference is primary only
GroupShardsIterator<ShardIterator> groupIterator = opRouting.searchShards(state, indexNames, null, null);
assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards));

for (ShardIterator shardIterator : groupIterator) {
assertThat("Only single shard will be returned with no preference", shardIterator.size(), equalTo(1));
assertTrue("Only primary should exist with no preference", shardIterator.nextOrNull().primary());
}

// Verify alternative preference can be applied to a searchable snapshot index
groupIterator = opRouting.searchShards(state, indexNames, null, "_replica");
assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards));

for (ShardIterator shardIterator : groupIterator) {
assertThat("Replica shards will be returned", shardIterator.size(), equalTo(numReplicas));
assertFalse("Returned shard should be a replica", shardIterator.nextOrNull().primary());
}
} finally {
IOUtils.close(clusterService);
terminate(threadPool);
}
}

private DiscoveryNode[] setupNodes() {
// Sets up two data nodes in zone-a and one data node in zone-b
List<String> zones = Arrays.asList("a", "a", "b");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.OpenSearchAllocationTestCase;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
Expand All @@ -55,6 +56,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexModule;
import org.opensearch.index.shard.ShardId;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -562,6 +564,71 @@ public void testReplicaShardPreferenceIters() {
assertTrue(routing.primary());
}

public void testSearchableSnapshotPreference() {
AllocationService strategy = createAllocationService(
Settings.builder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()
);
OperationRouting operationRouting = new OperationRouting(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);

// Modify the index to be a searchable snapshot index
Metadata metadata = Metadata.builder()
.put(
IndexMetadata.builder("test")
.settings(
Settings.builder()
.put(settings(Version.CURRENT).build())
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey())
)
.numberOfShards(2)
.numberOfReplicas(2)
)
.build();

RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();

ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.build();

clusterState = ClusterState.builder(clusterState)
.nodes(
DiscoveryNodes.builder()
.add(newNode("node1", Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)))
.add(newNode("node2", Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE)))
.add(newNode("node3", Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE)))
.localNodeId("node1")
)
.build();

clusterState = strategy.reroute(clusterState, "reroute"); // Move primaries to initializing
clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));

clusterState = strategy.reroute(clusterState, "reroute"); // Move replicas to initializing
clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));

// When replicas haven't initialized, it comes back with the primary first, then initializing replicas
GroupShardsIterator<ShardIterator> shardIterators = operationRouting.searchShards(
clusterState,
new String[] { "test" },
null,
null
);
assertThat(shardIterators.size(), equalTo(2)); // two potential shards
ShardIterator iter = shardIterators.iterator().next();
assertThat(iter.size(), equalTo(1)); // one potential candidate (primary) for the shard

ShardRouting routing = iter.nextOrNull();
assertNotNull(routing);
assertTrue(routing.primary()); // Default preference is primary
assertTrue(routing.started());
routing = iter.nextOrNull();
assertNull(routing); // No other candidate apart from primary
}

public void testWeightedRoutingWithDifferentWeights() {
TestThreadPool threadPool = null;
try {
Expand Down

0 comments on commit 1a366c2

Please sign in to comment.