diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 02e9f3ed9da1f..c162274f340cb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -17,6 +17,7 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; import org.opensearch.action.ActionFuture; @@ -41,17 +42,23 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.IndexModule; import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.SegmentReplicationShardStats; +import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.NRTReplicationReaderManager; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.search.SearchService; import org.opensearch.search.builder.PointInTimeBuilder; +import org.opensearch.search.internal.PitReaderContext; import org.opensearch.search.sort.SortOrder; import org.opensearch.node.NodeClosedException; import org.opensearch.test.BackgroundIndexer; @@ -69,6 +76,7 @@ import static java.util.Arrays.asList; import static org.opensearch.action.search.PitTestsUtil.assertSegments; +import static org.opensearch.action.search.SearchContextId.decode; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.index.query.QueryBuilders.matchQuery; @@ -984,8 +992,22 @@ public void testPitCreatedOnReplica() throws Exception { FlushRequest flushRequest = Requests.flushRequest(INDEX_NAME); client().admin().indices().flush(flushRequest).get(); final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME); - final SegmentInfos segmentInfos = replicaShard.getLatestSegmentInfosAndCheckpoint().v1().get(); - final Collection snapshottedSegments = segmentInfos.files(true); + + // fetch the segments snapshotted when the reader context was created. + Collection snapshottedSegments; + SearchService searchService = internalCluster().getInstance(SearchService.class, replica); + NamedWriteableRegistry registry = internalCluster().getInstance(NamedWriteableRegistry.class, replica); + final PitReaderContext pitReaderContext = searchService.getPitReaderContext( + decode(registry, pitResponse.getId()).shards().get(replicaShard.routingEntry().shardId()).getSearchContextId() + ); + try (final Engine.Searcher searcher = pitReaderContext.acquireSearcher("test")) { + final StandardDirectoryReader standardDirectoryReader = NRTReplicationReaderManager.unwrapStandardReader( + (OpenSearchDirectoryReader) searcher.getDirectoryReader() + ); + final SegmentInfos infos = standardDirectoryReader.getSegmentInfos(); + snapshottedSegments = infos.files(true); + } + ; flush(INDEX_NAME); for (int i = 101; i < 200; i++) { @@ -1042,6 +1064,6 @@ public void testPitCreatedOnReplica() throws Exception { client().execute(DeletePitAction.INSTANCE, deletePITRequest).actionGet(); currentFiles = List.of(replicaShard.store().directory().listAll()); - assertFalse("Files should be preserved", currentFiles.containsAll(snapshottedSegments)); + assertFalse("Files should be cleaned up", currentFiles.containsAll(snapshottedSegments)); } } diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java index 9353750a6f111..bd33e80e899fb 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java @@ -56,8 +56,8 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager { ) { super(reader); currentInfos = unwrapStandardReader(reader).getSegmentInfos(); - this.onReaderClosed = onReaderClosed; this.onNewReader = onNewReader; + this.onReaderClosed = onReaderClosed; } @Override @@ -108,7 +108,7 @@ public SegmentInfos getSegmentInfos() { return currentInfos; } - private StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) { + public static StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) { final DirectoryReader delegate = reader.getDelegate(); if (delegate instanceof SoftDeletesDirectoryReaderWrapper) { return (StandardDirectoryReader) ((SoftDeletesDirectoryReaderWrapper) delegate).getDelegate(); diff --git a/server/src/main/java/org/opensearch/index/store/ReplicaFileDeleter.java b/server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java similarity index 85% rename from server/src/main/java/org/opensearch/index/store/ReplicaFileDeleter.java rename to server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java index 510ba2dc1b886..0ec282619337c 100644 --- a/server/src/main/java/org/opensearch/index/store/ReplicaFileDeleter.java +++ b/server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java @@ -22,7 +22,7 @@ * * @opensearch.internal */ -final class ReplicaFileDeleter { +final class ReplicaFileTracker { private final Map refCounts = new HashMap<>(); @@ -45,11 +45,7 @@ public synchronized void decRef(Collection fileNames) { } } - public synchronized Integer getRefCount(String fileName) { - return refCounts.get(fileName); - } - - public synchronized boolean skipDelete(String fileName) { - return refCounts.containsKey(fileName); + public synchronized boolean canDelete(String fileName) { + return refCounts.containsKey(fileName) == false; } } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index bd2d342cf27f7..6642bbcd39a64 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -182,7 +182,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref private final ShardLock shardLock; private final OnClose onClose; - private final ReplicaFileDeleter replicaFileDeleter; + private final ReplicaFileTracker replicaFileTracker; private final AbstractRefCounted refCounter = new AbstractRefCounted("store") { @Override @@ -205,9 +205,9 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, this.shardLock = shardLock; this.onClose = onClose; if (indexSettings.isSegRepEnabled()) { - this.replicaFileDeleter = new ReplicaFileDeleter(); + this.replicaFileTracker = new ReplicaFileTracker(); } else { - this.replicaFileDeleter = null; + this.replicaFileTracker = null; } assert onClose != null; @@ -817,7 +817,8 @@ private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullab if (Store.isAutogenerated(existingFile) || localSnapshot.contains(existingFile) || (additionalFiles != null && additionalFiles.contains(existingFile)) - || replicaFileDeleter != null && replicaFileDeleter.skipDelete(existingFile)) { + // also ensure we are not deleting a file referenced by an active reader. + || replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false) { // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete // checksum) continue; @@ -1920,13 +1921,13 @@ private static IndexWriterConfig newIndexWriterConfig() { public void incRefFileDeleter(Collection files) { if (this.indexSettings.isSegRepEnabled()) { - this.replicaFileDeleter.incRef(files); + this.replicaFileTracker.incRef(files); } } public void decrefFileDeleter(Collection files) { if (this.indexSettings.isSegRepEnabled()) { - this.replicaFileDeleter.decRef(files); + this.replicaFileTracker.decRef(files); } } }