Skip to content

Commit

Permalink
[Segment Replication] Fix for AlreadyClosedException for engine (#4743)
Browse files Browse the repository at this point in the history
* alreadyClosedExceptionFix

Signed-off-by: Poojita Raj <[email protected]>

* adding changelog entry

Signed-off-by: Poojita Raj <[email protected]>

Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj authored Nov 7, 2022
1 parent 75d5ed5 commit 37d1eba
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fix version check for 2.x release for awareness attribute decommission([#5034](https://github.com/opensearch-project/OpenSearch/pull/5034))
- Fix flaky test ResourceAwareTasksTests on Windows ([#5077](https://github.com/opensearch-project/OpenSearch/pull/5077))
- Length calculation for block based fetching ([#5055](https://github.com/opensearch-project/OpenSearch/pull/5055))
- [Segment Replication] Fix for AlreadyClosedException for engine ([#4743](https://github.com/opensearch-project/OpenSearch/pull/4743))
### Security
- CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,44 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
}
}

public void testIndexReopenClose() throws Exception {
final String primary = internalCluster().startNode();
final String replica = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(100, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
flush(INDEX_NAME);
waitForReplicaUpdate();
}

assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

logger.info("--> Closing the index ");
client().admin().indices().prepareClose(INDEX_NAME).get();

logger.info("--> Opening the index");
client().admin().indices().prepareOpen(INDEX_NAME).get();

ensureGreen(INDEX_NAME);
assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
}

public void testMultipleShards() throws Exception {
Settings indexSettings = Settings.builder()
.put(super.indexSettings())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.seqno.SeqNoStats;
Expand Down Expand Up @@ -122,18 +123,21 @@ public TranslogManager translogManager() {

public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
// Update the current infos reference on the Engine's reader.
final long incomingGeneration = infos.getGeneration();
readerManager.updateSegments(infos);

// Commit and roll the xlog when we receive a different generation than what was last received.
// lower/higher gens are possible from a new primary that was just elected.
if (incomingGeneration != lastReceivedGen) {
commitSegmentInfos();
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(seqNo);
translogManager.rollTranslogGeneration();
ensureOpen();
try (ReleasableLock lock = writeLock.acquire()) {
final long incomingGeneration = infos.getGeneration();
readerManager.updateSegments(infos);

// Commit and roll the translog when we receive a different generation than what was last received.
// lower/higher gens are possible from a new primary that was just elected.
if (incomingGeneration != lastReceivedGen) {
commitSegmentInfos();
translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(seqNo);
translogManager.rollTranslogGeneration();
}
lastReceivedGen = incomingGeneration;
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}
lastReceivedGen = incomingGeneration;
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse,
toIndexInput(checkpointInfoResponse.getInfosBytes()),
responseCheckpoint.getSegmentsGen()
);
cancellableThreads.checkForCancel();
indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo());
store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", infos);
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
Expand Down

0 comments on commit 37d1eba

Please sign in to comment.