Skip to content

Commit

Permalink
Decoupling Translog changes
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <[email protected]>
  • Loading branch information
Bukhtawar committed May 30, 2022
1 parent e9b19a0 commit f869f1a
Show file tree
Hide file tree
Showing 11 changed files with 759 additions and 636 deletions.
85 changes: 5 additions & 80 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.search.suggest.completion.CompletionStats;

import java.io.Closeable;
Expand All @@ -107,7 +107,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;

import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand Down Expand Up @@ -167,6 +166,8 @@ public final EngineConfig config() {
return engineConfig;
}

public abstract TranslogManager translogManager();

protected abstract SegmentInfos getLastCommittedSegmentInfos();

/**
Expand Down Expand Up @@ -330,12 +331,6 @@ boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and te
*/
public abstract boolean isThrottled();

/**
* Trims translog for terms below <code>belowTerm</code> and seq# above <code>aboveSeqNo</code>
* @see Translog#trimOperations(long, long)
*/
public abstract void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException;

/**
* A Lock implementation that always allows the lock to be acquired
*
Expand Down Expand Up @@ -768,18 +763,6 @@ public enum SearcherScope {
INTERNAL
}

/**
* Checks if the underlying storage sync is required.
*/
public abstract boolean isTranslogSyncNeeded();

/**
* Ensures that all locations in the given stream have been written to the underlying storage.
*/
public abstract boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException;

public abstract void syncTranslog() throws IOException;

/**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/
Expand Down Expand Up @@ -815,13 +798,6 @@ public abstract Translog.Snapshot newChangesSnapshot(
*/
public abstract long getMinRetainedSeqNo();

public abstract TranslogStats getTranslogStats();

/**
* Returns the last location that the translog of this engine has written into.
*/
public abstract Translog.Location getTranslogLastWriteLocation();

protected final void ensureOpen(Exception suppressed) {
if (isClosed.get()) {
AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
Expand Down Expand Up @@ -856,6 +832,8 @@ public final CommitStats commitStats() {
*/
public abstract long getLastSyncedGlobalCheckpoint();

public abstract long getProcessedLocalCheckpoint();

/**
* Global stats on segments.
*/
Expand Down Expand Up @@ -1130,25 +1108,6 @@ public final void flush() throws EngineException {
flush(false, false);
}

/**
* checks and removes translog files that no longer need to be retained. See
* {@link org.opensearch.index.translog.TranslogDeletionPolicy} for details
*/
public abstract void trimUnreferencedTranslogFiles() throws EngineException;

/**
* Tests whether or not the translog generation should be rolled to a new generation.
* This test is based on the size of the current generation compared to the configured generation threshold size.
*
* @return {@code true} if the current generation should be rolled to a new generation
*/
public abstract boolean shouldRollTranslogGeneration();

/**
* Rolls the translog generation and cleans unneeded.
*/
public abstract void rollTranslogGeneration() throws EngineException;

/**
* Triggers a forced merge on this engine
*/
Expand Down Expand Up @@ -1959,37 +1918,13 @@ public interface Warmer {
* Reverses a previous {@link #activateThrottling} call.
*/
public abstract void deactivateThrottling();

/**
* This method replays translog to restore the Lucene index which might be reverted previously.
* This ensures that all acknowledged writes are restored correctly when this engine is promoted.
*
* @return the number of translog operations have been recovered
*/
public abstract int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException;

/**
* Fills up the local checkpoints history with no-ops until the local checkpoint
* and the max seen sequence ID are identical.
* @param primaryTerm the shards primary term this engine was created for
* @return the number of no-ops added
*/
public abstract int fillSeqNoGaps(long primaryTerm) throws IOException;

/**
* Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
* This operation will close the engine if the recovery fails.
*
* @param translogRecoveryRunner the translog recovery runner
* @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
*/
public abstract Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException;

/**
* Do not replay translog operations, but make the engine be ready.
*/
public abstract void skipTranslogRecovery();

/**
* Tries to prune buffered deletes from the version map.
*/
Expand All @@ -2010,16 +1945,6 @@ public long getMaxSeenAutoIdTimestamp() {
*/
public abstract void updateMaxUnsafeAutoIdTimestamp(long newTimestamp);

/**
* The runner for translog recovery
*
* @opensearch.internal
*/
@FunctionalInterface
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
}

/**
* Returns the maximum sequence number of either update or delete operations have been processed in this engine
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
Expand Down
Loading

0 comments on commit f869f1a

Please sign in to comment.