diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 970217444e20e..f4543a6fa7490 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -56,4 +56,38 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); } + + public void testReplicationAfterForceMerge() throws Exception { + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + createIndex( + INDEX_NAME, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true) + .build() + ); + ensureGreen(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), initialDocCount)) { + waitForDocs(initialDocCount, indexer); + } + flush(INDEX_NAME); + // wait a short amount of time to give replication a chance to complete. + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int totalDocs = initialDocCount + additionalDocCount; + try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), additionalDocCount)) { + waitForDocs(additionalDocCount, indexer); + } + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); + } } diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index a768a5275e586..904f590e96d39 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -236,7 +236,7 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException { } } - public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {}; + public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {} public long getProcessedLocalCheckpoint() { return 0L; diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 3b69a8a25753e..f7fb2cff409ec 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -63,10 +63,6 @@ import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.Weight; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.BufferedChecksumIndexInput; -import org.apache.lucene.store.ByteBuffersDataInput; -import org.apache.lucene.store.ByteBuffersIndexInput; -import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.BytesRef; @@ -120,7 +116,6 @@ import java.io.Closeable; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.Arrays; import java.util.HashMap; @@ -345,21 +340,13 @@ public InternalEngine(EngineConfig engineConfig) { } @Override - public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException { + public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException { assert engineConfig.isPrimary() == false : "Only replicas should update Infos"; - SegmentInfos infos = SegmentInfos.readCommit(this.store.directory(), toIndexInput(infosBytes), gen); - assert gen == infos.getGeneration(); externalReaderManager.internalReaderManager.updateSegments(infos); externalReaderManager.maybeRefresh(); localCheckpointTracker.markSeqNoAsProcessed(seqNo); } - private ChecksumIndexInput toIndexInput(byte[] input) { - return new BufferedChecksumIndexInput( - new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(input))), "SegmentInfos") - ); - } - private LocalCheckpointTracker createLocalCheckpointTracker( BiFunction localCheckpointTrackerSupplier ) throws IOException { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d4753e7763e6e..2ad470066d52a 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1522,8 +1522,8 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { ); } - public void updateCurrentInfos(long gen, byte[] infosBytes, long seqNo) throws IOException { - getEngine().updateCurrentInfos(infosBytes, gen, seqNo); + public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException { + getEngine().updateCurrentInfos(infos, seqNo); } /** diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 893d87e874b4a..7a548722f033b 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -702,6 +702,50 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr } } + /** + * This method deletes every file in this store that is not contained in either the remote or local metadata snapshots. + * This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file. + * In this case files from both snapshots must be preserved. + * @param reason the reason for this cleanup operation logged for each deleted file + * @param remoteSnapshot The remote snapshot sent from primary shards. + * @param localSnapshot The local snapshot from in memory SegmentInfos. + * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. + */ + public void cleanupAndVerify(String reason, MetadataSnapshot remoteSnapshot, MetadataSnapshot localSnapshot) throws IOException { + // fetch a snapshot from the latest on disk Segments_N file. This can be behind + // the passed in local in memory snapshot, so we want to ensure files it references are not removed. + final Store.MetadataSnapshot onDiskMetadata = getMetadata((IndexCommit) null); + metadataLock.writeLock().lock(); + try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + for (String existingFile : directory.listAll()) { + if (Store.isAutogenerated(existingFile) || remoteSnapshot.contains(existingFile) || onDiskMetadata.contains(existingFile)) { + // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete + // checksum) + continue; + } + try { + logger.info("Deleting file {}-{}", shardId, existingFile); + directory.deleteFile(reason, existingFile); + // FNF should not happen since we hold a write lock? + } catch (IOException ex) { + if (existingFile.startsWith(IndexFileNames.SEGMENTS) + || existingFile.equals(IndexFileNames.OLD_SEGMENTS_GEN) + || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) { + // TODO do we need to also fail this if we can't delete the pending commit file? + // if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit + // point around? + throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex); + } + logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex); + // ignore, we don't really care, will get deleted later on + } + } + verifyAfterCleanup(remoteSnapshot, localSnapshot); + } finally { + metadataLock.writeLock().unlock(); + } + } + // pkg private for testing final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) { final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata); diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationTarget.java index a06af1b697ca7..ac8bea7deed8e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationTarget.java @@ -36,7 +36,11 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.store.Directory; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.BufferedChecksumIndexInput; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.ByteBuffersIndexInput; +import org.apache.lucene.store.ChecksumIndexInput; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; @@ -56,6 +60,7 @@ import org.opensearch.indices.replication.checkpoint.TransportCheckpointInfoResponse; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -138,7 +143,7 @@ public Store store() { return store; } - public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfo, ActionListener listener) { + public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfoResponse, ActionListener listener) { ActionListener.completeWith(listener, () -> { // first, we go and move files that were created with the recovery id suffix to // the actual names, its ok if we have a corrupted index here, since we have replicas @@ -147,7 +152,6 @@ public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfo, final Store store = store(); store.incRef(); try { - store.cleanupAndVerify("recovery CleanFilesRequestHandler", checkpointInfo.getSnapshot()); if (indexShard.getRetentionLeases().leases().isEmpty()) { // if empty, may be a fresh IndexShard, so write an empty leases file to disk indexShard.persistRetentionLeases(); @@ -155,13 +159,22 @@ public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfo, } else { assert indexShard.assertRetentionLeasesPersisted(); } - final long segmentsGen = checkpointInfo.getCheckpoint().getSegmentsGen(); - // force an fsync if we are receiving a new gen. - if (segmentsGen > indexShard.getLatestSegmentInfos().getGeneration()) { - final Directory directory = store().directory(); - directory.sync(Arrays.asList(directory.listAll())); - } - indexShard.updateCurrentInfos(segmentsGen, checkpointInfo.getInfosBytes(), checkpointInfo.getCheckpoint().getSeqNo()); + + // Deserialize the new SegmentInfos object sent primary the primary. + final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); + + SegmentInfos infos = SegmentInfos.readCommit( + store.directory(), + toIndexInput(checkpointInfoResponse.getInfosBytes()), + responseCheckpoint.getSegmentsGen() + ); + + // clean up the local store of old segment files + // and validate the latest segment infos against the snapshot sent from the primary shard. + store.cleanupAndVerify("finalizeReplication", checkpointInfoResponse.getSnapshot(), store.getMetadata(infos)); + + // Update the current infos reference on the Engine's reader. + indexShard.updateCurrentInfos(infos, responseCheckpoint.getSeqNo()); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are @@ -192,6 +205,16 @@ public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfo, }); } + /** + * This method formats our byte[] containing the primary's SegmentInfos into lucene's {@link ChecksumIndexInput} that can be + * passed to SegmentInfos.readCommit + */ + private ChecksumIndexInput toIndexInput(byte[] input) { + return new BufferedChecksumIndexInput( + new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(input))), "SegmentInfos") + ); + } + public long getReplicationId() { return replicationId; }