Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Integrate Engine with decoupled Translog interfaces #3822

Merged
merged 19 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@
import org.opensearch.index.shard.DocsStats;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.search.suggest.completion.CompletionStats;

import java.io.Closeable;
Expand Down Expand Up @@ -899,6 +902,22 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl
return stats;
}

protected TranslogDeletionPolicy getTranslogDeletionPolicy(EngineConfig engineConfig) {
TranslogDeletionPolicy customTranslogDeletionPolicy = null;
if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) {
customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory()
.create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier());
}
return Objects.requireNonNullElseGet(
customTranslogDeletionPolicy,
() -> new DefaultTranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
)
);
}

protected void fillSegmentStats(SegmentReader segmentReader, boolean includeSegmentFileSizes, SegmentsStats stats) {
stats.add(1);
if (includeSegmentFileSizes) {
Expand Down
216 changes: 117 additions & 99 deletions server/src/main/java/org/opensearch/index/engine/InternalEngine.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogException;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.translog.WriteOnlyTranslogManager;
import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.search.suggest.completion.CompletionStats;

import java.io.Closeable;
Expand All @@ -49,18 +52,19 @@
*
* @opensearch.internal
*/
public class NRTReplicationEngine extends Engine {
public class NRTReplicationEngine extends Engine implements LifecycleAware {

private volatile SegmentInfos lastCommittedSegmentInfos;
private final NRTReplicationReaderManager readerManager;
private final CompletionStatsCache completionStatsCache;
private final LocalCheckpointTracker localCheckpointTracker;
private final Translog translog;
private final TranslogManager translogManager;

public NRTReplicationEngine(EngineConfig engineConfig) {
super(engineConfig);
store.incRef();
NRTReplicationReaderManager readerManager = null;
TranslogManager translogManagerRef = null;
try {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId));
Expand All @@ -71,18 +75,49 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
this.readerManager = readerManager;
this.readerManager.addListener(completionStatsCache);
this.translog = openTranslog(
engineConfig,
getTranslogDeletionPolicy(engineConfig),
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
translogManagerRef = new WriteOnlyTranslogManager(
engineConfig.getTranslogConfig(),
engineConfig.getPrimaryTermSupplier(),
engineConfig.getGlobalCheckpointSupplier(),
localCheckpointTracker::markSeqNoAsPersisted
getTranslogDeletionPolicy(engineConfig),
shardId,
readLock,
this::getLocalCheckpointTracker,
translogUUID,
new TranslogEventListener() {
@Override
public void onFailure(String reason, Exception ex) {
failEngine(reason, ex);
}

@Override
public void onAfterTranslogSync() {
try {
translogManager.getTranslog().trimUnreferencedReaders();
} catch (IOException ex) {
throw new TranslogException(shardId, "failed to trim unreferenced translog readers", ex);
}
}
},
this
);
this.translogManager = translogManagerRef;
} catch (IOException e) {
IOUtils.closeWhileHandlingException(store::decRef, readerManager);
Translog translog = null;
if (translogManagerRef != null) {
translog = translogManagerRef.getTranslog();
}
IOUtils.closeWhileHandlingException(store::decRef, readerManager, translog);
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
}
}

public TranslogManager translogManager() {
return translogManager;
}

public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
// Update the current infos reference on the Engine's reader.
readerManager.updateSegments(infos);
Expand All @@ -91,7 +126,7 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
// generation. We can still refresh with incoming SegmentInfos that are not part of a commit point.
if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) {
this.lastCommittedSegmentInfos = infos;
rollTranslogGeneration();
translogManager.rollTranslogGeneration();
}
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}
Expand Down Expand Up @@ -125,7 +160,7 @@ public boolean isThrottled() {
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
translog.trimOperations(belowTerm, aboveSeqNo);
translogManager.getTranslog().trimOperations(belowTerm, aboveSeqNo);
} catch (Exception e) {
try {
failEngine("translog operations trimming failed", e);
Expand All @@ -140,7 +175,7 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws E
public IndexResult index(Index index) throws IOException {
ensureOpen();
IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false);
final Translog.Location location = translog.add(new Translog.Index(index, indexResult));
final Translog.Location location = translogManager.getTranslog().add(new Translog.Index(index, indexResult));
indexResult.setTranslogLocation(location);
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
Expand All @@ -152,7 +187,7 @@ public IndexResult index(Index index) throws IOException {
public DeleteResult delete(Delete delete) throws IOException {
ensureOpen();
DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true);
final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult));
final Translog.Location location = translogManager.getTranslog().add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
Expand All @@ -164,7 +199,8 @@ public DeleteResult delete(Delete delete) throws IOException {
public NoOpResult noOp(NoOp noOp) throws IOException {
ensureOpen();
NoOpResult noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo());
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
final Translog.Location location = translogManager.getTranslog()
.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
noOpResult.setTook(System.nanoTime() - noOp.startTime());
noOpResult.freeze();
Expand All @@ -184,22 +220,22 @@ protected ReferenceManager<OpenSearchDirectoryReader> getReferenceManager(Search

@Override
public boolean isTranslogSyncNeeded() {
return translog.syncNeeded();
return translogManager.getTranslog().syncNeeded();
}

@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException {
boolean synced = translog.ensureSynced(locations);
boolean synced = translogManager.getTranslog().ensureSynced(locations);
if (synced) {
translog.trimUnreferencedReaders();
translogManager.getTranslog().trimUnreferencedReaders();
}
return synced;
}

@Override
public void syncTranslog() throws IOException {
translog.sync();
translog.trimUnreferencedReaders();
translogManager.getTranslog().sync();
translogManager.getTranslog().trimUnreferencedReaders();
}

@Override
Expand Down Expand Up @@ -242,12 +278,12 @@ public long getMinRetainedSeqNo() {

@Override
public TranslogStats getTranslogStats() {
return translog.stats();
return translogManager.getTranslog().stats();
}

@Override
public Translog.Location getTranslogLastWriteLocation() {
return translog.getLastWriteLocation();
return translogManager.getTranslog().getLastWriteLocation();
}

@Override
Expand All @@ -266,7 +302,7 @@ public SeqNoStats getSeqNoStats(long globalCheckpoint) {

@Override
public long getLastSyncedGlobalCheckpoint() {
return translog.getLastSyncedGlobalCheckpoint();
return translogManager.getTranslog().getLastSyncedGlobalCheckpoint();
}

@Override
Expand Down Expand Up @@ -302,7 +338,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
public void trimUnreferencedTranslogFiles() throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
translog.trimUnreferencedReaders();
translogManager.getTranslog().trimUnreferencedReaders();
} catch (Exception e) {
try {
failEngine("translog trimming failed", e);
Expand All @@ -315,15 +351,15 @@ public void trimUnreferencedTranslogFiles() throws EngineException {

@Override
public boolean shouldRollTranslogGeneration() {
return translog.shouldRollGeneration();
return translogManager.getTranslog().shouldRollGeneration();
}

@Override
public void rollTranslogGeneration() throws EngineException {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
translog.rollGeneration();
translog.trimUnreferencedReaders();
translogManager.getTranslog().rollGeneration();
translogManager.getTranslog().trimUnreferencedReaders();
} catch (Exception e) {
try {
failEngine("translog trimming failed", e);
Expand Down Expand Up @@ -370,7 +406,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
IOUtils.close(readerManager, translog, store::decRef);
IOUtils.close(readerManager, translogManager().getTranslog(), store::decRef);
} catch (Exception e) {
logger.warn("failed to close engine", e);
} finally {
Expand Down Expand Up @@ -421,12 +457,12 @@ public long getMaxSeqNoOfUpdatesOrDeletes() {
public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {}

public Translog getTranslog() {
return translog;
return translogManager.getTranslog();
}

@Override
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog().getDeletionPolicy();
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
}
Expand Down Expand Up @@ -469,21 +505,4 @@ private Translog openTranslog(
persistedSequenceNumberConsumer
);
}

private TranslogDeletionPolicy getTranslogDeletionPolicy(EngineConfig engineConfig) {
TranslogDeletionPolicy customTranslogDeletionPolicy = null;
if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) {
customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory()
.create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier());
}
return Objects.requireNonNullElseGet(
customTranslogDeletionPolicy,
() -> new DefaultTranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
)
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@
import org.opensearch.index.shard.DocsStats;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogException;
import org.opensearch.index.translog.NoOpTranslogManager;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicy;

Expand Down Expand Up @@ -149,6 +152,85 @@ public DocsStats docStats() {
return docsStats;
}

/**
* This implementation will trim existing translog files using a {@link TranslogDeletionPolicy}
* that retains nothing but the last translog generation from safe commit.
*/
public TranslogManager translogManager() {
try {
return new NoOpTranslogManager(shardId, readLock, this::ensureOpen, this.translogStats, new Translog.Snapshot() {
@Override
public void close() {}

@Override
public int totalOperations() {
return 0;
}

@Override
public Translog.Operation next() {
return null;
}

}) {
/**
* This implementation will trim existing translog files using a {@link TranslogDeletionPolicy}
* that retains nothing but the last translog generation from safe commit.
*/
@Override
public void trimUnreferencedTranslogFiles() throws TranslogException {
final Store store = engineConfig.getStore();
store.incRef();
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
if (commits.size() == 1 && translogStats.getTranslogSizeInBytes() > translogStats.getUncommittedSizeInBytes()) {
final Map<String, String> commitUserData = getLastCommittedSegmentInfos().getUserData();
final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY);
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final long localCheckpoint = Long.parseLong(commitUserData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final TranslogDeletionPolicy translogDeletionPolicy = new DefaultTranslogDeletionPolicy(-1, -1, 0);
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (
Translog translog = new Translog(
translogConfig,
translogUuid,
translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(),
engineConfig.getPrimaryTermSupplier(),
seqNo -> {}
)
) {
translog.trimUnreferencedReaders();
// refresh the translog stats
translogStats = translog.stats();
assert translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed "
+ " current gen "
+ translog.currentFileGeneration()
+ " != min gen "
+ translog.getMinFileGeneration();
}
}
} catch (final Exception e) {
try {
failEngine("translog trimming failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new EngineException(shardId, "failed to trim translog", e);
} finally {
store.decRef();
}
}
};
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}

/**
* This implementation will trim existing translog files using a {@link TranslogDeletionPolicy}
* that retains nothing but the last translog generation from safe commit.
Expand Down
Loading