diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f7d1ea5b3d19..6a14bcdcadde3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add batching supported processor base type AbstractBatchingProcessor ([#14554](https://github.com/opensearch-project/OpenSearch/pull/14554)) - Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445)) - Add allowlist setting for ingest-common and search-pipeline-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439)) +- [Writable Warm] Changes for shard recovery and relocation in case of composite directory ([#14670](https://github.com/opensearch-project/OpenSearch/pull/14670)) ### Dependencies - Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index be849452c0f5e..7c521927b4dce 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -42,6 +42,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -49,8 +50,6 @@ import java.util.stream.Collectors; import static java.util.Arrays.asList; -import static org.opensearch.test.OpenSearchIntegTestCase.client; -import static org.opensearch.test.OpenSearchTestCase.assertBusy; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase { @@ -245,4 +244,11 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio return closeable.get(); } } + + protected boolean warmIndexSegmentReplicationEnabled() { + return Objects.equals( + IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(), + IndexModule.DataLocalityType.PARTIAL.name() + ); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 2421a1a507372..dc16c0c4db439 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -430,7 +430,6 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected ) { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); - flush(INDEX_NAME); waitForSearchableDocs(initialDocCount, nodeA, nodeB); @@ -450,7 +449,11 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected assertThat(forceMergeResponse.getFailedShards(), is(0)); assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards)); refresh(INDEX_NAME); - verifyStoreContent(); + // skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote + // store. + if (!warmIndexSegmentReplicationEnabled()) { + verifyStoreContent(); + } } } @@ -623,7 +626,7 @@ private void cancelDuringReplicaAction(String actionToblock) throws Exception { // this test stubs transport calls specific to node-node replication. assumeFalse( "Skipping the test as its not compatible with segment replication with remote store.", - segmentReplicationWithRemoteEnabled() + segmentReplicationWithRemoteEnabled() || warmIndexSegmentReplicationEnabled() ); final String primaryNode = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()); @@ -957,7 +960,11 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { } ensureGreen(INDEX_NAME); waitForSearchableDocs(docCount, primaryNode, replicaNode); - verifyStoreContent(); + // skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote + // store. + if (!warmIndexSegmentReplicationEnabled()) { + verifyStoreContent(); + } final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME); assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId()); } @@ -1068,6 +1075,12 @@ private void assertAllocationIdsInReplicaShardStats(Set expected, Set f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty() + : "There should not be any segments file in the dir"; } - assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty() - : "There should not be any segments file in the dir"; store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); } syncSegmentSuccess = true; diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index b06b3e0497cf7..a9f026badeee0 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -117,9 +117,14 @@ public void getSegmentFiles( final List toDownloadSegmentNames = new ArrayList<>(); for (StoreFileMetadata fileMetadata : filesToFetch) { String file = fileMetadata.name(); - assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; + assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial() + : "Local store already contains the file " + file; toDownloadSegmentNames.add(file); } + if (indexShard.indexSettings().isStoreLocalityPartial()) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + return; + } indexShard.getFileDownloader() .downloadAsync( cancellableThreads, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index af764556b7549..175bb6592af10 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -38,6 +38,7 @@ import java.io.UncheckedIOException; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -170,14 +171,22 @@ public void startReplication(ActionListener listener) { final StepListener checkpointInfoListener = new StepListener<>(); final StepListener getFilesListener = new StepListener<>(); + Map replicaMd = null; + try { + replicaMd = indexShard.getSegmentMetadataMap(); + } catch (IOException e) { + listener.onFailure(new RuntimeException(e)); + } + logger.trace(new ParameterizedMessage("Starting Replication Target: {}", description())); // Get list of files to copy from this checkpoint. state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO); cancellableThreads.checkForCancel(); source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); + Map finalReplicaMd = replicaMd; checkpointInfoListener.whenComplete(checkpointInfo -> { - final List filesToFetch = getFiles(checkpointInfo); + final List filesToFetch = getFiles(checkpointInfo, finalReplicaMd); state.setStage(SegmentReplicationState.Stage.GET_FILES); cancellableThreads.checkForCancel(); source.getSegmentFiles( @@ -196,31 +205,37 @@ public void startReplication(ActionListener listener) { }, listener::onFailure); } - private List getFiles(CheckpointInfoResponse checkpointInfo) throws IOException { + private List getFiles(CheckpointInfoResponse checkpointInfo, Map finalReplicaMd) + throws IOException { cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FILE_DIFF); - final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap()); + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), finalReplicaMd); // local files final Set localFiles = Set.of(indexShard.store().directory().listAll()); - // set of local files that can be reused - final Set reuseFiles = diff.missing.stream() - .filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name())) - .filter(this::validateLocalChecksum) - .map(StoreFileMetadata::name) - .collect(Collectors.toSet()); + final List missingFiles; + // Skip reuse logic for warm indices + if (indexShard.indexSettings().isStoreLocalityPartial() == true) { + missingFiles = diff.missing; + } else { + // set of local files that can be reused + final Set reuseFiles = diff.missing.stream() + .filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name())) + .filter(this::validateLocalChecksum) + .map(StoreFileMetadata::name) + .collect(Collectors.toSet()); - final List missingFiles = diff.missing.stream() - .filter(md -> reuseFiles.contains(md.name()) == false) - .collect(Collectors.toList()); + missingFiles = diff.missing.stream().filter(md -> reuseFiles.contains(md.name()) == false).collect(Collectors.toList()); + + logger.trace( + () -> new ParameterizedMessage( + "Replication diff for checkpoint {} {} {}", + checkpointInfo.getCheckpoint(), + missingFiles, + diff.different + ) + ); + } - logger.trace( - () -> new ParameterizedMessage( - "Replication diff for checkpoint {} {} {}", - checkpointInfo.getCheckpoint(), - missingFiles, - diff.different - ) - ); /* * Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming * snapshot from source that means the local copy of the segment has been corrupted/changed in some way and we throw an