diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 9b2ab753832d3..c3661c0354e41 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -235,6 +235,48 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { } } + public void testIndexReopenClose() throws Exception { + final String primary = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + refresh(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(10000, 200000); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + } + + flushAndRefresh(INDEX_NAME); + waitForReplicaUpdate(); + + logger.info("--> Closing the index "); + client().admin().indices().prepareClose(INDEX_NAME).get(); + + // Add another node to kick off TransportNodesListGatewayStartedShards which fetches latestReplicationCheckpoint for SegRep enabled + // indices + final String replica2 = internalCluster().startNode(); + + logger.info("--> Opening the index"); + client().admin().indices().prepareOpen(INDEX_NAME).get(); + } + public void testMultipleShards() throws Exception { Settings indexSettings = Settings.builder() .put(super.indexSettings()) diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 12d420aa245fa..b4e2ce38f97e6 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -18,6 +18,7 @@ import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SeqNoStats; @@ -122,17 +123,20 @@ public TranslogManager translogManager() { public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException { // Update the current infos reference on the Engine's reader. - final long incomingGeneration = infos.getGeneration(); - readerManager.updateSegments(infos); - - // Commit and roll the xlog when we receive a different generation than what was last received. - // lower/higher gens are possible from a new primary that was just elected. - if (incomingGeneration != lastReceivedGen) { - commitSegmentInfos(); - translogManager.rollTranslogGeneration(); + ensureOpen(); + try (ReleasableLock lock = writeLock.acquire()) { + final long incomingGeneration = infos.getGeneration(); + readerManager.updateSegments(infos); + + // Commit and roll the xlog when we receive a different generation than what was last received. + // lower/higher gens are possible from a new primary that was just elected. + if (incomingGeneration != lastReceivedGen) { + commitSegmentInfos(); + translogManager.rollTranslogGeneration(); + } + lastReceivedGen = incomingGeneration; + localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); } - lastReceivedGen = incomingGeneration; - localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); } /** diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 26bec2203c599..babb4baffea50 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -216,6 +216,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, toIndexInput(checkpointInfoResponse.getInfosBytes()), responseCheckpoint.getSegmentsGen() ); + cancellableThreads.checkForCancel(); indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo()); store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {