diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index e9375a7556c36..83c99ac13bfb2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -231,6 +231,44 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { } } + public void testIndexReopenClose() throws Exception { + final String primary = internalCluster().startNode(); + final String replica = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(100, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + flush(INDEX_NAME); + waitForReplicaUpdate(); + } + + assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + logger.info("--> Closing the index "); + client().admin().indices().prepareClose(INDEX_NAME).get(); + + logger.info("--> Opening the index"); + client().admin().indices().prepareOpen(INDEX_NAME).get(); + + ensureGreen(INDEX_NAME); + assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + } + public void testMultipleShards() throws Exception { Settings indexSettings = Settings.builder() .put(super.indexSettings()) diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 2f4fb87670389..f083706df39a0 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -18,6 +18,7 @@ import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SeqNoStats; @@ -122,18 +123,21 @@ public TranslogManager translogManager() { public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException { // Update the current infos reference on the Engine's reader. - final long incomingGeneration = infos.getGeneration(); - readerManager.updateSegments(infos); - - // Commit and roll the xlog when we receive a different generation than what was last received. - // lower/higher gens are possible from a new primary that was just elected. - if (incomingGeneration != lastReceivedGen) { - commitSegmentInfos(); - translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(seqNo); - translogManager.rollTranslogGeneration(); + ensureOpen(); + try (ReleasableLock lock = writeLock.acquire()) { + final long incomingGeneration = infos.getGeneration(); + readerManager.updateSegments(infos); + + // Commit and roll the translog when we receive a different generation than what was last received. + // lower/higher gens are possible from a new primary that was just elected. + if (incomingGeneration != lastReceivedGen) { + commitSegmentInfos(); + translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(seqNo); + translogManager.rollTranslogGeneration(); + } + lastReceivedGen = incomingGeneration; + localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); } - lastReceivedGen = incomingGeneration; - localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); } /** diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 26bec2203c599..babb4baffea50 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -216,6 +216,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, toIndexInput(checkpointInfoResponse.getInfosBytes()), responseCheckpoint.getSegmentsGen() ); + cancellableThreads.checkForCancel(); indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo()); store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {