From 087f182481a381a368704714227d949017f986a1 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 8 Jun 2017 09:21:28 +0200 Subject: [PATCH] Translog file recovery should not rely on lucene commits (#25005) When we open a translog, we rely on the `translog.ckp` file to tell us what the maximum generation file should be and on the information stored in the last lucene commit to know the first file we need to recover. This requires coordination and is currently subject to a race condition: if a node dies after a lucene commit is made but before we remove the translog generations that were unneeded by it, the next time we open the translog we will ignore those files and never delete them (I have added tests for this). This PR changes the approach to have the translog store both of those numbers in the `translog.ckp`. This means it's more self contained and easier to control. This change also decouples the translog recovery logic from the specific commit we're opening. This prepares the ground to fully utilize the deletion policy introduced in #24950 and store more translog data that's needed for Lucene, keep multiple lucene commits around and be free to recover from any of them. --- .../index/engine/CombinedDeletionPolicy.java | 4 + .../index/engine/InternalEngine.java | 7 +- .../index/translog/Checkpoint.java | 22 ++- .../index/translog/Translog.java | 131 +++++++++++++---- .../index/translog/TranslogWriter.java | 27 +++- .../translog/TruncateTranslogCommand.java | 4 +- .../index/engine/InternalEngineTests.java | 123 ++++++++++------ .../index/translog/TranslogTests.java | 133 ++++++++++++++++-- .../index/translog/TranslogVersionTests.java | 2 +- .../PeerRecoveryTargetServiceTests.java | 2 +- 10 files changed, 356 insertions(+), 99 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 68e2865e28452..69173cc4216cd 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -83,4 +83,8 @@ private void setLastCommittedTranslogGeneration(List comm public SnapshotDeletionPolicy getIndexDeletionPolicy() { return indexDeletionPolicy; } + + public TranslogDeletionPolicy getTranslogDeletionPolicy() { + return translogDeletionPolicy; + } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 8c0481d686f41..f84f76b537e0d 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -305,7 +305,8 @@ private void recoverFromTranslogInternal() throws IOException { Translog.TranslogGeneration translogGeneration = translog.getGeneration(); final int opsRecovered; try { - Translog.Snapshot snapshot = translog.newSnapshot(); + final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); + Translog.Snapshot snapshot = translog.newSnapshot(translogGen); opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot); } catch (Exception e) { throw new EngineException(shardId, "failed to recover from translog", e); @@ -321,6 +322,8 @@ private void recoverFromTranslogInternal() throws IOException { } else if (translog.isCurrent(translogGeneration) == false) { commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); } + // clean up what's not needed + translog.trimUnreferencedReaders(); } private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) throws IOException { @@ -1772,7 +1775,7 @@ protected void doRun() throws Exception { * @param syncId the sync flush ID ({@code null} if not committing a synced flush) * @throws IOException if an I/O exception occurs committing the specfied writer */ - private void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException { + protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException { ensureCanFlush(); try { final long localCheckpoint = seqNoService().getLocalCheckpoint(); diff --git a/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java b/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java index ce5cc8e76010b..547d5aa499fb3 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java @@ -44,6 +44,7 @@ final class Checkpoint { final long minSeqNo; final long maxSeqNo; final long globalCheckpoint; + final long minTranslogGeneration; private static final int INITIAL_VERSION = 1; // start with 1, just to recognize there was some magic serialization logic before private static final int CURRENT_VERSION = 2; // introduction of global checkpoints @@ -58,6 +59,7 @@ final class Checkpoint { + Long.BYTES // minimum sequence number, introduced in 6.0.0 + Long.BYTES // maximum sequence number, introduced in 6.0.0 + Long.BYTES // global checkpoint, introduced in 6.0.0 + + Long.BYTES // minimum translog generation in the translog - introduced in 6.0.0 + CodecUtil.footerLength(); // size of 5.0.0 checkpoint @@ -76,15 +78,19 @@ final class Checkpoint { * @param minSeqNo the current minimum sequence number of all operations in the translog * @param maxSeqNo the current maximum sequence number of all operations in the translog * @param globalCheckpoint the last-known global checkpoint + * @param minTranslogGeneration the minimum generation referenced by the translog at this moment. */ - Checkpoint(long offset, int numOps, long generation, long minSeqNo, long maxSeqNo, long globalCheckpoint) { - assert minSeqNo <= maxSeqNo; + Checkpoint(long offset, int numOps, long generation, long minSeqNo, long maxSeqNo, long globalCheckpoint, long minTranslogGeneration) { + assert minSeqNo <= maxSeqNo : "minSeqNo [" + minSeqNo + "] is higher than maxSeqNo [" + maxSeqNo + "]"; + assert minTranslogGeneration <= generation : + "minTranslogGen [" + minTranslogGeneration + "] is higher than generation [" + generation + "]"; this.offset = offset; this.numOps = numOps; this.generation = generation; this.minSeqNo = minSeqNo; this.maxSeqNo = maxSeqNo; this.globalCheckpoint = globalCheckpoint; + this.minTranslogGeneration = minTranslogGeneration; } private void write(DataOutput out) throws IOException { @@ -94,16 +100,18 @@ private void write(DataOutput out) throws IOException { out.writeLong(minSeqNo); out.writeLong(maxSeqNo); out.writeLong(globalCheckpoint); + out.writeLong(minTranslogGeneration); } - static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint) { + static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint, + long minTranslogGeneration) { final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; - return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint); + return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration); } static Checkpoint readCheckpointV6_0_0(final DataInput in) throws IOException { - return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong(), in.readLong(), in.readLong()); + return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong(), in.readLong(), in.readLong(), in.readLong()); } // reads a checksummed checkpoint introduced in ES 5.0.0 @@ -111,7 +119,8 @@ static Checkpoint readCheckpointV5_0_0(final DataInput in) throws IOException { final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; final long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; - return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), minSeqNo, maxSeqNo, globalCheckpoint); + final long minTranslogGeneration = -1L; + return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration); } @Override @@ -123,6 +132,7 @@ public String toString() { ", minSeqNo=" + minSeqNo + ", maxSeqNo=" + maxSeqNo + ", globalCheckpoint=" + globalCheckpoint + + ", minTranslogGeneration=" + minTranslogGeneration + '}'; } diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index c351f0346236e..032d26d890a12 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -24,6 +24,7 @@ import org.apache.lucene.index.Term; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; @@ -55,6 +56,7 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -63,7 +65,6 @@ import java.util.function.LongSupplier; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -170,11 +171,12 @@ public Translog( && Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning logger.warn("deleted previously created, but not yet committed, next generation [{}]. This can happen due to a tragic exception when creating a new generation", nextTranslogFile.getFileName()); } - this.readers.addAll(recoverFromFiles(deletionPolicy.getMinTranslogGenerationForRecovery(), checkpoint)); + this.readers.addAll(recoverFromFiles(checkpoint)); if (readers.isEmpty()) { throw new IllegalStateException("at least one reader must be recovered"); } boolean success = false; + current = null; try { current = createWriter(checkpoint.generation + 1); success = true; @@ -192,14 +194,13 @@ public Translog( final long generation = deletionPolicy.getMinTranslogGenerationForRecovery(); logger.debug("wipe translog location - creating new translog, starting generation [{}]", generation); Files.createDirectories(location); - final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, globalCheckpointSupplier.getAsLong()); + final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, globalCheckpointSupplier.getAsLong(), generation); final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME); Checkpoint.write(getChannelFactory(), checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); IOUtils.fsync(checkpointFile, false); - current = createWriter(generation); - + current = createWriter(generation, generation); + readers.clear(); } - // now that we know which files are there, create a new current one. } catch (Exception e) { // close the opened translog files if we fail to create a new translog... IOUtils.closeWhileHandlingException(current); @@ -209,29 +210,46 @@ public Translog( } /** recover all translog files found on disk */ - private ArrayList recoverFromFiles(long translogFileGeneration, Checkpoint checkpoint) throws IOException { + private ArrayList recoverFromFiles(Checkpoint checkpoint) throws IOException { boolean success = false; ArrayList foundTranslogs = new ArrayList<>(); final Path tempFile = Files.createTempFile(location, TRANSLOG_FILE_PREFIX, TRANSLOG_FILE_SUFFIX); // a temp file to copy checkpoint to - note it must be in on the same FS otherwise atomic move won't work boolean tempFileRenamed = false; try (ReleasableLock lock = writeLock.acquire()) { logger.debug("open uncommitted translog checkpoint {}", checkpoint); + + final long minGenerationToRecoverFrom; + if (checkpoint.minTranslogGeneration < 0) { + final Version indexVersionCreated = indexSettings().getIndexVersionCreated(); + assert indexVersionCreated.before(Version.V_6_0_0_alpha3) : + "no minTranslogGeneration in checkpoint, but index was created with version [" + indexVersionCreated + "]"; + minGenerationToRecoverFrom = deletionPolicy.getMinTranslogGenerationForRecovery(); + } else { + minGenerationToRecoverFrom = checkpoint.minTranslogGeneration; + } + final String checkpointTranslogFile = getFilename(checkpoint.generation); // we open files in reverse order in order to validate tranlsog uuid before we start traversing the translog based on // the generation id we found in the lucene commit. This gives for better error messages if the wrong // translog was found. foundTranslogs.add(openReader(location.resolve(checkpointTranslogFile), checkpoint)); - for (long i = checkpoint.generation - 1; i >= translogFileGeneration; i--) { + for (long i = checkpoint.generation - 1; i >= minGenerationToRecoverFrom; i--) { Path committedTranslogFile = location.resolve(getFilename(i)); if (Files.exists(committedTranslogFile) == false) { throw new IllegalStateException("translog file doesn't exist with generation: " + i + " recovering from: " + - translogFileGeneration + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive"); + minGenerationToRecoverFrom + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive"); } final TranslogReader reader = openReader(committedTranslogFile, Checkpoint.read(location.resolve(getCommitCheckpointFileName(i)))); foundTranslogs.add(reader); logger.debug("recovered local translog from checkpoint {}", checkpoint); } Collections.reverse(foundTranslogs); + + // when we clean up files, we first update the checkpoint with a new minReferencedTranslog and then delete them; + // if we crash just at the wrong moment, it may be that we leave one unreferenced file behind so we delete it if there + IOUtils.deleteFilesIgnoringExceptions(location.resolve(getFilename(minGenerationToRecoverFrom - 1)), + location.resolve(getCommitCheckpointFileName(minGenerationToRecoverFrom - 1))); + Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(checkpoint.generation)); if (Files.exists(commitCheckpoint)) { Checkpoint checkpointFromDisk = Checkpoint.read(commitCheckpoint); @@ -332,6 +350,20 @@ public long currentFileGeneration() { } } + /** + * Returns the minimum file generation referenced by the translog + */ + long getMinFileGeneration() { + try (ReleasableLock ignored = readLock.acquire()) { + if (readers.isEmpty()) { + return current.getGeneration(); + } else { + return readers.get(0).getGeneration(); + } + } + } + + /** * Returns the number of operations in the transaction files that aren't committed to lucene.. */ @@ -372,7 +404,6 @@ private long sizeInBytes(long minGeneration) { } } - /** * Creates a new translog for the specified generation. * @@ -381,6 +412,18 @@ private long sizeInBytes(long minGeneration) { * @throws IOException if creating the translog failed */ TranslogWriter createWriter(long fileGeneration) throws IOException { + return createWriter(fileGeneration, getMinFileGeneration()); + } + + /** + * creates a new writer + * + * @param fileGeneration the generation of the write to be written + * @param initialMinTranslogGen the minimum translog generation to be written in the first checkpoint. This is + * needed to solve and initialization problem while constructing an empty translog. + * With no readers and no current, a call to {@link #getMinFileGeneration()} would not work. + */ + private TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen) throws IOException { final TranslogWriter newFile; try { newFile = TranslogWriter.create( @@ -390,7 +433,9 @@ TranslogWriter createWriter(long fileGeneration) throws IOException { location.resolve(getFilename(fileGeneration)), getChannelFactory(), config.getBufferSize(), - globalCheckpointSupplier); + globalCheckpointSupplier, + initialMinTranslogGen, + this::getMinFileGeneration); } catch (final IOException e) { throw new TranslogException(shardId, "failed to create new translog file", e); } @@ -494,12 +539,18 @@ public long getLastSyncedGlobalCheckpoint() { * Snapshots are fixed in time and will not be updated with future operations. */ public Snapshot newSnapshot() { - return createSnapshot(Long.MIN_VALUE); + try (ReleasableLock ignored = readLock.acquire()) { + return newSnapshot(getMinFileGeneration()); + } } - private Snapshot createSnapshot(long minGeneration) { + public Snapshot newSnapshot(long minGeneration) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); + if (minGeneration < getMinFileGeneration()) { + throw new IllegalArgumentException("requested snapshot generation [" + minGeneration + "] is not available. " + + "Min referenced generation is [" + getMinFileGeneration() + "]"); + } Snapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current)) .filter(reader -> reader.getGeneration() >= minGeneration) .map(BaseTranslogReader::newSnapshot).toArray(Snapshot[]::new); @@ -673,7 +724,7 @@ public long sizeInBytes() { /** create a snapshot from this view */ public Snapshot snapshot() { ensureOpen(); - return Translog.this.createSnapshot(minGeneration); + return Translog.this.newSnapshot(minGeneration); } void ensureOpen() { @@ -1442,30 +1493,58 @@ public void rollGeneration() throws IOException { * Trims unreferenced translog generations by asking {@link TranslogDeletionPolicy} for the minimum * required generation */ - public void trimUnreferencedReaders() { + public void trimUnreferencedReaders() throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { if (closed.get()) { // we're shutdown potentially on some tragic event, don't delete anything return; } long minReferencedGen = deletionPolicy.minTranslogGenRequired(); - final long minExistingGen = readers.isEmpty() ? current.getGeneration() : readers.get(0).getGeneration(); - assert minReferencedGen >= minExistingGen : + assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is [" - + minExistingGen + "]"; - final List unreferenced = - readers.stream().filter(r -> r.getGeneration() < minReferencedGen).collect(Collectors.toList()); - for (final TranslogReader unreferencedReader : unreferenced) { - final Path translogPath = unreferencedReader.path(); + + getMinFileGeneration() + "]"; + assert minReferencedGen <= currentFileGeneration() : + "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] which is higher than the current generation [" + + currentFileGeneration() + "]"; + + + for (Iterator iterator = readers.iterator(); iterator.hasNext(); ) { + TranslogReader reader = iterator.next(); + if (reader.getGeneration() >= minReferencedGen) { + break; + } + iterator.remove(); + IOUtils.closeWhileHandlingException(reader); + final Path translogPath = reader.path(); logger.trace("delete translog file [{}], not referenced and not current anymore", translogPath); - IOUtils.closeWhileHandlingException(unreferencedReader); - IOUtils.deleteFilesIgnoringExceptions(translogPath, - translogPath.resolveSibling(getCommitCheckpointFileName(unreferencedReader.getGeneration()))); + // The checkpoint is used when opening the translog to know which files should be recovered from. + // We now update the checkpoint to ignore the file we are going to remove. + // Note that there is a provision in recoverFromFiles to allow for the case where we synced the checkpoint + // but crashed before we could delete the file. + current.sync(); + deleteReaderFiles(reader); + } + assert readers.isEmpty() == false || current.generation == minReferencedGen : + "all readers were cleaned but the minReferenceGen [" + minReferencedGen + "] is not the current writer's gen [" + + current.generation + "]"; + } catch (Exception ex) { + try { + closeOnTragicEvent(ex); + } catch (final Exception inner) { + ex.addSuppressed(inner); } - readers.removeAll(unreferenced); + throw ex; } } + /** + * deletes all files associated with a reader. package-private to be able to simulate node failures at this point + */ + void deleteReaderFiles(TranslogReader reader) { + IOUtils.deleteFilesIgnoringExceptions(reader.path(), + reader.path().resolveSibling(getCommitCheckpointFileName(reader.getGeneration()))); + } + void closeFilesIfNoPendingViews() throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { if (closed.get() && deletionPolicy.pendingViewsCount() == 0) { diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 4a98365e02fba..d637c9da79f65 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -71,6 +71,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { private volatile long maxSeqNo; private final LongSupplier globalCheckpointSupplier; + private final LongSupplier minTranslogGenerationSupplier; protected final AtomicBoolean closed = new AtomicBoolean(false); // lock order synchronized(syncLock) -> synchronized(this) @@ -85,10 +86,11 @@ private TranslogWriter( final FileChannel channel, final Path path, final ByteSizeValue bufferSize, - final LongSupplier globalCheckpointSupplier) throws IOException { + final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier) throws IOException { super(initialCheckpoint.generation, channel, path, channel.position()); this.shardId = shardId; this.channelFactory = channelFactory; + this.minTranslogGenerationSupplier = minTranslogGenerationSupplier; this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt()); this.lastSyncedCheckpoint = initialCheckpoint; this.totalOffset = initialCheckpoint.offset; @@ -121,7 +123,9 @@ public static TranslogWriter create( Path file, ChannelFactory channelFactory, ByteSizeValue bufferSize, - final LongSupplier globalCheckpointSupplier) throws IOException { + final LongSupplier globalCheckpointSupplier, + final long initialMinTranslogGen, + final LongSupplier minTranslogGenerationSupplier) throws IOException { final BytesRef ref = new BytesRef(translogUUID); final int headerLength = getHeaderLength(ref.length); final FileChannel channel = channelFactory.open(file); @@ -132,9 +136,11 @@ public static TranslogWriter create( writeHeader(out, ref); channel.force(true); final Checkpoint checkpoint = - Checkpoint.emptyTranslogCheckpoint(headerLength, fileGeneration, globalCheckpointSupplier.getAsLong()); + Checkpoint.emptyTranslogCheckpoint(headerLength, fileGeneration, globalCheckpointSupplier.getAsLong(), + initialMinTranslogGen); writeCheckpoint(channelFactory, file.getParent(), checkpoint); - return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier); + return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier, + minTranslogGenerationSupplier); } catch (Exception exception) { // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition @@ -242,7 +248,9 @@ public void sync() throws IOException { * checkpoint has not yet been fsynced */ public boolean syncNeeded() { - return totalOffset != lastSyncedCheckpoint.offset || globalCheckpointSupplier.getAsLong() != lastSyncedCheckpoint.globalCheckpoint; + return totalOffset != lastSyncedCheckpoint.offset || + globalCheckpointSupplier.getAsLong() != lastSyncedCheckpoint.globalCheckpoint || + minTranslogGenerationSupplier.getAsLong() != lastSyncedCheckpoint.minTranslogGeneration; } @Override @@ -330,6 +338,7 @@ public boolean syncUpTo(long offset) throws IOException { final long currentMinSeqNo; final long currentMaxSeqNo; final long currentGlobalCheckpoint; + final long currentMinTranslogGeneration; synchronized (this) { ensureOpen(); try { @@ -339,6 +348,7 @@ public boolean syncUpTo(long offset) throws IOException { currentMinSeqNo = minSeqNo; currentMaxSeqNo = maxSeqNo; currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong(); + currentMinTranslogGeneration = minTranslogGenerationSupplier.getAsLong(); } catch (Exception ex) { try { closeWithTragicEvent(ex); @@ -354,7 +364,8 @@ public boolean syncUpTo(long offset) throws IOException { try { channel.force(false); checkpoint = - writeCheckpoint(channelFactory, offsetToSync, opsCounter, currentMinSeqNo, currentMaxSeqNo, currentGlobalCheckpoint, path.getParent(), generation); + writeCheckpoint(channelFactory, offsetToSync, opsCounter, currentMinSeqNo, currentMaxSeqNo, + currentGlobalCheckpoint, currentMinTranslogGeneration, path.getParent(), generation); } catch (Exception ex) { try { closeWithTragicEvent(ex); @@ -398,9 +409,11 @@ private static Checkpoint writeCheckpoint( long minSeqNo, long maxSeqNo, long globalCheckpoint, + long minTranslogGeneration, Path translogFile, long generation) throws IOException { - final Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation, minSeqNo, maxSeqNo, globalCheckpoint); + final Checkpoint checkpoint = + new Checkpoint(syncPosition, numOperations, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration); writeCheckpoint(channelFactory, translogFile, checkpoint); return checkpoint; } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java index ea1f4c13dfd6a..408691692cacf 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java @@ -168,8 +168,8 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th /** Write a checkpoint file to the given location with the given generation */ public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException { - Checkpoint emptyCheckpoint = - Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration, SequenceNumbersService.UNASSIGNED_SEQ_NO); + Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration, + SequenceNumbersService.UNASSIGNED_SEQ_NO, translogGeneration); Checkpoint.write(FileChannel::open, filename, emptyCheckpoint, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); // fsync with metadata here to make sure. diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index bb9ec29f1ada9..31a99063fb643 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -224,12 +224,12 @@ public void setUp() throws Exception { codecName = "default"; } defaultSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder() - .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us - .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), codecName) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.getKey(), - between(10, 10 * IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.get(Settings.EMPTY))) - .build()); // TODO randomize more settings + .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us + .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), codecName) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.getKey(), + between(10, 10 * IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.get(Settings.EMPTY))) + .build()); // TODO randomize more settings threadPool = new TestThreadPool(getClass().getName()); store = createStore(); storeReplica = createStore(); @@ -272,14 +272,14 @@ public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, An public void tearDown() throws Exception { super.tearDown(); IOUtils.close( - replicaEngine, storeReplica, - engine, store); + replicaEngine, storeReplica, + engine, store); terminate(threadPool); } private static Document testDocumentWithTextField() { - return testDocumentWithTextField("test"); + return testDocumentWithTextField("test"); } private static Document testDocumentWithTextField(String value) { @@ -319,6 +319,7 @@ protected Store createStore() throws IOException { protected Store createStore(final Directory directory) throws IOException { return createStore(INDEX_SETTINGS, directory); } + protected Store createStore(final IndexSettings indexSettings, final Directory directory) throws IOException { final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) { @Override @@ -351,6 +352,7 @@ protected InternalEngine createEngine(IndexSettings indexSettings, Store store, return createEngine(indexSettings, store, translogPath, mergePolicy, null); } + protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, @Nullable IndexWriterFactory indexWriterFactory) throws IOException { return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, null); @@ -392,12 +394,12 @@ public static InternalEngine createInternalEngine(@Nullable final IndexWriterFac @Nullable final Function sequenceNumbersServiceSupplier, final EngineConfig config) { return new InternalEngine(config) { - @Override - IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { - return (indexWriterFactory != null) ? - indexWriterFactory.createWriter(directory, iwc) : - super.createWriter(directory, iwc); - } + @Override + IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { + return (indexWriterFactory != null) ? + indexWriterFactory.createWriter(directory, iwc) : + super.createWriter(directory, iwc); + } @Override public SequenceNumbersService seqNoService() { @@ -436,9 +438,9 @@ public void onFailedEngine(String reason, @Nullable Exception e) { final List refreshListenerList = refreshListener == null ? emptyList() : Collections.singletonList(refreshListener); EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, - mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, - IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler); + mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, + IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, + TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler); return config; } @@ -454,7 +456,7 @@ private static BytesArray bytesArray(String string) { public void testSegments() throws Exception { try (Store store = createStore(); - Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) { + Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) { List segments = engine.segments(false); assertThat(segments.isEmpty(), equalTo(true)); assertThat(engine.segmentsStats(false).getCount(), equalTo(0L)); @@ -603,7 +605,7 @@ public void testVerboseSegments() throws Exception { public void testSegmentsWithMergeFlag() throws Exception { try (Store store = createStore(); - Engine engine = createEngine(defaultSettings, store, createTempDir(), new TieredMergePolicy())) { + Engine engine = createEngine(defaultSettings, store, createTempDir(), new TieredMergePolicy())) { ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); Engine.Index index = indexForDoc(doc); engine.index(index); @@ -686,7 +688,7 @@ public void testSegmentsWithIndexSort() throws Exception { public void testSegmentsStatsIncludingFileSizes() throws Exception { try (Store store = createStore(); - Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) { + Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) { assertThat(engine.segmentsStats(true).getFileSizes().size(), equalTo(0)); ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); @@ -1162,7 +1164,7 @@ public void testCommitAdvancesMinTranslogForRecovery() throws IOException { public void testSyncedFlush() throws IOException { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) { + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); engine.index(indexForDoc(doc)); @@ -1172,13 +1174,13 @@ public void testSyncedFlush() throws IOException { wrongBytes[0] = (byte) ~wrongBytes[0]; Engine.CommitId wrongId = new Engine.CommitId(wrongBytes); assertEquals("should fail to sync flush with wrong id (but no docs)", engine.syncFlush(syncId + "1", wrongId), - Engine.SyncedFlushResult.COMMIT_MISMATCH); + Engine.SyncedFlushResult.COMMIT_MISMATCH); engine.index(indexForDoc(doc)); assertEquals("should fail to sync flush with right id but pending doc", engine.syncFlush(syncId + "2", commitID), - Engine.SyncedFlushResult.PENDING_OPERATIONS); + Engine.SyncedFlushResult.PENDING_OPERATIONS); commitID = engine.flush(); assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID), - Engine.SyncedFlushResult.SUCCESS); + Engine.SyncedFlushResult.SUCCESS); assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); } @@ -1189,7 +1191,7 @@ public void testRenewSyncFlush() throws Exception { for (int i = 0; i < iters; i++) { try (Store store = createStore(); InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogDocMergePolicy(), null))) { + new LogDocMergePolicy(), null))) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); Engine.Index doc1 = indexForDoc(testParsedDocument("1", null, testDocumentWithTextField(), B_1, null)); engine.index(doc1); @@ -1208,7 +1210,7 @@ public void testRenewSyncFlush() throws Exception { } Engine.CommitId commitID = engine.flush(); assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID), - Engine.SyncedFlushResult.SUCCESS); + Engine.SyncedFlushResult.SUCCESS); assertEquals(3, engine.segments(false).size()); engine.forceMerge(forceMergeFlushes, 1, false, false, false); @@ -1248,7 +1250,7 @@ public void testSyncedFlushSurvivesEngineRestart() throws IOException { engine.index(indexForDoc(doc)); final Engine.CommitId commitID = engine.flush(); assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID), - Engine.SyncedFlushResult.SUCCESS); + Engine.SyncedFlushResult.SUCCESS); assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); EngineConfig config = engine.config(); @@ -1271,7 +1273,7 @@ public void testSyncedFlushVanishesOnReplay() throws IOException { engine.index(indexForDoc(doc)); final Engine.CommitId commitID = engine.flush(); assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID), - Engine.SyncedFlushResult.SUCCESS); + Engine.SyncedFlushResult.SUCCESS); assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); doc = testParsedDocument("2", null, testDocumentWithTextField(), new BytesArray("{}"), null); @@ -1307,8 +1309,8 @@ public void testVersioningNewIndex() throws IOException { public void testForceMerge() throws IOException { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), + new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP int numDocs = randomIntBetween(10, 100); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null); @@ -1422,7 +1424,7 @@ protected List generateSingleDocHistory(boolean forReplica, Ve final Term id = newUid("1"); final int startWithSeqNo; if (partialOldPrimary) { - startWithSeqNo = randomBoolean() ? numOfOps - 1 : randomIntBetween(0, numOfOps - 1); + startWithSeqNo = randomBoolean() ? numOfOps - 1 : randomIntBetween(0, numOfOps - 1); } else { startWithSeqNo = 0; } @@ -1541,7 +1543,8 @@ private void assertOpsOnReplica(List ops, InternalEngine repli } if (randomBoolean()) { engine.refresh("test"); - } if (randomBoolean()) { + } + if (randomBoolean()) { engine.flush(); } firstOp = false; @@ -1598,9 +1601,9 @@ private void concurrentlyApplyOps(List ops, InternalEngine eng try { final Engine.Operation op = ops.get(docOffset); if (op instanceof Engine.Index) { - engine.index((Engine.Index)op); + engine.index((Engine.Index) op); } else { - engine.delete((Engine.Delete)op); + engine.delete((Engine.Delete) op); } if ((docOffset + 1) % 4 == 0) { engine.refresh("test"); @@ -1641,7 +1644,7 @@ private int assertOpsOnPrimary(List ops, long currentOpVersion final long correctVersion = docDeleted && randomBoolean() ? Versions.MATCH_DELETED : lastOpVersion; logger.info("performing [{}]{}{}", op.operationType().name().charAt(0), - versionConflict ? " (conflict " + conflictingVersion +")" : "", + versionConflict ? " (conflict " + conflictingVersion + ")" : "", versionedOp ? " (versioned " + correctVersion + ")" : ""); if (op instanceof Engine.Index) { final Engine.Index index = (Engine.Index) op; @@ -1811,7 +1814,7 @@ public void testVersioningPromotedReplica() throws IOException { assertOpsOnReplica(replicaOps, replicaEngine, true); final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine); final long currentSeqNo = getSequenceID(replicaEngine, - new Engine.Get(false, "type", lastReplicaOp.uid().text(), lastReplicaOp.uid())).v1(); + new Engine.Get(false, "type", lastReplicaOp.uid().text(), lastReplicaOp.uid())).v1(); try (Searcher searcher = engine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); searcher.searcher().search(new MatchAllDocsQuery(), collector); @@ -2169,11 +2172,11 @@ public void testConcurrentWritesAndCommits() throws Exception { final IndexCommit commit = commitRef.getIndexCommit(); Map userData = commit.getUserData(); long localCheckpoint = userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) ? - Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) : - SequenceNumbersService.NO_OPS_PERFORMED; + Long.parseLong(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) : + SequenceNumbersService.NO_OPS_PERFORMED; long maxSeqNo = userData.containsKey(SequenceNumbers.MAX_SEQ_NO) ? - Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)) : - SequenceNumbersService.UNASSIGNED_SEQ_NO; + Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)) : + SequenceNumbersService.UNASSIGNED_SEQ_NO; // local checkpoint and max seq no shouldn't go backwards assertThat(localCheckpoint, greaterThanOrEqualTo(prevLocalCheckpoint)); assertThat(maxSeqNo, greaterThanOrEqualTo(prevMaxSeqNo)); @@ -2192,7 +2195,7 @@ public void testConcurrentWritesAndCommits() throws Exception { FixedBitSet seqNosBitSet = getSeqNosSet(reader, highestSeqNo); for (int i = 0; i <= localCheckpoint; i++) { assertTrue("local checkpoint [" + localCheckpoint + "], _seq_no [" + i + "] should be indexed", - seqNosBitSet.get(i)); + seqNosBitSet.get(i)); } } prevLocalCheckpoint = localCheckpoint; @@ -2268,7 +2271,7 @@ public void testIndexWriterIFDInfoStream() throws IllegalAccessException, IOExce public void testEnableGcDeletes() throws Exception { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { engine.config().setEnableGcDeletes(false); final Function searcherFactory = engine::acquireSearcher; @@ -2341,7 +2344,7 @@ private Engine.Index indexForDoc(ParsedDocument doc) { private Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo, boolean isRetry) { - return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL, + return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, System.nanoTime(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); } @@ -2484,6 +2487,38 @@ private static void assertVisibleCount(InternalEngine engine, int numDocs, boole } } + public void testTranslogCleanUpPostCommitCrash() throws Exception { + try (Store store = createStore()) { + AtomicBoolean throwErrorOnCommit = new AtomicBoolean(); + final Path translogPath = createTempDir(); + try (InternalEngine engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null)) { + @Override + protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { + super.commitIndexWriter(writer, translog, syncId); + if (throwErrorOnCommit.get()) { + throw new RuntimeException("power's out"); + } + } + }) { + final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); + engine.index(indexForDoc(doc1)); + throwErrorOnCommit.set(true); + FlushFailedEngineException e = expectThrows(FlushFailedEngineException.class, engine::flush); + assertThat(e.getCause().getMessage(), equalTo("power's out")); + } + try (InternalEngine engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null))) { + engine.recoverFromTranslog(); + assertVisibleCount(engine, 1); + final long committedGen = Long.valueOf( + engine.getLastCommittedSegmentInfos().getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); + for (int gen = 1; gen < committedGen; gen++) { + final Path genFile = translogPath.resolve(Translog.getFilename(gen)); + assertFalse(genFile + " wasn't cleaned up", Files.exists(genFile)); + } + } + } + } + public void testSkipTranslogReplay() throws IOException { final int numDocs = randomIntBetween(1, 10); for (int i = 0; i < numDocs; i++) { diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index d52adf37d6e56..b911e9a5a48ac 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -104,12 +104,14 @@ import java.util.stream.Collectors; import java.util.stream.LongStream; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween; import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.lessThanOrEqualTo; @LuceneTestCase.SuppressFileSystems("ExtrasFS") public class TranslogTests extends ESTestCase { @@ -141,7 +143,7 @@ protected Translog createTranslog(TranslogConfig config, String translogUUID) th return new Translog(config, translogUUID, new TranslogDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); } - private void markCurrentGenAsCommitted(Translog translog) { + private void markCurrentGenAsCommitted(Translog translog) throws IOException { commit(translog, translog.currentFileGeneration()); } @@ -150,9 +152,14 @@ private void rollAndCommit(Translog translog) throws IOException { commit(translog, translog.currentFileGeneration()); } - private void commit(Translog translog, long genToCommit) { - translog.getDeletionPolicy().setMinTranslogGenerationForRecovery(genToCommit); + private void commit(Translog translog, long genToCommit) throws IOException { + final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); + deletionPolicy.setMinTranslogGenerationForRecovery(genToCommit); translog.trimUnreferencedReaders(); + if (deletionPolicy.pendingViewsCount() == 0) { + assertThat(deletionPolicy.minTranslogGenRequired(), equalTo(genToCommit)); + } + assertThat(translog.getMinFileGeneration(), equalTo(deletionPolicy.minTranslogGenRequired())); } @Override @@ -484,7 +491,7 @@ public void testSnapshotOnClosedTranslog() throws IOException { } public void assertFileIsPresent(Translog translog, long id) { - if (Files.exists(translogDir.resolve(Translog.getFilename(id)))) { + if (Files.exists(translog.location().resolve(Translog.getFilename(id)))) { return; } fail(Translog.getFilename(id) + " is not present in any location: " + translog.location()); @@ -494,6 +501,15 @@ public void assertFileDeleted(Translog translog, long id) { assertFalse("translog [" + id + "] still exists", Files.exists(translog.location().resolve(Translog.getFilename(id)))); } + private void assertFilePresences(Translog translog) { + for (long gen = translog.getMinFileGeneration(); gen < translog.currentFileGeneration(); gen++) { + assertFileIsPresent(translog, gen); + } + for (long gen = 1; gen < translog.getMinFileGeneration(); gen++) { + assertFileDeleted(translog, gen); + } + } + static class LocationOperation implements Comparable { final Translog.Operation operation; final Translog.Location location; @@ -1015,7 +1031,7 @@ public void testBasicCheckpoint() throws IOException { } public void testTranslogWriter() throws IOException { - final TranslogWriter writer = translog.createWriter(0); + final TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1); final int numOps = randomIntBetween(8, 128); byte[] bytes = new byte[4]; ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); @@ -1075,7 +1091,7 @@ public void testTranslogWriter() throws IOException { } public void testCloseIntoReader() throws IOException { - try (TranslogWriter writer = translog.createWriter(0)) { + try (TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1)) { final int numOps = randomIntBetween(8, 128); final byte[] bytes = new byte[4]; final ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); @@ -1270,7 +1286,7 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { TranslogConfig config = translog.getConfig(); Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME); Checkpoint read = Checkpoint.read(ckp); - Checkpoint corrupted = Checkpoint.emptyTranslogCheckpoint(0, 0, SequenceNumbersService.UNASSIGNED_SEQ_NO); + Checkpoint corrupted = Checkpoint.emptyTranslogCheckpoint(0, 0, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0); Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); final String translogUUID = translog.getTranslogUUID(); final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); @@ -1278,8 +1294,8 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { fail("corrupted"); } catch (IllegalStateException ex) { assertEquals("Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, " + - "numOps=55, generation=2, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-2} but got: Checkpoint{offset=0, numOps=0, " + - "generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-2}", ex.getMessage()); + "numOps=55, generation=2, minSeqNo=45, maxSeqNo=99, globalCheckpoint=-2, minTranslogGeneration=1} but got: Checkpoint{offset=0, numOps=0, " + + "generation=0, minSeqNo=-1, maxSeqNo=-1, globalCheckpoint=-2, minTranslogGeneration=0}", ex.getMessage()); } Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { @@ -1699,6 +1715,91 @@ protected void afterAdd() throws IOException { } } + /** + * Tests the situation where the node crashes after a translog gen was committed to lucene, but before the translog had the chance + * to clean up its files. + */ + public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { + int translogOperations = randomIntBetween(10, 100); + for (int op = 0; op < translogOperations / 2; op++) { + translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))); + if (rarely()) { + translog.rollGeneration(); + } + } + translog.rollGeneration(); + long comittedGeneration = randomLongBetween(2, translog.currentFileGeneration()); + for (int op = translogOperations / 2; op < translogOperations; op++) { + translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))); + if (rarely()) { + translog.rollGeneration(); + } + } + // engine blows up, after committing the above generation + translog.close(); + TranslogConfig config = translog.getConfig(); + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); + translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO); + assertThat(translog.getMinFileGeneration(), equalTo(1L)); + // no trimming done yet, just recovered + for (long gen = 1; gen < translog.currentFileGeneration(); gen++) { + assertFileIsPresent(translog, gen); + } + translog.trimUnreferencedReaders(); + for (long gen = 1; gen < comittedGeneration; gen++) { + assertFileDeleted(translog, gen); + } + } + + /** + * Tests the situation where the node crashes after a translog gen was committed to lucene, but before the translog had the chance + * to clean up its files. + */ + public void testRecoveryFromFailureOnTrimming() throws IOException { + Path tempDir = createTempDir(); + final FailSwitch fail = new FailSwitch(); + fail.failNever(); + final TranslogConfig config = getTranslogConfig(tempDir); + final long comittedGeneration; + final String translogUUID; + try (Translog translog = getFailableTranslog(fail, config)) { + translogUUID = translog.getTranslogUUID(); + int translogOperations = randomIntBetween(10, 100); + for (int op = 0; op < translogOperations / 2; op++) { + translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))); + if (rarely()) { + translog.rollGeneration(); + } + } + translog.rollGeneration(); + comittedGeneration = randomLongBetween(2, translog.currentFileGeneration()); + for (int op = translogOperations / 2; op < translogOperations; op++) { + translog.add(new Translog.Index("test", "" + op, op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))); + if (rarely()) { + translog.rollGeneration(); + } + } + fail.failRandomly(); + try { + commit(translog, comittedGeneration); + } catch (Exception e) { + // expected... + } + } + final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(); + deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); + try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { + // we don't know when things broke exactly + assertThat(translog.getMinFileGeneration(), greaterThanOrEqualTo(1L)); + assertThat(translog.getMinFileGeneration(), lessThanOrEqualTo(comittedGeneration)); + assertFilePresences(translog); + translog.trimUnreferencedReaders(); + assertThat(translog.getMinFileGeneration(), equalTo(comittedGeneration)); + assertFilePresences(translog); + } + } + private Translog getFailableTranslog(FailSwitch fail, final TranslogConfig config) throws IOException { return getFailableTranslog(fail, config, randomBoolean(), false, null, new TranslogDeletionPolicy()); } @@ -1756,6 +1857,16 @@ ChannelFactory getChannelFactory() { } }; } + + @Override + void deleteReaderFiles(TranslogReader reader) { + if (fail.fail()) { + // simulate going OOM and dieing just at the wrong moment. + throw new RuntimeException("simulated"); + } else { + super.deleteReaderFiles(reader); + } + } }; } @@ -2054,7 +2165,9 @@ private Checkpoint randomCheckpoint() { minSeqNo = b; maxSeqNo = a; } - return new Checkpoint(randomLong(), randomInt(), randomLong(), minSeqNo, maxSeqNo, randomNonNegativeLong()); + final long generation = randomNonNegativeLong(); + return new Checkpoint(randomLong(), randomInt(), generation, minSeqNo, maxSeqNo, randomNonNegativeLong(), + randomLongBetween(1, generation)); } public void testCheckpointOnDiskFull() throws IOException { diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java index d008749506161..f6aafe765f56f 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogVersionTests.java @@ -89,7 +89,7 @@ public TranslogReader openReader(final Path path, final long id) throws IOExcept final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; final Checkpoint checkpoint = - new Checkpoint(Files.size(path), 1, id, minSeqNo, maxSeqNo, SequenceNumbersService.UNASSIGNED_SEQ_NO); + new Checkpoint(Files.size(path), 1, id, minSeqNo, maxSeqNo, SequenceNumbersService.UNASSIGNED_SEQ_NO, id); return TranslogReader.open(channel, path, checkpoint, null); } } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 4f0fec4c85e52..a2e678585844f 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -116,7 +116,7 @@ private Path writeTranslog( generation, resolve, FileChannel::open, - TranslogConfig.DEFAULT_BUFFER_SIZE, () -> globalCheckpoint)) {} + TranslogConfig.DEFAULT_BUFFER_SIZE, () -> globalCheckpoint, generation, () -> generation)) {} return tempDir; }