diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f67931fdbc9c..c3ee877cbd53a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -210,6 +210,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fix version check for 2.x release for awareness attribute decommission([#5034](https://github.com/opensearch-project/OpenSearch/pull/5034)) - Fix flaky test ResourceAwareTasksTests on Windows ([#5077](https://github.com/opensearch-project/OpenSearch/pull/5077)) - Length calculation for block based fetching ([#5055](https://github.com/opensearch-project/OpenSearch/pull/5055)) +- [Segment Replication] Fix for AlreadyClosedException for engine ([#4743](https://github.com/opensearch-project/OpenSearch/pull/4743)) ### Security - CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 9b2ab753832d3..2ceb4e0908df3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -235,6 +235,44 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { } } + public void testIndexReopenClose() throws Exception { + final String primary = internalCluster().startNode(); + final String replica = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(100, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + flush(INDEX_NAME); + waitForReplicaUpdate(); + } + + assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + logger.info("--> Closing the index "); + client().admin().indices().prepareClose(INDEX_NAME).get(); + + logger.info("--> Opening the index"); + client().admin().indices().prepareOpen(INDEX_NAME).get(); + + ensureGreen(INDEX_NAME); + assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + } + public void testMultipleShards() throws Exception { Settings indexSettings = Settings.builder() .put(super.indexSettings()) diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index b7bb4c0b40011..41abbce91c48c 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -18,6 +18,7 @@ import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SeqNoStats; @@ -122,18 +123,21 @@ public TranslogManager translogManager() { public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException { // Update the current infos reference on the Engine's reader. - final long incomingGeneration = infos.getGeneration(); - readerManager.updateSegments(infos); - - // Commit and roll the xlog when we receive a different generation than what was last received. - // lower/higher gens are possible from a new primary that was just elected. - if (incomingGeneration != lastReceivedGen) { - commitSegmentInfos(); - translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(seqNo); - translogManager.rollTranslogGeneration(); + ensureOpen(); + try (ReleasableLock lock = writeLock.acquire()) { + final long incomingGeneration = infos.getGeneration(); + readerManager.updateSegments(infos); + + // Commit and roll the translog when we receive a different generation than what was last received. + // lower/higher gens are possible from a new primary that was just elected. + if (incomingGeneration != lastReceivedGen) { + commitSegmentInfos(); + translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(seqNo); + translogManager.rollTranslogGeneration(); + } + lastReceivedGen = incomingGeneration; + localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); } - lastReceivedGen = incomingGeneration; - localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); } /** diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index c12d9b7165ae0..aadcb577f6174 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -213,6 +213,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, toIndexInput(checkpointInfoResponse.getInfosBytes()), responseCheckpoint.getSegmentsGen() ); + cancellableThreads.checkForCancel(); indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo()); store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {