From bfb4d34df97e31cd8585da49066898036fcc9168 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Tue, 4 Oct 2022 10:29:16 -0700 Subject: [PATCH 1/2] alreadyClosedExceptionFix Signed-off-by: Poojita Raj --- .../replication/SegmentReplicationIT.java | 42 +++++++++++++++++++ .../index/engine/NRTReplicationEngine.java | 25 ++++++----- .../replication/SegmentReplicationTarget.java | 1 + 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 9b2ab753832d3..c3661c0354e41 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -235,6 +235,48 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { } } + public void testIndexReopenClose() throws Exception { + final String primary = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + refresh(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(10000, 200000); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + } + + flushAndRefresh(INDEX_NAME); + waitForReplicaUpdate(); + + logger.info("--> Closing the index "); + client().admin().indices().prepareClose(INDEX_NAME).get(); + + // Add another node to kick off TransportNodesListGatewayStartedShards which fetches latestReplicationCheckpoint for SegRep enabled + // indices + final String replica2 = internalCluster().startNode(); + + logger.info("--> Opening the index"); + client().admin().indices().prepareOpen(INDEX_NAME).get(); + } + public void testMultipleShards() throws Exception { Settings indexSettings = Settings.builder() .put(super.indexSettings()) diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index b7bb4c0b40011..bdf00c0033038 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -18,6 +18,7 @@ import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SeqNoStats; @@ -122,18 +123,20 @@ public TranslogManager translogManager() { public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException { // Update the current infos reference on the Engine's reader. - final long incomingGeneration = infos.getGeneration(); - readerManager.updateSegments(infos); - - // Commit and roll the xlog when we receive a different generation than what was last received. - // lower/higher gens are possible from a new primary that was just elected. - if (incomingGeneration != lastReceivedGen) { - commitSegmentInfos(); - translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(seqNo); - translogManager.rollTranslogGeneration(); + ensureOpen(); + try (ReleasableLock lock = writeLock.acquire()) { + final long incomingGeneration = infos.getGeneration(); + readerManager.updateSegments(infos); + + // Commit and roll the translog when we receive a different generation than what was last received. + // lower/higher gens are possible from a new primary that was just elected. + if (incomingGeneration != lastReceivedGen) { + commitSegmentInfos(); + translogManager.rollTranslogGeneration(); + } + lastReceivedGen = incomingGeneration; + localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); } - lastReceivedGen = incomingGeneration; - localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); } /** diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index c12d9b7165ae0..aadcb577f6174 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -213,6 +213,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, toIndexInput(checkpointInfoResponse.getInfosBytes()), responseCheckpoint.getSegmentsGen() ); + cancellableThreads.checkForCancel(); indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo()); store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { From b1b064d3b478502d98374fe41551bd4bed611565 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Tue, 11 Oct 2022 21:27:25 -0700 Subject: [PATCH 2/2] adding changelog entry Signed-off-by: Poojita Raj --- CHANGELOG.md | 1 + .../replication/SegmentReplicationIT.java | 22 ++++++++----------- .../index/engine/NRTReplicationEngine.java | 1 + 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f67931fdbc9c..c3ee877cbd53a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -210,6 +210,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fix version check for 2.x release for awareness attribute decommission([#5034](https://github.com/opensearch-project/OpenSearch/pull/5034)) - Fix flaky test ResourceAwareTasksTests on Windows ([#5077](https://github.com/opensearch-project/OpenSearch/pull/5077)) - Length calculation for block based fetching ([#5055](https://github.com/opensearch-project/OpenSearch/pull/5055)) +- [Segment Replication] Fix for AlreadyClosedException for engine ([#4743](https://github.com/opensearch-project/OpenSearch/pull/4743)) ### Security - CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index c3661c0354e41..2ceb4e0908df3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -237,15 +237,11 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { public void testIndexReopenClose() throws Exception { final String primary = internalCluster().startNode(); - createIndex(INDEX_NAME); - ensureYellowAndNoInitializingShards(INDEX_NAME); final String replica = internalCluster().startNode(); + createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); - client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - refresh(INDEX_NAME); - - final int initialDocCount = scaledRandomIntBetween(10000, 200000); + final int initialDocCount = scaledRandomIntBetween(100, 200); try ( BackgroundIndexer indexer = new BackgroundIndexer( INDEX_NAME, @@ -259,22 +255,22 @@ public void testIndexReopenClose() throws Exception { ) { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); - refresh(INDEX_NAME); + flush(INDEX_NAME); waitForReplicaUpdate(); } - flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); + assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); logger.info("--> Closing the index "); client().admin().indices().prepareClose(INDEX_NAME).get(); - // Add another node to kick off TransportNodesListGatewayStartedShards which fetches latestReplicationCheckpoint for SegRep enabled - // indices - final String replica2 = internalCluster().startNode(); - logger.info("--> Opening the index"); client().admin().indices().prepareOpen(INDEX_NAME).get(); + + ensureGreen(INDEX_NAME); + assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); } public void testMultipleShards() throws Exception { diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index bdf00c0033038..41abbce91c48c 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -132,6 +132,7 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th // lower/higher gens are possible from a new primary that was just elected. if (incomingGeneration != lastReceivedGen) { commitSegmentInfos(); + translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(seqNo); translogManager.rollTranslogGeneration(); } lastReceivedGen = incomingGeneration;