Skip to content

Commit

Permalink
Test case fix ups
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <[email protected]>
  • Loading branch information
Bukhtawar committed Jun 2, 2022
1 parent 14187a7 commit d33cc03
Show file tree
Hide file tree
Showing 20 changed files with 339 additions and 332 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

package org.opensearch.index.engine;

import java.io.IOException;

/**
* Simple Engine Factory
*
Expand All @@ -39,6 +41,6 @@
@FunctionalInterface
public interface EngineFactory {

Engine newReadWriteEngine(EngineConfig config);
Engine newReadWriteEngine(EngineConfig config) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,13 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.OpenSearchMergePolicy;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.*;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.InternalTranslogManager;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.index.translog.listener.CompositeTranslogEventListener;
import org.opensearch.search.suggest.completion.CompletionStats;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -212,15 +218,24 @@ public class InternalEngine extends Engine {
private volatile String forceMergeUUID;

public InternalEngine(EngineConfig engineConfig) {
this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new);
this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new, TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER);
}

public InternalEngine(EngineConfig engineConfig, TranslogEventListener translogEventListener) {
this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new, translogEventListener);
}

@Override
public TranslogManager translogManager() {
return translogManager;
}

InternalEngine(EngineConfig engineConfig, int maxDocs, BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
InternalEngine(
EngineConfig engineConfig,
int maxDocs,
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier,
TranslogEventListener translogEventListener
) {
super(engineConfig);
this.maxDocs = maxDocs;
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
Expand All @@ -240,24 +255,25 @@ public TranslogManager translogManager() {
store.trimUnsafeCommits(engineConfig.getTranslogConfig().getTranslogPath());
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
TranslogEventListener engineEventListener = new TranslogEventListener() {
@Override
public void onTranslogSync() {
revisitIndexDeletionPolicyOnTranslogSynced();
}

@Override
public void onTranslogRecovery() {
flush(false, true);
translogManager.trimUnreferencedTranslogFiles();
}
};
translogManager = new InternalTranslogManager(
engineConfig,
shardId,
readLock,
this::getLocalCheckpointTracker,
translogUUID,
new TranslogManager.TranslogEventListener() {
@Override
public void onTranslogSync() {
revisitIndexDeletionPolicyOnTranslogSynced();
}

@Override
public void onTranslogRecovery() {
flush(false, true);
translogManager.trimUnreferencedTranslogFiles();
}
},
new CompositeTranslogEventListener(Arrays.asList(engineEventListener, translogEventListener)),
() -> ensureOpen(null),
this::failEngine,
this::failOnTragicEvent
Expand Down Expand Up @@ -286,6 +302,8 @@ public void onTranslogRecovery() {
} else {
throw e;
}
} catch (Exception e) {
throw e;
}
externalReaderManager = createReaderManager(new RefreshWarmerListener(logger, isClosed, engineConfig));
internalReaderManager = externalReaderManager.internalReaderManager;
Expand Down Expand Up @@ -319,13 +337,11 @@ public void onTranslogRecovery() {
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(
writer,
translogManager.getTranslog(false),
internalReaderManager,
externalReaderManager,
scheduler
);
Translog translog = null;
if (translogManager != null) {
translog = translogManager.getTranslog(false);
}
IOUtils.closeWhileHandlingException(writer, translog, internalReaderManager, externalReaderManager, scheduler);
if (isClosed.get() == false) {
// failure we need to dec the store reference
store.decRef();
Expand Down Expand Up @@ -486,14 +502,14 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {
}
}

private void revisitIndexDeletionPolicyOnTranslogSynced() {
public void revisitIndexDeletionPolicyOnTranslogSynced() {
try {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
indexWriter.deleteUnusedFiles();
}
translogManager.getTranslog(false).trimUnreferencedReaders();
} catch (IOException ex) {
throw new RuntimeException();
throw new RuntimeException("Failed to trim unreferenced readers", ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.WriteOnlyTranslogManager;
import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.search.suggest.completion.CompletionStats;

import java.io.Closeable;
Expand Down Expand Up @@ -74,7 +75,7 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
readLock,
this::getLocalCheckpointTracker,
translogUUID,
TranslogManager.TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER,
TranslogEventListener.NOOP_TRANSLOG_EVENT_LISTENER,
() -> ensureOpen(null),
this::failEngine,
(ex) -> null
Expand Down
18 changes: 3 additions & 15 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1689,11 +1689,7 @@ public long recoverLocallyUpToGlobalCheckpoint() {
};
innerOpenEngineAndTranslog(() -> globalCheckpoint);
getEngine().translogManager()
.recoverFromTranslog(
translogRecoveryRunner,
getEngine().getProcessedLocalCheckpoint(),
globalCheckpoint
);
.recoverFromTranslog(translogRecoveryRunner, getEngine().getProcessedLocalCheckpoint(), globalCheckpoint);
logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
} finally {
synchronized (engineMutex) {
Expand Down Expand Up @@ -1875,11 +1871,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
getEngine().translogManager()
.recoverFromTranslog(
translogRecoveryRunner,
getEngine().getProcessedLocalCheckpoint(),
Long.MAX_VALUE
);
.recoverFromTranslog(translogRecoveryRunner, getEngine().getProcessedLocalCheckpoint(), Long.MAX_VALUE);
}

/**
Expand Down Expand Up @@ -3957,11 +3949,7 @@ public void close() throws IOException {
);
newEngineReference.get()
.translogManager()
.recoverFromTranslog(
translogRunner,
newEngineReference.get().getProcessedLocalCheckpoint(),
globalCheckpoint
);
.recoverFromTranslog(translogRunner, newEngineReference.get().getProcessedLocalCheckpoint(), globalCheckpoint);
newEngineReference.get().refresh("reset_engine");
synchronized (engineMutex) {
verifyNotClosed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.listener.TranslogEventListener;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -30,7 +31,7 @@
* The {@link TranslogManager} implementation capable of orchestrating all {@link Translog} operations while
* interfacing with the {@link org.opensearch.index.engine.InternalEngine}
*/
public class InternalTranslogManager extends TranslogManager {
public class InternalTranslogManager implements TranslogManager {

private final ReleasableLock readLock;
private final Runnable ensureOpen;
Expand All @@ -39,7 +40,7 @@ public class InternalTranslogManager extends TranslogManager {
private final BiConsumer<String, Exception> failEngine;
private final Function<AlreadyClosedException, Boolean> failOnTragicEvent;
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
private final TranslogManager.TranslogEventListener translogEventListener;
private final TranslogEventListener translogEventListener;
private static final Logger logger = LogManager.getLogger(InternalTranslogManager.class);

public InternalTranslogManager(
Expand All @@ -48,7 +49,7 @@ public InternalTranslogManager(
ReleasableLock readLock,
Supplier<LocalCheckpointTracker> localCheckpointTrackerSupplier,
String translogUUID,
TranslogManager.TranslogEventListener translogEventListener,
TranslogEventListener translogEventListener,
Runnable ensureOpen,
BiConsumer<String, Exception> failEngine,
Function<AlreadyClosedException, Boolean> failOnTragicEvent
Expand Down Expand Up @@ -119,11 +120,9 @@ public void rollTranslogGeneration() throws TranslogException {
* @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
*/
@Override
public void recoverFromTranslog(
TranslogRecoveryRunner translogRecoveryRunner,
long localCheckpoint,
long recoverUpToSeqNo
) throws IOException {
public void recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo)
throws IOException {
translogEventListener.onBeginTranslogRecovery();
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen.run();
if (pendingTranslogRecovery.get() == false) {
Expand All @@ -143,11 +142,8 @@ public void recoverFromTranslog(
}
}

private void recoverFromTranslogInternal(
TranslogRecoveryRunner translogRecoveryRunner,
long localCheckpoint,
long recoverUpToSeqNo
) throws IOException {
private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo)
throws IOException {
final int opsRecovered;
if (localCheckpoint < recoverUpToSeqNo) {
try (Translog.Snapshot snapshot = translog.newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.io.IOException;
import java.util.stream.Stream;

public class NoOpTranslogManager extends TranslogManager {
public class NoOpTranslogManager implements TranslogManager {

private final Translog.Snapshot emptyTranslogSnapshot;
private final ReleasableLock readLock;
Expand All @@ -40,11 +40,8 @@ public NoOpTranslogManager(
public void rollTranslogGeneration() throws TranslogException {}

@Override
public void recoverFromTranslog(
TranslogRecoveryRunner translogRecoveryRunner,
long localCheckpoint,
long recoverUpToSeqNo
) throws IOException {
public void recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo)
throws IOException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen.run();
try (Translog.Snapshot snapshot = emptyTranslogSnapshot) {
Expand Down
Loading

0 comments on commit d33cc03

Please sign in to comment.