diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index ddb046139d5cd..d06dc6e20660b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -24,6 +24,7 @@ import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.CreatePitResponse; @@ -78,6 +79,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static java.util.Arrays.asList; @@ -266,6 +268,59 @@ public void testIndexReopenClose() throws Exception { verifyStoreContent(); } + public void testScrollWithConcurrentIndexAndSearch() throws Exception { + assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled()); + final String primary = internalCluster().startNode(); + final String replica = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + final List> pendingIndexResponses = new ArrayList<>(); + final List> pendingSearchResponse = new ArrayList<>(); + final int searchCount = randomIntBetween(10, 20); + final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values()); + + for (int i = 0; i < searchCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(refreshPolicy) + .setSource("field", "value" + i) + .execute() + ); + flush(INDEX_NAME); + forceMerge(); + } + + final SearchResponse searchResponse = client().prepareSearch() + .setQuery(matchAllQuery()) + .setIndices(INDEX_NAME) + .setRequestCache(false) + .setScroll(TimeValue.timeValueDays(1)) + .setSize(10) + .get(); + + for (int i = searchCount; i < searchCount * 2; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(refreshPolicy) + .setSource("field", "value" + i) + .execute() + ); + } + flush(INDEX_NAME); + forceMerge(); + client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get(); + + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + assertTrue(pendingSearchResponse.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + verifyStoreContent(); + waitForSearchableDocs(INDEX_NAME, 2 * searchCount, List.of(primary, replica)); + } + public void testMultipleShards() throws Exception { Settings indexSettings = Settings.builder() .put(super.indexSettings()) diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 626a57750f3f4..a7658401bc5de 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -125,21 +125,7 @@ private NRTReplicationReaderManager buildReaderManager() throws IOException { return new NRTReplicationReaderManager( OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId), store::incRefFileDeleter, - (files) -> { - store.decRefFileDeleter(files); - try { - store.cleanupAndPreserveLatestCommitPoint( - "On reader closed", - getLatestSegmentInfos(), - getLastCommittedSegmentInfos(), - false - ); - } catch (IOException e) { - // Log but do not rethrow - we can try cleaning up again after next replication cycle. - // If that were to fail, the shard will as well. - logger.error("Unable to clean store after reader closed", e); - } - } + store::decRefFileDeleter ); } @@ -148,9 +134,9 @@ public TranslogManager translogManager() { } public synchronized void updateSegments(final SegmentInfos infos) throws IOException { - // Update the current infos reference on the Engine's reader. - ensureOpen(); try (ReleasableLock lock = writeLock.acquire()) { + // Update the current infos reference on the Engine's reader. + ensureOpen(); final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO)); final long incomingGeneration = infos.getGeneration(); readerManager.updateSegments(infos); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java index 9ec484ebfd383..268ba1a436393 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java @@ -72,6 +72,8 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re for (LeafReaderContext ctx : standardDirectoryReader.leaves()) { subs.add(ctx.reader()); } + // Segment_n here is ignored because it is either already committed on disk as part of previous commit point or + // does not yet exist on store (not yet committed) final Collection files = currentInfos.files(false); DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null); final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper( diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 90832b4c77756..46e5e627ef415 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -50,6 +50,7 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.BufferedChecksum; +import org.apache.lucene.store.BufferedChecksumIndexInput; import org.apache.lucene.store.ByteArrayDataInput; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; @@ -64,7 +65,7 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.Version; import org.opensearch.ExceptionsHelper; -import org.opensearch.common.Nullable; +import org.opensearch.common.CheckedConsumer; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -789,69 +790,33 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr /** * Segment Replication method - * This method deletes every file in this store that is not referenced by the passed in SegmentInfos or - * part of the latest on-disk commit point. + * This method deletes files in store that are not referenced by latest on-disk commit point * - * This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file. - * In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk. * @param reason the reason for this cleanup operation logged for each deleted file - * @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present. - * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. - */ - public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException { - this.cleanupAndPreserveLatestCommitPoint(reason, infos, readLastCommittedSegmentsInfo(), true); - } - - /** - * Segment Replication method - * - * Similar to {@link Store#cleanupAndPreserveLatestCommitPoint(String, SegmentInfos)} with extra parameters for cleanup - * - * This method deletes every file in this store. Except - * 1. Files referenced by the passed in SegmentInfos, usually in-memory segment infos copied from primary - * 2. Files part of the passed in segment infos, typically the last committed segment info - * 3. Files incremented by active reader for pit/scroll queries - * 4. Temporary replication file if passed in deleteTempFiles is true. - * - * @param reason the reason for this cleanup operation logged for each deleted file - * @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present. - * @param lastCommittedSegmentInfos {@link SegmentInfos} Last committed segment infos - * @param deleteTempFiles Does this clean up delete temporary replication files + * @param fileToConsiderForCleanUp Files to consider for clean up. * - * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. + * @throws IOException Exception on locking. */ - public void cleanupAndPreserveLatestCommitPoint( - String reason, - SegmentInfos infos, - SegmentInfos lastCommittedSegmentInfos, - boolean deleteTempFiles - ) throws IOException { + public void cleanupAndPreserveLatestCommitPoint(Collection fileToConsiderForCleanUp, String reason) throws IOException { assert indexSettings.isSegRepEnabled(); // fetch a snapshot from the latest on disk Segments_N file. This can be behind // the passed in local in memory snapshot, so we want to ensure files it references are not removed. metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - cleanupFiles(reason, lastCommittedSegmentInfos.files(true), infos.files(true), deleteTempFiles); + cleanupFiles(fileToConsiderForCleanUp, reason, this.readLastCommittedSegmentsInfo().files(true)); } finally { metadataLock.writeLock().unlock(); } } - private void cleanupFiles( - String reason, - Collection localSnapshot, - @Nullable Collection additionalFiles, - boolean deleteTempFiles - ) throws IOException { + private void cleanupFiles(Collection filesToConsiderForCleanup, String reason, Collection lastCommittedSegmentInfos) { assert metadataLock.isWriteLockedByCurrentThread(); - for (String existingFile : directory.listAll()) { - if (Store.isAutogenerated(existingFile) - || localSnapshot != null && localSnapshot.contains(existingFile) - || (additionalFiles != null && additionalFiles.contains(existingFile)) - // also ensure we are not deleting a file referenced by an active reader. + for (String existingFile : filesToConsiderForCleanup) { + if (Store.isAutogenerated(existingFile) || lastCommittedSegmentInfos != null && lastCommittedSegmentInfos.contains(existingFile) + // also ensure we are not deleting a file referenced by an active reader. || replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false - // prevent temporary file deletion during reader cleanup - || deleteTempFiles == false && existingFile.startsWith(REPLICATION_PREFIX)) { + // Prevent temporary replication files as it should be cleaned up MultiFileWriter + || existingFile.startsWith(REPLICATION_PREFIX)) { // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete // checksum) continue; @@ -871,6 +836,53 @@ private void cleanupFiles( } } + /** + * Segment replication method + * + * This method takes the segment info bytes to build SegmentInfos. It inc'refs files pointed by passed in SegmentInfos + * bytes to ensure they are not deleted. + * + * @param tmpToFileName Map of temporary replication file to actual file name + * @param infosBytes bytes[] of SegmentInfos supposed to be sent over by primary excluding segment_N file + * @param segmentsGen segment generation number + * @param consumer consumer for generated SegmentInfos + * @throws IOException Exception while reading store and building segment infos + */ + public void buildInfosFromBytes( + Map tmpToFileName, + byte[] infosBytes, + long segmentsGen, + CheckedConsumer consumer + ) throws IOException { + metadataLock.writeLock().lock(); + try { + final List values = new ArrayList<>(tmpToFileName.values()); + incRefFileDeleter(values); + try { + renameTempFilesSafe(tmpToFileName); + consumer.accept(buildSegmentInfos(infosBytes, segmentsGen)); + } finally { + decRefFileDeleter(values); + } + } finally { + metadataLock.writeLock().unlock(); + } + } + + private SegmentInfos buildSegmentInfos(byte[] infosBytes, long segmentsGen) throws IOException { + try (final ChecksumIndexInput input = toIndexInput(infosBytes)) { + return SegmentInfos.readCommit(directory, input, segmentsGen); + } + } + + /** + * This method formats byte[] containing the primary's SegmentInfos into lucene's {@link ChecksumIndexInput} that can be + * passed to SegmentInfos.readCommit + */ + private ChecksumIndexInput toIndexInput(byte[] input) { + return new BufferedChecksumIndexInput(new ByteArrayIndexInput("Snapshot of SegmentInfos", input)); + } + // pkg private for testing final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) { final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata); @@ -945,7 +957,7 @@ public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, l latestSegmentInfos.commit(directory()); directory.sync(latestSegmentInfos.files(true)); directory.syncMetaData(); - cleanupAndPreserveLatestCommitPoint("After commit", latestSegmentInfos); + cleanupAndPreserveLatestCommitPoint(List.of(this.directory.listAll()), "After commit"); } finally { metadataLock.writeLock().unlock(); } @@ -1961,6 +1973,13 @@ public void incRefFileDeleter(Collection files) { public void decRefFileDeleter(Collection files) { if (this.indexSettings.isSegRepEnabled()) { this.replicaFileTracker.decRef(files); + try { + this.cleanupAndPreserveLatestCommitPoint(files, "On reader close"); + } catch (IOException e) { + // Log but do not rethrow - we can try cleaning up again after next replication cycle. + // If that were to fail, the shard will as well. + logger.error("Unable to clean store after reader closed", e); + } } } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java index 4f9db27ffc9db..c27852b27960b 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java +++ b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java @@ -97,6 +97,10 @@ String getTempNameForFile(String origFile) { return tempFilePrefix + origFile; } + public Map getTempFileNames() { + return tempFileNames; + } + public IndexOutput getOpenIndexOutput(String key) { ensureOpen.run(); return openIndexOutputs.get(key); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 22c68ad46fea6..9d724d6cc9dcf 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -21,6 +21,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; +import org.opensearch.common.CheckedConsumer; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.lucene.Lucene; @@ -221,18 +222,15 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); Store store = null; try { - multiFileWriter.renameAllTempFiles(); store = store(); store.incRef(); - // Deserialize the new SegmentInfos object sent from the primary. - final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); - SegmentInfos infos = SegmentInfos.readCommit( - store.directory(), - toIndexInput(checkpointInfoResponse.getInfosBytes()), - responseCheckpoint.getSegmentsGen() + CheckedConsumer finalizeReplication = indexShard::finalizeReplication; + store.buildInfosFromBytes( + multiFileWriter.getTempFileNames(), + checkpointInfoResponse.getInfosBytes(), + checkpointInfoResponse.getCheckpoint().getSegmentsGen(), + finalizeReplication ); - cancellableThreads.checkForCancel(); - indexShard.finalizeReplication(infos); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index d0d63dc96a86a..d9eac1c577e0f 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -157,6 +158,38 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti } } + public void testSimultaneousEngineCloseAndCommit() throws IOException, InterruptedException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try ( + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) + ) { + CountDownLatch latch = new CountDownLatch(1); + Thread commitThread = new Thread(() -> { + try { + nrtEngine.updateSegments(store.readLastCommittedSegmentsInfo()); + latch.countDown(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + Thread closeThread = new Thread(() -> { + try { + latch.await(); + nrtEngine.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + commitThread.start(); + closeThread.start(); + commitThread.join(); + closeThread.join(); + } + } + public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 0c859c5f6a64a..b3876a8ea8fd0 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -13,13 +13,14 @@ import org.junit.Assert; import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.flush.FlushRequest; +import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingHelper; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.settings.ClusterSettings; @@ -30,6 +31,7 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.DocIdSeqNoAndSource; +import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.engine.NRTReplicationEngine; @@ -63,6 +65,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -70,7 +74,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Arrays.asList; import static org.hamcrest.Matchers.containsString; @@ -78,6 +85,7 @@ import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -169,6 +177,186 @@ public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException closeShards(indexShard); } + /** + * This test mimics the segment replication failure due to CorruptIndexException exception which happens when + * reader close operation on replica shard deletes the segment files copied in current round of segment replication. + * It does this by blocking the finalizeReplication on replica shard and performing close operation on acquired + * searcher that triggers the reader close operation. + * @throws Exception + */ + public void testSegmentReplication_With_ReaderClosedConcurrently() throws Exception { + String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; + try (ReplicationGroup shards = createGroup(1, settings, mappings, new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primaryShard = shards.getPrimary(); + final IndexShard replicaShard = shards.getReplicas().get(0); + + // Step 1. Ingest numDocs documents & replicate to replica shard + final int numDocs = randomIntBetween(100, 200); + logger.info("--> Inserting documents {}", numDocs); + for (int i = 0; i < numDocs; i++) { + shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); + } + assertEqualTranslogOperations(shards, primaryShard); + primaryShard.refresh("Test"); + primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + replicateSegments(primaryShard, shards.getReplicas()); + + IndexShard spyShard = spy(replicaShard); + Engine.Searcher test = replicaShard.getEngine().acquireSearcher("testSegmentReplication_With_ReaderClosedConcurrently"); + shards.assertAllEqual(numDocs); + + // Step 2. Ingest numDocs documents again & replicate to replica shard + logger.info("--> Ingest {} docs again", numDocs); + for (int i = 0; i < numDocs; i++) { + shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); + } + assertEqualTranslogOperations(shards, primaryShard); + primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + replicateSegments(primaryShard, shards.getReplicas()); + + // Step 3. Perform force merge down to 1 segment on primary + primaryShard.forceMerge(new ForceMergeRequest().maxNumSegments(1).flush(true)); + logger.info("--> primary store after force merge {}", Arrays.toString(primaryShard.store().directory().listAll())); + // Perform close on searcher before IndexShard::finalizeReplication + doAnswer(n -> { + test.close(); + n.callRealMethod(); + return null; + }).when(spyShard).finalizeReplication(any()); + replicateSegments(primaryShard, List.of(spyShard)); + shards.assertAllEqual(numDocs); + } + } + + /** + * Similar to test above, this test shows the issue where an engine close operation during active segment replication + * can result in Lucene CorruptIndexException. + * @throws Exception + */ + public void testSegmentReplication_With_EngineClosedConcurrently() throws Exception { + String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; + try (ReplicationGroup shards = createGroup(1, settings, mappings, new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primaryShard = shards.getPrimary(); + final IndexShard replicaShard = shards.getReplicas().get(0); + + // Step 1. Ingest numDocs documents + final int numDocs = randomIntBetween(100, 200); + logger.info("--> Inserting documents {}", numDocs); + for (int i = 0; i < numDocs; i++) { + shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); + } + assertEqualTranslogOperations(shards, primaryShard); + primaryShard.refresh("Test"); + primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + replicateSegments(primaryShard, shards.getReplicas()); + shards.assertAllEqual(numDocs); + + // Step 2. Ingest numDocs documents again to create a new commit + logger.info("--> Ingest {} docs again", numDocs); + for (int i = 0; i < numDocs; i++) { + shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); + } + assertEqualTranslogOperations(shards, primaryShard); + primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + logger.info("--> primary store after final flush {}", Arrays.toString(primaryShard.store().directory().listAll())); + + // Step 3. Before replicating segments, block finalizeReplication and perform engine commit directly that + // cleans up recently copied over files + IndexShard spyShard = spy(replicaShard); + doAnswer(n -> { + NRTReplicationEngine engine = (NRTReplicationEngine) replicaShard.getEngine(); + // Using engine.close() prevents indexShard.finalizeReplication execution due to engine AlreadyClosedException, + // thus as workaround, use updateSegments which eventually calls commitSegmentInfos on latest segment infos. + engine.updateSegments(engine.getSegmentInfosSnapshot().get()); + n.callRealMethod(); + return null; + }).when(spyShard).finalizeReplication(any()); + replicateSegments(primaryShard, List.of(spyShard)); + shards.assertAllEqual(numDocs); + } + } + + /** + * Verifies that commits on replica engine resulting from engine or reader close does not cleanup the temporary + * replication files from ongoing round of segment replication + * @throws Exception + */ + public void testTemporaryFilesNotCleanup() throws Exception { + String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; + try (ReplicationGroup shards = createGroup(1, settings, mappings, new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primaryShard = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + // Step 1. Ingest numDocs documents, commit to create commit point on primary & replicate + final int numDocs = randomIntBetween(100, 200); + logger.info("--> Inserting documents {}", numDocs); + for (int i = 0; i < numDocs; i++) { + shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); + } + assertEqualTranslogOperations(shards, primaryShard); + primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + replicateSegments(primaryShard, shards.getReplicas()); + shards.assertAllEqual(numDocs); + + // Step 2. Ingest numDocs documents again to create a new commit on primary + logger.info("--> Ingest {} docs again", numDocs); + for (int i = 0; i < numDocs; i++) { + shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); + } + assertEqualTranslogOperations(shards, primaryShard); + primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + + // Step 3. Copy segment files to replica shard but prevent commit + final CountDownLatch countDownLatch = new CountDownLatch(1); + Map primaryMetadata; + try (final GatedCloseable segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) { + final SegmentInfos primarySegmentInfos = segmentInfosSnapshot.get(); + primaryMetadata = primaryShard.store().getSegmentMetadataMap(primarySegmentInfos); + } + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final IndicesService indicesService = mock(IndicesService.class); + when(indicesService.getShardOrNull(replica.shardId)).thenReturn(replica); + final SegmentReplicationTargetService targetService = new SegmentReplicationTargetService( + threadPool, + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), + mock(TransportService.class), + sourceFactory, + indicesService, + clusterService + ); + final Consumer runnablePostGetFiles = (indexShard) -> { + try { + Collection temporaryFiles = Stream.of(indexShard.store().directory().listAll()) + .filter(name -> name.startsWith(SegmentReplicationTarget.REPLICATION_PREFIX)) + .collect(Collectors.toList()); + + // Step 4. Perform a commit on replica shard. + NRTReplicationEngine engine = (NRTReplicationEngine) indexShard.getEngine(); + engine.updateSegments(engine.getSegmentInfosSnapshot().get()); + + // Step 5. Validate temporary files are not deleted from store. + Collection replicaStoreFiles = List.of(indexShard.store().directory().listAll()); + assertTrue(replicaStoreFiles.containsAll(temporaryFiles)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + SegmentReplicationSource segmentReplicationSource = getSegmentReplicationSource( + primaryShard, + (repId) -> targetService.get(repId), + runnablePostGetFiles + ); + when(sourceFactory.get(any())).thenReturn(segmentReplicationSource); + targetService.startReplication(replica, getTargetListener(primaryShard, replica, primaryMetadata, countDownLatch)); + countDownLatch.await(30, TimeUnit.SECONDS); + assertEquals("Replication failed", 0, countDownLatch.getCount()); + shards.assertAllEqual(numDocs); + } + } + public void testSegmentReplication_Index_Update_Delete() throws Exception { String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) { @@ -296,13 +484,7 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException { public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { IndexShard primaryShard = newStartedShard(true); SegmentReplicationTargetService sut; - sut = prepareForReplication( - primaryShard, - null, - mock(TransportService.class), - mock(IndicesService.class), - mock(ClusterService.class) - ); + sut = prepareForReplication(primaryShard, null); SegmentReplicationTargetService spy = spy(sut); // Starting a new shard in PrimaryMode and shard routing primary. diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index b11e8554027b1..d9493e63fd34c 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -100,6 +100,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -108,6 +109,8 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.unmodifiableMap; import static org.hamcrest.Matchers.anyOf; @@ -1191,8 +1194,12 @@ public void testCleanupAndPreserveLatestCommitPoint() throws IOException { } assertFalse(additionalSegments.isEmpty()); + Collection filesToConsiderForCleanUp = Stream.of(store.readLastCommittedSegmentsInfo().files(true), additionalSegments) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + // clean up everything not in the latest commit point. - store.cleanupAndPreserveLatestCommitPoint("test", store.readLastCommittedSegmentsInfo()); + store.cleanupAndPreserveLatestCommitPoint(filesToConsiderForCleanUp, "test"); // we want to ensure commitMetadata files are preserved after calling cleanup for (String existingFile : store.directory().listAll()) { diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index f5af84bb9f128..984605007dad7 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1330,15 +1330,22 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) { * Segment Replication specific test method - Creates a {@link SegmentReplicationTargetService} to perform replications that has * been configured to return the given primaryShard's current segments. * - * @param primaryShard {@link IndexShard} - The primary shard to replicate from. - * @param target {@link IndexShard} - The target replica shard in segment replication. + * @param primaryShard {@link IndexShard} - The target replica shard in segment replication. + * @param target {@link IndexShard} - The source primary shard in segment replication. + * @param transportService {@link TransportService} - Transport service to be used on target + * @param indicesService {@link IndicesService} - The indices service to be used on target + * @param clusterService {@link ClusterService} - The cluster service to be used on target + * @param postGetFilesRunnable - Consumer which is executed after file copy operation. This can be used to stub operations + * which are desired right after files are copied. e.g. To work with temp files + * @return Returns SegmentReplicationTargetService */ public final SegmentReplicationTargetService prepareForReplication( IndexShard primaryShard, IndexShard target, TransportService transportService, IndicesService indicesService, - ClusterService clusterService + ClusterService clusterService, + Consumer postGetFilesRunnable ) { final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); final SegmentReplicationTargetService targetService = new SegmentReplicationTargetService( @@ -1349,7 +1356,102 @@ public final SegmentReplicationTargetService prepareForReplication( indicesService, clusterService ); - final SegmentReplicationSource replicationSource = new TestReplicationSource() { + final SegmentReplicationSource replicationSource = getSegmentReplicationSource( + primaryShard, + (repId) -> targetService.get(repId), + postGetFilesRunnable + ); + when(sourceFactory.get(any())).thenReturn(replicationSource); + when(indicesService.getShardOrNull(any())).thenReturn(target); + return targetService; + } + + /** + * Segment Replication specific test method - Creates a {@link SegmentReplicationTargetService} to perform replications that has + * been configured to return the given primaryShard's current segments. + * + * @param primaryShard {@link IndexShard} - The primary shard to replicate from. + * @param target {@link IndexShard} - The target replica shard. + * @return Returns SegmentReplicationTargetService + */ + public final SegmentReplicationTargetService prepareForReplication(IndexShard primaryShard, IndexShard target) { + return prepareForReplication( + primaryShard, + target, + mock(TransportService.class), + mock(IndicesService.class), + mock(ClusterService.class), + (indexShard) -> {} + ); + } + + public final SegmentReplicationTargetService prepareForReplication( + IndexShard primaryShard, + IndexShard target, + TransportService transportService, + IndicesService indicesService, + ClusterService clusterService + ) { + return prepareForReplication(primaryShard, target, transportService, indicesService, clusterService, (indexShard) -> {}); + } + + /** + * Get listener on started segment replication event which verifies replica shard store with primary's after completion + * @param primaryShard - source of segment replication + * @param replicaShard - target of segment replication + * @param primaryMetadata - primary shard metadata before start of segment replication + * @param latch - Latch which allows consumers of this utility to ensure segment replication completed successfully + * @return Returns SegmentReplicationTargetService.SegmentReplicationListener + */ + public SegmentReplicationTargetService.SegmentReplicationListener getTargetListener( + IndexShard primaryShard, + IndexShard replicaShard, + Map primaryMetadata, + CountDownLatch latch + ) { + return new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + try (final GatedCloseable snapshot = replicaShard.getSegmentInfosSnapshot()) { + final SegmentInfos replicaInfos = snapshot.get(); + final Map replicaMetadata = replicaShard.store().getSegmentMetadataMap(replicaInfos); + final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff(primaryMetadata, replicaMetadata); + assertTrue(recoveryDiff.missing.isEmpty()); + assertTrue(recoveryDiff.different.isEmpty()); + assertEquals(recoveryDiff.identical.size(), primaryMetadata.size()); + primaryShard.updateVisibleCheckpointForShard( + replicaShard.routingEntry().allocationId().getId(), + primaryShard.getLatestReplicationCheckpoint() + ); + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } finally { + latch.countDown(); + } + } + + @Override + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { + logger.error("Unexpected replication failure in test", e); + Assert.fail("test replication should not fail: " + e); + } + }; + } + + /** + * Utility method which creates a segment replication source, which copies files from primary shard to target shard + * @param primaryShard Primary IndexShard - source of segment replication + * @param getTargetFunc - provides replication target from target service using replication id + * @param postGetFilesRunnable - Consumer which is executed after file copy operation. This can be used to stub operations + * which are desired right after files are copied. e.g. To work with temp files + * @return Return SegmentReplicationSource + */ + public SegmentReplicationSource getSegmentReplicationSource( + IndexShard primaryShard, + Function> getTargetFunc, + Consumer postGetFilesRunnable + ) { + return new TestReplicationSource() { @Override public void getCheckpointMetadata( long replicationId, @@ -1380,18 +1482,16 @@ public void getSegmentFiles( ActionListener listener ) { try ( - final ReplicationCollection.ReplicationRef replicationRef = targetService.get(replicationId) + final ReplicationCollection.ReplicationRef replicationRef = getTargetFunc.apply(replicationId) ) { writeFileChunks(replicationRef.get(), primaryShard, filesToFetch.toArray(new StoreFileMetadata[] {})); } catch (IOException e) { listener.onFailure(e); } + postGetFilesRunnable.accept(indexShard); listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } }; - when(sourceFactory.get(any())).thenReturn(replicationSource); - when(indicesService.getShardOrNull(any())).thenReturn(target); - return targetService; } /** @@ -1412,46 +1512,10 @@ public final List replicateSegments(IndexShard primary } List ids = new ArrayList<>(); for (IndexShard replica : replicaShards) { - final SegmentReplicationTargetService targetService = prepareForReplication( - primaryShard, - replica, - mock(TransportService.class), - mock(IndicesService.class), - mock(ClusterService.class) - ); + final SegmentReplicationTargetService targetService = prepareForReplication(primaryShard, replica); final SegmentReplicationTarget target = targetService.startReplication( replica, - new SegmentReplicationTargetService.SegmentReplicationListener() { - @Override - public void onReplicationDone(SegmentReplicationState state) { - try (final GatedCloseable snapshot = replica.getSegmentInfosSnapshot()) { - final SegmentInfos replicaInfos = snapshot.get(); - final Map replicaMetadata = replica.store().getSegmentMetadataMap(replicaInfos); - final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff(primaryMetadata, replicaMetadata); - assertTrue(recoveryDiff.missing.isEmpty()); - assertTrue(recoveryDiff.different.isEmpty()); - assertEquals(recoveryDiff.identical.size(), primaryMetadata.size()); - primaryShard.updateVisibleCheckpointForShard( - replica.routingEntry().allocationId().getId(), - primaryShard.getLatestReplicationCheckpoint() - ); - } catch (Exception e) { - throw ExceptionsHelper.convertToRuntime(e); - } finally { - countDownLatch.countDown(); - } - } - - @Override - public void onReplicationFailure( - SegmentReplicationState state, - ReplicationFailedException e, - boolean sendShardFailure - ) { - logger.error("Unexpected replication failure in test", e); - Assert.fail("test replication should not fail: " + e); - } - } + getTargetListener(primaryShard, replica, primaryMetadata, countDownLatch) ); ids.add(target); }