Skip to content

Commit

Permalink
Translog file recovery should not rely on lucene commits (#25005)
Browse files Browse the repository at this point in the history
When we open a translog, we rely on the `translog.ckp` file to tell us what the maximum generation file should be and on the information stored in the last lucene commit to know the first file we need to recover. This requires coordination and is currently subject to a race condition: if a node dies after a lucene commit is made but before we remove the translog generations that were unneeded by it, the next time we open the translog we will ignore those files and never delete them (I have added tests for this).

This PR changes the approach to have the translog store both of those numbers in the `translog.ckp`. This means it's more self contained and easier to control. 

This change also decouples the translog recovery logic from the specific commit we're opening. This prepares the ground to fully utilize the deletion policy introduced in #24950 and store more translog data that's needed for Lucene, keep multiple lucene commits around and be free to recover from any of them.
  • Loading branch information
bleskes authored Jun 8, 2017
1 parent ce24331 commit 087f182
Show file tree
Hide file tree
Showing 10 changed files with 356 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,8 @@ private void setLastCommittedTranslogGeneration(List<? extends IndexCommit> comm
public SnapshotDeletionPolicy getIndexDeletionPolicy() {
return indexDeletionPolicy;
}

public TranslogDeletionPolicy getTranslogDeletionPolicy() {
return translogDeletionPolicy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ private void recoverFromTranslogInternal() throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
try {
Translog.Snapshot snapshot = translog.newSnapshot();
final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
Translog.Snapshot snapshot = translog.newSnapshot(translogGen);
opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
Expand All @@ -321,6 +322,8 @@ private void recoverFromTranslogInternal() throws IOException {
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
}
// clean up what's not needed
translog.trimUnreferencedReaders();
}

private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) throws IOException {
Expand Down Expand Up @@ -1772,7 +1775,7 @@ protected void doRun() throws Exception {
* @param syncId the sync flush ID ({@code null} if not committing a synced flush)
* @throws IOException if an I/O exception occurs committing the specfied writer
*/
private void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
ensureCanFlush();
try {
final long localCheckpoint = seqNoService().getLocalCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ final class Checkpoint {
final long minSeqNo;
final long maxSeqNo;
final long globalCheckpoint;
final long minTranslogGeneration;

private static final int INITIAL_VERSION = 1; // start with 1, just to recognize there was some magic serialization logic before
private static final int CURRENT_VERSION = 2; // introduction of global checkpoints
Expand All @@ -58,6 +59,7 @@ final class Checkpoint {
+ Long.BYTES // minimum sequence number, introduced in 6.0.0
+ Long.BYTES // maximum sequence number, introduced in 6.0.0
+ Long.BYTES // global checkpoint, introduced in 6.0.0
+ Long.BYTES // minimum translog generation in the translog - introduced in 6.0.0
+ CodecUtil.footerLength();

// size of 5.0.0 checkpoint
Expand All @@ -76,15 +78,19 @@ final class Checkpoint {
* @param minSeqNo the current minimum sequence number of all operations in the translog
* @param maxSeqNo the current maximum sequence number of all operations in the translog
* @param globalCheckpoint the last-known global checkpoint
* @param minTranslogGeneration the minimum generation referenced by the translog at this moment.
*/
Checkpoint(long offset, int numOps, long generation, long minSeqNo, long maxSeqNo, long globalCheckpoint) {
assert minSeqNo <= maxSeqNo;
Checkpoint(long offset, int numOps, long generation, long minSeqNo, long maxSeqNo, long globalCheckpoint, long minTranslogGeneration) {
assert minSeqNo <= maxSeqNo : "minSeqNo [" + minSeqNo + "] is higher than maxSeqNo [" + maxSeqNo + "]";
assert minTranslogGeneration <= generation :
"minTranslogGen [" + minTranslogGeneration + "] is higher than generation [" + generation + "]";
this.offset = offset;
this.numOps = numOps;
this.generation = generation;
this.minSeqNo = minSeqNo;
this.maxSeqNo = maxSeqNo;
this.globalCheckpoint = globalCheckpoint;
this.minTranslogGeneration = minTranslogGeneration;
}

private void write(DataOutput out) throws IOException {
Expand All @@ -94,24 +100,27 @@ private void write(DataOutput out) throws IOException {
out.writeLong(minSeqNo);
out.writeLong(maxSeqNo);
out.writeLong(globalCheckpoint);
out.writeLong(minTranslogGeneration);
}

static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint) {
static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint,
long minTranslogGeneration) {
final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint);
return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration);
}

static Checkpoint readCheckpointV6_0_0(final DataInput in) throws IOException {
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong(), in.readLong(), in.readLong());
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong(), in.readLong(), in.readLong(), in.readLong());
}

// reads a checksummed checkpoint introduced in ES 5.0.0
static Checkpoint readCheckpointV5_0_0(final DataInput in) throws IOException {
final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
final long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), minSeqNo, maxSeqNo, globalCheckpoint);
final long minTranslogGeneration = -1L;
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration);
}

@Override
Expand All @@ -123,6 +132,7 @@ public String toString() {
", minSeqNo=" + minSeqNo +
", maxSeqNo=" + maxSeqNo +
", globalCheckpoint=" + globalCheckpoint +
", minTranslogGeneration=" + minTranslogGeneration +
'}';
}

Expand Down
Loading

0 comments on commit 087f182

Please sign in to comment.