diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java index 60d7eefbb6d9b..85c2514ebf00f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java @@ -41,7 +41,6 @@ public class RemoteStoreRestoreIT extends RemoteStoreBaseIntegTestCase { private static final String INDEX_NAMES = "test-remote-store-1,test-remote-store-2,remote-store-test-index-1,remote-store-test-index-2"; private static final String INDEX_NAMES_WILDCARD = "test-remote-store-*,remote-store-test-index-*"; private static final String TOTAL_OPERATIONS = "total-operations"; - private static final String REFRESHED_OR_FLUSHED_OPERATIONS = "refreshed-or-flushed-operations"; private static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total"; @Override @@ -72,18 +71,26 @@ private void restore(String... indices) { ); } - private void verifyRestoredData(Map indexStats, String indexName) { - // This is required to get updated number from already active shards which were not restored - refresh(indexName); + private void verifyRestoredData(Map indexStats, String indexName) throws Exception { ensureYellowAndNoInitializingShards(indexName); ensureGreen(indexName); - assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS)); + // This is to ensure that shards that were already assigned will get latest count + refresh(indexName); + assertBusy( + () -> assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS)), + 30, + TimeUnit.SECONDS + ); IndexResponse response = indexSingleDoc(indexName); if (indexStats.containsKey(MAX_SEQ_NO_TOTAL + "-shard-" + response.getShardId().id())) { assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL + "-shard-" + response.getShardId().id()) + 1, response.getSeqNo()); } refresh(indexName); - assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1); + assertBusy( + () -> assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1), + 30, + TimeUnit.SECONDS + ); } private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) { @@ -100,7 +107,7 @@ private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, St * Simulates all data restored using Remote Translog Store. * @throws IOException IO Exception. */ - public void testRemoteTranslogRestoreWithNoDataPostCommit() throws IOException { + public void testRemoteTranslogRestoreWithNoDataPostCommit() throws Exception { testRestoreFlow(1, true, randomIntBetween(1, 5)); } @@ -108,7 +115,7 @@ public void testRemoteTranslogRestoreWithNoDataPostCommit() throws IOException { * Simulates all data restored using Remote Translog Store. * @throws IOException IO Exception. */ - public void testRemoteTranslogRestoreWithNoDataPostRefresh() throws IOException { + public void testRemoteTranslogRestoreWithNoDataPostRefresh() throws Exception { testRestoreFlow(1, false, randomIntBetween(1, 5)); } @@ -117,7 +124,7 @@ public void testRemoteTranslogRestoreWithNoDataPostRefresh() throws IOException * and unrefreshed data restored using Remote Translog Store. * @throws IOException IO Exception. */ - public void testRemoteTranslogRestoreWithRefreshedData() throws IOException { + public void testRemoteTranslogRestoreWithRefreshedData() throws Exception { testRestoreFlow(randomIntBetween(2, 5), false, randomIntBetween(1, 5)); } @@ -126,7 +133,7 @@ public void testRemoteTranslogRestoreWithRefreshedData() throws IOException { * and unrefreshed data restored using Remote Translog Store. * @throws IOException IO Exception. */ - public void testRemoteTranslogRestoreWithCommittedData() throws IOException { + public void testRemoteTranslogRestoreWithCommittedData() throws Exception { testRestoreFlow(randomIntBetween(2, 5), true, randomIntBetween(1, 5)); } @@ -134,8 +141,7 @@ public void testRemoteTranslogRestoreWithCommittedData() throws IOException { * Simulates all data restored using Remote Translog Store. * @throws IOException IO Exception. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479") - public void testRTSRestoreWithNoDataPostCommitPrimaryReplicaDown() throws IOException { + public void testRTSRestoreWithNoDataPostCommitPrimaryReplicaDown() throws Exception { testRestoreFlowBothPrimaryReplicasDown(1, true, randomIntBetween(1, 5)); } @@ -143,8 +149,7 @@ public void testRTSRestoreWithNoDataPostCommitPrimaryReplicaDown() throws IOExce * Simulates all data restored using Remote Translog Store. * @throws IOException IO Exception. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479") - public void testRTSRestoreWithNoDataPostRefreshPrimaryReplicaDown() throws IOException { + public void testRTSRestoreWithNoDataPostRefreshPrimaryReplicaDown() throws Exception { testRestoreFlowBothPrimaryReplicasDown(1, false, randomIntBetween(1, 5)); } @@ -153,8 +158,7 @@ public void testRTSRestoreWithNoDataPostRefreshPrimaryReplicaDown() throws IOExc * and unrefreshed data restored using Remote Translog Store. * @throws IOException IO Exception. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479") - public void testRTSRestoreWithRefreshedDataPrimaryReplicaDown() throws IOException { + public void testRTSRestoreWithRefreshedDataPrimaryReplicaDown() throws Exception { testRestoreFlowBothPrimaryReplicasDown(randomIntBetween(2, 5), false, randomIntBetween(1, 5)); } @@ -163,12 +167,11 @@ public void testRTSRestoreWithRefreshedDataPrimaryReplicaDown() throws IOExcepti * and unrefreshed data restored using Remote Translog Store. * @throws IOException IO Exception. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479") - public void testRTSRestoreWithCommittedDataPrimaryReplicaDown() throws IOException { + public void testRTSRestoreWithCommittedDataPrimaryReplicaDown() throws Exception { testRestoreFlowBothPrimaryReplicasDown(randomIntBetween(2, 5), true, randomIntBetween(1, 5)); } - private void restoreAndVerify(int shardCount, int replicaCount, Map indexStats) { + private void restoreAndVerify(int shardCount, int replicaCount, Map indexStats) throws Exception { restore(INDEX_NAME); ensureGreen(INDEX_NAME); // This is required to get updated number from already active shards which were not restored @@ -183,7 +186,7 @@ private void restoreAndVerify(int shardCount, int replicaCount, Map indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME); assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); @@ -202,10 +205,10 @@ private void testRestoreFlow(int numberOfIterations, boolean invokeFlush, int sh * @param invokeFlush If true, a flush is invoked. Otherwise, a refresh is invoked. * @throws IOException IO Exception. */ - private void testRestoreFlowBothPrimaryReplicasDown(int numberOfIterations, boolean invokeFlush, int shardCount) throws IOException { + private void testRestoreFlowBothPrimaryReplicasDown(int numberOfIterations, boolean invokeFlush, int shardCount) throws Exception { prepareCluster(1, 2, INDEX_NAME, 1, shardCount); Map indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME); - assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); + assertEquals(shardCount * 2, getNumShards(INDEX_NAME).totalNumShards); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNodeName(INDEX_NAME))); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); @@ -221,14 +224,14 @@ private void testRestoreFlowBothPrimaryReplicasDown(int numberOfIterations, bool * @param invokeFlush If true, a flush is invoked. Otherwise, a refresh is invoked. * @throws IOException IO Exception. */ - private void testRestoreFlowMultipleIndices(int numberOfIterations, boolean invokeFlush, int shardCount) throws IOException { + private void testRestoreFlowMultipleIndices(int numberOfIterations, boolean invokeFlush, int shardCount) throws Exception { prepareCluster(1, 3, INDEX_NAMES, 1, shardCount); String[] indices = INDEX_NAMES.split(","); Map> indicesStats = new HashMap<>(); for (String index : indices) { Map indexStats = indexData(numberOfIterations, invokeFlush, index); indicesStats.put(index, indexStats); - assertEquals(shardCount, getNumShards(index).totalNumShards); + assertEquals(shardCount * 2, getNumShards(index).totalNumShards); } for (String index : indices) { @@ -259,7 +262,7 @@ private void testRestoreFlowMultipleIndices(int numberOfIterations, boolean invo ); ensureGreen(indices); for (String index : indices) { - assertEquals(shardCount, getNumShards(index).totalNumShards); + assertEquals(shardCount * 2, getNumShards(index).totalNumShards); verifyRestoredData(indicesStats.get(index), index); } } @@ -280,7 +283,7 @@ public void testRestoreFlowAllShardsNoRedIndex() throws InterruptedException { } } - public void testRestoreFlowNoRedIndex() { + public void testRestoreFlowNoRedIndex() throws Exception { int shardCount = randomIntBetween(1, 5); prepareCluster(0, 3, INDEX_NAME, 0, shardCount); Map indexStats = indexData(randomIntBetween(2, 5), true, INDEX_NAME); @@ -302,7 +305,7 @@ public void testRestoreFlowNoRedIndex() { * @throws IOException IO Exception. */ @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8480") - public void testRTSRestoreWithCommittedDataMultipleIndicesPatterns() throws IOException { + public void testRTSRestoreWithCommittedDataMultipleIndicesPatterns() throws Exception { testRestoreFlowMultipleIndices(2, true, randomIntBetween(1, 5)); } @@ -313,7 +316,7 @@ public void testRTSRestoreWithCommittedDataMultipleIndicesPatterns() throws IOEx * @throws IOException IO Exception. */ @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8480") - public void testRTSRestoreWithCommittedDataDefaultAllIndices() throws IOException { + public void testRTSRestoreWithCommittedDataDefaultAllIndices() throws Exception { int shardCount = randomIntBetween(1, 5); prepareCluster(1, 3, INDEX_NAMES, 1, shardCount); String[] indices = INDEX_NAMES.split(","); @@ -354,7 +357,7 @@ public void testRTSRestoreWithCommittedDataDefaultAllIndices() throws IOExceptio * with only some of the remote-enabled red indices requested for the restore. * @throws IOException IO Exception. */ - public void testRTSRestoreWithCommittedDataNotAllRedRemoteIndices() throws IOException { + public void testRTSRestoreWithCommittedDataNotAllRedRemoteIndices() throws Exception { int shardCount = randomIntBetween(1, 5); prepareCluster(1, 3, INDEX_NAMES, 0, shardCount); String[] indices = INDEX_NAMES.split(","); @@ -402,7 +405,7 @@ public void testRTSRestoreWithCommittedDataNotAllRedRemoteIndices() throws IOExc * @throws IOException IO Exception. */ @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8480") - public void testRTSRestoreWithCommittedDataExcludeIndicesPatterns() throws IOException { + public void testRTSRestoreWithCommittedDataExcludeIndicesPatterns() throws Exception { int shardCount = randomIntBetween(1, 5); prepareCluster(1, 3, INDEX_NAMES, 1, shardCount); String[] indices = INDEX_NAMES.split(","); @@ -451,7 +454,7 @@ public void testRTSRestoreWithCommittedDataExcludeIndicesPatterns() throws IOExc * when the index has no data. * @throws IOException IO Exception. */ - public void testRTSRestoreDataOnlyInTranslog() throws IOException { + public void testRTSRestoreDataOnlyInTranslog() throws Exception { testRestoreFlow(0, true, randomIntBetween(1, 5)); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java index e4ffacd708632..fd70d319780c8 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java @@ -453,7 +453,8 @@ public Builder initializeAsRestore(IndexMetadata indexMetadata, SnapshotRecovery public Builder initializeAsRemoteStoreRestore( IndexMetadata indexMetadata, RemoteStoreRecoverySource recoverySource, - Map activeInitializingShards + Map indexShardRoutingTableMap, + boolean restoreAllShards ) { final UnassignedInfo unassignedInfo = new UnassignedInfo( UnassignedInfo.Reason.EXISTING_INDEX_RESTORED, @@ -465,11 +466,34 @@ public Builder initializeAsRemoteStoreRestore( } for (int shardNumber = 0; shardNumber < indexMetadata.getNumberOfShards(); shardNumber++) { ShardId shardId = new ShardId(index, shardNumber); + if (indexShardRoutingTableMap.containsKey(shardId) == false) { + throw new IllegalStateException("IndexShardRoutingTable is not present for shardId: " + shardId); + } IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); - if (activeInitializingShards.containsKey(shardId)) { - indexShardRoutingBuilder.addShard(activeInitializingShards.get(shardId)); - } else { + IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingTableMap.get(shardId); + if (restoreAllShards || indexShardRoutingTable.primaryShard().unassigned()) { + // Primary shard to be recovered from remote store. indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId, true, recoverySource, unassignedInfo)); + // All the replica shards to be recovered from peer recovery. + indexShardRoutingTable.replicaShards() + .forEach( + shardRouting -> indexShardRoutingBuilder.addShard( + ShardRouting.newUnassigned(shardId, false, PeerRecoverySource.INSTANCE, unassignedInfo) + ) + ); + } else { + // Primary is either active or initializing. Do not trigger restore. + indexShardRoutingBuilder.addShard(indexShardRoutingTable.primaryShard()); + // Replica, if unassigned, trigger peer recovery else no action. + for (ShardRouting shardRouting : indexShardRoutingTable.replicaShards()) { + if (shardRouting.unassigned()) { + indexShardRoutingBuilder.addShard( + ShardRouting.newUnassigned(shardId, false, PeerRecoverySource.INSTANCE, unassignedInfo) + ); + } else { + indexShardRoutingBuilder.addShard(shardRouting); + } + } } shards.put(shardNumber, indexShardRoutingBuilder.build()); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java index 9ca4da0b5a85e..bcfc324b202b9 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java @@ -575,10 +575,11 @@ public Builder addAsFromOpenToClose(IndexMetadata indexMetadata) { public Builder addAsRemoteStoreRestore( IndexMetadata indexMetadata, RemoteStoreRecoverySource recoverySource, - Map activeInitializingShards + Map indexShardRoutingTableMap, + boolean restoreAllShards ) { IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()) - .initializeAsRemoteStoreRestore(indexMetadata, recoverySource, activeInitializingShards); + .initializeAsRemoteStoreRestore(indexMetadata, recoverySource, indexShardRoutingTableMap, restoreAllShards); add(indexRoutingBuilder); return this; } diff --git a/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java index 2617898fef491..9fdd2ff9f759d 100644 --- a/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java +++ b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java @@ -19,7 +19,6 @@ import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.UUIDs; @@ -31,7 +30,6 @@ import org.opensearch.snapshots.RestoreService; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -80,7 +78,6 @@ public ClusterState execute(ClusterState currentState) { } if (currentIndexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)) { IndexMetadata updatedIndexMetadata = currentIndexMetadata; - Map activeInitializingShards = new HashMap<>(); if (request.restoreAllShards()) { if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) { throw new IllegalStateException( @@ -97,17 +94,15 @@ public ClusterState execute(ClusterState currentState) { .settingsVersion(1 + currentIndexMetadata.getSettingsVersion()) .aliasesVersion(1 + currentIndexMetadata.getAliasesVersion()) .build(); - } else { - activeInitializingShards = currentState.routingTable() - .index(index) - .shards() - .values() - .stream() - .map(IndexShardRoutingTable::primaryShard) - .filter(shardRouting -> shardRouting.unassigned() == false) - .collect(Collectors.toMap(ShardRouting::shardId, Function.identity())); } + Map indexShardRoutingTableMap = currentState.routingTable() + .index(index) + .shards() + .values() + .stream() + .collect(Collectors.toMap(IndexShardRoutingTable::shardId, Function.identity())); + IndexId indexId = new IndexId(index, updatedIndexMetadata.getIndexUUID()); RecoverySource.RemoteStoreRecoverySource recoverySource = new RecoverySource.RemoteStoreRecoverySource( @@ -115,7 +110,12 @@ public ClusterState execute(ClusterState currentState) { updatedIndexMetadata.getCreationVersion(), indexId ); - rtBuilder.addAsRemoteStoreRestore(updatedIndexMetadata, recoverySource, activeInitializingShards); + rtBuilder.addAsRemoteStoreRestore( + updatedIndexMetadata, + recoverySource, + indexShardRoutingTableMap, + request.restoreAllShards() + ); blocks.updateBlocks(updatedIndexMetadata); mdBuilder.put(updatedIndexMetadata, true); indicesToBeRestored.add(index); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index be94915eecb09..a8462f14fa4f4 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2335,7 +2335,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b assert currentEngineReference.get() == null : "engine is running"; verifyNotClosed(); if (indexSettings.isRemoteStoreEnabled() && syncFromRemote) { - syncSegmentsFromRemoteSegmentStore(false, true); + syncSegmentsFromRemoteSegmentStore(false); } if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { if (syncFromRemote) { @@ -4603,7 +4603,7 @@ public void close() throws IOException { }; IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); if (indexSettings.isRemoteStoreEnabled()) { - syncSegmentsFromRemoteSegmentStore(false, true); + syncSegmentsFromRemoteSegmentStore(false); } if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { syncRemoteTranslogAndUpdateGlobalCheckpoint(); @@ -4663,10 +4663,9 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException { /** * Downloads segments from remote segment store. * @param overrideLocal flag to override local segment files with those in remote store - * @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise * @throws IOException if exception occurs while reading segments from remote store */ - public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException { + public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException { assert indexSettings.isRemoteStoreEnabled(); logger.trace("Downloading segments from remote segment store"); RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory(); @@ -4678,9 +4677,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re .getSegmentsUploadedToRemoteStore() .entrySet() .stream() - // if this is a refresh level sync, ignore any segments_n uploaded to the store, we will commit the received infos bytes - // locally. - .filter(entry -> refreshLevelSegmentSync && entry.getKey().startsWith(IndexFileNames.SEGMENTS) == false) + .filter(entry -> entry.getKey().startsWith(IndexFileNames.SEGMENTS) == false) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); store.incRef(); remoteStore.incRef(); @@ -4701,7 +4698,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re } copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal); - if (refreshLevelSegmentSync && remoteSegmentMetadata != null) { + if (remoteSegmentMetadata != null) { final SegmentInfos infosSnapshot = store.buildSegmentInfos( remoteSegmentMetadata.getSegmentInfosBytes(), remoteSegmentMetadata.getGeneration() diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index e8a9ec866ac01..bbb8fd1b3d08e 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -173,6 +173,14 @@ private boolean syncSegments() { indexShard.getReplicationTracker().isPrimaryMode(), indexShard.state() ); + // Following check is required to enable retry and make sure that we do not lose this refresh event + // When primary shard is restored from remote store, the recovery happens first followed by changing + // primaryMode to true. Due to this, the refresh that is triggered post replay of translog will not go through + // if following condition does not exist. The segments created as part of translog replay will not be present + // in the remote store. + if (indexShard.state() == IndexShardState.STARTED && indexShard.getEngine() instanceof InternalEngine) { + return false; + } return true; } ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint(); diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index d0c083390ab70..6d675b709e05b 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -535,11 +535,13 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco remoteStore.incRef(); try { // Download segments from remote segment store - indexShard.syncSegmentsFromRemoteSegmentStore(true, true); + indexShard.syncSegmentsFromRemoteSegmentStore(true); indexShard.syncTranslogFilesFromRemoteTranslog(); - if (store.directory().listAll().length == 0) { + // On index creation, the only segment file that is created is segments_N. We can safely discard this file + // as there is no data associated with this shard as part of segments. + if (store.directory().listAll().length <= 1) { Path location = indexShard.shardPath().resolveTranslog(); Checkpoint checkpoint = Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME)); final Path translogFile = location.resolve(Translog.getFilename(checkpoint.getGeneration())); diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 88b4cd063b8a6..df1589b3f29b9 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi indexShard.prepareForIndexRecovery(); final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled(); if (hasRemoteSegmentStore) { - indexShard.syncSegmentsFromRemoteSegmentStore(false, true); + indexShard.syncSegmentsFromRemoteSegmentStore(false); } final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java index 6e7583fbd75d5..8542ff53c6ff1 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java @@ -50,13 +50,18 @@ import org.opensearch.repositories.IndexId; import org.junit.Before; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING; +import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -64,6 +69,7 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class RoutingTableTests extends OpenSearchAllocationTestCase { @@ -540,8 +546,47 @@ public void testAddAsRecovery() { } } - public void testAddAsRemoteStoreRestore() { - final IndexMetadata indexMetadata = createIndexMetadata(TEST_INDEX_1).state(IndexMetadata.State.OPEN).build(); + private Map getIndexShardRoutingTableMap(Index index, boolean allUnassigned, int numberOfReplicas) { + Map indexShardRoutingTableMap = new HashMap<>(); + List activeInitializingStates = List.of(INITIALIZING, STARTED, RELOCATING); + for (int i = 0; i < this.numberOfShards; i++) { + IndexShardRoutingTable indexShardRoutingTable = mock(IndexShardRoutingTable.class); + ShardRouting primaryShardRouting = mock(ShardRouting.class); + Boolean primaryUnassigned = allUnassigned || randomBoolean(); + when(primaryShardRouting.unassigned()).thenReturn(primaryUnassigned); + if (primaryUnassigned) { + when(primaryShardRouting.state()).thenReturn(UNASSIGNED); + } else { + when(primaryShardRouting.state()).thenReturn( + activeInitializingStates.get(randomIntBetween(0, activeInitializingStates.size() - 1)) + ); + } + when(indexShardRoutingTable.primaryShard()).thenReturn(primaryShardRouting); + List replicaShards = new ArrayList<>(); + for (int j = 0; j < numberOfReplicas; j++) { + ShardRouting replicaShardRouting = mock(ShardRouting.class); + Boolean replicaUnassigned = allUnassigned || randomBoolean(); + when(replicaShardRouting.unassigned()).thenReturn(replicaUnassigned); + if (replicaUnassigned) { + when(replicaShardRouting.state()).thenReturn(UNASSIGNED); + } else { + when(replicaShardRouting.state()).thenReturn( + activeInitializingStates.get(randomIntBetween(0, activeInitializingStates.size() - 1)) + ); + } + replicaShards.add(replicaShardRouting); + } + when(indexShardRoutingTable.replicaShards()).thenReturn(replicaShards); + indexShardRoutingTableMap.put(new ShardId(index, i), indexShardRoutingTable); + } + return indexShardRoutingTableMap; + } + + public void testAddAsRemoteStoreRestoreAllUnassigned() { + int numberOfReplicas = randomIntBetween(0, 5); + final IndexMetadata indexMetadata = createIndexMetadata(TEST_INDEX_1).state(IndexMetadata.State.OPEN) + .numberOfReplicas(numberOfReplicas) + .build(); final RemoteStoreRecoverySource remoteStoreRecoverySource = new RemoteStoreRecoverySource( "restore_uuid", Version.CURRENT, @@ -550,34 +595,78 @@ public void testAddAsRemoteStoreRestore() { final RoutingTable routingTable = new RoutingTable.Builder().addAsRemoteStoreRestore( indexMetadata, remoteStoreRecoverySource, - new HashMap<>() + getIndexShardRoutingTableMap(indexMetadata.getIndex(), true, numberOfReplicas), + false ).build(); assertTrue(routingTable.hasIndex(TEST_INDEX_1)); - assertEquals(this.numberOfShards, routingTable.allShards(TEST_INDEX_1).size()); - assertEquals(this.numberOfShards, routingTable.index(TEST_INDEX_1).shardsWithState(UNASSIGNED).size()); + int numberOfShards = this.numberOfShards * (numberOfReplicas + 1); + assertEquals(numberOfShards, routingTable.allShards(TEST_INDEX_1).size()); + assertEquals(numberOfShards, routingTable.index(TEST_INDEX_1).shardsWithState(UNASSIGNED).size()); } public void testAddAsRemoteStoreRestoreWithActiveShards() { - final IndexMetadata indexMetadata = createIndexMetadata(TEST_INDEX_1).state(IndexMetadata.State.OPEN).build(); + int numberOfReplicas = randomIntBetween(0, 5); + final IndexMetadata indexMetadata = createIndexMetadata(TEST_INDEX_1).state(IndexMetadata.State.OPEN) + .numberOfReplicas(numberOfReplicas) + .build(); final RemoteStoreRecoverySource remoteStoreRecoverySource = new RemoteStoreRecoverySource( "restore_uuid", Version.CURRENT, new IndexId(TEST_INDEX_1, "1") ); - Map activeInitializingShards = new HashMap<>(); - for (int i = 0; i < randomIntBetween(1, this.numberOfShards); i++) { - activeInitializingShards.put(new ShardId(indexMetadata.getIndex(), i), mock(ShardRouting.class)); - } + Map indexShardRoutingTableMap = getIndexShardRoutingTableMap( + indexMetadata.getIndex(), + false, + numberOfReplicas + ); final RoutingTable routingTable = new RoutingTable.Builder().addAsRemoteStoreRestore( indexMetadata, remoteStoreRecoverySource, - activeInitializingShards + indexShardRoutingTableMap, + false ).build(); assertTrue(routingTable.hasIndex(TEST_INDEX_1)); - assertEquals(this.numberOfShards, routingTable.allShards(TEST_INDEX_1).size()); - assertEquals( - this.numberOfShards - activeInitializingShards.size(), - routingTable.index(TEST_INDEX_1).shardsWithState(UNASSIGNED).size() + int numberOfShards = this.numberOfShards * (numberOfReplicas + 1); + assertEquals(numberOfShards, routingTable.allShards(TEST_INDEX_1).size()); + int unassignedShards = 0; + for (IndexShardRoutingTable indexShardRoutingTable : indexShardRoutingTableMap.values()) { + if (indexShardRoutingTable.primaryShard().unassigned()) { + unassignedShards += indexShardRoutingTable.replicaShards().size() + 1; + } else { + for (ShardRouting replicaShardRouting : indexShardRoutingTable.replicaShards()) { + if (replicaShardRouting.unassigned()) { + unassignedShards += 1; + } + } + } + } + assertEquals(unassignedShards, routingTable.index(TEST_INDEX_1).shardsWithState(UNASSIGNED).size()); + } + + public void testAddAsRemoteStoreRestoreShardMismatch() { + int numberOfReplicas = randomIntBetween(0, 5); + final IndexMetadata indexMetadata = createIndexMetadata(TEST_INDEX_1).state(IndexMetadata.State.OPEN) + .numberOfReplicas(numberOfReplicas) + .build(); + final RemoteStoreRecoverySource remoteStoreRecoverySource = new RemoteStoreRecoverySource( + "restore_uuid", + Version.CURRENT, + new IndexId(TEST_INDEX_1, "1") + ); + Map indexShardRoutingTableMap = getIndexShardRoutingTableMap( + indexMetadata.getIndex(), + true, + numberOfReplicas + ); + indexShardRoutingTableMap.remove(indexShardRoutingTableMap.keySet().iterator().next()); + assertThrows( + IllegalStateException.class, + () -> new RoutingTable.Builder().addAsRemoteStoreRestore( + indexMetadata, + remoteStoreRecoverySource, + indexShardRoutingTableMap, + false + ).build() ); } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 83b07e986bcc5..95fe67592d5f8 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -532,10 +532,9 @@ private Tuple mockIndexS new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool ); - RemoteStorePressureService remoteStorePressureService = new RemoteStorePressureService(clusterService, Settings.EMPTY); + RemoteStorePressureService remoteStorePressureService = indexShard.getRemoteStorePressureService(); when(shard.indexSettings()).thenReturn(indexShard.indexSettings()); when(shard.shardId()).thenReturn(indexShard.shardId()); - remoteStorePressureService.afterIndexShardCreated(shard); RemoteSegmentTransferTracker tracker = remoteStorePressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, emptyCheckpointPublisher, tracker); refreshListener.afterRefresh(true);