From 386cb454471734cc61cd06a7796c98da2b8d418b Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 5 Jul 2023 17:22:21 -0700 Subject: [PATCH 01/10] [Segment Replication] Prevent store clean up on reader close action Signed-off-by: Suraj Singh --- .../replication/SegmentReplicationIT.java | 102 ++++++++++++++ .../index/engine/NRTReplicationEngine.java | 11 +- .../org/opensearch/index/store/Store.java | 68 ++++++++++ .../indices/recovery/MultiFileWriter.java | 4 + .../replication/SegmentReplicationTarget.java | 16 +-- .../engine/NRTReplicationEngineTests.java | 32 +++++ .../SegmentReplicationIndexShardTests.java | 127 ++++++++++++++++++ 7 files changed, 343 insertions(+), 17 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 86794dd5ee811..432c74e4152ff 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -25,6 +25,7 @@ import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.CreatePitAction; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.CreatePitResponse; @@ -79,6 +80,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static java.util.Arrays.asList; @@ -272,6 +274,106 @@ public void testIndexReopenClose() throws Exception { verifyStoreContent(); } + public void testConcurrentIndexAndSearch() throws Exception { + final String primary = internalCluster().startDataOnlyNode(); + final String replica = internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + + final List> pendingIndexResponses = new ArrayList<>(); + final List> pendingSearchResponse = new ArrayList<>(); + final int searchCount = randomIntBetween(100, 200); + final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values()); + + for (int i = 0; i < searchCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(refreshPolicy) + .setSource("field", "value" + i) + .execute() + ); + flush(INDEX_NAME); + forceMerge(); + } + + final SearchResponse searchResponse = client().prepareSearch() + .setQuery(matchAllQuery()) + .setIndices(INDEX_NAME) + .setRequestCache(false) + .setScroll(TimeValue.timeValueDays(1)) + .setSize(10) + .get(); + + for (int i = searchCount; i < searchCount * 2; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(refreshPolicy) + .setSource("field", "value" + i) + .execute() + ); + } + flush(INDEX_NAME); + forceMerge(); + client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get(); + + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + assertTrue(pendingSearchResponse.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + logger.info("--> Cluster state {}", client().admin().cluster().prepareState().execute().actionGet().getState()); + verifyStoreContent(); + } + + public void testScrollWithConcurrentIndexAndSearch() throws Exception { + final List> pendingIndexResponses = new ArrayList<>(); + final List> pendingSearchResponse = new ArrayList<>(); + final int searchCount = randomIntBetween(100, 200); + final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values()); + + for (int i = 0; i < searchCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(refreshPolicy) + .setSource("field", "value" + i) + .execute() + ); + flush(INDEX_NAME); + forceMerge(); + } + + final SearchResponse searchResponse = client().prepareSearch() + .setQuery(matchAllQuery()) + .setIndices(INDEX_NAME) + .setRequestCache(false) + .setScroll(TimeValue.timeValueDays(1)) + .setSize(10) + .get(); + + for (int i = searchCount; i < searchCount * 2; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(refreshPolicy) + .setSource("field", "value" + i) + .execute() + ); + } + flush(INDEX_NAME); + forceMerge(); + client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get(); + + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + assertTrue(pendingSearchResponse.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + verifyStoreContent(); + } + public void testMultipleShards() throws Exception { Settings indexSettings = Settings.builder() .put(super.indexSettings()) diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 50b5fbb8596a6..d73b3f65150ae 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -126,12 +126,7 @@ private NRTReplicationReaderManager buildReaderManager() throws IOException { (files) -> { store.decRefFileDeleter(files); try { - store.cleanupAndPreserveLatestCommitPoint( - "On reader closed", - getLatestSegmentInfos(), - getLastCommittedSegmentInfos(), - false - ); + store.cleanupUnReferencedFiles("On reader closed", files); } catch (IOException e) { // Log but do not rethrow - we can try cleaning up again after next replication cycle. // If that were to fail, the shard will as well. @@ -147,9 +142,9 @@ public TranslogManager translogManager() { } public synchronized void updateSegments(final SegmentInfos infos) throws IOException { - // Update the current infos reference on the Engine's reader. - ensureOpen(); try (ReleasableLock lock = writeLock.acquire()) { + // Update the current infos reference on the Engine's reader. + ensureOpen(); final long maxSeqNo = Long.parseLong(infos.userData.get(MAX_SEQ_NO)); final long incomingGeneration = infos.getGeneration(); readerManager.updateSegments(infos); diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 90832b4c77756..928e897b44e14 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -50,6 +50,7 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.BufferedChecksum; +import org.apache.lucene.store.BufferedChecksumIndexInput; import org.apache.lucene.store.ByteArrayDataInput; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; @@ -64,6 +65,7 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.Version; import org.opensearch.ExceptionsHelper; +import org.opensearch.common.CheckedConsumer; import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; @@ -837,6 +839,25 @@ public void cleanupAndPreserveLatestCommitPoint( } } + /** + * Segment Replication method + * + * Performs cleanup of un-referenced files intended to be used reader release action + * + * @param reason Reason for cleanup + * @param filesToConsider Files to consider for clean up + * @throws IOException Exception from cleanup operation + */ + public void cleanupUnReferencedFiles(String reason, Collection filesToConsider) throws IOException { + assert indexSettings.isSegRepEnabled(); + metadataLock.writeLock().lock(); + try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + cleanupFiles(reason, null, filesToConsider, false); + } finally { + metadataLock.writeLock().unlock(); + } + } + private void cleanupFiles( String reason, Collection localSnapshot, @@ -871,6 +892,53 @@ private void cleanupFiles( } } + /** + * Used for segment replication method + * + * This method takes the segment info bytes to build SegmentInfos. It inc'refs files pointed by passed in SegmentInfos + * bytes to ensure they are not deleted. + * + * @param tmpToFileName Map of temporary replication file to actual file name + * @param infosBytes bytes[] of SegmentInfos supposed to be sent over by primary excluding segment_N file + * @param segmentsGen segment generation number + * @param consumer consumer for generated SegmentInfos + * @throws IOException Exception while reading store and building segment infos + */ + public void buildInfosFromStore( + Map tmpToFileName, + byte[] infosBytes, + long segmentsGen, + CheckedConsumer consumer + ) throws IOException { + metadataLock.writeLock().lock(); + try { + final List values = new ArrayList<>(tmpToFileName.values()); + incRefFileDeleter(values); + try { + renameTempFilesSafe(tmpToFileName); + consumer.accept(buildSegmentInfos(infosBytes, segmentsGen)); + } finally { + decRefFileDeleter(values); + } + } finally { + metadataLock.writeLock().unlock(); + } + } + + private SegmentInfos buildSegmentInfos(byte[] infosBytes, long segmentsGen) throws IOException { + try (final ChecksumIndexInput input = toIndexInput(infosBytes)) { + return SegmentInfos.readCommit(directory, input, segmentsGen); + } + } + + /** + * This method formats byte[] containing the primary's SegmentInfos into lucene's {@link ChecksumIndexInput} that can be + * passed to SegmentInfos.readCommit + */ + private ChecksumIndexInput toIndexInput(byte[] input) { + return new BufferedChecksumIndexInput(new ByteArrayIndexInput("Snapshot of SegmentInfos", input)); + } + // pkg private for testing final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) { final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata); diff --git a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java index 4f9db27ffc9db..c27852b27960b 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java +++ b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java @@ -97,6 +97,10 @@ String getTempNameForFile(String origFile) { return tempFilePrefix + origFile; } + public Map getTempFileNames() { + return tempFileNames; + } + public IndexOutput getOpenIndexOutput(String key) { ensureOpen.run(); return openIndexOutputs.get(key); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 22c68ad46fea6..25f8cdf77007d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -21,6 +21,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; +import org.opensearch.common.CheckedConsumer; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.lucene.Lucene; @@ -221,18 +222,15 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); Store store = null; try { - multiFileWriter.renameAllTempFiles(); store = store(); store.incRef(); - // Deserialize the new SegmentInfos object sent from the primary. - final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); - SegmentInfos infos = SegmentInfos.readCommit( - store.directory(), - toIndexInput(checkpointInfoResponse.getInfosBytes()), - responseCheckpoint.getSegmentsGen() + CheckedConsumer finalizeReplication = indexShard::finalizeReplication; + store.buildInfosFromStore( + multiFileWriter.getTempFileNames(), + checkpointInfoResponse.getInfosBytes(), + checkpointInfoResponse.getCheckpoint().getSegmentsGen(), + finalizeReplication ); - cancellableThreads.checkForCancel(); - indexShard.finalizeReplication(infos); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 3e1112ae3069b..0847d183ea5de 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -160,6 +160,38 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti } } + public void testSimultaneousEngineCloseAndCommit() throws IOException, InterruptedException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try ( + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) + ) { + CountDownLatch latch = new CountDownLatch(1); + Thread commitThread = new Thread(() -> { + try { + nrtEngine.updateSegments(store.readLastCommittedSegmentsInfo()); + latch.countDown(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + Thread closeThread = new Thread(() -> { + try { + latch.await(); + nrtEngine.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + commitThread.start(); + closeThread.start(); + commitThread.join(); + closeThread.join(); + } + } + public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 0c859c5f6a64a..dcc23858fde48 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -13,6 +13,8 @@ import org.junit.Assert; import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.flush.FlushRequest; +import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.PlainActionFuture; @@ -30,6 +32,7 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.DocIdSeqNoAndSource; +import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.engine.NRTReplicationEngine; @@ -63,6 +66,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -78,6 +82,7 @@ import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -169,6 +174,107 @@ public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException closeShards(indexShard); } + /** + * This test mimics the segment replication failure due to CorruptIndexException exception which happens when + * reader close operation on replica shard deletes the segment files copied in current round of segment replication. + * It does this by blocking the finalizeReplication on replica shard and performing close operation on acquired + * searcher that triggers the reader close operation. + * @throws Exception + */ + public void testSegmentReplication_With_ReaderClosedConcurrently() throws Exception { + String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; + try (ReplicationGroup shards = createGroup(1, settings, mappings, new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primaryShard = shards.getPrimary(); + final IndexShard replicaShard = shards.getReplicas().get(0); + + // Step 1. Ingest numDocs documents & replicate to replica shard + final int numDocs = randomIntBetween(100, 200); + logger.info("--> Inserting documents {}", numDocs); + for (int i = 0; i < numDocs; i++) { + shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); + } + assertEqualTranslogOperations(shards, primaryShard); + primaryShard.refresh("Test"); + primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + replicateSegments(primaryShard, shards.getReplicas()); + + IndexShard spyShard = spy(replicaShard); + Engine.Searcher test = replicaShard.getEngine().acquireSearcher("testSegmentReplication_With_ReaderClosedConcurrently"); + shards.assertAllEqual(numDocs); + + // Step 2. Ingest numDocs documents again & replicate to replica shard + logger.info("--> Ingest {} docs again", numDocs); + for (int i = 0; i < numDocs; i++) { + shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); + } + assertEqualTranslogOperations(shards, primaryShard); + primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + replicateSegments(primaryShard, shards.getReplicas()); + + // Step 3. Perform force merge down to 1 segment on primary + primaryShard.forceMerge(new ForceMergeRequest().maxNumSegments(1).flush(true)); + logger.info("--> primary store after force merge {}", Arrays.toString(primaryShard.store().directory().listAll())); + // Perform close on searcher before IndexShard::finalizeReplication + doAnswer(n -> { + test.close(); + n.callRealMethod(); + return null; + }).when(spyShard).finalizeReplication(any()); + replicateSegments(primaryShard, List.of(spyShard)); + shards.assertAllEqual(numDocs); + } + } + + /** + * Similar to test above, this test shows the issue where an engine close operation during active segment replication + * can result in Lucene CorruptIndexException. + * @throws Exception + */ + public void testSegmentReplication_With_EngineClosedConcurrently() throws Exception { + String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; + try (ReplicationGroup shards = createGroup(1, settings, mappings, new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primaryShard = shards.getPrimary(); + final IndexShard replicaShard = shards.getReplicas().get(0); + + // Step 1. Ingest numDocs documents + final int numDocs = randomIntBetween(100, 200); + logger.info("--> Inserting documents {}", numDocs); + for (int i = 0; i < numDocs; i++) { + shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); + } + assertEqualTranslogOperations(shards, primaryShard); + primaryShard.refresh("Test"); + primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + replicateSegments(primaryShard, shards.getReplicas()); + shards.assertAllEqual(numDocs); + + // Step 2. Ingest numDocs documents again to create a new commit + logger.info("--> Ingest {} docs again", numDocs); + for (int i = 0; i < numDocs; i++) { + shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); + } + assertEqualTranslogOperations(shards, primaryShard); + primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + logger.info("--> primary store after final flush {}", Arrays.toString(primaryShard.store().directory().listAll())); + + // Step 3. Before replicating segments, block finalizeReplication and perform engine commit directly that + // cleans up recently copied over files + IndexShard spyShard = spy(replicaShard); + doAnswer(n -> { + NRTReplicationEngine engine = (NRTReplicationEngine) replicaShard.getEngine(); + // Using engine.close() prevents indexShard.finalizeReplication execution due to engine AlreadyClosedException, + // thus as workaround, use updateSegments which eventually calls commitSegmentInfos on latest segment infos. + engine.updateSegments(engine.getSegmentInfosSnapshot().get()); + n.callRealMethod(); + return null; + }).when(spyShard).finalizeReplication(any()); + replicateSegments(primaryShard, List.of(spyShard)); + shards.assertAllEqual(numDocs); + } + } + public void testSegmentReplication_Index_Update_Delete() throws Exception { String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) { @@ -974,6 +1080,27 @@ public void getSegmentFiles( } } + public void testReplicaClosesWhile_NotReplicating() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + final int numDocs = shards.indexDocs(randomInt(10)); + primary.refresh("Test"); + replicateSegments(primary, shards.getReplicas()); + + logger.info("--> PrimaryStore {}", Arrays.toString(primary.store().directory().listAll())); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); + + shards.removeReplica(replica); + closeShards(replica); + } + } + public void testPrimaryCancelsExecution() throws Exception { try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { shards.startAll(); From 0ef1de03ef1a103c1bd12f0cee25ede09cfaf54f Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 5 Jul 2023 17:43:36 -0700 Subject: [PATCH 02/10] [Segment Replication] Self review Signed-off-by: Suraj Singh --- .../replication/SegmentReplicationIT.java | 54 ++----------------- .../org/opensearch/index/store/Store.java | 21 +++----- .../engine/NRTReplicationEngineTests.java | 1 + .../SegmentReplicationIndexShardTests.java | 21 -------- 4 files changed, 12 insertions(+), 85 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 432c74e4152ff..e4489e9b82640 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -274,63 +274,14 @@ public void testIndexReopenClose() throws Exception { verifyStoreContent(); } - public void testConcurrentIndexAndSearch() throws Exception { + public void testScrollWithConcurrentIndexAndSearch() throws Exception { final String primary = internalCluster().startDataOnlyNode(); final String replica = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); - - final List> pendingIndexResponses = new ArrayList<>(); - final List> pendingSearchResponse = new ArrayList<>(); - final int searchCount = randomIntBetween(100, 200); - final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values()); - - for (int i = 0; i < searchCount; i++) { - pendingIndexResponses.add( - client().prepareIndex(INDEX_NAME) - .setId(Integer.toString(i)) - .setRefreshPolicy(refreshPolicy) - .setSource("field", "value" + i) - .execute() - ); - flush(INDEX_NAME); - forceMerge(); - } - - final SearchResponse searchResponse = client().prepareSearch() - .setQuery(matchAllQuery()) - .setIndices(INDEX_NAME) - .setRequestCache(false) - .setScroll(TimeValue.timeValueDays(1)) - .setSize(10) - .get(); - - for (int i = searchCount; i < searchCount * 2; i++) { - pendingIndexResponses.add( - client().prepareIndex(INDEX_NAME) - .setId(Integer.toString(i)) - .setRefreshPolicy(refreshPolicy) - .setSource("field", "value" + i) - .execute() - ); - } - flush(INDEX_NAME); - forceMerge(); - client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get(); - - assertBusy(() -> { - client().admin().indices().prepareRefresh().execute().actionGet(); - assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); - assertTrue(pendingSearchResponse.stream().allMatch(ActionFuture::isDone)); - }, 1, TimeUnit.MINUTES); - logger.info("--> Cluster state {}", client().admin().cluster().prepareState().execute().actionGet().getState()); - verifyStoreContent(); - } - - public void testScrollWithConcurrentIndexAndSearch() throws Exception { final List> pendingIndexResponses = new ArrayList<>(); final List> pendingSearchResponse = new ArrayList<>(); - final int searchCount = randomIntBetween(100, 200); + final int searchCount = randomIntBetween(10, 20); final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values()); for (int i = 0; i < searchCount; i++) { @@ -372,6 +323,7 @@ public void testScrollWithConcurrentIndexAndSearch() throws Exception { assertTrue(pendingSearchResponse.stream().allMatch(ActionFuture::isDone)); }, 1, TimeUnit.MINUTES); verifyStoreContent(); + waitForSearchableDocs(INDEX_NAME, 2 * searchCount, List.of(primary, replica)); } public void testMultipleShards() throws Exception { diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 928e897b44e14..62bb6050900df 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -801,7 +801,7 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException { - this.cleanupAndPreserveLatestCommitPoint(reason, infos, readLastCommittedSegmentsInfo(), true); + this.cleanupAndPreserveLatestCommitPoint(reason, infos, readLastCommittedSegmentsInfo()); } /** @@ -818,22 +818,20 @@ public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos info * @param reason the reason for this cleanup operation logged for each deleted file * @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present. * @param lastCommittedSegmentInfos {@link SegmentInfos} Last committed segment infos - * @param deleteTempFiles Does this clean up delete temporary replication files * * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ public void cleanupAndPreserveLatestCommitPoint( String reason, SegmentInfos infos, - SegmentInfos lastCommittedSegmentInfos, - boolean deleteTempFiles + SegmentInfos lastCommittedSegmentInfos ) throws IOException { assert indexSettings.isSegRepEnabled(); // fetch a snapshot from the latest on disk Segments_N file. This can be behind // the passed in local in memory snapshot, so we want to ensure files it references are not removed. metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - cleanupFiles(reason, lastCommittedSegmentInfos.files(true), infos.files(true), deleteTempFiles); + cleanupFiles(reason, lastCommittedSegmentInfos.files(true), infos.files(true)); } finally { metadataLock.writeLock().unlock(); } @@ -842,7 +840,7 @@ public void cleanupAndPreserveLatestCommitPoint( /** * Segment Replication method * - * Performs cleanup of un-referenced files intended to be used reader release action + * Performs cleanup of un-referenced files intended to be used after reader close action * * @param reason Reason for cleanup * @param filesToConsider Files to consider for clean up @@ -852,7 +850,7 @@ public void cleanupUnReferencedFiles(String reason, Collection filesToCo assert indexSettings.isSegRepEnabled(); metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - cleanupFiles(reason, null, filesToConsider, false); + cleanupFiles(reason, null, filesToConsider); } finally { metadataLock.writeLock().unlock(); } @@ -861,8 +859,7 @@ public void cleanupUnReferencedFiles(String reason, Collection filesToCo private void cleanupFiles( String reason, Collection localSnapshot, - @Nullable Collection additionalFiles, - boolean deleteTempFiles + @Nullable Collection additionalFiles ) throws IOException { assert metadataLock.isWriteLockedByCurrentThread(); for (String existingFile : directory.listAll()) { @@ -870,9 +867,7 @@ private void cleanupFiles( || localSnapshot != null && localSnapshot.contains(existingFile) || (additionalFiles != null && additionalFiles.contains(existingFile)) // also ensure we are not deleting a file referenced by an active reader. - || replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false - // prevent temporary file deletion during reader cleanup - || deleteTempFiles == false && existingFile.startsWith(REPLICATION_PREFIX)) { + || replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false) { // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete // checksum) continue; @@ -893,7 +888,7 @@ private void cleanupFiles( } /** - * Used for segment replication method + * Segment replication method * * This method takes the segment info bytes to build SegmentInfos. It inc'refs files pointed by passed in SegmentInfos * bytes to ensure they are not deleted. diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 0847d183ea5de..e25e6ea206d84 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index dcc23858fde48..17e37d63d8b0d 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -1080,27 +1080,6 @@ public void getSegmentFiles( } } - public void testReplicaClosesWhile_NotReplicating() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { - shards.startAll(); - IndexShard primary = shards.getPrimary(); - final IndexShard replica = shards.getReplicas().get(0); - - final int numDocs = shards.indexDocs(randomInt(10)); - primary.refresh("Test"); - replicateSegments(primary, shards.getReplicas()); - - logger.info("--> PrimaryStore {}", Arrays.toString(primary.store().directory().listAll())); - - final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); - final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); - - shards.removeReplica(replica); - closeShards(replica); - } - } - public void testPrimaryCancelsExecution() throws Exception { try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { shards.startAll(); From 1328c59d637f2d1a089318a61539af5ff5fa162d Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Thu, 6 Jul 2023 18:22:40 -0700 Subject: [PATCH 03/10] Address review comment Signed-off-by: Suraj Singh --- .../org/opensearch/index/store/Store.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 62bb6050900df..e9c05768bc509 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -821,17 +821,14 @@ public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos info * * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ - public void cleanupAndPreserveLatestCommitPoint( - String reason, - SegmentInfos infos, - SegmentInfos lastCommittedSegmentInfos - ) throws IOException { + public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos, SegmentInfos lastCommittedSegmentInfos) + throws IOException { assert indexSettings.isSegRepEnabled(); // fetch a snapshot from the latest on disk Segments_N file. This can be behind // the passed in local in memory snapshot, so we want to ensure files it references are not removed. metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - cleanupFiles(reason, lastCommittedSegmentInfos.files(true), infos.files(true)); + cleanupFiles(List.of(directory.listAll()), reason, lastCommittedSegmentInfos.files(true), infos.files(true)); } finally { metadataLock.writeLock().unlock(); } @@ -843,31 +840,34 @@ public void cleanupAndPreserveLatestCommitPoint( * Performs cleanup of un-referenced files intended to be used after reader close action * * @param reason Reason for cleanup - * @param filesToConsider Files to consider for clean up + * @param filesToCleanUp Files to consider for clean up * @throws IOException Exception from cleanup operation */ - public void cleanupUnReferencedFiles(String reason, Collection filesToConsider) throws IOException { + public void cleanupUnReferencedFiles(String reason, Collection filesToCleanUp) throws IOException { assert indexSettings.isSegRepEnabled(); metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - cleanupFiles(reason, null, filesToConsider); + cleanupFiles(filesToCleanUp, reason, null, null); } finally { metadataLock.writeLock().unlock(); } } private void cleanupFiles( + Collection filesToCleanUp, String reason, Collection localSnapshot, @Nullable Collection additionalFiles ) throws IOException { assert metadataLock.isWriteLockedByCurrentThread(); - for (String existingFile : directory.listAll()) { + for (String existingFile : filesToCleanUp) { if (Store.isAutogenerated(existingFile) || localSnapshot != null && localSnapshot.contains(existingFile) || (additionalFiles != null && additionalFiles.contains(existingFile)) // also ensure we are not deleting a file referenced by an active reader. - || replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false) { + || replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false + // Prevent temporary replication files as it should be cleaned up MultiFileWriter + || existingFile.startsWith(REPLICATION_PREFIX)) { // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete // checksum) continue; From f05661036a390d899e165fe077ca3ff65313e4d3 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Fri, 7 Jul 2023 14:04:34 -0700 Subject: [PATCH 04/10] Address review comments & refactor Signed-off-by: Suraj Singh --- .../index/engine/NRTReplicationEngine.java | 11 +-- .../org/opensearch/index/store/Store.java | 78 ++++--------------- .../opensearch/index/store/StoreTests.java | 2 +- 3 files changed, 19 insertions(+), 72 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index d73b3f65150ae..b5552ed552f09 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -123,16 +123,7 @@ private NRTReplicationReaderManager buildReaderManager() throws IOException { return new NRTReplicationReaderManager( OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId), store::incRefFileDeleter, - (files) -> { - store.decRefFileDeleter(files); - try { - store.cleanupUnReferencedFiles("On reader closed", files); - } catch (IOException e) { - // Log but do not rethrow - we can try cleaning up again after next replication cycle. - // If that were to fail, the shard will as well. - logger.error("Unable to clean store after reader closed", e); - } - } + store::decRefFileDeleter ); } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index e9c05768bc509..0f345f332cb32 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -66,7 +66,6 @@ import org.apache.lucene.util.Version; import org.opensearch.ExceptionsHelper; import org.opensearch.common.CheckedConsumer; -import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -791,80 +790,30 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr /** * Segment Replication method - * This method deletes every file in this store that is not referenced by the passed in SegmentInfos or - * part of the latest on-disk commit point. + * This method deletes files in store that are not referenced by latest on-disk commit point * - * This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file. - * In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk. * @param reason the reason for this cleanup operation logged for each deleted file - * @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present. - * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. - */ - public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException { - this.cleanupAndPreserveLatestCommitPoint(reason, infos, readLastCommittedSegmentsInfo()); - } - - /** - * Segment Replication method - * - * Similar to {@link Store#cleanupAndPreserveLatestCommitPoint(String, SegmentInfos)} with extra parameters for cleanup - * - * This method deletes every file in this store. Except - * 1. Files referenced by the passed in SegmentInfos, usually in-memory segment infos copied from primary - * 2. Files part of the passed in segment infos, typically the last committed segment info - * 3. Files incremented by active reader for pit/scroll queries - * 4. Temporary replication file if passed in deleteTempFiles is true. + * @param fileToConsiderForCleanUp Files to consider for clean up. * - * @param reason the reason for this cleanup operation logged for each deleted file - * @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present. - * @param lastCommittedSegmentInfos {@link SegmentInfos} Last committed segment infos - * - * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. + * @throws IOException Exception on locking. */ - public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos, SegmentInfos lastCommittedSegmentInfos) - throws IOException { + public void cleanupAndPreserveLatestCommitPoint(Collection fileToConsiderForCleanUp, String reason) throws IOException { assert indexSettings.isSegRepEnabled(); // fetch a snapshot from the latest on disk Segments_N file. This can be behind // the passed in local in memory snapshot, so we want to ensure files it references are not removed. metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - cleanupFiles(List.of(directory.listAll()), reason, lastCommittedSegmentInfos.files(true), infos.files(true)); + cleanupFiles(fileToConsiderForCleanUp, reason, this.readLastCommittedSegmentsInfo().files(true)); } finally { metadataLock.writeLock().unlock(); } } - /** - * Segment Replication method - * - * Performs cleanup of un-referenced files intended to be used after reader close action - * - * @param reason Reason for cleanup - * @param filesToCleanUp Files to consider for clean up - * @throws IOException Exception from cleanup operation - */ - public void cleanupUnReferencedFiles(String reason, Collection filesToCleanUp) throws IOException { - assert indexSettings.isSegRepEnabled(); - metadataLock.writeLock().lock(); - try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - cleanupFiles(filesToCleanUp, reason, null, null); - } finally { - metadataLock.writeLock().unlock(); - } - } - - private void cleanupFiles( - Collection filesToCleanUp, - String reason, - Collection localSnapshot, - @Nullable Collection additionalFiles - ) throws IOException { + private void cleanupFiles(Collection filesToConsiderForCleanup, String reason, Collection lastCommittedSegmentInfos) { assert metadataLock.isWriteLockedByCurrentThread(); - for (String existingFile : filesToCleanUp) { - if (Store.isAutogenerated(existingFile) - || localSnapshot != null && localSnapshot.contains(existingFile) - || (additionalFiles != null && additionalFiles.contains(existingFile)) - // also ensure we are not deleting a file referenced by an active reader. + for (String existingFile : filesToConsiderForCleanup) { + if (Store.isAutogenerated(existingFile) || lastCommittedSegmentInfos != null && lastCommittedSegmentInfos.contains(existingFile) + // also ensure we are not deleting a file referenced by an active reader. || replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false // Prevent temporary replication files as it should be cleaned up MultiFileWriter || existingFile.startsWith(REPLICATION_PREFIX)) { @@ -1008,7 +957,7 @@ public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, l latestSegmentInfos.commit(directory()); directory.sync(latestSegmentInfos.files(true)); directory.syncMetaData(); - cleanupAndPreserveLatestCommitPoint("After commit", latestSegmentInfos); + cleanupAndPreserveLatestCommitPoint(List.of(this.directory.listAll()), "After commit"); } finally { metadataLock.writeLock().unlock(); } @@ -2024,6 +1973,13 @@ public void incRefFileDeleter(Collection files) { public void decRefFileDeleter(Collection files) { if (this.indexSettings.isSegRepEnabled()) { this.replicaFileTracker.decRef(files); + try { + this.cleanupAndPreserveLatestCommitPoint(files, "On reader close"); + } catch (IOException e) { + // Log but do not rethrow - we can try cleaning up again after next replication cycle. + // If that were to fail, the shard will as well. + logger.error("Unable to clean store after reader closed", e); + } } } } diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index b11e8554027b1..829f9441551f5 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -1192,7 +1192,7 @@ public void testCleanupAndPreserveLatestCommitPoint() throws IOException { assertFalse(additionalSegments.isEmpty()); // clean up everything not in the latest commit point. - store.cleanupAndPreserveLatestCommitPoint("test", store.readLastCommittedSegmentsInfo()); + store.cleanupAndPreserveLatestCommitPoint(store.readLastCommittedSegmentsInfo().files(true), "test"); // we want to ensure commitMetadata files are preserved after calling cleanup for (String existingFile : store.directory().listAll()) { From ec4eef8a3e45c03829ffbf157064b7355981de4c Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Fri, 7 Jul 2023 14:08:04 -0700 Subject: [PATCH 05/10] Comment Signed-off-by: Suraj Singh --- .../opensearch/index/engine/NRTReplicationReaderManager.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java index 9ec484ebfd383..268ba1a436393 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java @@ -72,6 +72,8 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re for (LeafReaderContext ctx : standardDirectoryReader.leaves()) { subs.add(ctx.reader()); } + // Segment_n here is ignored because it is either already committed on disk as part of previous commit point or + // does not yet exist on store (not yet committed) final Collection files = currentInfos.files(false); DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null); final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper( From fd2a05cb16cc7db538f8e2009dbcd0cd182d9e0b Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Fri, 7 Jul 2023 15:31:41 -0700 Subject: [PATCH 06/10] Fix unit test Signed-off-by: Suraj Singh --- .../test/java/org/opensearch/index/store/StoreTests.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index 829f9441551f5..d9493e63fd34c 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -100,6 +100,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -108,6 +109,8 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.unmodifiableMap; import static org.hamcrest.Matchers.anyOf; @@ -1191,8 +1194,12 @@ public void testCleanupAndPreserveLatestCommitPoint() throws IOException { } assertFalse(additionalSegments.isEmpty()); + Collection filesToConsiderForCleanUp = Stream.of(store.readLastCommittedSegmentsInfo().files(true), additionalSegments) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + // clean up everything not in the latest commit point. - store.cleanupAndPreserveLatestCommitPoint(store.readLastCommittedSegmentsInfo().files(true), "test"); + store.cleanupAndPreserveLatestCommitPoint(filesToConsiderForCleanUp, "test"); // we want to ensure commitMetadata files are preserved after calling cleanup for (String existingFile : store.directory().listAll()) { From 2a375ebc3fe2ffb398a7117c60fdc4549cf7e95e Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Sat, 8 Jul 2023 14:54:45 -0700 Subject: [PATCH 07/10] Unit test to verify temporary files are not deleted from commits Signed-off-by: Suraj Singh --- .../org/opensearch/index/store/Store.java | 2 +- .../replication/SegmentReplicationTarget.java | 2 +- .../SegmentReplicationIndexShardTests.java | 93 ++++++++++- .../index/shard/IndexShardTestCase.java | 155 ++++++++++++------ 4 files changed, 196 insertions(+), 56 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 0f345f332cb32..46e5e627ef415 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -848,7 +848,7 @@ private void cleanupFiles(Collection filesToConsiderForCleanup, String r * @param consumer consumer for generated SegmentInfos * @throws IOException Exception while reading store and building segment infos */ - public void buildInfosFromStore( + public void buildInfosFromBytes( Map tmpToFileName, byte[] infosBytes, long segmentsGen, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 25f8cdf77007d..9d724d6cc9dcf 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -225,7 +225,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, store = store(); store.incRef(); CheckedConsumer finalizeReplication = indexShard::finalizeReplication; - store.buildInfosFromStore( + store.buildInfosFromBytes( multiFileWriter.getTempFileNames(), checkpointInfoResponse.getInfosBytes(), checkpointInfoResponse.getCheckpoint().getSegmentsGen(), diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 17e37d63d8b0d..2e6f057b60375 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -21,7 +21,6 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingHelper; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.settings.ClusterSettings; @@ -67,6 +66,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -74,7 +74,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Arrays.asList; import static org.hamcrest.Matchers.containsString; @@ -275,6 +278,85 @@ public void testSegmentReplication_With_EngineClosedConcurrently() throws Except } } + /** + * Verifies that commits on replica engine resulting from engine or reader close does not cleanup the temporary + * replication files from ongoing round of segment replication + * @throws Exception + */ + public void testTemporaryFilesNotCleanup() throws Exception { + String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; + try (ReplicationGroup shards = createGroup(1, settings, mappings, new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primaryShard = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + // Step 1. Ingest numDocs documents, commit to create commit point on primary & replicate + final int numDocs = randomIntBetween(100, 200); + logger.info("--> Inserting documents {}", numDocs); + for (int i = 0; i < numDocs; i++) { + shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); + } + assertEqualTranslogOperations(shards, primaryShard); + primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + replicateSegments(primaryShard, shards.getReplicas()); + shards.assertAllEqual(numDocs); + + // Step 2. Ingest numDocs documents again to create a new commit on primary + logger.info("--> Ingest {} docs again", numDocs); + for (int i = 0; i < numDocs; i++) { + shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON)); + } + assertEqualTranslogOperations(shards, primaryShard); + primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + + // Step 3. Copy segment files to replica shard but prevent commit + final CountDownLatch countDownLatch = new CountDownLatch(1); + Map primaryMetadata; + try (final GatedCloseable segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) { + final SegmentInfos primarySegmentInfos = segmentInfosSnapshot.get(); + primaryMetadata = primaryShard.store().getSegmentMetadataMap(primarySegmentInfos); + } + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final IndicesService indicesService = mock(IndicesService.class); + when(indicesService.getShardOrNull(replica.shardId)).thenReturn(replica); + final SegmentReplicationTargetService targetService = new SegmentReplicationTargetService( + threadPool, + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), + mock(TransportService.class), + sourceFactory, + indicesService, + clusterService + ); + final Consumer runnablePostGetFiles = (indexShard) -> { + try { + Collection temporaryFiles = Stream.of(indexShard.store().directory().listAll()) + .filter(name -> name.startsWith(SegmentReplicationTarget.REPLICATION_PREFIX)) + .collect(Collectors.toList()); + + // Step 4. Perform a commit on replica shard. + NRTReplicationEngine engine = (NRTReplicationEngine) indexShard.getEngine(); + engine.updateSegments(engine.getSegmentInfosSnapshot().get()); + + // Step 5. Validate temporary files are not deleted from store. + Collection replicaStoreFiles = List.of(indexShard.store().directory().listAll()); + assertTrue(replicaStoreFiles.containsAll(temporaryFiles)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + SegmentReplicationSource segmentReplicationSource = getSegmentReplicationSource( + primaryShard, + (repId) -> targetService.get(repId), + runnablePostGetFiles + ); + when(sourceFactory.get(any())).thenReturn(segmentReplicationSource); + targetService.startReplication(replica, getTargetListener(primaryShard, replica, primaryMetadata, countDownLatch)); + countDownLatch.await(30, TimeUnit.SECONDS); + assertEquals("Replication failed", 0, countDownLatch.getCount()); + shards.assertAllEqual(numDocs); + } + } + public void testSegmentReplication_Index_Update_Delete() throws Exception { String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}"; try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) { @@ -402,13 +484,7 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException { public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { IndexShard primaryShard = newStartedShard(true); SegmentReplicationTargetService sut; - sut = prepareForReplication( - primaryShard, - null, - mock(TransportService.class), - mock(IndicesService.class), - mock(ClusterService.class) - ); + sut = prepareForReplication(primaryShard, null); SegmentReplicationTargetService spy = spy(sut); // Starting a new shard in PrimaryMode and shard routing primary. @@ -1007,6 +1083,7 @@ public void testReplicaClosesWhileReplicating_AfterGetCheckpoint() throws Except final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + SegmentReplicationSource source = new TestReplicationSource() { @Override public void getCheckpointMetadata( diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 7f3819563dcbd..6d9d279cebbf6 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1315,15 +1315,22 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) { * Segment Replication specific test method - Creates a {@link SegmentReplicationTargetService} to perform replications that has * been configured to return the given primaryShard's current segments. * - * @param primaryShard {@link IndexShard} - The primary shard to replicate from. - * @param target {@link IndexShard} - The target replica shard in segment replication. + * @param primaryShard {@link IndexShard} - The target replica shard in segment replication. + * @param target {@link IndexShard} - The source primary shard in segment replication. + * @param transportService {@link TransportService} - Transport service to be used on target + * @param indicesService {@link IndicesService} - The indices service to be used on target + * @param clusterService {@link ClusterService} - The cluster service to be used on target + * @param indicesService {@link IndicesService} - The indices service to be used on target + * @param postGetFilesRunnable - Consumer which is executed after file copy operation. This can be used to stub operations + * which are desired right after files are copied. e.g. To work with temp files */ public final SegmentReplicationTargetService prepareForReplication( IndexShard primaryShard, IndexShard target, TransportService transportService, IndicesService indicesService, - ClusterService clusterService + ClusterService clusterService, + Consumer postGetFilesRunnable ) { final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); final SegmentReplicationTargetService targetService = new SegmentReplicationTargetService( @@ -1334,7 +1341,101 @@ public final SegmentReplicationTargetService prepareForReplication( indicesService, clusterService ); - final SegmentReplicationSource replicationSource = new TestReplicationSource() { + final SegmentReplicationSource replicationSource = getSegmentReplicationSource( + primaryShard, + (repId) -> targetService.get(repId), + postGetFilesRunnable + ); + when(sourceFactory.get(any())).thenReturn(replicationSource); + when(indicesService.getShardOrNull(any())).thenReturn(target); + return targetService; + } + + /** + * Segment Replication specific test method - Creates a {@link SegmentReplicationTargetService} to perform replications that has + * been configured to return the given primaryShard's current segments. + * + * @param primaryShard {@link IndexShard} - The primary shard to replicate from. + * @param target {@link IndexShard} - The target replica shard. + */ + public final SegmentReplicationTargetService prepareForReplication(IndexShard primaryShard, IndexShard target) { + return prepareForReplication( + primaryShard, + target, + mock(TransportService.class), + mock(IndicesService.class), + mock(ClusterService.class), + (indexShard) -> {} + ); + } + + public final SegmentReplicationTargetService prepareForReplication( + IndexShard primaryShard, + IndexShard target, + TransportService transportService, + IndicesService indicesService, + ClusterService clusterService + ) { + return prepareForReplication(primaryShard, target, transportService, indicesService, clusterService, (indexShard) -> {}); + } + + /** + * Get listener on started segment replication event which verifies replica shard store with primary's after completion + * @param primaryShard - source of segment replication + * @param replicaShard - target of segment replication + * @param primaryMetadata - primary shard metadata before start of segment replication + * @param latch - Latch which allows consumers of this utility to ensure segment replication completed successfully + * @return + */ + public SegmentReplicationTargetService.SegmentReplicationListener getTargetListener( + IndexShard primaryShard, + IndexShard replicaShard, + Map primaryMetadata, + CountDownLatch latch + ) { + return new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + try (final GatedCloseable snapshot = replicaShard.getSegmentInfosSnapshot()) { + final SegmentInfos replicaInfos = snapshot.get(); + final Map replicaMetadata = replicaShard.store().getSegmentMetadataMap(replicaInfos); + final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff(primaryMetadata, replicaMetadata); + assertTrue(recoveryDiff.missing.isEmpty()); + assertTrue(recoveryDiff.different.isEmpty()); + assertEquals(recoveryDiff.identical.size(), primaryMetadata.size()); + primaryShard.updateVisibleCheckpointForShard( + replicaShard.routingEntry().allocationId().getId(), + primaryShard.getLatestReplicationCheckpoint() + ); + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } finally { + latch.countDown(); + } + } + + @Override + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { + logger.error("Unexpected replication failure in test", e); + Assert.fail("test replication should not fail: " + e); + } + }; + } + + /** + * Utility method which creates a segment replication source, which copies files from primary shard to target shard + * @param primaryShard Primary IndexShard - source of segment replication + * @param getTargetFunc - provides replication target from target service using replication id + * @param postGetFilesRunnable - Consumer which is executed after file copy operation. This can be used to stub operations + * which are desired right after files are copied. e.g. To work with temp files + * @return + */ + public SegmentReplicationSource getSegmentReplicationSource( + IndexShard primaryShard, + Function> getTargetFunc, + Consumer postGetFilesRunnable + ) { + return new TestReplicationSource() { @Override public void getCheckpointMetadata( long replicationId, @@ -1365,18 +1466,16 @@ public void getSegmentFiles( ActionListener listener ) { try ( - final ReplicationCollection.ReplicationRef replicationRef = targetService.get(replicationId) + final ReplicationCollection.ReplicationRef replicationRef = getTargetFunc.apply(replicationId) ) { writeFileChunks(replicationRef.get(), primaryShard, filesToFetch.toArray(new StoreFileMetadata[] {})); } catch (IOException e) { listener.onFailure(e); } + postGetFilesRunnable.accept(indexShard); listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } }; - when(sourceFactory.get(any())).thenReturn(replicationSource); - when(indicesService.getShardOrNull(any())).thenReturn(target); - return targetService; } /** @@ -1397,46 +1496,10 @@ public final List replicateSegments(IndexShard primary } List ids = new ArrayList<>(); for (IndexShard replica : replicaShards) { - final SegmentReplicationTargetService targetService = prepareForReplication( - primaryShard, - replica, - mock(TransportService.class), - mock(IndicesService.class), - mock(ClusterService.class) - ); + final SegmentReplicationTargetService targetService = prepareForReplication(primaryShard, replica); final SegmentReplicationTarget target = targetService.startReplication( replica, - new SegmentReplicationTargetService.SegmentReplicationListener() { - @Override - public void onReplicationDone(SegmentReplicationState state) { - try (final GatedCloseable snapshot = replica.getSegmentInfosSnapshot()) { - final SegmentInfos replicaInfos = snapshot.get(); - final Map replicaMetadata = replica.store().getSegmentMetadataMap(replicaInfos); - final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff(primaryMetadata, replicaMetadata); - assertTrue(recoveryDiff.missing.isEmpty()); - assertTrue(recoveryDiff.different.isEmpty()); - assertEquals(recoveryDiff.identical.size(), primaryMetadata.size()); - primaryShard.updateVisibleCheckpointForShard( - replica.routingEntry().allocationId().getId(), - primaryShard.getLatestReplicationCheckpoint() - ); - } catch (Exception e) { - throw ExceptionsHelper.convertToRuntime(e); - } finally { - countDownLatch.countDown(); - } - } - - @Override - public void onReplicationFailure( - SegmentReplicationState state, - ReplicationFailedException e, - boolean sendShardFailure - ) { - logger.error("Unexpected replication failure in test", e); - Assert.fail("test replication should not fail: " + e); - } - } + getTargetListener(primaryShard, replica, primaryMetadata, countDownLatch) ); ids.add(target); } From 8d0bf2335064d603048b2ad62fd397d2959e2255 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Sat, 8 Jul 2023 15:05:39 -0700 Subject: [PATCH 08/10] Compilation error fix Signed-off-by: Suraj Singh --- .../index/shard/SegmentReplicationIndexShardTests.java | 1 - .../main/java/org/opensearch/index/shard/IndexShardTestCase.java | 1 - 2 files changed, 2 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 2e6f057b60375..b3876a8ea8fd0 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -1083,7 +1083,6 @@ public void testReplicaClosesWhileReplicating_AfterGetCheckpoint() throws Except final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - SegmentReplicationSource source = new TestReplicationSource() { @Override public void getCheckpointMetadata( diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 6d9d279cebbf6..612779a5048c1 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1320,7 +1320,6 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) { * @param transportService {@link TransportService} - Transport service to be used on target * @param indicesService {@link IndicesService} - The indices service to be used on target * @param clusterService {@link ClusterService} - The cluster service to be used on target - * @param indicesService {@link IndicesService} - The indices service to be used on target * @param postGetFilesRunnable - Consumer which is executed after file copy operation. This can be used to stub operations * which are desired right after files are copied. e.g. To work with temp files */ From 55497b63d15c1316034560537c68104d97e23d95 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Sat, 8 Jul 2023 15:22:49 -0700 Subject: [PATCH 09/10] Javadoc Signed-off-by: Suraj Singh --- .../java/org/opensearch/index/shard/IndexShardTestCase.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 612779a5048c1..6d4eb278c080c 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1322,6 +1322,7 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) { * @param clusterService {@link ClusterService} - The cluster service to be used on target * @param postGetFilesRunnable - Consumer which is executed after file copy operation. This can be used to stub operations * which are desired right after files are copied. e.g. To work with temp files + * @return Returns SegmentReplicationTargetService */ public final SegmentReplicationTargetService prepareForReplication( IndexShard primaryShard, @@ -1356,6 +1357,7 @@ public final SegmentReplicationTargetService prepareForReplication( * * @param primaryShard {@link IndexShard} - The primary shard to replicate from. * @param target {@link IndexShard} - The target replica shard. + * @return Returns SegmentReplicationTargetService */ public final SegmentReplicationTargetService prepareForReplication(IndexShard primaryShard, IndexShard target) { return prepareForReplication( @@ -1384,7 +1386,7 @@ public final SegmentReplicationTargetService prepareForReplication( * @param replicaShard - target of segment replication * @param primaryMetadata - primary shard metadata before start of segment replication * @param latch - Latch which allows consumers of this utility to ensure segment replication completed successfully - * @return + * @return Returns SegmentReplicationTargetService.SegmentReplicationListener */ public SegmentReplicationTargetService.SegmentReplicationListener getTargetListener( IndexShard primaryShard, @@ -1427,7 +1429,7 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile * @param getTargetFunc - provides replication target from target service using replication id * @param postGetFilesRunnable - Consumer which is executed after file copy operation. This can be used to stub operations * which are desired right after files are copied. e.g. To work with temp files - * @return + * @return Return SegmentReplicationSource */ public SegmentReplicationSource getSegmentReplicationSource( IndexShard primaryShard, From ae456c01d017c5b8d63b98687c2522fe79e5ae3b Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Sat, 8 Jul 2023 16:00:25 -0700 Subject: [PATCH 10/10] Skip testScrollWithConcurrentIndexAndSearch with remote store Signed-off-by: Suraj Singh --- .../org/opensearch/indices/replication/SegmentReplicationIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index e4489e9b82640..151ffb3416799 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -275,6 +275,7 @@ public void testIndexReopenClose() throws Exception { } public void testScrollWithConcurrentIndexAndSearch() throws Exception { + assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled()); final String primary = internalCluster().startDataOnlyNode(); final String replica = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME);