Skip to content

Commit

Permalink
Test compilation errors fix up
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <[email protected]>
  • Loading branch information
Bukhtawar committed May 31, 2022
1 parent f869f1a commit 72dd53d
Show file tree
Hide file tree
Showing 20 changed files with 603 additions and 282 deletions.
2 changes: 2 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1918,13 +1918,15 @@ public interface Warmer {
* Reverses a previous {@link #activateThrottling} call.
*/
public abstract void deactivateThrottling();

/**
* Fills up the local checkpoints history with no-ops until the local checkpoint
* and the max seen sequence ID are identical.
* @param primaryTerm the shards primary term this engine was created for
* @return the number of no-ops added
*/
public abstract int fillSeqNoGaps(long primaryTerm) throws IOException;

/**
* Tries to prune buffered deletes from the version map.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,17 @@ public TranslogManager translogManager() {
store.trimUnsafeCommits(engineConfig.getTranslogConfig().getTranslogPath());
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
translogManager = new InternalTranslogManager(engineConfig, shardId, readLock, getLocalCheckpointTracker(), translogUUID,
this::revisitIndexDeletionPolicyOnTranslogSynced, () -> ensureOpen(null), this::failEngine, this::failOnTragicEvent);
translogManager = new InternalTranslogManager(
engineConfig,
shardId,
readLock,
getLocalCheckpointTracker(),
translogUUID,
this::revisitIndexDeletionPolicyOnTranslogSynced,
() -> ensureOpen(null),
this::failEngine,
this::failOnTragicEvent
);
this.softDeletesPolicy = newSoftDeletesPolicy();
this.combinedDeletionPolicy = new CombinedDeletionPolicy(
logger,
Expand Down Expand Up @@ -280,7 +289,9 @@ public TranslogManager translogManager() {
}
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint());
this.internalReaderManager.addListener(lastRefreshedCheckpointListener);
maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translogManager.getTranslog(false).getMaxSeqNo()));
maxSeqNoOfUpdatesOrDeletes = new AtomicLong(
SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translogManager.getTranslog(false).getMaxSeqNo())
);
if (localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) {
try (Searcher searcher = acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) {
restoreVersionMapAndCheckpointTracker(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
Expand All @@ -297,7 +308,13 @@ public TranslogManager translogManager() {
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(writer, translogManager.getTranslog(false), internalReaderManager, externalReaderManager, scheduler);
IOUtils.closeWhileHandlingException(
writer,
translogManager.getTranslog(false),
internalReaderManager,
externalReaderManager,
scheduler
);
if (isClosed.get() == false) {
// failure we need to dec the store reference
store.decRef();
Expand Down Expand Up @@ -458,7 +475,7 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {
}
}

private void revisitIndexDeletionPolicyOnTranslogSynced() {
private void revisitIndexDeletionPolicyOnTranslogSynced() {
try {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
indexWriter.deleteUnusedFiles();
Expand Down Expand Up @@ -1643,7 +1660,8 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
}
noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo());
if (noOp.origin().isFromTranslog() == false && noOpResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location location = translogManager.getTranslog(false).add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
final Translog.Location location = translogManager.getTranslog(false)
.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
}
}
Expand Down Expand Up @@ -1744,9 +1762,8 @@ public boolean shouldPeriodicallyFlush() {
final long localCheckpointOfLastCommit = Long.parseLong(
lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
);
final long translogGenerationOfLastCommit = translogManager.getTranslog(false).getMinGenerationForSeqNo(
localCheckpointOfLastCommit + 1
).translogFileGeneration;
final long translogGenerationOfLastCommit = translogManager.getTranslog(false)
.getMinGenerationForSeqNo(localCheckpointOfLastCommit + 1).translogFileGeneration;
final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes();
if (translogManager.getTranslog(false).sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
return false;
Expand All @@ -1766,9 +1783,8 @@ public boolean shouldPeriodicallyFlush() {
*
* This method is to maintain translog only, thus IndexWriter#hasUncommittedChanges condition is not considered.
*/
final long translogGenerationOfNewCommit = translogManager.getTranslog(false).getMinGenerationForSeqNo(
localCheckpointTracker.getProcessedCheckpoint() + 1
).translogFileGeneration;
final long translogGenerationOfNewCommit = translogManager.getTranslog(false)
.getMinGenerationForSeqNo(localCheckpointTracker.getProcessedCheckpoint() + 1).translogFileGeneration;
return translogGenerationOfLastCommit < translogGenerationOfNewCommit
|| localCheckpointTracker.getProcessedCheckpoint() == localCheckpointTracker.getMaxSeqNo();
}
Expand Down Expand Up @@ -1807,7 +1823,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
|| getProcessedLocalCheckpoint() > Long.parseLong(
lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
)) {
translogManager.ensureCanFlush(shardId);
translogManager.ensureCanFlush();
try {
translogManager.getTranslog(false).rollGeneration();
logger.trace("starting commit for flush; commitTranslog=true");
Expand Down Expand Up @@ -2041,16 +2057,17 @@ private boolean failOnTragicEvent(AlreadyClosedException ex) {
}
failEngine("already closed by tragic event on the index writer", tragicException);
engineFailed = true;
} else if (translogManager.getTranslog(false).isOpen() == false && translogManager.getTranslog(false).getTragicException() != null) {
failEngine("already closed by tragic event on the translog", translogManager.getTranslog(false).getTragicException());
engineFailed = true;
} else if (failedEngine.get() == null && isClosed.get() == false) { // we are closed but the engine is not failed yet?
// this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by
// a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error
throw new AssertionError("Unexpected AlreadyClosedException", ex);
} else {
engineFailed = false;
}
} else if (translogManager.getTranslog(false).isOpen() == false
&& translogManager.getTranslog(false).getTragicException() != null) {
failEngine("already closed by tragic event on the translog", translogManager.getTranslog(false).getTragicException());
engineFailed = true;
} else if (failedEngine.get() == null && isClosed.get() == false) { // we are closed but the engine is not failed yet?
// this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by
// a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error
throw new AssertionError("Unexpected AlreadyClosedException", ex);
} else {
engineFailed = false;
}
return engineFailed;
}

Expand All @@ -2067,12 +2084,13 @@ protected boolean maybeFailEngine(String source, Exception e) {
return failOnTragicEvent((AlreadyClosedException) e);
} else if (e != null
&& ((indexWriter.isOpen() == false && indexWriter.getTragicException() == e)
|| (translogManager.getTranslog(false).isOpen() == false && translogManager.getTranslog(false).getTragicException() == e))) {
// this spot on - we are handling the tragic event exception here so we have to fail the engine
// right away
failEngine(source, e);
return true;
}
|| (translogManager.getTranslog(false).isOpen() == false
&& translogManager.getTranslog(false).getTragicException() == e))) {
// this spot on - we are handling the tragic event exception here so we have to fail the engine
// right away
failEngine(source, e);
return true;
}
return false;
}

Expand Down Expand Up @@ -2422,7 +2440,7 @@ protected void doRun() throws Exception {
* @param translog the translog
*/
protected void commitIndexWriter(final IndexWriter writer, final Translog translog) throws IOException {
translogManager.ensureCanFlush(shardId);
translogManager.ensureCanFlush();
try {
final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
writer.setLiveCommitData(() -> {
Expand Down Expand Up @@ -2498,7 +2516,7 @@ public MergeStats getMergeStats() {
return mergeScheduler.stats();
}

LocalCheckpointTracker getLocalCheckpointTracker() {
public LocalCheckpointTracker getLocalCheckpointTracker() {
return localCheckpointTracker;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.*;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.WriteOnlyTranslogManager;
import org.opensearch.search.suggest.completion.CompletionStats;

import java.io.Closeable;
Expand All @@ -33,8 +36,6 @@
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

/**
* This is an {@link Engine} implementation intended for replica shards when Segment Replication
Expand Down Expand Up @@ -67,8 +68,17 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
this.readerManager.addListener(completionStatsCache);
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
translogManager = new WriteOnlyTranslogManager(engineConfig, shardId, readLock, getLocalCheckpointTracker(), translogUUID,
() -> {}, () -> ensureOpen(null), this::failEngine, (ex) -> null);
translogManager = new WriteOnlyTranslogManager(
engineConfig,
shardId,
readLock,
getLocalCheckpointTracker(),
translogUUID,
() -> {},
() -> ensureOpen(null),
this::failEngine,
(ex) -> null
);

} catch (IOException e) {
IOUtils.closeWhileHandlingException(store::decRef, readerManager);
Expand Down Expand Up @@ -147,7 +157,8 @@ public DeleteResult delete(Delete delete) throws IOException {
public NoOpResult noOp(NoOp noOp) throws IOException {
ensureOpen();
NoOpResult noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo());
final Translog.Location location = translogManager.getTranslog(false).add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
final Translog.Location location = translogManager.getTranslog(false)
.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
noOpResult.setTook(System.nanoTime() - noOp.startTime());
noOpResult.freeze();
Expand Down
11 changes: 8 additions & 3 deletions server/src/main/java/org/opensearch/index/engine/NoOpEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.DocsStats;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.*;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.NoOpTranslogManager;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogException;
import org.opensearch.index.translog.TranslogManager;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -151,8 +157,7 @@ public TranslogManager translogManager() {
try {
return new NoOpTranslogManager(shardId, readLock, this::ensureOpen, this.translogStats, new Translog.Snapshot() {
@Override
public void close() {
}
public void close() {}

@Override
public int totalOperations() {
Expand Down
Loading

0 comments on commit 72dd53d

Please sign in to comment.