From 0ef1de03ef1a103c1bd12f0cee25ede09cfaf54f Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 5 Jul 2023 17:43:36 -0700 Subject: [PATCH] [Segment Replication] Self review Signed-off-by: Suraj Singh --- .../replication/SegmentReplicationIT.java | 54 ++----------------- .../org/opensearch/index/store/Store.java | 21 +++----- .../engine/NRTReplicationEngineTests.java | 1 + .../SegmentReplicationIndexShardTests.java | 21 -------- 4 files changed, 12 insertions(+), 85 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 432c74e4152ff..e4489e9b82640 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -274,63 +274,14 @@ public void testIndexReopenClose() throws Exception { verifyStoreContent(); } - public void testConcurrentIndexAndSearch() throws Exception { + public void testScrollWithConcurrentIndexAndSearch() throws Exception { final String primary = internalCluster().startDataOnlyNode(); final String replica = internalCluster().startDataOnlyNode(); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); - - final List> pendingIndexResponses = new ArrayList<>(); - final List> pendingSearchResponse = new ArrayList<>(); - final int searchCount = randomIntBetween(100, 200); - final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values()); - - for (int i = 0; i < searchCount; i++) { - pendingIndexResponses.add( - client().prepareIndex(INDEX_NAME) - .setId(Integer.toString(i)) - .setRefreshPolicy(refreshPolicy) - .setSource("field", "value" + i) - .execute() - ); - flush(INDEX_NAME); - forceMerge(); - } - - final SearchResponse searchResponse = client().prepareSearch() - .setQuery(matchAllQuery()) - .setIndices(INDEX_NAME) - .setRequestCache(false) - .setScroll(TimeValue.timeValueDays(1)) - .setSize(10) - .get(); - - for (int i = searchCount; i < searchCount * 2; i++) { - pendingIndexResponses.add( - client().prepareIndex(INDEX_NAME) - .setId(Integer.toString(i)) - .setRefreshPolicy(refreshPolicy) - .setSource("field", "value" + i) - .execute() - ); - } - flush(INDEX_NAME); - forceMerge(); - client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get(); - - assertBusy(() -> { - client().admin().indices().prepareRefresh().execute().actionGet(); - assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); - assertTrue(pendingSearchResponse.stream().allMatch(ActionFuture::isDone)); - }, 1, TimeUnit.MINUTES); - logger.info("--> Cluster state {}", client().admin().cluster().prepareState().execute().actionGet().getState()); - verifyStoreContent(); - } - - public void testScrollWithConcurrentIndexAndSearch() throws Exception { final List> pendingIndexResponses = new ArrayList<>(); final List> pendingSearchResponse = new ArrayList<>(); - final int searchCount = randomIntBetween(100, 200); + final int searchCount = randomIntBetween(10, 20); final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values()); for (int i = 0; i < searchCount; i++) { @@ -372,6 +323,7 @@ public void testScrollWithConcurrentIndexAndSearch() throws Exception { assertTrue(pendingSearchResponse.stream().allMatch(ActionFuture::isDone)); }, 1, TimeUnit.MINUTES); verifyStoreContent(); + waitForSearchableDocs(INDEX_NAME, 2 * searchCount, List.of(primary, replica)); } public void testMultipleShards() throws Exception { diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 928e897b44e14..62bb6050900df 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -801,7 +801,7 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException { - this.cleanupAndPreserveLatestCommitPoint(reason, infos, readLastCommittedSegmentsInfo(), true); + this.cleanupAndPreserveLatestCommitPoint(reason, infos, readLastCommittedSegmentsInfo()); } /** @@ -818,22 +818,20 @@ public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos info * @param reason the reason for this cleanup operation logged for each deleted file * @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present. * @param lastCommittedSegmentInfos {@link SegmentInfos} Last committed segment infos - * @param deleteTempFiles Does this clean up delete temporary replication files * * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ public void cleanupAndPreserveLatestCommitPoint( String reason, SegmentInfos infos, - SegmentInfos lastCommittedSegmentInfos, - boolean deleteTempFiles + SegmentInfos lastCommittedSegmentInfos ) throws IOException { assert indexSettings.isSegRepEnabled(); // fetch a snapshot from the latest on disk Segments_N file. This can be behind // the passed in local in memory snapshot, so we want to ensure files it references are not removed. metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - cleanupFiles(reason, lastCommittedSegmentInfos.files(true), infos.files(true), deleteTempFiles); + cleanupFiles(reason, lastCommittedSegmentInfos.files(true), infos.files(true)); } finally { metadataLock.writeLock().unlock(); } @@ -842,7 +840,7 @@ public void cleanupAndPreserveLatestCommitPoint( /** * Segment Replication method * - * Performs cleanup of un-referenced files intended to be used reader release action + * Performs cleanup of un-referenced files intended to be used after reader close action * * @param reason Reason for cleanup * @param filesToConsider Files to consider for clean up @@ -852,7 +850,7 @@ public void cleanupUnReferencedFiles(String reason, Collection filesToCo assert indexSettings.isSegRepEnabled(); metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - cleanupFiles(reason, null, filesToConsider, false); + cleanupFiles(reason, null, filesToConsider); } finally { metadataLock.writeLock().unlock(); } @@ -861,8 +859,7 @@ public void cleanupUnReferencedFiles(String reason, Collection filesToCo private void cleanupFiles( String reason, Collection localSnapshot, - @Nullable Collection additionalFiles, - boolean deleteTempFiles + @Nullable Collection additionalFiles ) throws IOException { assert metadataLock.isWriteLockedByCurrentThread(); for (String existingFile : directory.listAll()) { @@ -870,9 +867,7 @@ private void cleanupFiles( || localSnapshot != null && localSnapshot.contains(existingFile) || (additionalFiles != null && additionalFiles.contains(existingFile)) // also ensure we are not deleting a file referenced by an active reader. - || replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false - // prevent temporary file deletion during reader cleanup - || deleteTempFiles == false && existingFile.startsWith(REPLICATION_PREFIX)) { + || replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false) { // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete // checksum) continue; @@ -893,7 +888,7 @@ private void cleanupFiles( } /** - * Used for segment replication method + * Segment replication method * * This method takes the segment info bytes to build SegmentInfos. It inc'refs files pointed by passed in SegmentInfos * bytes to ensure they are not deleted. diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 0847d183ea5de..e25e6ea206d84 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index dcc23858fde48..17e37d63d8b0d 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -1080,27 +1080,6 @@ public void getSegmentFiles( } } - public void testReplicaClosesWhile_NotReplicating() throws Exception { - try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { - shards.startAll(); - IndexShard primary = shards.getPrimary(); - final IndexShard replica = shards.getReplicas().get(0); - - final int numDocs = shards.indexDocs(randomInt(10)); - primary.refresh("Test"); - replicateSegments(primary, shards.getReplicas()); - - logger.info("--> PrimaryStore {}", Arrays.toString(primary.store().directory().listAll())); - - final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); - final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); - - shards.removeReplica(replica); - closeShards(replica); - } - } - public void testPrimaryCancelsExecution() throws Exception { try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { shards.startAll();