Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve multiple translog generations #24015

Merged
merged 22 commits into from
Apr 17, 2017
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, Lon
throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist");
}
if (generation.translogUUID == null) {
throw new IndexFormatTooOldException("trasnlog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
}
}
final Translog translog = new Translog(translogConfig, generation, globalCheckpointSupplier);
Expand Down Expand Up @@ -1179,12 +1179,12 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
try {
translog.prepareCommit();
logger.trace("starting commit for flush; commitTranslog=true");
commitIndexWriter(indexWriter, translog, null);
final long committedGeneration = commitIndexWriter(indexWriter, translog, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I so wish we could come up with a better name for this one - I find it confusing every time. How about translogGenForRecovery?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment, this is a new local variable that was never there before. Did you maybe mean to comment on Translog#lastCommittedTranslogFileGeneration which we have discussed the confusing nature of before?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all of them suffer from the same problem. This is just the first one I run into while reviewing. I think I get why it's named like this now (the translog id that was committed into lucene's metadata) but there is much room for ambiguity - is the the maximum generation for which all ops are in lucene (i.e., committed)? I wonder every time and end up double checking the code. If people find this name better, I'm good with keeping it - it's a subjective matter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One naming thing that gets to me is prepareCommit and commit in Translog. I don't think commit commits anything. It trims segments based on a commit in permanent storage. I wonder if it'd make the naming easier to stop using the word "commit" for the thing that the Translog is doing. Then any time you see the word "commit" it is about a lucene commit. If you renamed Translog#commit into Translog#updatePersistedGeneration or something then it'd be more clear that it is reacting to a commit in Lucene. Then maybe the other variable names just fall out of that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to focus on the naming in a small follow-up PR immediately after this PR.

logger.trace("finished commit for flush");
// we need to refresh in order to clear older version values
refresh("version_table_flush");
// after refresh documents can be retrieved from the index so we can now commit the translog
translog.commit();
translog.commit(committedGeneration);
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
}
Expand Down Expand Up @@ -1680,55 +1680,62 @@ protected void doRun() throws Exception {
}
}

private void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
/**
* Commits the specified index writer.
*
* @param writer the index writer to commit
* @param translog the translog
* @param syncId the sync flush ID ({@code null} if not committing a synced flush)
* @return the minimum translog generation for the local checkpoint committed with the specified index writer
* @throws IOException if an I/O exception occurs committing the specfied writer
*/
private long commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
ensureCanFlush();
try {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we bring these local vars back it's easier to reason about the code when it's not inline

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 5f3d2d8.


final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint());
final long localCheckpoint = seqNoService().getLocalCheckpoint();
final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);

writer.setLiveCommitData(() -> {
/*
* The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes
* segments, including the local checkpoint amongst other values. The maximum sequence number is different - we never want
* segments, including the local checkpoint amongst other values. The maximum sequence number is different, we never want
* the maximum sequence number to be less than the last sequence number to go into a Lucene commit, otherwise we run the
* risk of re-using a sequence number for two different documents when restoring from this commit point and subsequently
* writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the max_seq_no to the time of invocation
* of the commit data iterator (which occurs after all documents have been flushed to Lucene).
* writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpoint);
final Map<String, String> commitData = new HashMap<>(5);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration));
commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});

writer.commit();
} catch (Exception ex) {
return translogGeneration.translogFileGeneration;
} catch (final Exception ex) {
try {
failEngine("lucene commit failed", ex);
} catch (Exception inner) {
} catch (final Exception inner) {
ex.addSuppressed(inner);
}
throw ex;
} catch (AssertionError e) {
// IndexWriter throws AssertionError on commit, if asserts are enabled, if any files don't exist, but tests that
// randomly throw FNFE/NSFE can also hit this:
} catch (final AssertionError e) {
/*
* If assertions are enabled, IndexWriter throws AssertionError on commit if any files don't exist, but tests that randomly
* throw FileNotFoundException or NoSuchFileException can also hit this.
*/
if (ExceptionsHelper.stackTrace(e).contains("org.apache.lucene.index.IndexWriter.filesExist")) {
EngineException engineException = new EngineException(shardId, "failed to commit engine", e);
final EngineException engineException = new EngineException(shardId, "failed to commit engine", e);
try {
failEngine("lucene commit failed", engineException);
} catch (Exception inner) {
} catch (final Exception inner) {
engineException.addSuppressed(inner);
}
throw engineException;
Expand Down Expand Up @@ -1812,7 +1819,7 @@ public boolean isRecovering() {
* Gets the commit data from {@link IndexWriter} as a map.
*/
private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter) {
Map<String, String> commitData = new HashMap<>(6);
Map<String, String> commitData = new HashMap<>(5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good eye.

for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
commitData.put(entry.getKey(), entry.getValue());
}
Expand Down
105 changes: 77 additions & 28 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@
* When a translog is opened the checkpoint is use to retrieve the latest translog file generation and subsequently to open the last written file to recovery operations.
* The {@link org.elasticsearch.index.translog.Translog.TranslogGeneration}, given when the translog is opened / constructed is compared against
* the latest generation and all consecutive translog files singe the given generation and the last generation in the checkpoint will be recovered and preserved until the next
* generation is committed using {@link Translog#commit()}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are
* the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit()}. In such a case
* generation is committed using {@link Translog#commit(long)}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are
* the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit(long)}. In such a case
* the currently being committed translog file will not be deleted since it's commit was not successful. Yet, a new/current translog file is already opened at that point such that there is more than
* one translog file present. Such an uncommitted translog file always has a <tt>translog-${gen}.ckp</tt> associated with it which is an fsynced copy of the it's last <tt>translog.ckp</tt> such that in
* disaster recovery last fsynced offsets, number of operation etc. are still preserved.
* </p>
*/
public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable, TwoPhaseCommit {
public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable {

/*
* TODO
Expand Down Expand Up @@ -1347,6 +1347,31 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
out.writeInt((int) checksum);
}

/**
* Gets the minimum generation that could contain any sequence number after the specified sequence number, or the current generation if
* there is no generation that could any such sequence number.
*
* @param seqNo the sequence number
* @return the minimum generation for the sequence number
*/
public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
try (ReleasableLock ignored = writeLock.acquire()) {
/*
* When flushing, the engine will ask the translog for the minimum generation that could contain any sequence number after the
* local checkpoint. Immediately after flushing, there will be no such generation, so this minimum generation in this case will
* be the current translog generation as we do not need any prior generations to have a complete history up to the current local
* checkpoint.
*/
long minTranslogFileGeneration = this.currentFileGeneration();
for (final TranslogReader reader : readers) {
if (seqNo <= reader.getCheckpoint().maxSeqNo) {
minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration());
}
}
return new TranslogGeneration(translogUUID, minTranslogFileGeneration);
}
}

/**
* Roll the current translog generation into a new generation. This does not commit the
* translog.
Expand Down Expand Up @@ -1375,54 +1400,85 @@ public void rollGeneration() throws IOException {
}
}

@Override
public long prepareCommit() throws IOException {
/**
* Prepares a translog commit by setting the current committing generation and rolling the translog generation.
*
* @throws IOException if an I/O exception occurred while rolling the translog generation
*/
public void prepareCommit() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that we don't use the 2phase commit interface anymore we can rename these methods IMO

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, let's do this in a small PR that immediately follows this PR.

try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
if (currentCommittingGeneration != NOT_SET_GENERATION) {
final String message = String.format(
Locale.ROOT,
"already committing a translog with generation [%d]",
currentCommittingGeneration);
final String message =
String.format(Locale.ROOT, "already committing a translog with generation [%d]", currentCommittingGeneration);
throw new IllegalStateException(message);
}
currentCommittingGeneration = current.getGeneration();
rollGeneration();
}
return 0;
}

@Override
public long commit() throws IOException {
/**
* Commits the translog and sets the last committed translog generation to the specified generation. The specified committed generation
* will be used when trimming unreferenced translog generations such that generations from the committed generation will be preserved.
*
* If {@link Translog#prepareCommit()} was not called before calling commit, this method will be invoked too causing the translog
* generation to be rolled.
*
* @param committedGeneration the minimum translog generation to preserve after trimming unreferenced generations
* @throws IOException if an I/O exception occurred preparing the translog commit
*/
public void commit(final long committedGeneration) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this parameter name is confusing given that we have currentCommittingGeneration. See previous suggestion for a name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's focus on naming in a folow-up PR.

try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
if (committedGeneration > current.generation) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would this happen? I am not sure I think we should do this. If the translog is so fragile that we can fail a commit based on a userinput I think the design is broken?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should model this very similar to the IndexWriter where we have a deletion policy which can be a simple long -> long function of some sort. The InternalEngine can control this policy and by default we just use a policy that returns the identity.

Copy link
Member Author

@jasontedor jasontedor Apr 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s1monw It should never happen yet @bleskes asked for hard failures here instead of assertions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes asked for hard failures here instead of assertions.

My rationale was that if we have a bug and we get this wrong, I rather not commit. That said - this is all very murky and I'm good with an assertion if either of you prefer it that way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't like this really. lets compromise on Translog#commit(Translog.TranslogGeneration generation) and you can add an overloaded method for tests that doesn't take any argument.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed e6abaf9.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not pass in a long but rather Translog.TranslogGeneration? this would make it less trappy?

Copy link
Member Author

@jasontedor jasontedor Apr 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this suggestio. I'm not sure if it makes it less trappy (I'm also not sure if it's trappy to begin with). I do think that adding the extra indirection does make the API awkward though. A commit from the engine here is really about telling the translog to keep all generations on of after the provided generation number so it seems clearer to stay the the API being driven off the generation number? Do you feel strongly about this would be an improvement @s1monw?

final String message = String.format(
Locale.ROOT,
"tried to commit generation [%d] later than the current generation [%d]",
committedGeneration,
current.generation);
throw new IllegalStateException(message);
}
final Long min = readers.stream().map(TranslogReader::getGeneration).min(Long::compareTo).orElse(Long.MIN_VALUE);
if (committedGeneration < min) {
final String message = String.format(
Locale.ROOT,
"tried to commit generation [%d] before minimum generation [%d]",
committedGeneration,
min);
throw new IllegalStateException(message);
}
if (currentCommittingGeneration == NOT_SET_GENERATION) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this makes sense anymore? should we just forbid it with an exception? or better yet require that the generation given to use is <= current and be done with it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying that we should require preparing a commit (invoking Translog#prepareCommit) before committing (invoking Translog#commit)?

prepareCommit();
}
assert currentCommittingGeneration != NOT_SET_GENERATION;
assert readers.stream().anyMatch(r -> r.getGeneration() == currentCommittingGeneration)
: "readers missing committing generation [" + currentCommittingGeneration + "]";
// set the last committed generation otherwise old files will not be cleaned up
lastCommittedTranslogFileGeneration = currentCommittingGeneration + 1;
lastCommittedTranslogFileGeneration = committedGeneration;
currentCommittingGeneration = NOT_SET_GENERATION;
trimUnreferencedReaders();
}
return 0;
}

/**
* Trims unreferenced translog generations. The guarantee here is that translog generations will be preserved for all outstanding views
* and from the last committed translog generation defined by {@link Translog#lastCommittedTranslogFileGeneration}.
*/
void trimUnreferencedReaders() {
try (ReleasableLock ignored = writeLock.acquire()) {
if (closed.get()) {
// we're shutdown potentially on some tragic event - don't delete anything
// we're shutdown potentially on some tragic event, don't delete anything
return;
}
long minReferencedGen = outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE);
minReferencedGen = Math.min(lastCommittedTranslogFileGeneration, minReferencedGen);
final long finalMinReferencedGen = minReferencedGen;
List<TranslogReader> unreferenced = readers.stream().filter(r -> r.getGeneration() < finalMinReferencedGen).collect(Collectors.toList());
long minReferencedGen = Math.min(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did something change here that is related it's hard to tell maybe we can revert this unnecessary modification?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a small improvement here which is removing the need for the extra local variable finalMinReferencedGen and so avoiding overwriting minReferencedGen. I find it easier to read this way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough

lastCommittedTranslogFileGeneration,
outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE));
final List<TranslogReader> unreferenced =
readers.stream().filter(r -> r.getGeneration() < minReferencedGen).collect(Collectors.toList());
for (final TranslogReader unreferencedReader : unreferenced) {
Path translogPath = unreferencedReader.path();
logger.trace("delete translog file - not referenced and not current anymore {}", translogPath);
final Path translogPath = unreferencedReader.path();
logger.trace("delete translog file [{}], not referenced and not current anymore", translogPath);
IOUtils.closeWhileHandlingException(unreferencedReader);
IOUtils.deleteFilesIgnoringExceptions(translogPath,
translogPath.resolveSibling(getCommitCheckpointFileName(unreferencedReader.getGeneration())));
Expand All @@ -1442,13 +1498,6 @@ void closeFilesIfNoPendingViews() throws IOException {
}
}


@Override
public void rollback() throws IOException {
ensureOpen();
close();
}

/**
* References a transaction log generation
*/
Expand Down
Loading