Skip to content

Commit

Permalink
Add failEngine calls to translog methods in NRTReplicationEngine.
Browse files Browse the repository at this point in the history
Roll xlog generation on replica when a new commit point is received.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed May 18, 2022
1 parent 5b380d2 commit 06b0d98
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.seqno.SeqNoStats;
Expand Down Expand Up @@ -90,6 +91,7 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
// generation. We can still refresh with incoming SegmentInfos that are not part of a commit point.
if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) {
this.lastCommittedSegmentInfos = infos;
rollTranslogGeneration();
}
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}
Expand Down Expand Up @@ -121,10 +123,15 @@ public boolean isThrottled() {

@Override
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {
ensureOpen();
try {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
translog.trimOperations(belowTerm, aboveSeqNo);
} catch (IOException e) {
} catch (Exception e) {
try {
failEngine("translog operations trimming failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new EngineException(shardId, "failed to trim translog operations", e);
}
}
Expand Down Expand Up @@ -286,10 +293,15 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {

@Override
public void trimUnreferencedTranslogFiles() throws EngineException {
ensureOpen();
try {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
translog.trimUnreferencedReaders();
} catch (IOException e) {
} catch (Exception e) {
try {
failEngine("translog trimming failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new EngineException(shardId, "failed to trim translog", e);
}
}
Expand All @@ -301,11 +313,16 @@ public boolean shouldRollTranslogGeneration() {

@Override
public void rollTranslogGeneration() throws EngineException {
ensureOpen();
try {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
translog.rollGeneration();
translog.trimUnreferencedReaders();
} catch (IOException e) {
} catch (Exception e) {
try {
failEngine("translog trimming failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new EngineException(shardId, "failed to roll translog", e);
}
}
Expand Down Expand Up @@ -350,11 +367,8 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
} catch (Exception e) {
logger.warn("failed to close engine", e);
} finally {
try {
logger.debug("engine closed [{}]", reason);
} finally {
closedLatch.countDown();
}
logger.debug("engine closed [{}]", reason);
closedLatch.countDown();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,32 @@ public void testUpdateSegments() throws Exception {
engine.flush();
nrtEngine.syncTranslog(); // to advance persisted checkpoint

Set<Long> seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet());

try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertThat(
TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
equalTo(seqNos)
);
}

nrtEngine.updateSegments(engine.getLastCommittedSegmentInfos(), engine.getProcessedLocalCheckpoint());
assertMatchingSegmentsAndCheckpoints(nrtEngine);

assertEquals(
nrtEngine.getTranslog().getGeneration().translogFileGeneration,
engine.getTranslog().getGeneration().translogFileGeneration
);

try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertThat(
TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
equalTo(seqNos)
);
}

// Ensure the same hit count between engines.
int expectedDocCount;
try (final Engine.Searcher test = engine.acquireSearcher("test")) {
Expand Down

0 comments on commit 06b0d98

Please sign in to comment.