From 5f1944190d764bf8de16c3fa405122b75bda3921 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 5 Apr 2023 12:36:42 -0700 Subject: [PATCH] [Segment Replication] Add PIT/Scroll compatibility with Segment Replication (#7010) * [Segment Replication] Add PIT/Scroll compatibility with Segment Replication #6644 (#6765) * Segment Replication - PIT/Scroll compatibility. This change makes updates to make PIT/Scroll queries compatibile with Segment Replication. It does this by refcounting files when a new reader is created, and discarding those files after a reader is closed. Signed-off-by: Marc Handalian * Fix broken test. Signed-off-by: Marc Handalian * Fix test bug with PIT where snapshotted segments are queried instead of current store state. Signed-off-by: Marc Handalian * Address review comments and prevent temp file deletion during reader close Signed-off-by: Suraj Singh * Fix precommit failure Signed-off-by: Suraj Singh * Use last committed segment infos reference from replication engine Signed-off-by: Suraj Singh * Clean up and prevent incref on segment info file copied from primary Signed-off-by: Suraj Singh * Fix failing test Signed-off-by: Suraj Singh --------- Signed-off-by: Marc Handalian Signed-off-by: Suraj Singh Co-authored-by: Marc Handalian * Add param definition causing precommit failure Signed-off-by: Suraj Singh * Remove unnecessary override annotation Signed-off-by: Suraj Singh --------- Signed-off-by: Marc Handalian Signed-off-by: Suraj Singh Co-authored-by: Marc Handalian --- CHANGELOG.md | 1 + .../replication/SegmentReplicationIT.java | 369 ++++++++++++++++++ .../allocator/BalancedShardsAllocator.java | 2 +- .../index/engine/NRTReplicationEngine.java | 24 +- .../engine/NRTReplicationReaderManager.java | 27 +- .../index/store/ReplicaFileTracker.java | 51 +++ .../org/opensearch/index/store/Store.java | 64 ++- .../replication/SegmentReplicationTarget.java | 4 +- 8 files changed, 530 insertions(+), 12 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 0041b6df93708..4fbb5fc1bac2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Remove 'cluster_manager' role attachment when using 'node.master' deprecated setting ([#6331](https://github.com/opensearch-project/OpenSearch/pull/6331)) - Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834)) - Add experimental support for ZSTD compression. ([#3577](https://github.com/opensearch-project/OpenSearch/pull/3577)) +- [Segment Replication] Add point in time and scroll query compatibility. ([#6644](https://github.com/opensearch-project/OpenSearch/pull/6644)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 59c1c45ccd3ea..59713cf0642f6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -17,8 +17,21 @@ import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; +import org.opensearch.action.ActionFuture; +import org.opensearch.action.admin.indices.flush.FlushRequest; +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.search.CreatePitAction; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.DeletePitAction; +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.PitTestsUtil; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchType; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; @@ -29,15 +42,24 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.index.IndexModule; import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.SegmentReplicationShardStats; +import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.NRTReplicationReaderManager; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.search.SearchService; +import org.opensearch.search.builder.PointInTimeBuilder; +import org.opensearch.search.internal.PitReaderContext; +import org.opensearch.search.sort.SortOrder; import org.opensearch.node.NodeClosedException; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.InternalTestCluster; @@ -46,14 +68,23 @@ import org.opensearch.transport.TransportService; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; import static java.util.Arrays.asList; +import static org.opensearch.action.search.PitTestsUtil.assertSegments; +import static org.opensearch.action.search.SearchContextId.decode; +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.index.query.QueryBuilders.matchQuery; +import static org.opensearch.indices.replication.SegmentReplicationTarget.REPLICATION_PREFIX; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits; @@ -836,4 +867,342 @@ public void testPressureServiceStats() throws Exception { }); } } + + /** + * Tests a scroll query on the replica + * @throws Exception + */ + public void testScrollCreatedOnReplica() throws Exception { + // create the cluster with one primary node containing primary shard and replica node containing replica shard + final String primary = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + // index 100 docs + for (int i = 0; i < 100; i++) { + client().prepareIndex(INDEX_NAME) + .setId(String.valueOf(i)) + .setSource(jsonBuilder().startObject().field("field", i).endObject()) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + refresh(INDEX_NAME); + } + assertBusy( + () -> assertEquals( + getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), + getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion() + ) + ); + final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME); + final SegmentInfos segmentInfos = replicaShard.getLatestSegmentInfosAndCheckpoint().v1().get(); + final Collection snapshottedSegments = segmentInfos.files(false); + // opens a scrolled query before a flush is called. + // this is for testing scroll segment consistency between refresh and flush + SearchResponse searchResponse = client(replica).prepareSearch() + .setQuery(matchAllQuery()) + .setIndices(INDEX_NAME) + .setRequestCache(false) + .setPreference("_only_local") + .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) + .addSort("field", SortOrder.ASC) + .setSize(10) + .setScroll(TimeValue.timeValueDays(1)) + .get(); + + // force call flush + flush(INDEX_NAME); + + for (int i = 3; i < 50; i++) { + client().prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + refresh(INDEX_NAME); + if (randomBoolean()) { + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get(); + flush(INDEX_NAME); + } + } + assertBusy(() -> { + assertEquals( + getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), + getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion() + ); + }); + + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get(); + assertBusy(() -> { + assertEquals( + getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), + getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion() + ); + }); + // Test stats + logger.info("--> Collect all scroll query hits"); + long scrollHits = 0; + do { + scrollHits += searchResponse.getHits().getHits().length; + searchResponse = client(replica).prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueDays(1)).get(); + assertAllSuccessful(searchResponse); + } while (searchResponse.getHits().getHits().length > 0); + + List currentFiles = List.of(replicaShard.store().directory().listAll()); + assertTrue("Files should be preserved", currentFiles.containsAll(snapshottedSegments)); + + client(replica).prepareClearScroll().addScrollId(searchResponse.getScrollId()).get(); + + currentFiles = List.of(replicaShard.store().directory().listAll()); + assertFalse("Files should be cleaned up post scroll clear request", currentFiles.containsAll(snapshottedSegments)); + assertEquals(100, scrollHits); + } + + /** + * Tests that when scroll query is cleared, it does not delete the temporary replication files, which are part of + * ongoing round of segment replication + * + * @throws Exception + */ + public void testScrollWithOngoingSegmentReplication() throws Exception { + // create the cluster with one primary node containing primary shard and replica node containing replica shard + final String primary = internalCluster().startNode(); + prepareCreate( + INDEX_NAME, + Settings.builder() + // we want to control refreshes + .put("index.refresh_interval", -1) + ).get(); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + final int initialDocCount = 10; + final int finalDocCount = 20; + for (int i = 0; i < initialDocCount; i++) { + client().prepareIndex(INDEX_NAME) + .setId(String.valueOf(i)) + .setSource(jsonBuilder().startObject().field("field", i).endObject()) + .get(); + } + // catch up replica with primary + refresh(INDEX_NAME); + assertBusy( + () -> assertEquals( + getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), + getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion() + ) + ); + logger.info("--> Create scroll query"); + // opens a scrolled query before a flush is called. + SearchResponse searchResponse = client(replica).prepareSearch() + .setQuery(matchAllQuery()) + .setIndices(INDEX_NAME) + .setRequestCache(false) + .setPreference("_only_local") + .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) + .addSort("field", SortOrder.ASC) + .setSize(10) + .setScroll(TimeValue.timeValueDays(1)) + .get(); + + // force call flush + flush(INDEX_NAME); + + // Index more documents + for (int i = initialDocCount; i < finalDocCount; i++) { + client().prepareIndex(INDEX_NAME) + .setId(String.valueOf(i)) + .setSource(jsonBuilder().startObject().field("field", i).endObject()) + .get(); + } + // Block file copy operation to ensure replica has few temporary replication files + CountDownLatch blockFileCopy = new CountDownLatch(1); + CountDownLatch waitForFileCopy = new CountDownLatch(1); + MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + primary + )); + primaryTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, replica), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) { + FileChunkRequest req = (FileChunkRequest) request; + logger.debug("file chunk [{}] lastChunk: {}", req, req.lastChunk()); + if (req.name().endsWith("cfs") && req.lastChunk()) { + try { + waitForFileCopy.countDown(); + logger.info("--> Waiting for file copy"); + blockFileCopy.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + connection.sendRequest(requestId, action, request, options); + } + ); + + // perform refresh to start round of segment replication + refresh(INDEX_NAME); + + // wait for segrep to start and copy temporary files + waitForFileCopy.await(); + + // verify replica contains temporary files + IndexShard replicaShard = getIndexShard(replica, INDEX_NAME); + List temporaryFiles = Arrays.stream(replicaShard.store().directory().listAll()) + .filter(fileName -> fileName.startsWith(REPLICATION_PREFIX)) + .collect(Collectors.toList()); + logger.info("--> temporaryFiles {}", temporaryFiles); + assertTrue(temporaryFiles.size() > 0); + + // Clear scroll query, this should clean up files on replica + client(replica).prepareClearScroll().addScrollId(searchResponse.getScrollId()).get(); + + // verify temporary files still exist + replicaShard = getIndexShard(replica, INDEX_NAME); + List temporaryFilesPostClear = Arrays.stream(replicaShard.store().directory().listAll()) + .filter(fileName -> fileName.startsWith(REPLICATION_PREFIX)) + .collect(Collectors.toList()); + logger.info("--> temporaryFilesPostClear {}", temporaryFilesPostClear); + + // Unblock segment replication + blockFileCopy.countDown(); + + assertEquals(temporaryFiles.size(), temporaryFilesPostClear.size()); + assertTrue(temporaryFilesPostClear.containsAll(temporaryFiles)); + + // wait for replica to catch up and verify doc count + assertBusy(() -> { + assertEquals( + getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), + getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion() + ); + }); + verifyStoreContent(); + waitForSearchableDocs(finalDocCount, primary, replica); + } + + public void testPitCreatedOnReplica() throws Exception { + final String primary = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + client().prepareIndex(INDEX_NAME) + .setId("1") + .setSource("foo", randomInt()) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + refresh(INDEX_NAME); + + client().prepareIndex(INDEX_NAME) + .setId("2") + .setSource("foo", randomInt()) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + for (int i = 3; i < 100; i++) { + client().prepareIndex(INDEX_NAME) + .setId(String.valueOf(i)) + .setSource("foo", randomInt()) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + refresh(INDEX_NAME); + } + // wait until replication finishes, then make the pit request. + assertBusy( + () -> assertEquals( + getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), + getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion() + ) + ); + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), false); + request.setPreference("_only_local"); + request.setIndices(new String[] { INDEX_NAME }); + ActionFuture execute = client(replica).execute(CreatePitAction.INSTANCE, request); + CreatePitResponse pitResponse = execute.get(); + SearchResponse searchResponse = client(replica).prepareSearch(INDEX_NAME) + .setSize(10) + .setPreference("_only_local") + .setRequestCache(false) + .addSort("foo", SortOrder.ASC) + .searchAfter(new Object[] { 30 }) + .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1))) + .get(); + assertEquals(1, searchResponse.getSuccessfulShards()); + assertEquals(1, searchResponse.getTotalShards()); + FlushRequest flushRequest = Requests.flushRequest(INDEX_NAME); + client().admin().indices().flush(flushRequest).get(); + final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME); + + // fetch the segments snapshotted when the reader context was created. + Collection snapshottedSegments; + SearchService searchService = internalCluster().getInstance(SearchService.class, replica); + NamedWriteableRegistry registry = internalCluster().getInstance(NamedWriteableRegistry.class, replica); + final PitReaderContext pitReaderContext = searchService.getPitReaderContext( + decode(registry, pitResponse.getId()).shards().get(replicaShard.routingEntry().shardId()).getSearchContextId() + ); + try (final Engine.Searcher searcher = pitReaderContext.acquireSearcher("test")) { + final StandardDirectoryReader standardDirectoryReader = NRTReplicationReaderManager.unwrapStandardReader( + (OpenSearchDirectoryReader) searcher.getDirectoryReader() + ); + final SegmentInfos infos = standardDirectoryReader.getSegmentInfos(); + snapshottedSegments = infos.files(false); + } + + flush(INDEX_NAME); + for (int i = 101; i < 200; i++) { + client().prepareIndex(INDEX_NAME) + .setId(String.valueOf(i)) + .setSource("foo", randomInt()) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + refresh(INDEX_NAME); + if (randomBoolean()) { + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get(); + flush(INDEX_NAME); + } + } + assertBusy(() -> { + assertEquals( + getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), + getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion() + ); + }); + + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(true).get(); + assertBusy(() -> { + assertEquals( + getIndexShard(primary, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion(), + getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion() + ); + }); + // Test stats + IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); + indicesStatsRequest.indices(INDEX_NAME); + indicesStatsRequest.all(); + IndicesStatsResponse indicesStatsResponse = client().admin().indices().stats(indicesStatsRequest).get(); + long pitCurrent = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getTotal().getPitCurrent(); + long openContexts = indicesStatsResponse.getIndex(INDEX_NAME).getTotal().search.getOpenContexts(); + assertEquals(1, pitCurrent); + assertEquals(1, openContexts); + SearchResponse resp = client(replica).prepareSearch(INDEX_NAME) + .setSize(10) + .setPreference("_only_local") + .addSort("foo", SortOrder.ASC) + .searchAfter(new Object[] { 30 }) + .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1))) + .setRequestCache(false) + .get(); + PitTestsUtil.assertUsingGetAllPits(client(replica), pitResponse.getId(), pitResponse.getCreationTime()); + assertSegments(false, INDEX_NAME, 1, client(replica), pitResponse.getId()); + + List currentFiles = List.of(replicaShard.store().directory().listAll()); + assertTrue("Files should be preserved", currentFiles.containsAll(snapshottedSegments)); + + // delete the PIT + DeletePitRequest deletePITRequest = new DeletePitRequest(pitResponse.getId()); + client().execute(DeletePitAction.INSTANCE, deletePITRequest).actionGet(); + + currentFiles = List.of(replicaShard.store().directory().listAll()); + assertFalse("Files should be cleaned up", currentFiles.containsAll(snapshottedSegments)); + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 0ff0eeba7d394..6ba8e5d893bc0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -162,7 +162,7 @@ private void setWeightFunction(float indexBalance, float shardBalanceFactor) { /** * When primary shards balance is desired, enable primary shard balancing constraints - * @param preferPrimaryShardBalance + * @param preferPrimaryShardBalance boolean to prefer balancing by primary shard */ private void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) { this.preferPrimaryShardBalance = preferPrimaryShardBalance; diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 8071e94d1426d..da3f914d8bd7e 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -70,7 +70,7 @@ public NRTReplicationEngine(EngineConfig engineConfig) { WriteOnlyTranslogManager translogManagerRef = null; try { lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); - readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId)); + readerManager = buildReaderManager(); final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( this.lastCommittedSegmentInfos.getUserData().entrySet() ); @@ -121,6 +121,28 @@ public void onAfterTranslogSync() { } } + private NRTReplicationReaderManager buildReaderManager() throws IOException { + return new NRTReplicationReaderManager( + OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId), + store::incRefFileDeleter, + (files) -> { + store.decRefFileDeleter(files); + try { + store.cleanupAndPreserveLatestCommitPoint( + "On reader closed", + getLatestSegmentInfos(), + getLastCommittedSegmentInfos(), + false + ); + } catch (IOException e) { + // Log but do not rethrow - we can try cleaning up again after next replication cycle. + // If that were to fail, the shard will as well. + logger.error("Unable to clean store after reader closed", e); + } + } + ); + } + public TranslogManager translogManager() { return translogManager; } diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java index 00748acb1d76d..9ec484ebfd383 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java @@ -22,8 +22,10 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.function.Consumer; /** * This is an extension of {@link OpenSearchReaderManager} for use with {@link NRTReplicationEngine}. @@ -35,17 +37,27 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager { private final static Logger logger = LogManager.getLogger(NRTReplicationReaderManager.class); private volatile SegmentInfos currentInfos; + private Consumer> onReaderClosed; + private Consumer> onNewReader; /** * Creates and returns a new SegmentReplicationReaderManager from the given * already-opened {@link OpenSearchDirectoryReader}, stealing * the incoming reference. * - * @param reader the SegmentReplicationReaderManager to use for future reopens + * @param reader - The SegmentReplicationReaderManager to use for future reopens. + * @param onNewReader - Called when a new reader is created. + * @param onReaderClosed - Called when a reader is closed. */ - NRTReplicationReaderManager(OpenSearchDirectoryReader reader) { + NRTReplicationReaderManager( + OpenSearchDirectoryReader reader, + Consumer> onNewReader, + Consumer> onReaderClosed + ) { super(reader); currentInfos = unwrapStandardReader(reader).getSegmentInfos(); + this.onNewReader = onNewReader; + this.onReaderClosed = onReaderClosed; } @Override @@ -60,6 +72,7 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re for (LeafReaderContext ctx : standardDirectoryReader.leaves()) { subs.add(ctx.reader()); } + final Collection files = currentInfos.files(false); DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null); final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper( innerReader, @@ -68,7 +81,13 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re logger.trace( () -> new ParameterizedMessage("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader) ); - return OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId()); + final OpenSearchDirectoryReader reader = OpenSearchDirectoryReader.wrap( + softDeletesDirectoryReaderWrapper, + referenceToRefresh.shardId() + ); + onNewReader.accept(files); + OpenSearchDirectoryReader.addReaderCloseListener(reader, key -> onReaderClosed.accept(files)); + return reader; } /** @@ -89,7 +108,7 @@ public SegmentInfos getSegmentInfos() { return currentInfos; } - private StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) { + public static StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) { final DirectoryReader delegate = reader.getDelegate(); if (delegate instanceof SoftDeletesDirectoryReaderWrapper) { return (StandardDirectoryReader) ((SoftDeletesDirectoryReaderWrapper) delegate).getDelegate(); diff --git a/server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java b/server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java new file mode 100644 index 0000000000000..0ec282619337c --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/ReplicaFileTracker.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * This class is a version of Lucene's ReplicaFileDeleter class used to keep track of + * segment files that should be preserved on replicas between replication events. + * The difference is this component does not actually perform any deletions, it only handles refcounts. + * Our deletions are made through Store.java. + * + * https://github.com/apache/lucene/blob/main/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java + * + * @opensearch.internal + */ +final class ReplicaFileTracker { + + private final Map refCounts = new HashMap<>(); + + public synchronized void incRef(Collection fileNames) { + for (String fileName : fileNames) { + refCounts.merge(fileName, 1, Integer::sum); + } + } + + public synchronized void decRef(Collection fileNames) { + for (String fileName : fileNames) { + Integer curCount = refCounts.get(fileName); + assert curCount != null : "fileName=" + fileName; + assert curCount > 0; + if (curCount == 1) { + refCounts.remove(fileName); + } else { + refCounts.put(fileName, curCount - 1); + } + } + } + + public synchronized boolean canDelete(String fileName) { + return refCounts.containsKey(fileName) == false; + } +} diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 69de85cd23820..f923532b3d9ad 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -124,6 +124,7 @@ import static java.util.Collections.unmodifiableMap; import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; import static org.opensearch.index.store.Store.MetadataSnapshot.loadMetadata; +import static org.opensearch.indices.replication.SegmentReplicationTarget.REPLICATION_PREFIX; /** * A Store provides plain access to files written by an opensearch index shard. Each shard @@ -182,6 +183,10 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref private final ShardLock shardLock; private final OnClose onClose; + // used to ref count files when a new Reader is opened for PIT/Scroll queries + // prevents segment files deletion until the PIT/Scroll expires or is discarded + private final ReplicaFileTracker replicaFileTracker; + private final AbstractRefCounted refCounter = new AbstractRefCounted("store") { @Override protected void closeInternal() { @@ -202,6 +207,7 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId)); this.shardLock = shardLock; this.onClose = onClose; + this.replicaFileTracker = indexSettings.isSegRepEnabled() ? new ReplicaFileTracker() : null; assert onClose != null; assert shardLock != null; @@ -782,9 +788,10 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr } /** - * Segment Replication method - + * Segment Replication method * This method deletes every file in this store that is not referenced by the passed in SegmentInfos or * part of the latest on-disk commit point. + * * This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file. * In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk. * @param reason the reason for this cleanup operation logged for each deleted file @@ -792,24 +799,59 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException { + this.cleanupAndPreserveLatestCommitPoint(reason, infos, readLastCommittedSegmentsInfo(), true); + } + + /** + * Segment Replication method + * + * Similar to {@link Store#cleanupAndPreserveLatestCommitPoint(String, SegmentInfos)} with extra parameters for cleanup + * + * This method deletes every file in this store. Except + * 1. Files referenced by the passed in SegmentInfos, usually in-memory segment infos copied from primary + * 2. Files part of the passed in segment infos, typically the last committed segment info + * 3. Files incremented by active reader for pit/scroll queries + * 4. Temporary replication file if passed in deleteTempFiles is true. + * + * @param reason the reason for this cleanup operation logged for each deleted file + * @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present. + * @param lastCommittedSegmentInfos {@link SegmentInfos} Last committed segment infos + * @param deleteTempFiles Does this clean up delete temporary replication files + * + * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. + */ + public void cleanupAndPreserveLatestCommitPoint( + String reason, + SegmentInfos infos, + SegmentInfos lastCommittedSegmentInfos, + boolean deleteTempFiles + ) throws IOException { assert indexSettings.isSegRepEnabled(); // fetch a snapshot from the latest on disk Segments_N file. This can be behind // the passed in local in memory snapshot, so we want to ensure files it references are not removed. metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - cleanupFiles(reason, getMetadata(readLastCommittedSegmentsInfo()), infos.files(true)); + cleanupFiles(reason, getMetadata(lastCommittedSegmentInfos), infos.files(true), deleteTempFiles); } finally { metadataLock.writeLock().unlock(); } } - private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable Collection additionalFiles) - throws IOException { + private void cleanupFiles( + String reason, + MetadataSnapshot localSnapshot, + @Nullable Collection additionalFiles, + boolean deleteTempFiles + ) throws IOException { assert metadataLock.isWriteLockedByCurrentThread(); for (String existingFile : directory.listAll()) { if (Store.isAutogenerated(existingFile) || localSnapshot.contains(existingFile) - || (additionalFiles != null && additionalFiles.contains(existingFile))) { + || (additionalFiles != null && additionalFiles.contains(existingFile)) + // also ensure we are not deleting a file referenced by an active reader. + || replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false + // prevent temporary file deletion during reader cleanup + || deleteTempFiles == false && existingFile.startsWith(REPLICATION_PREFIX)) { // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete // checksum) continue; @@ -1909,4 +1951,16 @@ private static IndexWriterConfig newIndexWriterConfig() { // we also don't specify a codec here and merges should use the engines for this index .setMergePolicy(NoMergePolicy.INSTANCE); } + + public void incRefFileDeleter(Collection files) { + if (this.indexSettings.isSegRepEnabled()) { + this.replicaFileTracker.incRef(files); + } + } + + public void decRefFileDeleter(Collection files) { + if (this.indexSettings.isSegRepEnabled()) { + this.replicaFileTracker.decRef(files); + } + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 03d67f4aa2313..995ec58d8768f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -51,6 +51,8 @@ public class SegmentReplicationTarget extends ReplicationTarget { private final SegmentReplicationState state; protected final MultiFileWriter multiFileWriter; + public final static String REPLICATION_PREFIX = "replication."; + public ReplicationCheckpoint getCheckpoint() { return this.checkpoint; } @@ -85,7 +87,7 @@ protected void closeInternal() { @Override protected String getPrefix() { - return "replication." + UUIDs.randomBase64UUID() + "."; + return REPLICATION_PREFIX + UUIDs.randomBase64UUID() + "."; } @Override