diff --git a/CHANGELOG.md b/CHANGELOG.md index 45a60cfa404e4..10c6b1e6d2c79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -124,6 +124,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Moved concurrent-search from sandbox plugin to server module behind feature flag ([#7203](https://github.com/opensearch-project/OpenSearch/pull/7203)) - Allow access to indices cache clear APIs for read only indexes ([#7303](https://github.com/opensearch-project/OpenSearch/pull/7303)) - Changed concurrent-search threadpool type to be resizable and support task resource tracking ([#7502](https://github.com/opensearch-project/OpenSearch/pull/7502)) +- Default search preference to _primary for searchable snapshot indices ([#7628](https://github.com/opensearch-project/OpenSearch/pull/7628)) ### Deprecated diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index e9021dd8ed9ef..bf9ea4a3a781f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -10,6 +10,8 @@ import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsGroup; +import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; @@ -41,6 +43,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -664,6 +667,58 @@ public void testCacheIndexFilesClearedOnDelete() throws Exception { logger.info("--> validated that the cache file path doesn't exist"); } + /** + * Test scenario that validates that the default search preference for searchable snapshot + * is primary shards + */ + public void testDefaultShardPreference() throws Exception { + final int numReplicas = 1; + final String indexName = "test-idx"; + final String restoredIndexName = indexName + "-copy"; + final String repoName = "test-repo"; + final String snapshotName = "test-snap"; + final Client client = client(); + + // Create an index, snapshot and restore as a searchable snapshot index + internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicas + 1); + createIndexWithDocsAndEnsureGreen(numReplicas, 100, indexName); + createRepositoryWithSettings(null, repoName); + takeSnapshot(client, snapshotName, repoName, indexName); + restoreSnapshotAndEnsureGreen(client, snapshotName, repoName); + assertDocCount(restoredIndexName, 100L); + assertRemoteSnapshotIndexSettings(client, restoredIndexName); + + // ClusterSearchShards API returns a list of shards that will be used + // when querying a particular index + ClusterSearchShardsGroup[] shardGroups = client.admin() + .cluster() + .searchShards(new ClusterSearchShardsRequest(restoredIndexName)) + .actionGet() + .getGroups(); + + // Ensure when no preferences are set (default preference), the only compatible shards are primary + for (ClusterSearchShardsGroup shardsGroup : shardGroups) { + assertEquals(1, shardsGroup.getShards().length); + assertTrue(shardsGroup.getShards()[0].primary()); + } + + // Ensure when preferences are set, all the compatible shards are returned + shardGroups = client.admin() + .cluster() + .searchShards(new ClusterSearchShardsRequest(restoredIndexName).preference("foo")) + .actionGet() + .getGroups(); + + // Ensures that the compatible shards are not just primaries + for (ClusterSearchShardsGroup shardsGroup : shardGroups) { + assertTrue(shardsGroup.getShards().length > 1); + boolean containsReplica = Arrays.stream(shardsGroup.getShards()) + .map(shardRouting -> !shardRouting.primary()) + .reduce(false, (s1, s2) -> s1 || s2); + assertTrue(containsReplica); + } + } + /** * Asserts the cache folder count to match the number of shards and the number of indices within the cache folder * as provided. diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index 9f1f4f5622a57..ade2cda797334 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -42,6 +42,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.Strings; +import org.opensearch.index.IndexModule; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.shard.ShardId; import org.opensearch.node.ResponseCollectorService; @@ -238,6 +239,13 @@ public GroupShardsIterator searchShards( final Set shards = computeTargetedShards(clusterState, concreteIndices, routing); final Set set = new HashSet<>(shards.size()); for (IndexShardRoutingTable shard : shards) { + IndexMetadata indexMetadataForShard = indexMetadata(clusterState, shard.shardId.getIndex().getName()); + if (IndexModule.Type.REMOTE_SNAPSHOT.match( + indexMetadataForShard.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()) + ) && (preference == null || preference.isEmpty())) { + preference = Preference.PRIMARY.type(); + } + ShardIterator iterator = preferenceActiveShardIterator( shard, clusterState.nodes().getLocalNodeId(), diff --git a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java index e9cbb42369a19..55885fb61ee0c 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java @@ -46,11 +46,13 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; import org.opensearch.index.Index; +import org.opensearch.index.IndexModule; import org.opensearch.index.shard.ShardId; import org.opensearch.node.ResponseCollectorService; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.ArrayList; @@ -992,6 +994,66 @@ public void testWeightedOperationRoutingWeightUndefinedForOneZone() throws Excep } } + public void testSearchableSnapshotPrimaryDefault() throws Exception { + final int numIndices = 1; + final int numShards = 2; + final int numReplicas = 2; + final String[] indexNames = new String[numIndices]; + for (int i = 0; i < numIndices; i++) { + indexNames[i] = "test" + i; + } + // The first index is a searchable snapshot index + final String searchableSnapshotIndex = indexNames[0]; + ClusterService clusterService = null; + ThreadPool threadPool = null; + + try { + OperationRouting opRouting = new OperationRouting( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + + ClusterState state = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(indexNames, numShards, numReplicas); + threadPool = new TestThreadPool("testSearchableSnapshotPreference"); + clusterService = ClusterServiceUtils.createClusterService(threadPool); + + // Update the index config within the cluster state to modify the index to a searchable snapshot index + IndexMetadata searchableSnapshotIndexMetadata = IndexMetadata.builder(searchableSnapshotIndex) + .settings( + Settings.builder() + .put(state.metadata().index(searchableSnapshotIndex).getSettings()) + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()) + .build() + ) + .build(); + Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()) + .put(searchableSnapshotIndexMetadata, false) + .generateClusterUuidIfNeeded(); + state = ClusterState.builder(state).metadata(metadataBuilder.build()).build(); + + // Verify default preference is primary only + GroupShardsIterator groupIterator = opRouting.searchShards(state, indexNames, null, null); + assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards)); + + for (ShardIterator shardIterator : groupIterator) { + assertThat("Only single shard will be returned with no preference", shardIterator.size(), equalTo(1)); + assertTrue("Only primary should exist with no preference", shardIterator.nextOrNull().primary()); + } + + // Verify alternative preference can be applied to a searchable snapshot index + groupIterator = opRouting.searchShards(state, indexNames, null, "_replica"); + assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards)); + + for (ShardIterator shardIterator : groupIterator) { + assertThat("Replica shards will be returned", shardIterator.size(), equalTo(numReplicas)); + assertFalse("Returned shard should be a replica", shardIterator.nextOrNull().primary()); + } + } finally { + IOUtils.close(clusterService); + terminate(threadPool); + } + } + private DiscoveryNode[] setupNodes() { // Sets up two data nodes in zone-a and one data node in zone-b List zones = Arrays.asList("a", "a", "b"); diff --git a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java index cd645b08a119c..e24e8b2ad3f87 100644 --- a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java +++ b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java @@ -38,6 +38,7 @@ import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.cluster.routing.IndexShardRoutingTable; @@ -55,6 +56,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexModule; import org.opensearch.index.shard.ShardId; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.threadpool.TestThreadPool; @@ -562,6 +564,71 @@ public void testReplicaShardPreferenceIters() { assertTrue(routing.primary()); } + public void testSearchableSnapshotPreference() { + AllocationService strategy = createAllocationService( + Settings.builder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build() + ); + OperationRouting operationRouting = new OperationRouting( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + + // Modify the index to be a searchable snapshot index + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(settings(Version.CURRENT).build()) + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()) + ) + .numberOfShards(2) + .numberOfReplicas(2) + ) + .build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("node1", Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))) + .add(newNode("node2", Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE))) + .add(newNode("node3", Collections.singleton(DiscoveryNodeRole.SEARCH_ROLE))) + .localNodeId("node1") + ) + .build(); + + clusterState = strategy.reroute(clusterState, "reroute"); // Move primaries to initializing + clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + + clusterState = strategy.reroute(clusterState, "reroute"); // Move replicas to initializing + clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + + // When replicas haven't initialized, it comes back with the primary first, then initializing replicas + GroupShardsIterator shardIterators = operationRouting.searchShards( + clusterState, + new String[] { "test" }, + null, + null + ); + assertThat(shardIterators.size(), equalTo(2)); // two potential shards + ShardIterator iter = shardIterators.iterator().next(); + assertThat(iter.size(), equalTo(1)); // one potential candidate (primary) for the shard + + ShardRouting routing = iter.nextOrNull(); + assertNotNull(routing); + assertTrue(routing.primary()); // Default preference is primary + assertTrue(routing.started()); + routing = iter.nextOrNull(); + assertNull(routing); // No other candidate apart from primary + } + public void testWeightedRoutingWithDifferentWeights() { TestThreadPool threadPool = null; try {