diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index be849452c0f5e..549b751a7efea 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication; +import java.util.Objects; import org.apache.lucene.index.SegmentInfos; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.ClusterState; @@ -49,8 +50,6 @@ import java.util.stream.Collectors; import static java.util.Arrays.asList; -import static org.opensearch.test.OpenSearchIntegTestCase.client; -import static org.opensearch.test.OpenSearchTestCase.assertBusy; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase { @@ -245,4 +244,8 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio return closeable.get(); } } + + protected boolean warmIndexSegmentReplicationEnabled() { + return Objects.equals(IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(), IndexModule.DataLocalityType.PARTIAL.name()); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 70da3b0e38472..fe09284295d0d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -20,6 +20,8 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.StandardDirectoryReader; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse; @@ -430,7 +432,6 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected ) { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); - flush(INDEX_NAME); waitForSearchableDocs(initialDocCount, nodeA, nodeB); @@ -450,7 +451,10 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected assertThat(forceMergeResponse.getFailedShards(), is(0)); assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards)); refresh(INDEX_NAME); - verifyStoreContent(); + //skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote store. + if(!warmIndexSegmentReplicationEnabled()) { + verifyStoreContent(); + } } } @@ -623,7 +627,7 @@ private void cancelDuringReplicaAction(String actionToblock) throws Exception { // this test stubs transport calls specific to node-node replication. assumeFalse( "Skipping the test as its not compatible with segment replication with remote store.", - segmentReplicationWithRemoteEnabled() + segmentReplicationWithRemoteEnabled() || warmIndexSegmentReplicationEnabled() ); final String primaryNode = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()); @@ -957,7 +961,10 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { } ensureGreen(INDEX_NAME); waitForSearchableDocs(docCount, primaryNode, replicaNode); - verifyStoreContent(); + //skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote store. + if (!warmIndexSegmentReplicationEnabled()) { + verifyStoreContent(); + } final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME); assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId()); } @@ -1068,6 +1075,12 @@ private void assertAllocationIdsInReplicaShardStats(Set expected, Set nodeAttributes = node.getAttributes(); + String type = nodeAttributes.get(String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name)); + + String settingsAttributeKeyPrefix = String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, name); + Map settingsMap = node.getAttributes() + .keySet() + .stream() + .filter(key -> key.startsWith(settingsAttributeKeyPrefix)) + .collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> node.getAttributes().get(key))); + + Settings.Builder settings = Settings.builder(); + settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue())); + settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true); + + return new RepositoryMetadata(name, type, settings.build()); + } + + public void assertRemoteStoreRepositoryOnAllNodes(String repositoryName) { + RepositoriesMetadata repositories = internalCluster().getInstance(ClusterService.class, internalCluster().getNodeNames()[0]) + .state() + .metadata() + .custom(RepositoriesMetadata.TYPE); + RepositoryMetadata actualRepository = repositories.repository(repositoryName); + + final RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class); + final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repositoryName); + + for (String nodeName : internalCluster().getNodeNames()) { + ClusterService clusterService = internalCluster().getInstance(ClusterService.class, nodeName); + DiscoveryNode node = clusterService.localNode(); + RepositoryMetadata expectedRepository = buildRepositoryMetadata(node, repositoryName); + + // Validated that all the restricted settings are entact on all the nodes. + repository.getRestrictedSystemRepositorySettings() + .stream() + .forEach( + setting -> assertEquals( + String.format(Locale.ROOT, "Restricted Settings mismatch [%s]", setting.getKey()), + setting.get(actualRepository.settings()), + setting.get(expectedRepository.settings()) + ) + ); + } + } + +} diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index d759423ce5a55..4a7c804fc74d8 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -171,7 +171,9 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep // a lower gen from a newly elected primary shard that is behind this shard's last commit gen. // In that case we still commit into the next local generation. if (incomingGeneration != this.lastReceivedPrimaryGen) { - flush(false, true); + if(engineConfig.getIndexSettings().isStoreLocalityPartial() == false) { + flush(false, true); + } translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo); translogManager.rollTranslogGeneration(); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 82b68b32f3bf8..0c6ab28a2853d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -5030,6 +5030,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE */ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException { boolean syncSegmentSuccess = false; + boolean shouldOverrideLocalFiles = overrideLocal && indexSettings.isStoreLocalityPartial() == false; + long startTimeMs = System.currentTimeMillis(); assert indexSettings.isRemoteStoreEnabled() || this.isRemoteSeeded(); logger.trace("Downloading segments from remote segment store"); @@ -5052,7 +5054,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn storeDirectory = new StoreRecovery.StatsDirectoryWrapper(store.directory(), recoveryState.getIndex()); for (String file : uploadedSegments.keySet()) { long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum()); - if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) { + if (shouldOverrideLocalFiles || localDirectoryContains(storeDirectory, file, checksum) == false) { recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), false); } else { recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), true); @@ -5061,7 +5063,9 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn } else { storeDirectory = store.directory(); } - copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync); + if (indexSettings.isStoreLocalityPartial() == false) { + copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync); + } if (remoteSegmentMetadata != null) { final SegmentInfos infosSnapshot = store.buildSegmentInfos( @@ -5071,13 +5075,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); // delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes. // Extra segments will be wiped on engine open. - for (String file : List.of(store.directory().listAll())) { - if (file.startsWith(IndexFileNames.SEGMENTS)) { - store.deleteQuiet(file); + if (indexSettings.isStoreLocalityPartial() == false) { + for (String file : List.of(store.directory().listAll())) { + if (file.startsWith(IndexFileNames.SEGMENTS)) { + store.deleteQuiet(file); + } } + assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty() + : "There should not be any segments file in the dir"; } - assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty() - : "There should not be any segments file in the dir"; store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); } syncSegmentSuccess = true; diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java index b06b3e0497cf7..0bc762d1ec6de 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -117,9 +117,13 @@ public void getSegmentFiles( final List toDownloadSegmentNames = new ArrayList<>(); for (StoreFileMetadata fileMetadata : filesToFetch) { String file = fileMetadata.name(); - assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; + assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial() : "Local store already contains the file " + file; toDownloadSegmentNames.add(file); } + if(indexShard.indexSettings().isStoreLocalityPartial()) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + return; + } indexShard.getFileDownloader() .downloadAsync( cancellableThreads, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index af764556b7549..e853e0c301ba7 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication; +import java.util.Map; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; @@ -170,14 +171,22 @@ public void startReplication(ActionListener listener) { final StepListener checkpointInfoListener = new StepListener<>(); final StepListener getFilesListener = new StepListener<>(); + Map replicaMd = null; + try { + replicaMd = indexShard.getSegmentMetadataMap(); + } catch (IOException e) { + listener.onFailure(new RuntimeException(e)); + } + logger.trace(new ParameterizedMessage("Starting Replication Target: {}", description())); // Get list of files to copy from this checkpoint. state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO); cancellableThreads.checkForCancel(); source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); + Map finalReplicaMd = replicaMd; checkpointInfoListener.whenComplete(checkpointInfo -> { - final List filesToFetch = getFiles(checkpointInfo); + final List filesToFetch = getFiles(checkpointInfo, finalReplicaMd); state.setStage(SegmentReplicationState.Stage.GET_FILES); cancellableThreads.checkForCancel(); source.getSegmentFiles( @@ -196,31 +205,38 @@ public void startReplication(ActionListener listener) { }, listener::onFailure); } - private List getFiles(CheckpointInfoResponse checkpointInfo) throws IOException { + private List getFiles(CheckpointInfoResponse checkpointInfo, Map finalReplicaMd) throws IOException { cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FILE_DIFF); - final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap()); + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), finalReplicaMd); // local files final Set localFiles = Set.of(indexShard.store().directory().listAll()); - // set of local files that can be reused - final Set reuseFiles = diff.missing.stream() - .filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name())) - .filter(this::validateLocalChecksum) - .map(StoreFileMetadata::name) - .collect(Collectors.toSet()); + final List missingFiles; + // Skip reuse logic for warm indices + if (indexShard.indexSettings().isStoreLocalityPartial() == true) { + missingFiles = diff.missing; + } else { + // set of local files that can be reused + final Set reuseFiles = diff.missing.stream() + .filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name())) + .filter(this::validateLocalChecksum) + .map(StoreFileMetadata::name) + .collect(Collectors.toSet()); - final List missingFiles = diff.missing.stream() - .filter(md -> reuseFiles.contains(md.name()) == false) - .collect(Collectors.toList()); + missingFiles = diff.missing.stream() + .filter(md -> reuseFiles.contains(md.name()) == false) + .collect(Collectors.toList()); + + logger.trace( + () -> new ParameterizedMessage( + "Replication diff for checkpoint {} {} {}", + checkpointInfo.getCheckpoint(), + missingFiles, + diff.different + ) + ); + } - logger.trace( - () -> new ParameterizedMessage( - "Replication diff for checkpoint {} {} {}", - checkpointInfo.getCheckpoint(), - missingFiles, - diff.different - ) - ); /* * Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming * snapshot from source that means the local copy of the segment has been corrupted/changed in some way and we throw an