From 72dd53d766e96bf7ac017cb4387b66fcb4dc8738 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Tue, 31 May 2022 19:39:04 +0530 Subject: [PATCH] Test compilation errors fix up Signed-off-by: Bukhtawar Khan --- .../org/opensearch/index/engine/Engine.java | 2 + .../index/engine/InternalEngine.java | 80 ++-- .../index/engine/NRTReplicationEngine.java | 23 +- .../opensearch/index/engine/NoOpEngine.java | 11 +- .../opensearch/index/shard/IndexShard.java | 62 +-- .../translog/InternalTranslogManager.java | 71 +++- .../index/translog/NoOpTranslogManager.java | 26 +- .../index/translog/TranslogManager.java | 23 +- .../translog/TranslogRecoveryRunner.java | 5 +- .../translog/WriteOnlyTranslogManager.java | 25 +- .../opensearch/index/IndexServiceTests.java | 2 +- .../index/engine/InternalEngineTests.java | 365 +++++++++++++----- .../engine/LuceneChangesSnapshotTests.java | 4 +- .../engine/NRTReplicationEngineTests.java | 18 +- .../index/engine/NoOpEngineTests.java | 21 +- .../index/engine/ReadOnlyEngineTests.java | 37 +- .../index/shard/IndexShardTests.java | 87 ++++- .../index/shard/RefreshListenersTests.java | 5 +- .../index/engine/EngineTestCase.java | 9 +- .../index/engine/TranslogHandler.java | 9 +- 20 files changed, 603 insertions(+), 282 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 36b7c8715ddec..850ae332ef668 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -1918,6 +1918,7 @@ public interface Warmer { * Reverses a previous {@link #activateThrottling} call. */ public abstract void deactivateThrottling(); + /** * Fills up the local checkpoints history with no-ops until the local checkpoint * and the max seen sequence ID are identical. @@ -1925,6 +1926,7 @@ public interface Warmer { * @return the number of no-ops added */ public abstract int fillSeqNoGaps(long primaryTerm) throws IOException; + /** * Tries to prune buffered deletes from the version map. */ diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index ea3a3c1def795..24359989889b6 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -240,8 +240,17 @@ public TranslogManager translogManager() { store.trimUnsafeCommits(engineConfig.getTranslogConfig().getTranslogPath()); final Map userData = store.readLastCommittedSegmentsInfo().getUserData(); final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY)); - translogManager = new InternalTranslogManager(engineConfig, shardId, readLock, getLocalCheckpointTracker(), translogUUID, - this::revisitIndexDeletionPolicyOnTranslogSynced, () -> ensureOpen(null), this::failEngine, this::failOnTragicEvent); + translogManager = new InternalTranslogManager( + engineConfig, + shardId, + readLock, + getLocalCheckpointTracker(), + translogUUID, + this::revisitIndexDeletionPolicyOnTranslogSynced, + () -> ensureOpen(null), + this::failEngine, + this::failOnTragicEvent + ); this.softDeletesPolicy = newSoftDeletesPolicy(); this.combinedDeletionPolicy = new CombinedDeletionPolicy( logger, @@ -280,7 +289,9 @@ public TranslogManager translogManager() { } this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint()); this.internalReaderManager.addListener(lastRefreshedCheckpointListener); - maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translogManager.getTranslog(false).getMaxSeqNo())); + maxSeqNoOfUpdatesOrDeletes = new AtomicLong( + SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translogManager.getTranslog(false).getMaxSeqNo()) + ); if (localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) { try (Searcher searcher = acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) { restoreVersionMapAndCheckpointTracker(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); @@ -297,7 +308,13 @@ public TranslogManager translogManager() { success = true; } finally { if (success == false) { - IOUtils.closeWhileHandlingException(writer, translogManager.getTranslog(false), internalReaderManager, externalReaderManager, scheduler); + IOUtils.closeWhileHandlingException( + writer, + translogManager.getTranslog(false), + internalReaderManager, + externalReaderManager, + scheduler + ); if (isClosed.get() == false) { // failure we need to dec the store reference store.decRef(); @@ -458,7 +475,7 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException { } } - private void revisitIndexDeletionPolicyOnTranslogSynced() { + private void revisitIndexDeletionPolicyOnTranslogSynced() { try { if (combinedDeletionPolicy.hasUnreferencedCommits()) { indexWriter.deleteUnusedFiles(); @@ -1643,7 +1660,8 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { } noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo()); if (noOp.origin().isFromTranslog() == false && noOpResult.getResultType() == Result.Type.SUCCESS) { - final Translog.Location location = translogManager.getTranslog(false).add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); + final Translog.Location location = translogManager.getTranslog(false) + .add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); noOpResult.setTranslogLocation(location); } } @@ -1744,9 +1762,8 @@ public boolean shouldPeriodicallyFlush() { final long localCheckpointOfLastCommit = Long.parseLong( lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY) ); - final long translogGenerationOfLastCommit = translogManager.getTranslog(false).getMinGenerationForSeqNo( - localCheckpointOfLastCommit + 1 - ).translogFileGeneration; + final long translogGenerationOfLastCommit = translogManager.getTranslog(false) + .getMinGenerationForSeqNo(localCheckpointOfLastCommit + 1).translogFileGeneration; final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); if (translogManager.getTranslog(false).sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) { return false; @@ -1766,9 +1783,8 @@ public boolean shouldPeriodicallyFlush() { * * This method is to maintain translog only, thus IndexWriter#hasUncommittedChanges condition is not considered. */ - final long translogGenerationOfNewCommit = translogManager.getTranslog(false).getMinGenerationForSeqNo( - localCheckpointTracker.getProcessedCheckpoint() + 1 - ).translogFileGeneration; + final long translogGenerationOfNewCommit = translogManager.getTranslog(false) + .getMinGenerationForSeqNo(localCheckpointTracker.getProcessedCheckpoint() + 1).translogFileGeneration; return translogGenerationOfLastCommit < translogGenerationOfNewCommit || localCheckpointTracker.getProcessedCheckpoint() == localCheckpointTracker.getMaxSeqNo(); } @@ -1807,7 +1823,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { || getProcessedLocalCheckpoint() > Long.parseLong( lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY) )) { - translogManager.ensureCanFlush(shardId); + translogManager.ensureCanFlush(); try { translogManager.getTranslog(false).rollGeneration(); logger.trace("starting commit for flush; commitTranslog=true"); @@ -2041,16 +2057,17 @@ private boolean failOnTragicEvent(AlreadyClosedException ex) { } failEngine("already closed by tragic event on the index writer", tragicException); engineFailed = true; - } else if (translogManager.getTranslog(false).isOpen() == false && translogManager.getTranslog(false).getTragicException() != null) { - failEngine("already closed by tragic event on the translog", translogManager.getTranslog(false).getTragicException()); - engineFailed = true; - } else if (failedEngine.get() == null && isClosed.get() == false) { // we are closed but the engine is not failed yet? - // this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by - // a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error - throw new AssertionError("Unexpected AlreadyClosedException", ex); - } else { - engineFailed = false; - } + } else if (translogManager.getTranslog(false).isOpen() == false + && translogManager.getTranslog(false).getTragicException() != null) { + failEngine("already closed by tragic event on the translog", translogManager.getTranslog(false).getTragicException()); + engineFailed = true; + } else if (failedEngine.get() == null && isClosed.get() == false) { // we are closed but the engine is not failed yet? + // this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by + // a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error + throw new AssertionError("Unexpected AlreadyClosedException", ex); + } else { + engineFailed = false; + } return engineFailed; } @@ -2067,12 +2084,13 @@ protected boolean maybeFailEngine(String source, Exception e) { return failOnTragicEvent((AlreadyClosedException) e); } else if (e != null && ((indexWriter.isOpen() == false && indexWriter.getTragicException() == e) - || (translogManager.getTranslog(false).isOpen() == false && translogManager.getTranslog(false).getTragicException() == e))) { - // this spot on - we are handling the tragic event exception here so we have to fail the engine - // right away - failEngine(source, e); - return true; - } + || (translogManager.getTranslog(false).isOpen() == false + && translogManager.getTranslog(false).getTragicException() == e))) { + // this spot on - we are handling the tragic event exception here so we have to fail the engine + // right away + failEngine(source, e); + return true; + } return false; } @@ -2422,7 +2440,7 @@ protected void doRun() throws Exception { * @param translog the translog */ protected void commitIndexWriter(final IndexWriter writer, final Translog translog) throws IOException { - translogManager.ensureCanFlush(shardId); + translogManager.ensureCanFlush(); try { final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); writer.setLiveCommitData(() -> { @@ -2498,7 +2516,7 @@ public MergeStats getMergeStats() { return mergeScheduler.stats(); } - LocalCheckpointTracker getLocalCheckpointTracker() { + public LocalCheckpointTracker getLocalCheckpointTracker() { return localCheckpointTracker; } diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 7cd1c4edd2535..19975f32ebc60 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -22,7 +22,10 @@ import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; -import org.opensearch.index.translog.*; +import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.TranslogDeletionPolicy; +import org.opensearch.index.translog.TranslogManager; +import org.opensearch.index.translog.WriteOnlyTranslogManager; import org.opensearch.search.suggest.completion.CompletionStats; import java.io.Closeable; @@ -33,8 +36,6 @@ import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.function.BiFunction; -import java.util.function.LongConsumer; -import java.util.function.LongSupplier; /** * This is an {@link Engine} implementation intended for replica shards when Segment Replication @@ -67,8 +68,17 @@ public NRTReplicationEngine(EngineConfig engineConfig) { this.readerManager.addListener(completionStatsCache); final Map userData = store.readLastCommittedSegmentsInfo().getUserData(); final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY)); - translogManager = new WriteOnlyTranslogManager(engineConfig, shardId, readLock, getLocalCheckpointTracker(), translogUUID, - () -> {}, () -> ensureOpen(null), this::failEngine, (ex) -> null); + translogManager = new WriteOnlyTranslogManager( + engineConfig, + shardId, + readLock, + getLocalCheckpointTracker(), + translogUUID, + () -> {}, + () -> ensureOpen(null), + this::failEngine, + (ex) -> null + ); } catch (IOException e) { IOUtils.closeWhileHandlingException(store::decRef, readerManager); @@ -147,7 +157,8 @@ public DeleteResult delete(Delete delete) throws IOException { public NoOpResult noOp(NoOp noOp) throws IOException { ensureOpen(); NoOpResult noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo()); - final Translog.Location location = translogManager.getTranslog(false).add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); + final Translog.Location location = translogManager.getTranslog(false) + .add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); noOpResult.setTranslogLocation(location); noOpResult.setTook(System.nanoTime() - noOp.startTime()); noOpResult.freeze(); diff --git a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java index b81a4a3214ba9..440595d0cc679 100644 --- a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java @@ -44,7 +44,13 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.DocsStats; import org.opensearch.index.store.Store; -import org.opensearch.index.translog.*; +import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.TranslogConfig; +import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; +import org.opensearch.index.translog.NoOpTranslogManager; +import org.opensearch.index.translog.TranslogDeletionPolicy; +import org.opensearch.index.translog.TranslogException; +import org.opensearch.index.translog.TranslogManager; import java.io.IOException; import java.io.UncheckedIOException; @@ -151,8 +157,7 @@ public TranslogManager translogManager() { try { return new NoOpTranslogManager(shardId, readLock, this::ensureOpen, this.translogStats, new Translog.Snapshot() { @Override - public void close() { - } + public void close() {} @Override public int totalOperations() { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index faeda62b4c98b..2f952c43f7968 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -146,7 +146,10 @@ import org.opensearch.index.store.Store.MetadataSnapshot; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.store.StoreStats; -import org.opensearch.index.translog.*; +import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.TranslogConfig; +import org.opensearch.index.translog.TranslogStats; +import org.opensearch.index.translog.TranslogRecoveryRunner; import org.opensearch.index.warmer.ShardIndexWarmerService; import org.opensearch.index.warmer.WarmerStats; import org.opensearch.indices.IndexingMemoryController; @@ -621,16 +624,11 @@ public void updateShardState( * the reverted operations on this shard by replaying the translog to avoid losing acknowledged writes. */ final Engine engine = getEngine(); - engine.translogManager().restoreLocalHistoryFromTranslog( - engine, - engine.getProcessedLocalCheckpoint(), - (resettingEngine, snapshot) -> runTranslogRecovery( - resettingEngine, - snapshot, - Engine.Operation.Origin.LOCAL_RESET, - () -> {} - ) - ); + engine.translogManager() + .restoreLocalHistoryFromTranslog( + engine.getProcessedLocalCheckpoint(), + (snapshot) -> runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {}) + ); /* Rolling the translog generation is not strictly needed here (as we will never have collisions between * sequence numbers in a translog generation in a new primary as it takes the last known sequence number * as a starting point), but it simplifies reasoning about the relationship between primary terms and @@ -1678,10 +1676,10 @@ public long recoverLocallyUpToGlobalCheckpoint() { return safeCommit.get().localCheckpoint + 1; } try { - final TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> { + final TranslogRecoveryRunner translogRecoveryRunner = (snapshot) -> { recoveryState.getTranslog().totalLocal(snapshot.totalOperations()); final int recoveredOps = runTranslogRecovery( - engine, + getEngine(), snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, recoveryState.getTranslog()::incrementRecoveredOperations @@ -1690,8 +1688,13 @@ public long recoverLocallyUpToGlobalCheckpoint() { return recoveredOps; }; innerOpenEngineAndTranslog(() -> globalCheckpoint); - getEngine().translogManager().recoverFromTranslog(getEngine(), translogRecoveryRunner, getEngine().getProcessedLocalCheckpoint(), - globalCheckpoint, () -> getEngine().flush(false, true)); + getEngine().translogManager() + .recoverFromTranslog( + translogRecoveryRunner, + getEngine().getProcessedLocalCheckpoint(), + globalCheckpoint, + () -> getEngine().flush(false, true) + ); logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint)); } finally { synchronized (engineMutex) { @@ -1720,7 +1723,7 @@ public long recoverLocallyUpToGlobalCheckpoint() { } public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { - getEngine().translogManager().trimOperationsFromTranslog(shardId, getOperationPrimaryTerm(), aboveSeqNo); + getEngine().translogManager().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo); } /** @@ -1860,11 +1863,11 @@ public void openEngineAndRecoverFromTranslog() throws IOException { maybeCheckIndex(); recoveryState.setStage(RecoveryState.Stage.TRANSLOG); final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog(); - final TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> { + final TranslogRecoveryRunner translogRecoveryRunner = (snapshot) -> { translogRecoveryStats.totalOperations(snapshot.totalOperations()); translogRecoveryStats.totalOperationsOnStart(snapshot.totalOperations()); return runTranslogRecovery( - engine, + getEngine(), snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, translogRecoveryStats::incrementRecoveredOperations @@ -1872,8 +1875,13 @@ public void openEngineAndRecoverFromTranslog() throws IOException { }; loadGlobalCheckpointToReplicationTracker(); innerOpenEngineAndTranslog(replicationTracker); - getEngine().translogManager().recoverFromTranslog(getEngine(), translogRecoveryRunner, - getEngine().getProcessedLocalCheckpoint(), Long.MAX_VALUE, () -> getEngine().flush(false, true)); + getEngine().translogManager() + .recoverFromTranslog( + translogRecoveryRunner, + getEngine().getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> getEngine().flush(false, true) + ); } /** @@ -3941,16 +3949,22 @@ public void close() throws IOException { newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); onNewEngine(newEngineReference.get()); } - final TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( - engine, + final TranslogRecoveryRunner translogRunner = (snapshot) -> runTranslogRecovery( + newEngineReference.get(), snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { // TODO: add a dedicate recovery stats for the reset translog } ); - newEngineReference.get().translogManager().recoverFromTranslog(newEngineReference.get(), translogRunner, - newEngineReference.get().getProcessedLocalCheckpoint(), globalCheckpoint, () -> newEngineReference.get().flush(false, true)); + newEngineReference.get() + .translogManager() + .recoverFromTranslog( + translogRunner, + newEngineReference.get().getProcessedLocalCheckpoint(), + globalCheckpoint, + () -> newEngineReference.get().flush(false, true) + ); newEngineReference.get().refresh("reset_engine"); synchronized (engineMutex) { verifyNotClosed(); diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 2cb1b9ce5917b..48150922a821f 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -13,7 +13,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.common.util.concurrent.ReleasableLock; -import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.EngineConfig; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.shard.ShardId; @@ -26,6 +25,10 @@ import java.util.function.LongSupplier; import java.util.stream.Stream; +/** + * The {@link TranslogManager} implementation capable of orchestrating all {@link Translog} operations while + * interfacing with the {@link org.opensearch.index.engine.InternalEngine} + */ public class InternalTranslogManager extends TranslogManager { private final ReleasableLock readLock; @@ -36,11 +39,19 @@ public class InternalTranslogManager extends TranslogManager { private final Function failOnTragicEvent; private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); private final TranslogManager.TranslogEventListener translogEventListener; - private static final Logger logger = LogManager.getLogger(TranslogManager.class); + private static final Logger logger = LogManager.getLogger(InternalTranslogManager.class); - public InternalTranslogManager(EngineConfig engineConfig, ShardId shardId, ReleasableLock readLock, LocalCheckpointTracker tracker, - String translogUUID, TranslogManager.TranslogEventListener translogEventListener, Runnable ensureOpen, - BiConsumer failEngine, Function failOnTragicEvent) throws IOException { + public InternalTranslogManager( + EngineConfig engineConfig, + ShardId shardId, + ReleasableLock readLock, + LocalCheckpointTracker tracker, + String translogUUID, + TranslogManager.TranslogEventListener translogEventListener, + Runnable ensureOpen, + BiConsumer failEngine, + Function failOnTragicEvent + ) throws IOException { this.shardId = shardId; this.readLock = readLock; this.ensureOpen = ensureOpen; @@ -78,6 +89,7 @@ public InternalTranslogManager(EngineConfig engineConfig, ShardId shardId, Relea /** * Rolls the translog generation and cleans unneeded. */ + @Override public void rollTranslogGeneration() throws TranslogException { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen.run(); @@ -103,15 +115,20 @@ public void rollTranslogGeneration() throws TranslogException { * @param translogRecoveryRunner the translog recovery runner * @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered */ - public void recoverFromTranslog(Engine engine, TranslogRecoveryRunner translogRecoveryRunner, - long localCheckpoint, long recoverUpToSeqNo, Runnable flush) throws IOException { - try (ReleasableLock lock = readLock.acquire()) { + @Override + public void recoverFromTranslog( + TranslogRecoveryRunner translogRecoveryRunner, + long localCheckpoint, + long recoverUpToSeqNo, + Runnable flush + ) throws IOException { + try (ReleasableLock ignored = readLock.acquire()) { ensureOpen.run(); if (pendingTranslogRecovery.get() == false) { throw new IllegalStateException("Engine has already been recovered"); } try { - recoverFromTranslogInternal(engine, translogRecoveryRunner, localCheckpoint, recoverUpToSeqNo, flush); + recoverFromTranslogInternal(translogRecoveryRunner, localCheckpoint, recoverUpToSeqNo, flush); } catch (Exception e) { try { pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush @@ -124,12 +141,16 @@ public void recoverFromTranslog(Engine engine, TranslogRecoveryRunner translogRe } } - private void recoverFromTranslogInternal(Engine engine, TranslogRecoveryRunner translogRecoveryRunner, - long localCheckpoint, long recoverUpToSeqNo, Runnable flush) throws IOException { + private void recoverFromTranslogInternal( + TranslogRecoveryRunner translogRecoveryRunner, + long localCheckpoint, + long recoverUpToSeqNo, + Runnable flush + ) throws IOException { final int opsRecovered; if (localCheckpoint < recoverUpToSeqNo) { try (Translog.Snapshot snapshot = translog.newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) { - opsRecovered = translogRecoveryRunner.run(engine, snapshot); + opsRecovered = translogRecoveryRunner.run(snapshot); } catch (Exception e) { throw new TranslogException(shardId, "failed to recover from translog", e); } @@ -151,10 +172,10 @@ private void recoverFromTranslogInternal(Engine engine, TranslogRecoveryRunner t translog.trimUnreferencedReaders(); } - /** * Checks if the underlying storage sync is required. */ + @Override public boolean isTranslogSyncNeeded() { return getTranslog(true).syncNeeded(); } @@ -162,6 +183,7 @@ public boolean isTranslogSyncNeeded() { /** * Ensures that all locations in the given stream have been written to the underlying storage. */ + @Override public boolean ensureTranslogSynced(Stream locations) throws IOException { final boolean synced = translog.ensureSynced(locations); if (synced) { @@ -170,11 +192,13 @@ public boolean ensureTranslogSynced(Stream locations) throws return synced; } + @Override public void syncTranslog() throws IOException { translog.sync(); translogEventListener.onTranslogSync(); } + @Override public TranslogStats getTranslogStats() { return getTranslog(true).stats(); } @@ -182,6 +206,7 @@ public TranslogStats getTranslogStats() { /** * Returns the last location that the translog of this engine has written into. */ + @Override public Translog.Location getTranslogLastWriteLocation() { return getTranslog(true).getLastWriteLocation(); } @@ -190,8 +215,9 @@ public Translog.Location getTranslogLastWriteLocation() { * checks and removes translog files that no longer need to be retained. See * {@link org.opensearch.index.translog.TranslogDeletionPolicy} for details */ + @Override public void trimUnreferencedTranslogFiles() throws TranslogException { - try (ReleasableLock lock = readLock.acquire()) { + try (ReleasableLock ignored = readLock.acquire()) { ensureOpen.run(); translog.trimUnreferencedReaders(); } catch (AlreadyClosedException e) { @@ -213,6 +239,7 @@ public void trimUnreferencedTranslogFiles() throws TranslogException { * * @return {@code true} if the current generation should be rolled to a new generation */ + @Override public boolean shouldRollTranslogGeneration() { return getTranslog(true).shouldRollGeneration(); } @@ -221,8 +248,9 @@ public boolean shouldRollTranslogGeneration() { * Trims translog for terms below belowTerm and seq# above aboveSeqNo * @see Translog#trimOperations(long, long) */ - public void trimOperationsFromTranslog(ShardId shardId, long belowTerm, long aboveSeqNo) throws TranslogException { - try (ReleasableLock lock = readLock.acquire()) { + @Override + public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException { + try (ReleasableLock ignored = readLock.acquire()) { ensureOpen.run(); translog.trimOperations(belowTerm, aboveSeqNo); } catch (AlreadyClosedException e) { @@ -238,23 +266,24 @@ public void trimOperationsFromTranslog(ShardId shardId, long belowTerm, long abo } } - /** * This method replays translog to restore the Lucene index which might be reverted previously. * This ensures that all acknowledged writes are restored correctly when this engine is promoted. * * @return the number of translog operations have been recovered */ - public int restoreLocalHistoryFromTranslog(Engine engine, long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException { + @Override + public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen.run(); try (Translog.Snapshot snapshot = getTranslog(true).newSnapshot(processedCheckpoint + 1, Long.MAX_VALUE)) { - return translogRecoveryRunner.run(engine, snapshot); + return translogRecoveryRunner.run(snapshot); } } } - public void ensureCanFlush(ShardId shardId) { + @Override + public void ensureCanFlush() { // translog recovery happens after the engine is fully constructed. // If we are in this stage we have to prevent flushes from this // engine otherwise we might loose documents if the flush succeeds @@ -267,6 +296,7 @@ public void ensureCanFlush(ShardId shardId) { /** * Do not replay translog operations, but make the engine be ready. */ + @Override public void skipTranslogRecovery() { assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; pendingTranslogRecovery.set(false); // we are good - now we can commit @@ -292,6 +322,7 @@ private Translog openTranslog( ); } + @Override public Translog getTranslog(boolean ensureOpen) { if (ensureOpen) { this.ensureOpen.run(); diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index d5e7fb285dea0..51bd5a303b572 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -9,7 +9,6 @@ package org.opensearch.index.translog; import org.opensearch.common.util.concurrent.ReleasableLock; -import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.ShardId; import java.io.IOException; @@ -23,8 +22,13 @@ public class NoOpTranslogManager extends TranslogManager { private final ShardId shardId; private final TranslogStats translogStats; - public NoOpTranslogManager(ShardId shardId, ReleasableLock readLock, Runnable ensureOpen, TranslogStats translogStats, - Translog.Snapshot emptyTranslogSnapshot) throws IOException { + public NoOpTranslogManager( + ShardId shardId, + ReleasableLock readLock, + Runnable ensureOpen, + TranslogStats translogStats, + Translog.Snapshot emptyTranslogSnapshot + ) throws IOException { this.emptyTranslogSnapshot = emptyTranslogSnapshot; this.readLock = readLock; this.shardId = shardId; @@ -36,12 +40,16 @@ public NoOpTranslogManager(ShardId shardId, ReleasableLock readLock, Runnable en public void rollTranslogGeneration() throws TranslogException {} @Override - public void recoverFromTranslog(Engine engine, TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, - long recoverUpToSeqNo, Runnable flush) throws IOException { + public void recoverFromTranslog( + TranslogRecoveryRunner translogRecoveryRunner, + long localCheckpoint, + long recoverUpToSeqNo, + Runnable flush + ) throws IOException { try (ReleasableLock lock = readLock.acquire()) { ensureOpen.run(); try (Translog.Snapshot snapshot = emptyTranslogSnapshot) { - translogRecoveryRunner.run(engine, snapshot); + translogRecoveryRunner.run(snapshot); } catch (final Exception e) { throw new TranslogException(shardId, "failed to recover from empty translog snapshot", e); } @@ -80,7 +88,7 @@ public boolean shouldRollTranslogGeneration() { } @Override - public void trimOperationsFromTranslog(ShardId shardId, long belowTerm, long aboveSeqNo) throws TranslogException {} + public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException {} @Override public Translog getTranslog(boolean ensureOpen) { @@ -88,10 +96,10 @@ public Translog getTranslog(boolean ensureOpen) { } @Override - public void ensureCanFlush(ShardId shardId) {} + public void ensureCanFlush() {} @Override - public int restoreLocalHistoryFromTranslog(Engine engine, long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException { + public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException { return 0; } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index c5ffee10e1e39..d40c09cca8b75 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -8,12 +8,13 @@ package org.opensearch.index.translog; -import org.opensearch.index.engine.Engine; -import org.opensearch.index.shard.ShardId; - import java.io.IOException; import java.util.stream.Stream; +/** + * + * @opensearch.internal + */ public abstract class TranslogManager { /** @@ -28,8 +29,12 @@ public abstract class TranslogManager { * @param translogRecoveryRunner the translog recovery runner * @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered */ - abstract public void recoverFromTranslog(Engine engine, TranslogRecoveryRunner translogRecoveryRunner, - long localCheckpoint, long recoverUpToSeqNo, Runnable flush) throws IOException; + abstract public void recoverFromTranslog( + TranslogRecoveryRunner translogRecoveryRunner, + long localCheckpoint, + long recoverUpToSeqNo, + Runnable flush + ) throws IOException; /** * Checks if the underlying storage sync is required. @@ -72,7 +77,7 @@ abstract public void recoverFromTranslog(Engine engine, TranslogRecoveryRunner t * Trims translog for terms below belowTerm and seq# above aboveSeqNo * @see Translog#trimOperations(long, long) */ - abstract public void trimOperationsFromTranslog(ShardId shardId, long belowTerm, long aboveSeqNo) throws TranslogException; + abstract public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException; /** * This method replays translog to restore the Lucene index which might be reverted previously. @@ -80,7 +85,8 @@ abstract public void recoverFromTranslog(Engine engine, TranslogRecoveryRunner t * * @return the number of translog operations have been recovered */ - abstract public int restoreLocalHistoryFromTranslog(Engine engine, long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException; + abstract public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) + throws IOException; /** * Do not replay translog operations, but make the engine be ready. @@ -89,10 +95,9 @@ abstract public void recoverFromTranslog(Engine engine, TranslogRecoveryRunner t abstract public Translog getTranslog(boolean ensureOpen); - public abstract void ensureCanFlush(ShardId shardId); + public abstract void ensureCanFlush(); public interface TranslogEventListener { void onTranslogSync(); } } - diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogRecoveryRunner.java b/server/src/main/java/org/opensearch/index/translog/TranslogRecoveryRunner.java index 2178f491b2df9..d523d2210c177 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogRecoveryRunner.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogRecoveryRunner.java @@ -7,13 +7,10 @@ */ package org.opensearch.index.translog; -import org.opensearch.index.engine.Engine; import java.io.IOException; @FunctionalInterface public interface TranslogRecoveryRunner { - int run(Engine engine, Translog.Snapshot snapshot) throws IOException; + int run(Translog.Snapshot snapshot) throws IOException; } - - diff --git a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java index d75b6c91be7ea..042d523db26ba 100644 --- a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java @@ -10,7 +10,6 @@ import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.common.util.concurrent.ReleasableLock; -import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.EngineConfig; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.shard.ShardId; @@ -19,19 +18,37 @@ import java.util.function.BiConsumer; import java.util.function.Function; +/*** + * + */ public class WriteOnlyTranslogManager extends InternalTranslogManager { - public WriteOnlyTranslogManager(EngineConfig engineConfig, ShardId shardId, ReleasableLock readLock, LocalCheckpointTracker tracker, String translogUUID, TranslogEventListener translogEventListener, Runnable ensureOpen, BiConsumer failEngine, Function failOnTragicEvent) throws IOException { + public WriteOnlyTranslogManager( + EngineConfig engineConfig, + ShardId shardId, + ReleasableLock readLock, + LocalCheckpointTracker tracker, + String translogUUID, + TranslogEventListener translogEventListener, + Runnable ensureOpen, + BiConsumer failEngine, + Function failOnTragicEvent + ) throws IOException { super(engineConfig, shardId, readLock, tracker, translogUUID, translogEventListener, ensureOpen, failEngine, failOnTragicEvent); } @Override - public int restoreLocalHistoryFromTranslog(Engine engine, long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException { + public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException { return 0; } @Override - public void recoverFromTranslog(Engine engine, TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo, Runnable flush) throws IOException { + public void recoverFromTranslog( + TranslogRecoveryRunner translogRecoveryRunner, + long localCheckpoint, + long recoverUpToSeqNo, + Runnable flush + ) throws IOException { throw new UnsupportedOperationException("Read only replicas do not have an IndexWriter and cannot recover from a translog."); } diff --git a/server/src/test/java/org/opensearch/index/IndexServiceTests.java b/server/src/test/java/org/opensearch/index/IndexServiceTests.java index be38b707b77b4..2d6907ba8348f 100644 --- a/server/src/test/java/org/opensearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/opensearch/index/IndexServiceTests.java @@ -448,7 +448,7 @@ public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception { final Engine readOnlyEngine = getEngine(indexService.getShard(0)); assertBusy( () -> assertThat( - readOnlyEngine.getTranslogStats().getTranslogSizeInBytes(), + readOnlyEngine.translogManager().getTranslogStats().getTranslogSizeInBytes(), equalTo((long) Translog.DEFAULT_HEADER_SIZE_IN_BYTES) ) ); diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index cbae55a047a1e..c8c6672c4e02e 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -757,20 +757,26 @@ public long getProcessedCheckpoint() { } public void testFlushIsDisabledDuringTranslogRecovery() throws IOException { - engine.ensureCanFlush(); // recovered already + engine.translogManager().ensureCanFlush(); // recovered already ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc)); engine.close(); engine = new InternalEngine(engine.config()); - expectThrows(IllegalStateException.class, engine::ensureCanFlush); + expectThrows(IllegalStateException.class, () -> engine.translogManager().ensureCanFlush()); expectThrows(IllegalStateException.class, () -> engine.flush(true, true)); if (randomBoolean()) { - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.translogManager() + .recoverFromTranslog( + translogHandler, + engine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> engine.flush(false, true) + ); } else { - engine.skipTranslogRecovery(); + engine.translogManager().skipTranslogRecovery(); } - engine.ensureCanFlush(); // ready + engine.translogManager().ensureCanFlush(); // ready doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc)); engine.flush(); @@ -822,7 +828,13 @@ public void testTranslogMultipleOperationsSameDocument() throws IOException { IOUtils.close(engine); } try (Engine recoveringEngine = new InternalEngine(engine.config())) { - recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + recoveringEngine.translogManager() + .recoverFromTranslog( + translogHandler, + recoveringEngine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> recoveringEngine.flush(false, true) + ); recoveringEngine.refresh("test"); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); @@ -858,7 +870,14 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog) throws I } }; assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs)); - recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + Engine finalRecoveringEngine = recoveringEngine; + finalRecoveringEngine.translogManager() + .recoverFromTranslog( + translogHandler, + finalRecoveringEngine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> finalRecoveringEngine.flush(false, true) + ); assertTrue(committed.get()); } finally { IOUtils.close(recoveringEngine); @@ -892,7 +911,14 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException { } initialEngine.close(); recoveringEngine = new InternalEngine(initialEngine.config()); - recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + Engine finalRecoveringEngine = recoveringEngine; + finalRecoveringEngine.translogManager() + .recoverFromTranslog( + translogHandler, + finalRecoveringEngine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> finalRecoveringEngine.flush(false, true) + ); recoveringEngine.refresh("test"); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), docs); @@ -915,23 +941,30 @@ public void testRecoveryFromTranslogUpToSeqNo() throws IOException { final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc)); if (rarely()) { - engine.rollTranslogGeneration(); + engine.translogManager().rollTranslogGeneration(); } else if (rarely()) { engine.flush(randomBoolean(), true); } } maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo(); globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getProcessedLocalCheckpoint())); - engine.syncTranslog(); + engine.translogManager().syncTranslog(); } try (InternalEngine engine = new InternalEngine(config)) { - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.translogManager() + .recoverFromTranslog( + translogHandler, + engine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> engine.flush(false, true) + ); assertThat(engine.getProcessedLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo)); } try (InternalEngine engine = new InternalEngine(config)) { long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo); - engine.recoverFromTranslog(translogHandler, upToSeqNo); + engine.translogManager() + .recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), upToSeqNo, () -> engine.flush(false, true)); assertThat(engine.getProcessedLocalCheckpoint(), equalTo(upToSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(upToSeqNo)); } @@ -1246,26 +1279,26 @@ public void testCommitAdvancesMinTranslogForRecovery() throws IOException { engine.index(indexForDoc(doc)); boolean inSync = randomBoolean(); if (inSync) { - engine.syncTranslog(); // to advance persisted local checkpoint + engine.translogManager().syncTranslog(); // to advance persisted local checkpoint globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); } engine.flush(); - assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); + assertThat(engine.translogManager().getTranslog(true).currentFileGeneration(), equalTo(3L)); + assertThat(engine.translogManager().getTranslog(true).getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); engine.flush(); - assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); + assertThat(engine.translogManager().getTranslog(true).currentFileGeneration(), equalTo(3L)); + assertThat(engine.translogManager().getTranslog(true).getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); engine.flush(true, true); - assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); + assertThat(engine.translogManager().getTranslog(true).currentFileGeneration(), equalTo(3L)); + assertThat(engine.translogManager().getTranslog(true).getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); engine.flush(true, true); - assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(3L)); + assertThat(engine.translogManager().getTranslog(true).currentFileGeneration(), equalTo(3L)); + assertThat(engine.translogManager().getTranslog(true).getMinFileGeneration(), equalTo(3L)); } public void testSyncTranslogConcurrently() throws Exception { @@ -1278,7 +1311,7 @@ public void testSyncTranslogConcurrently() throws Exception { applyOperations(engine, ops); engine.flush(true, true); final CheckedRunnable checker = () -> { - assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(0)); + assertThat(engine.translogManager().getTranslogStats().getUncommittedOperations(), equalTo(0)); assertThat(engine.getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint.get())); try (GatedCloseable wrappedSafeCommit = engine.acquireSafeIndexCommit()) { SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( @@ -1294,7 +1327,7 @@ public void testSyncTranslogConcurrently() throws Exception { threads[i] = new Thread(() -> { phaser.arriveAndAwaitAdvance(); try { - engine.syncTranslog(); + engine.translogManager().syncTranslog(); checker.run(); } catch (IOException e) { throw new AssertionError(e); @@ -1349,7 +1382,8 @@ public void testSyncedFlushSurvivesEngineRestart() throws IOException { store.associateIndexWithNewTranslog(translogUUID); } engine = new InternalEngine(config); - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.translogManager() + .recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE, () -> engine.flush(false, true)); assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); } @@ -1383,7 +1417,8 @@ public void testSyncedFlushVanishesOnReplay() throws IOException { EngineConfig config = engine.config(); engine.close(); engine = new InternalEngine(config); - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.translogManager() + .recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE, () -> engine.flush(false, true)); assertNull( "Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID) @@ -1695,7 +1730,7 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception { long localCheckpoint = engine.getProcessedLocalCheckpoint(); globalCheckpoint.set(randomLongBetween(0, localCheckpoint)); - engine.syncTranslog(); + engine.translogManager().syncTranslog(); final long safeCommitCheckpoint; try (GatedCloseable wrappedSafeCommit = engine.acquireSafeIndexCommit()) { safeCommitCheckpoint = Long.parseLong(wrappedSafeCommit.get().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); @@ -1726,7 +1761,7 @@ public void testForceMergeWithSoftDeletesRetention() throws Exception { indexSettings.getSoftDeleteRetentionOperations() ); globalCheckpoint.set(localCheckpoint); - engine.syncTranslog(); + engine.translogManager().syncTranslog(); engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine); @@ -1784,7 +1819,7 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc } engine.flush(); globalCheckpoint.set(randomLongBetween(0, engine.getPersistedLocalCheckpoint())); - engine.syncTranslog(); + engine.translogManager().syncTranslog(); final long minSeqNoToRetain; try (GatedCloseable wrappedSafeCommit = engine.acquireSafeIndexCommit()) { long safeCommitLocalCheckpoint = Long.parseLong( @@ -1832,12 +1867,12 @@ public void testForceMergeWithSoftDeletesRetentionAndRecoverySource() throws Exc if (useRecoverySource == false) { liveDocsWithSource.add(doc.id()); } - engine.syncTranslog(); + engine.translogManager().syncTranslog(); globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); engine.flush(randomBoolean(), true); } else { globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); - engine.syncTranslog(); + engine.translogManager().syncTranslog(); } engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine); @@ -2195,7 +2230,7 @@ private int assertOpsOnPrimary(List ops, long currentOpVersion final long conflictingTerm = conflictingSeqNo == lastOpSeqNo || randomBoolean() ? lastOpTerm + 1 : lastOpTerm; if (rarely()) { currentTerm.set(currentTerm.get() + 1L); - engine.rollTranslogGeneration(); + engine.translogManager().rollTranslogGeneration(); } final long correctVersion = docDeleted ? Versions.MATCH_DELETED : lastOpVersion; logger.info( @@ -2741,7 +2776,7 @@ public void testSeqNoAndCheckpoints() throws IOException, InterruptedException { } } - initialEngine.syncTranslog(); // to advance persisted local checkpoint + initialEngine.translogManager().syncTranslog(); // to advance persisted local checkpoint if (randomInt(10) < 3) { // only update rarely as we do it every doc @@ -2768,8 +2803,9 @@ public void testSeqNoAndCheckpoints() throws IOException, InterruptedException { Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint) ); - initialEngine.getTranslog().sync(); // to guarantee the global checkpoint is written to the translog checkpoint - assertThat(initialEngine.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); + initialEngine.translogManager().getTranslog(true).sync(); // to guarantee the global checkpoint is written to the translog + // checkpoint + assertThat(initialEngine.translogManager().getTranslog(true).getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); assertThat(Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)), equalTo(maxSeqNo)); } finally { @@ -2777,14 +2813,20 @@ public void testSeqNoAndCheckpoints() throws IOException, InterruptedException { } try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())) { - recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + recoveringEngine.translogManager() + .recoverFromTranslog( + translogHandler, + recoveringEngine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> recoveringEngine.flush(false, true) + ); assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertThat( Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(primarySeqNo) ); - assertThat(recoveringEngine.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); + assertThat(recoveringEngine.translogManager().getTranslog(true).getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); assertThat( Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)), // after recovering from translog, all docs have been flushed to Lucene segments, so here we will assert @@ -3188,24 +3230,45 @@ public void testCurrentTranslogUUIIDIsCommitted() throws IOException { try (InternalEngine engine = createEngine(config)) { engine.index(firstIndexRequest); - engine.syncTranslog(); // to advance persisted local checkpoint + engine.translogManager().syncTranslog(); // to advance persisted local checkpoint assertEquals(engine.getProcessedLocalCheckpoint(), engine.getPersistedLocalCheckpoint()); globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); - expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE)); + expectThrows( + IllegalStateException.class, + () -> engine.translogManager() + .recoverFromTranslog( + translogHandler, + engine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> engine.flush(false, true) + ) + ); Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + assertEquals(engine.translogManager().getTranslog(true).getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); } } // open and recover tlog { for (int i = 0; i < 2; i++) { try (InternalEngine engine = new InternalEngine(config)) { - expectThrows(IllegalStateException.class, engine::ensureCanFlush); + expectThrows(IllegalStateException.class, engine.translogManager()::ensureCanFlush); Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + assertEquals( + engine.translogManager().getTranslog(true).getTranslogUUID(), + userData.get(Translog.TRANSLOG_UUID_KEY) + ); + engine.translogManager() + .recoverFromTranslog( + translogHandler, + engine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> engine.flush(false, true) + ); userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + assertEquals( + engine.translogManager().getTranslog(true).getTranslogUUID(), + userData.get(Translog.TRANSLOG_UUID_KEY) + ); } } } @@ -3220,10 +3283,16 @@ public void testCurrentTranslogUUIIDIsCommitted() throws IOException { store.associateIndexWithNewTranslog(translogUUID); try (InternalEngine engine = new InternalEngine(config)) { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); - assertEquals(2, engine.getTranslog().currentFileGeneration()); - assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations()); + assertEquals(engine.translogManager().getTranslog(true).getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.translogManager() + .recoverFromTranslog( + translogHandler, + engine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> engine.flush(false, true) + ); + assertEquals(2, engine.translogManager().getTranslog(true).currentFileGeneration()); + assertEquals(0L, engine.translogManager().getTranslog(true).stats().getUncommittedOperations()); } } @@ -3232,10 +3301,22 @@ public void testCurrentTranslogUUIIDIsCommitted() throws IOException { for (int i = 0; i < 2; i++) { try (InternalEngine engine = new InternalEngine(config)) { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + assertEquals( + engine.translogManager().getTranslog(true).getTranslogUUID(), + userData.get(Translog.TRANSLOG_UUID_KEY) + ); + engine.translogManager() + .recoverFromTranslog( + translogHandler, + engine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> engine.flush(false, true) + ); userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + assertEquals( + engine.translogManager().getTranslog(true).getTranslogUUID(), + userData.get(Translog.TRANSLOG_UUID_KEY) + ); } } } @@ -3356,10 +3437,16 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog) throws I } } ) { - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.translogManager() + .recoverFromTranslog( + translogHandler, + engine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> engine.flush(false, true) + ); final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc1)); - engine.syncTranslog(); // to advance local checkpoint + engine.translogManager().syncTranslog(); // to advance local checkpoint assertEquals(engine.getProcessedLocalCheckpoint(), engine.getPersistedLocalCheckpoint()); globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); throwErrorOnCommit.set(true); @@ -3371,12 +3458,20 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog) throws I config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier) ) ) { - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.translogManager() + .recoverFromTranslog( + translogHandler, + engine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> engine.flush(false, true) + ); assertVisibleCount(engine, 1); final long localCheckpoint = Long.parseLong( engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY) ); - final long committedGen = engine.getTranslog().getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; + final long committedGen = engine.translogManager() + .getTranslog(true) + .getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; for (int gen = 1; gen < committedGen; gen++) { final Path genFile = translogPath.resolve(Translog.getFilename(gen)); assertFalse(genFile + " wasn't cleaned up", Files.exists(genFile)); @@ -3410,7 +3505,7 @@ public void testSkipTranslogReplay() throws IOException { assertVisibleCount(engine, numDocs); engine.close(); try (InternalEngine engine = new InternalEngine(config)) { - engine.skipTranslogRecovery(); + engine.translogManager().skipTranslogRecovery(); try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); assertThat(topDocs.totalHits.value, equalTo(0L)); @@ -3457,7 +3552,8 @@ public void testTranslogReplay() throws IOException { engine.close(); // we need to reuse the engine config unless the parser.mappingModified won't work engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier)); - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.translogManager() + .recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE, () -> engine.flush(false, true)); engine.refresh("warm_up"); assertVisibleCount(engine, numDocs, false); @@ -3560,7 +3656,7 @@ public void testRecoverFromForeignTranslog() throws IOException { assertThat(index.getVersion(), equalTo(1L)); } assertVisibleCount(engine, numDocs); - Translog.TranslogGeneration generation = engine.getTranslog().getGeneration(); + Translog.TranslogGeneration generation = engine.translogManager().getTranslog(true).getGeneration(); engine.close(); final Path badTranslogLog = createTempDir(); @@ -3656,7 +3752,7 @@ public CustomTranslogDeletionPolicy(IndexSettings indexSettings, Supplier engine.flush(false, true) + ); assertEquals(timestamp1, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp()); final ParsedDocument doc = testParsedDocument( "1", @@ -4925,7 +5027,13 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro IOUtils.close(initialEngine); } try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())) { - recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + recoveringEngine.translogManager() + .recoverFromTranslog( + translogHandler, + recoveringEngine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> recoveringEngine.flush(false, true) + ); recoveringEngine.fillSeqNoGaps(2); assertEquals(recoveringEngine.getProcessedLocalCheckpoint(), recoveringEngine.getPersistedLocalCheckpoint()); assertThat(recoveringEngine.getProcessedLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1))); @@ -5105,21 +5213,28 @@ protected long doGenerateSeqNoForOperation(Operation operation) { throw new UnsupportedOperationException(); } }; - noOpEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + InternalEngine finalNoOpEngine = noOpEngine; + noOpEngine.translogManager() + .recoverFromTranslog( + translogHandler, + noOpEngine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> finalNoOpEngine.flush(false, true) + ); final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get()); final String reason = "filling gaps"; noOpEngine.noOp(new Engine.NoOp(maxSeqNo + 1, primaryTerm.get(), LOCAL_TRANSLOG_RECOVERY, System.nanoTime(), reason)); assertThat(noOpEngine.getProcessedLocalCheckpoint(), equalTo((long) (maxSeqNo + 1))); - assertThat(noOpEngine.getTranslog().stats().getUncommittedOperations(), equalTo(gapsFilled)); + assertThat(noOpEngine.translogManager().getTranslog(true).stats().getUncommittedOperations(), equalTo(gapsFilled)); noOpEngine.noOp( new Engine.NoOp(maxSeqNo + 2, primaryTerm.get(), randomFrom(PRIMARY, REPLICA, PEER_RECOVERY), System.nanoTime(), reason) ); assertThat(noOpEngine.getProcessedLocalCheckpoint(), equalTo((long) (maxSeqNo + 2))); - assertThat(noOpEngine.getTranslog().stats().getUncommittedOperations(), equalTo(gapsFilled + 1)); + assertThat(noOpEngine.translogManager().getTranslog(true).stats().getUncommittedOperations(), equalTo(gapsFilled + 1)); // skip to the op that we added to the translog Translog.Operation op; Translog.Operation last = null; - try (Translog.Snapshot snapshot = noOpEngine.getTranslog().newSnapshot()) { + try (Translog.Snapshot snapshot = noOpEngine.translogManager().getTranslog(true).newSnapshot()) { while ((op = snapshot.next()) != null) { last = op; } @@ -5233,8 +5348,8 @@ public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierExcepti getStallingSeqNoGenerator(latchReference, barrier, stall, expectedLocalCheckpoint) ); final InternalEngine finalActualEngine = actualEngine; - final Translog translog = finalActualEngine.getTranslog(); - final long generation = finalActualEngine.getTranslog().currentFileGeneration(); + final Translog translog = finalActualEngine.translogManager().getTranslog(true); + final long generation = finalActualEngine.translogManager().getTranslog(true).currentFileGeneration(); for (int i = 0; i < numberOfTriplets; i++) { /* * Index three documents with the first and last landing in the same generation and the middle document being stalled until @@ -5339,28 +5454,34 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException { final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); engine.index(replicaIndexForDoc(doc, 1, seqNo, false)); if (rarely()) { - engine.rollTranslogGeneration(); + engine.translogManager().rollTranslogGeneration(); } if (rarely()) { engine.flush(); } } globalCheckpoint.set(randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, engine.getPersistedLocalCheckpoint())); - engine.syncTranslog(); + engine.translogManager().syncTranslog(); prevSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); prevDocs = getDocIds(engine, true); } try (InternalEngine engine = new InternalEngine(engineConfig)) { - final long currentTranslogGeneration = engine.getTranslog().currentFileGeneration(); - engine.recoverFromTranslog(translogHandler, globalCheckpoint.get()); - engine.restoreLocalHistoryFromTranslog(translogHandler); + final long currentTranslogGeneration = engine.translogManager().getTranslog(true).currentFileGeneration(); + engine.translogManager() + .recoverFromTranslog( + translogHandler, + engine.getProcessedLocalCheckpoint(), + globalCheckpoint.get(), + () -> engine.flush(false, true) + ); + engine.translogManager().restoreLocalHistoryFromTranslog(engine.getProcessedLocalCheckpoint(), translogHandler); assertThat(getDocIds(engine, true), equalTo(prevDocs)); SeqNoStats seqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); assertThat(seqNoStats.getLocalCheckpoint(), equalTo(prevSeqNoStats.getLocalCheckpoint())); assertThat(seqNoStats.getMaxSeqNo(), equalTo(prevSeqNoStats.getMaxSeqNo())); assertThat( "restore from local translog must not add operations to translog", - engine.getTranslog().totalOperationsByMinGen(currentTranslogGeneration), + engine.translogManager().getTranslog(true).totalOperationsByMinGen(currentTranslogGeneration), equalTo(0) ); } @@ -5385,8 +5506,8 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { replicaEngine.index(replicaIndexForDoc(doc, 1, indexResult.getSeqNo(), false)); } } - engine.syncTranslog(); // to advance local checkpoint - replicaEngine.syncTranslog(); // to advance local checkpoint + engine.translogManager().syncTranslog(); // to advance local checkpoint + replicaEngine.translogManager().syncTranslog(); // to advance local checkpoint checkpointOnReplica = replicaEngine.getProcessedLocalCheckpoint(); } finally { IOUtils.close(replicaEngine); @@ -5402,7 +5523,14 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { assertEquals(checkpointOnReplica, replicaEngine.getProcessedLocalCheckpoint()); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations()); - recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + InternalEngine finalRecoveringEngine = recoveringEngine; + finalRecoveringEngine.translogManager() + .recoverFromTranslog( + translogHandler, + finalRecoveringEngine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> finalRecoveringEngine.flush(false, true) + ); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.getProcessedLocalCheckpoint()); assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2)); @@ -5435,9 +5563,16 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { try { recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); if (flushed) { - assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); - } - recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + assertThat(recoveringEngine.translogManager().getTranslogStats().getUncommittedOperations(), equalTo(0)); + } + InternalEngine finalRecoveringEngine = recoveringEngine; + finalRecoveringEngine.translogManager() + .recoverFromTranslog( + translogHandler, + finalRecoveringEngine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> finalRecoveringEngine.flush(false, true) + ); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.getProcessedLocalCheckpoint()); assertEquals(0, recoveringEngine.fillSeqNoGaps(3)); @@ -5667,7 +5802,13 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog) throws I super.commitIndexWriter(writer, translog); } }) { - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.translogManager() + .recoverFromTranslog( + translogHandler, + engine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> engine.flush(false, true) + ); int numDocs = scaledRandomIntBetween(10, 100); for (int docId = 0; docId < numDocs; docId++) { ParseContext.Document document = testDocumentWithTextField(); @@ -5675,7 +5816,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog) throws I engine.index(indexForDoc(testParsedDocument(Integer.toString(docId), null, document, B_1, null))); if (frequently()) { globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint())); - engine.syncTranslog(); + engine.translogManager().syncTranslog(); } if (frequently()) { engine.flush(randomBoolean(), true); @@ -5842,7 +5983,7 @@ public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception { } engine.flush(false, randomBoolean()); globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint())); - engine.syncTranslog(); + engine.translogManager().syncTranslog(); List commits = DirectoryReader.listCommits(store.directory()); assertThat( Long.parseLong(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO)), @@ -5856,9 +5997,9 @@ public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception { } // Global checkpoint advanced enough - only the last commit is kept. globalCheckpoint.set(randomLongBetween(engine.getPersistedLocalCheckpoint(), Long.MAX_VALUE)); - engine.syncTranslog(); + engine.translogManager().syncTranslog(); assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1))); - assertThat(engine.getTranslog().totalOperations(), equalTo(0)); + assertThat(engine.translogManager().getTranslog(true).totalOperations(), equalTo(0)); } } @@ -5881,7 +6022,7 @@ public void testCleanupCommitsWhenReleaseSnapshot() throws Exception { snapshots.add(engine.acquireSafeIndexCommit()); // taking snapshots from the safe commit. } globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); - engine.syncTranslog(); + engine.translogManager().syncTranslog(); final List commits = DirectoryReader.listCommits(store.directory()); for (int i = 0; i < numSnapshots - 1; i++) { snapshots.get(i).close(); @@ -5896,12 +6037,12 @@ public void testCleanupCommitsWhenReleaseSnapshot() throws Exception { public void testShouldPeriodicallyFlush() throws Exception { assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false)); // A new engine may have more than one empty translog files - the test should account this extra. - final Translog translog = engine.getTranslog(); + final Translog translog = engine.translogManager().getTranslog(true); final IntSupplier uncommittedTranslogOperationsSinceLastCommit = () -> { long localCheckpoint = Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); return translog.totalOperationsByMinGen(translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration); }; - final long extraTranslogSizeInNewEngine = engine.getTranslog().stats().getUncommittedSizeInBytes() + final long extraTranslogSizeInNewEngine = engine.translogManager().getTranslog(true).stats().getUncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES; int numDocs = between(10, 100); for (int id = 0; id < numDocs; id++) { @@ -5912,7 +6053,7 @@ public void testShouldPeriodicallyFlush() throws Exception { long flushThreshold = RandomNumbers.randomLongBetween( random(), 120, - engine.getTranslog().stats().getUncommittedSizeInBytes() - extraTranslogSizeInNewEngine + engine.translogManager().getTranslog(true).stats().getUncommittedSizeInBytes() - extraTranslogSizeInNewEngine ); final IndexSettings indexSettings = engine.config().getIndexSettings(); final IndexMetadata indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata()) @@ -5928,7 +6069,7 @@ public void testShouldPeriodicallyFlush() throws Exception { indexSettings.getTranslogRetentionSize(), indexSettings.getSoftDeleteRetentionOperations() ); - assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); + assertThat(engine.translogManager().getTranslog(true).stats().getUncommittedOperations(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(); assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(0)); @@ -5984,11 +6125,11 @@ public void testShouldPeriodicallyFlushAfterMerge() throws Exception { indexSettings.getTranslogRetentionSize(), indexSettings.getSoftDeleteRetentionOperations() ); - assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(1)); + assertThat(engine.translogManager().getTranslog(true).stats().getUncommittedOperations(), equalTo(1)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); doc = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc)); - assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(2)); + assertThat(engine.translogManager().getTranslog(true).stats().getUncommittedOperations(), equalTo(2)); engine.refresh("test"); engine.forceMerge(false, 1, false, false, false, UUIDs.randomBase64UUID()); assertBusy(() -> { @@ -6023,8 +6164,8 @@ public void testStressShouldPeriodicallyFlush() throws Exception { final long seqno = randomLongBetween(Math.max(0, localCheckPoint), localCheckPoint + 5); final ParsedDocument doc = testParsedDocument(Long.toString(seqno), null, testDocumentWithTextField(), SOURCE, null); engine.index(replicaIndexForDoc(doc, 1L, seqno, false)); - if (rarely() && engine.getTranslog().shouldRollGeneration()) { - engine.rollTranslogGeneration(); + if (rarely() && engine.translogManager().getTranslog(true).shouldRollGeneration()) { + engine.translogManager().rollTranslogGeneration(); } if (rarely() || engine.shouldPeriodicallyFlush()) { engine.flush(); @@ -6245,8 +6386,8 @@ public void testTrimUnsafeCommits() throws Exception { } } globalCheckpoint.set(randomInt(maxSeqNo)); - engine.syncTranslog(); - minTranslogGen = engine.getTranslog().getMinFileGeneration(); + engine.translogManager().syncTranslog(); + minTranslogGen = engine.translogManager().getTranslog(true).getMinFileGeneration(); } store.trimUnsafeCommits(config.getTranslogConfig().getTranslogPath()); @@ -6427,7 +6568,7 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { } existingSeqNos.add(result.getSeqNo()); if (randomBoolean()) { - engine.syncTranslog(); // advance persisted local checkpoint + engine.translogManager().syncTranslog(); // advance persisted local checkpoint assertEquals(engine.getProcessedLocalCheckpoint(), engine.getPersistedLocalCheckpoint()); globalCheckpoint.set( randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getPersistedCheckpoint()) @@ -6663,7 +6804,7 @@ public void testRebuildLocalCheckpointTrackerAndVersionMap() throws Exception { flushedOperations.add(op); applyOperation(engine, op); if (randomBoolean()) { - engine.syncTranslog(); + engine.translogManager().syncTranslog(); globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint())); } if (randomInt(100) < 10) { @@ -6723,7 +6864,13 @@ public void testRebuildLocalCheckpointTrackerAndVersionMap() throws Exception { equalTo(seqNosInSafeCommit.contains(op.seqNo())) ); } - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.translogManager() + .recoverFromTranslog( + translogHandler, + engine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> engine.flush(false, true) + ); assertThat(getDocIds(engine, true), equalTo(docs)); } } @@ -6777,8 +6924,8 @@ public void testStoreHonorsLuceneVersion() throws IOException { public void testMaxSeqNoInCommitUserData() throws Exception { AtomicBoolean running = new AtomicBoolean(true); Thread rollTranslog = new Thread(() -> { - while (running.get() && engine.getTranslog().currentFileGeneration() < 500) { - engine.rollTranslogGeneration(); // make adding operations to translog slower + while (running.get() && engine.translogManager().getTranslog(true).currentFileGeneration() < 500) { + engine.translogManager().rollTranslogGeneration(); // make adding operations to translog slower } }); rollTranslog.start(); @@ -6917,7 +7064,7 @@ public void testRecoverFromLocalTranslog() throws Exception { for (Engine.Operation op : operations) { applyOperation(engine, op); if (randomBoolean()) { - engine.syncTranslog(); + engine.translogManager().syncTranslog(); globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint())); } if (randomInt(100) < 10) { @@ -6932,7 +7079,7 @@ public void testRecoverFromLocalTranslog() throws Exception { } if (randomBoolean()) { // engine is flushed properly before shutting down. - engine.syncTranslog(); + engine.translogManager().syncTranslog(); globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); engine.flush(); } @@ -6940,13 +7087,19 @@ public void testRecoverFromLocalTranslog() throws Exception { } try (InternalEngine engine = new InternalEngine(config)) { engine.onSettingsChanged(TimeValue.MINUS_ONE, ByteSizeValue.ZERO, 0); - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.translogManager() + .recoverFromTranslog( + translogHandler, + engine.getProcessedLocalCheckpoint(), + Long.MAX_VALUE, + () -> engine.flush(false, true) + ); assertThat(getDocIds(engine, randomBoolean()), equalTo(docs)); if (engine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo() == globalCheckpoint.get()) { assertThat( "engine should trim all unreferenced translog after recovery", - engine.getTranslog().getMinFileGeneration(), - equalTo(engine.getTranslog().currentFileGeneration()) + engine.translogManager().getTranslog(true).getMinFileGeneration(), + equalTo(engine.translogManager().getTranslog(true).currentFileGeneration()) ); } } @@ -7018,8 +7171,8 @@ public void testAlwaysRecordReplicaOrPeerRecoveryOperationsToTranslog() throws E ); } primaryTerm.set(randomLongBetween(primaryTerm.get(), Long.MAX_VALUE)); - engine.rollTranslogGeneration(); - engine.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog + engine.translogManager().rollTranslogGeneration(); + engine.translogManager().trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(0)); assertNull(snapshot.next()); diff --git a/server/src/test/java/org/opensearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/opensearch/index/engine/LuceneChangesSnapshotTests.java index e3117e179e7fa..24f018e710f43 100644 --- a/server/src/test/java/org/opensearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/opensearch/index/engine/LuceneChangesSnapshotTests.java @@ -226,7 +226,7 @@ public void testSkipNonRootOfNestedDocuments() throws Exception { engine.refresh("test"); } if (rarely()) { - engine.rollTranslogGeneration(); + engine.translogManager().rollTranslogGeneration(); } if (rarely()) { engine.flush(); @@ -315,7 +315,7 @@ void pullOperations(InternalEngine follower) throws IOException { long batchSize = randomLongBetween(0, 100); long toSeqNo = Math.min(fromSeqNo + batchSize, leaderCheckpoint); try (Translog.Snapshot snapshot = leader.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean())) { - translogHandler.run(follower, snapshot); + translogHandler.run(snapshot); } } } diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 6aa00bb9312dd..41e34e20ccd82 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -75,16 +75,20 @@ public void testEngineWritesOpsToTranslog() throws Exception { applyOperation(nrtEngine, op); } - assertEquals(nrtEngine.getTranslogLastWriteLocation(), engine.getTranslogLastWriteLocation()); + assertEquals( + nrtEngine.translogManager().getTranslogLastWriteLocation(), + engine.translogManager().getTranslogLastWriteLocation() + ); assertEquals(nrtEngine.getLastSyncedGlobalCheckpoint(), engine.getLastSyncedGlobalCheckpoint()); // we don't index into nrtEngine, so get the doc ids from the regular engine. final List docs = getDocIds(engine, true); // recover a new engine from the nrtEngine's xlog. - nrtEngine.syncTranslog(); + nrtEngine.translogManager().syncTranslog(); try (InternalEngine engine = new InternalEngine(nrtEngine.config())) { - engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + engine.translogManager() + .recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE, () -> {}); assertEquals(getDocIds(engine, true), docs); } assertEngineCleanedUp(nrtEngine, nrtEngine.getTranslog()); @@ -127,7 +131,7 @@ public void testUpdateSegments() throws Exception { // Flush the primary and update the NRTEngine with the latest committed infos. engine.flush(); - nrtEngine.syncTranslog(); // to advance persisted checkpoint + nrtEngine.translogManager().syncTranslog(); // to advance persisted checkpoint Set seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet()); @@ -144,7 +148,7 @@ public void testUpdateSegments() throws Exception { assertEquals( nrtEngine.getTranslog().getGeneration().translogFileGeneration, - engine.getTranslog().getGeneration().translogFileGeneration + engine.translogManager().getTranslog(true).getGeneration().translogFileGeneration ); try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) { @@ -187,8 +191,8 @@ public void testTrimTranslogOps() throws Exception { equalTo(seqNos) ); } - nrtEngine.rollTranslogGeneration(); - nrtEngine.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); + nrtEngine.translogManager().rollTranslogGeneration(); + nrtEngine.translogManager().trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) { assertThat(snapshot.totalOperations(), equalTo(0)); assertNull(snapshot.next()); diff --git a/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java index a015443979527..9f0f376ab3b0f 100644 --- a/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java @@ -145,7 +145,7 @@ public void testNoOpEngineStats() throws Exception { if (rarely()) { engine.flush(); } - engine.syncTranslog(); // advance persisted local checkpoint + engine.translogManager().syncTranslog(); // advance persisted local checkpoint globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); } @@ -154,7 +154,7 @@ public void testNoOpEngineStats() throws Exception { String delId = Integer.toString(i); Engine.DeleteResult result = engine.delete(new Engine.Delete(delId, newUid(delId), primaryTerm.get())); assertTrue(result.isFound()); - engine.syncTranslog(); // advance persisted local checkpoint + engine.translogManager().syncTranslog(); // advance persisted local checkpoint globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); deletions += 1; } @@ -217,20 +217,23 @@ public void testTrimUnreferencedTranslogFiles() throws Exception { engine.flush(); } if (randomBoolean()) { - engine.rollTranslogGeneration(); + engine.translogManager().rollTranslogGeneration(); } } // prevent translog from trimming so we can test trimUnreferencedFiles in NoOpEngine. - final Translog.Snapshot snapshot = engine.getTranslog().newSnapshot(); + final Translog.Snapshot snapshot = engine.translogManager().getTranslog(true).newSnapshot(); engine.flush(true, true); engine.close(); final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker)); - assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(totalTranslogOps)); - noOpEngine.trimUnreferencedTranslogFiles(); - assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(0)); - assertThat(noOpEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); - assertThat(noOpEngine.getTranslogStats().getTranslogSizeInBytes(), equalTo((long) Translog.DEFAULT_HEADER_SIZE_IN_BYTES)); + assertThat(noOpEngine.translogManager().getTranslogStats().estimatedNumberOfOperations(), equalTo(totalTranslogOps)); + noOpEngine.translogManager().trimUnreferencedTranslogFiles(); + assertThat(noOpEngine.translogManager().getTranslogStats().estimatedNumberOfOperations(), equalTo(0)); + assertThat(noOpEngine.translogManager().getTranslogStats().getUncommittedOperations(), equalTo(0)); + assertThat( + noOpEngine.translogManager().getTranslogStats().getTranslogSizeInBytes(), + equalTo((long) Translog.DEFAULT_HEADER_SIZE_IN_BYTES) + ); snapshot.close(); noOpEngine.close(); } diff --git a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java index 2106c5e1067fb..d23e87fb1b67d 100644 --- a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java @@ -93,13 +93,13 @@ public void testReadOnlyEngine() throws Exception { } globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint())); } - engine.syncTranslog(); + engine.translogManager().syncTranslog(); globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getPersistedLocalCheckpoint())); engine.flush(); readOnlyEngine = new ReadOnlyEngine( engine.engineConfig, engine.getSeqNoStats(globalCheckpoint.get()), - engine.getTranslogStats(), + engine.translogManager().getTranslogStats(), false, Function.identity(), true @@ -139,7 +139,8 @@ public void testReadOnlyEngine() throws Exception { } // Close and reopen the main engine try (InternalEngine recoveringEngine = new InternalEngine(config)) { - recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + recoveringEngine.translogManager() + .recoverFromTranslog(translogHandler, recoveringEngine.getProcessedLocalCheckpoint(), Long.MAX_VALUE, () -> {}); // the locked down engine should still point to the previous commit assertThat(readOnlyEngine.getPersistedLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); @@ -178,7 +179,7 @@ public void testEnsureMaxSeqNoIsEqualToGlobalCheckpoint() throws IOException { ); maxSeqNo = engine.getProcessedLocalCheckpoint(); } - engine.syncTranslog(); + engine.translogManager().syncTranslog(); globalCheckpoint.set(engine.getPersistedLocalCheckpoint() - 1); engine.flushAndClose(); @@ -274,12 +275,13 @@ public void testRecoverFromTranslogAppliesNoOperations() throws IOException { } globalCheckpoint.set(i); } - engine.syncTranslog(); + engine.translogManager().syncTranslog(); engine.flushAndClose(); } try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity(), true)) { final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings()); - readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong()); + readOnlyEngine.translogManager() + .recoverFromTranslog(translogHandler, readOnlyEngine.getProcessedLocalCheckpoint(), randomNonNegativeLong(), () -> {}); assertThat(translogHandler.appliedOperations(), equalTo(0L)); } @@ -324,23 +326,26 @@ public void testTranslogStats() throws IOException { } assertThat( - engine.getTranslogStats().estimatedNumberOfOperations(), + engine.translogManager().getTranslogStats().estimatedNumberOfOperations(), equalTo(softDeletesEnabled ? uncommittedDocs : numDocs) ); - assertThat(engine.getTranslogStats().getUncommittedOperations(), equalTo(uncommittedDocs)); - assertThat(engine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L)); - assertThat(engine.getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L)); - assertThat(engine.getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L)); + assertThat(engine.translogManager().getTranslogStats().getUncommittedOperations(), equalTo(uncommittedDocs)); + assertThat(engine.translogManager().getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L)); + assertThat(engine.translogManager().getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L)); + assertThat(engine.translogManager().getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L)); engine.flush(true, true); } try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity(), true)) { - assertThat(readOnlyEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(softDeletesEnabled ? 0 : numDocs)); - assertThat(readOnlyEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); - assertThat(readOnlyEngine.getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L)); - assertThat(readOnlyEngine.getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L)); - assertThat(readOnlyEngine.getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L)); + assertThat( + readOnlyEngine.translogManager().getTranslogStats().estimatedNumberOfOperations(), + equalTo(softDeletesEnabled ? 0 : numDocs) + ); + assertThat(readOnlyEngine.translogManager().getTranslogStats().getUncommittedOperations(), equalTo(0)); + assertThat(readOnlyEngine.translogManager().getTranslogStats().getTranslogSizeInBytes(), greaterThan(0L)); + assertThat(readOnlyEngine.translogManager().getTranslogStats().getUncommittedSizeInBytes(), greaterThan(0L)); + assertThat(readOnlyEngine.translogManager().getTranslogStats().getEarliestLastModifiedAge(), greaterThan(0L)); } } } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 49d0c089f072b..ce58e09c8358c 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -86,6 +86,7 @@ import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.xcontent.NamedXContentRegistry; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentFactory; @@ -128,9 +129,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreStats; import org.opensearch.index.store.StoreUtils; -import org.opensearch.index.translog.TestTranslog; -import org.opensearch.index.translog.Translog; -import org.opensearch.index.translog.TranslogStats; +import org.opensearch.index.translog.*; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -174,6 +173,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongFunction; @@ -2192,7 +2192,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { long primaryTerm = shard.getOperationPrimaryTerm(); shard.advanceMaxSeqNoOfUpdatesOrDeletes(1); // manually advance msu for this delete shard.applyDeleteOperationOnReplica(1, primaryTerm, 2, "id"); - shard.getEngine().rollTranslogGeneration(); // isolate the delete in it's own generation + shard.getEngine().translogManager().rollTranslogGeneration(); // isolate the delete in it's own generation shard.applyIndexOperationOnReplica( 0, primaryTerm, @@ -2240,7 +2240,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { replayedOps = 3; } else { if (randomBoolean()) { - shard.getEngine().rollTranslogGeneration(); + shard.getEngine().translogManager().rollTranslogGeneration(); } translogOps = 5; replayedOps = 5; @@ -2513,7 +2513,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception { ); flushShard(shard); assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1")); - shard.getEngine().rollTranslogGeneration(); + shard.getEngine().translogManager().rollTranslogGeneration(); shard.markSeqNoAsNoop(1, primaryTerm, "test"); shard.applyIndexOperationOnReplica( 2, @@ -4109,15 +4109,38 @@ public void testCloseShardWhileResettingEngine() throws Exception { CountDownLatch closeDoneLatch = new CountDownLatch(1); IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) { @Override - public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) - throws IOException { - readyToCloseLatch.countDown(); + public TranslogManager translogManager() { try { - closeDoneLatch.await(); - } catch (InterruptedException e) { - throw new AssertionError(e); + return new InternalTranslogManager( + config, + null, + new ReleasableLock(new ReentrantReadWriteLock().readLock()), + null, + null, + null, + () -> {}, + (str, ex) -> {}, + null + ) { + @Override + public void recoverFromTranslog( + TranslogRecoveryRunner translogRecoveryRunner, + long localCheckpoint, + long recoverUpToSeqNo, + Runnable flush + ) throws IOException { + readyToCloseLatch.countDown(); + try { + closeDoneLatch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + super.recoverFromTranslog(translogRecoveryRunner, localCheckpoint, recoverUpToSeqNo, flush); + } + }; + } catch (IOException ex) { + throw new RuntimeException(ex); } - return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo); } }); @@ -4168,16 +4191,38 @@ public void testSnapshotWhileResettingEngine() throws Exception { CountDownLatch snapshotDoneLatch = new CountDownLatch(1); IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) { @Override - public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) - throws IOException { - InternalEngine engine = super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo); - readyToSnapshotLatch.countDown(); + public TranslogManager translogManager() { try { - snapshotDoneLatch.await(); - } catch (InterruptedException e) { - throw new AssertionError(e); + return new InternalTranslogManager( + config, + null, + new ReleasableLock(new ReentrantReadWriteLock().readLock()), + null, + null, + null, + () -> {}, + (str, ex) -> {}, + null + ) { + @Override + public void recoverFromTranslog( + TranslogRecoveryRunner translogRecoveryRunner, + long localCheckpoint, + long recoverUpToSeqNo, + Runnable flush + ) throws IOException { + super.recoverFromTranslog(translogRecoveryRunner, localCheckpoint, recoverUpToSeqNo, flush); + readyToSnapshotLatch.countDown(); + try { + snapshotDoneLatch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + }; + } catch (IOException ex) { + throw new RuntimeException(ex); } - return engine; } }); diff --git a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java index eea316d9a9370..cc8cf1e48b03d 100644 --- a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java @@ -173,8 +173,9 @@ public void onFailedEngine(String reason, @Nullable Exception e) { EngineTestCase.tombstoneDocSupplier() ); engine = new InternalEngine(config); - engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); - listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); + engine.translogManager() + .recoverFromTranslog((s) -> 0, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE, () -> engine.flush(false, true)); + listeners.setCurrentRefreshLocationSupplier(engine.translogManager()::getTranslogLastWriteLocation); } @After diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index 66c697d83510b..4384065925658 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -328,10 +328,10 @@ public void tearDown() throws Exception { super.tearDown(); try { if (engine != null && engine.isClosed.get() == false) { - assertEngineCleanedUp(engine, engine.getTranslog()); + assertEngineCleanedUp(engine, engine.translogManager().getTranslog(true)); } if (replicaEngine != null && replicaEngine.isClosed.get() == false) { - assertEngineCleanedUp(replicaEngine, replicaEngine.getTranslog()); + assertEngineCleanedUp(replicaEngine, replicaEngine.translogManager().getTranslog(true)); } } finally { IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool)); @@ -662,7 +662,8 @@ protected InternalEngine createEngine( } InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config); - internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + internalEngine.translogManager().recoverFromTranslog(translogHandler, internalEngine.getProcessedLocalCheckpoint(), Long.MAX_VALUE, + () -> engine.flush(false, true)); return internalEngine; } @@ -1479,7 +1480,7 @@ public static MapperService createMapperService() throws IOException { public static Translog getTranslog(Engine engine) { assert engine instanceof InternalEngine : "only InternalEngines have translogs, got: " + engine.getClass(); InternalEngine internalEngine = (InternalEngine) engine; - return internalEngine.getTranslog(); + return internalEngine.translogManager().getTranslog(true); } /** diff --git a/test/framework/src/main/java/org/opensearch/index/engine/TranslogHandler.java b/test/framework/src/main/java/org/opensearch/index/engine/TranslogHandler.java index e1f2357aa2400..e4895746dcc6b 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/TranslogHandler.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/TranslogHandler.java @@ -50,6 +50,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.TranslogRecoveryRunner; import org.opensearch.indices.IndicesModule; import org.opensearch.indices.mapper.MapperRegistry; @@ -61,7 +62,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; -public class TranslogHandler implements Engine.TranslogRecoveryRunner { +public class TranslogHandler implements TranslogRecoveryRunner { private final MapperService mapperService; @@ -112,15 +113,15 @@ private void applyOperation(Engine engine, Engine.Operation operation) throws IO } @Override - public int run(Engine engine, Translog.Snapshot snapshot) throws IOException { + public int run(Translog.Snapshot snapshot) throws IOException { int opsRecovered = 0; Translog.Operation operation; while ((operation = snapshot.next()) != null) { - applyOperation(engine, convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY)); + //applyOperation(convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY)); opsRecovered++; appliedOperations.incrementAndGet(); } - engine.syncTranslog(); + //engine.syncTranslog(); return opsRecovered; }