From 7cbd56a5d8b101801cae5baf9e6de4abd9a01783 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 16 Mar 2018 22:34:49 -0400 Subject: [PATCH 01/18] Harden periodically check to avoid endless flush loop In #28350, we fixed an endless flushing loop which can happen on replicas by tightening the relation between the flush action and the periodically flush condition. 1. The periodically flush condition is enabled only if it will be disabled after a flush. 2. If the periodically flush condition is true then a flush will actually happen regardless of Lucene state. (1) and (2) guarantee a flushing loop will be terminated. Sadly, the condition #1 can be violated in edge cases as we used two different algorithms to evaluate the current and future uncommitted size. - We use method `uncommittedSizeInBytes` to calculate current uncommitted size. It is the sum of translogs whose generation at least the minGen (determined by a given seqno). We pick a continuous range of translogs since the minGen to evaluate the current uncommitted size. - We use method `sizeOfGensAboveSeqNoInBytes` to calculate the future uncommitted size. It is the sum of translogs whose maxSeqNo at least the given seqNo. Here we don't pick a range but select translog one by one. Suppose we have 3 translogs gen1={#1,#2}, gen2={}, gen3={#3} and seqno=#1, uncommittedSizeInBytes is the sum of gen1, gen2, and gen3 while sizeOfGensAboveSeqNoInBytes is sum of gen1 and gen3. Gen2 is excluded because its maxSeqno is still -1. This commit ensures sizeOfGensAboveSeqNoInBytes use the same algorithm from uncommittedSizeInBytes Closes #29097 --- .../index/translog/Translog.java | 20 ++++++- .../index/engine/InternalEngineTests.java | 20 +++++++ .../index/translog/TranslogTests.java | 56 ++++++++++++++++++- 3 files changed, 92 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 6a32ae14fdd3a..ff14082f471e8 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -23,7 +23,6 @@ import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.index.Term; import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.UUIDs; @@ -39,6 +38,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; @@ -431,7 +431,7 @@ public int estimateTotalOperationsFromMinSeq(long minSeqNo) { /** * Returns the size in bytes of the translog files above the given generation */ - private long sizeInBytesByMinGen(long minGeneration) { + long sizeInBytesByMinGen(long minGeneration) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); return Stream.concat(readers.stream(), Stream.of(current)) @@ -447,7 +447,21 @@ private long sizeInBytesByMinGen(long minGeneration) { public long sizeOfGensAboveSeqNoInBytes(long minSeqNo) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); - return readersAboveMinSeqNo(minSeqNo).mapToLong(BaseTranslogReader::sizeInBytes).sum(); + final List readers = new ArrayList<>(this.readers); + readers.add(current); + int keptIndex = Integer.MAX_VALUE; + for (int i = 0; i < readers.size(); i++) { + final long maxSeqNo = readers.get(i).getCheckpoint().maxSeqNo; + if (maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo) { + keptIndex = i; + break; + } + } + long totalBytes = 0; + for (int i = keptIndex; i < readers.size(); i++) { + totalBytes += readers.get(i).sizeInBytes(); + } + return totalBytes; } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index cac74573374aa..20bf96366860e 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4283,6 +4283,26 @@ public void testShouldPeriodicallyFlush() throws Exception { engine.flush(false, false); assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0)); + + // If the new index commit still points to the same translog generation as the current index commit, + // we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes. + engine.getLocalCheckpointTracker().generateSeqNo(); // create a gap here + for (int id = 0; id < numDocs; id++) { + if (randomBoolean()){ + engine.getTranslog().rollGeneration(); + } + final ParsedDocument doc = testParsedDocument("new" + id, null, testDocumentWithTextField(), SOURCE, null); + long seqno = engine.getLocalCheckpointTracker().generateSeqNo(); + final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 2L, seqno, false)); + assertThat(result.isCreated(), equalTo(true)); + } + // A flush must change the periodically flush condition. + lastCommitInfo = engine.getLastCommittedSegmentInfos(); + if (engine.shouldPeriodicallyFlush()) { + engine.flush(); + assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); + } + assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index c18784873a472..98105e6fcc9eb 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -31,7 +31,6 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.ByteArrayDataOutput; import org.apache.lucene.store.MockDirectoryWrapper; -import org.elasticsearch.core.internal.io.IOUtils; import org.apache.lucene.util.LineFileDocs; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -54,6 +53,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; @@ -113,6 +113,7 @@ import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -510,6 +511,58 @@ public void testUncommittedOperations() throws Exception { } } + public void testSizeOfGensAboveSeqNoInBytes() throws Exception { + final long emptyTranslogSize = Translog.DEFAULT_HEADER_SIZE_IN_BYTES; + assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomNonNegativeLong()), equalTo(0L)); + // Gen1: seqno in 1001-2000 + int ops = between(1, 100); + final Set seqnoGen1 = new HashSet<>(); + final long gen1 = translog.currentFileGeneration(); + for (int i = 0; i < ops; i++) { + long seqno = randomValueOtherThanMany(n -> seqnoGen1.add(n) == false, () -> randomLongBetween(1001, 2000)); + translog.add(new Translog.Index("test", UUIDs.randomBase64UUID(), seqno, new byte[]{1})); + } + final long maxSeqnoGen1 = Collections.max(seqnoGen1); + long sizeGen1 = translog.getCurrent().sizeInBytes(); + for (int numOfEmptyGen = between(0, 10), i = 0; i < numOfEmptyGen; i++) { + translog.rollGeneration(); + sizeGen1 += emptyTranslogSize; + } + assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(maxSeqnoGen1 + 1, Long.MAX_VALUE)), equalTo(0L)); + assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(0, maxSeqnoGen1)), + allOf(equalTo(sizeGen1), equalTo(translog.sizeInBytesByMinGen(gen1)))); + // Gen2: seqno in 0-1000 + translog.rollGeneration(); + ops = between(1, 100); + for (int i = 0; i < ops; i++) { + translog.add(new Translog.Index("test", UUIDs.randomBase64UUID(), i, new byte[]{1})); + } + long sizeGen2 = translog.getCurrent().sizeInBytes(); + assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(maxSeqnoGen1 + 1, Long.MAX_VALUE)), equalTo(0L)); + assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(0, maxSeqnoGen1)), + allOf(equalTo(sizeGen1 + sizeGen2), equalTo(translog.sizeInBytesByMinGen(gen1)))); + // Gen3: seqno in 2001+ + ops = between(1, 100); + translog.rollGeneration(); + final long gen3 = translog.currentFileGeneration(); + final Set seqnoGen3 = new HashSet<>(); + for (int i = 0; i < ops; i++) { + long seqno = randomValueOtherThanMany(n -> seqnoGen3.add(n) == false, () -> randomLongBetween(2001, Long.MAX_VALUE)); + translog.add(new Translog.Index("test", UUIDs.randomBase64UUID(), seqno, new byte[]{1})); + } + final long maxSeqnoGen3 = Collections.max(seqnoGen3); + long sizeGen3 = translog.getCurrent().sizeInBytes(); + for (int numOfEmptyGen = between(0, 10), i = 0; i < numOfEmptyGen; i++) { + translog.rollGeneration(); + sizeGen3 += emptyTranslogSize; // check an empty generation is included + } + assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(maxSeqnoGen3 + 1, Long.MAX_VALUE)), equalTo(0L)); + assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(maxSeqnoGen1 + 1, maxSeqnoGen3)), + allOf(equalTo(sizeGen3), equalTo(translog.sizeInBytesByMinGen(gen3)))); // Since gen3 + assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(0, maxSeqnoGen1)), + allOf(equalTo(sizeGen1 + sizeGen2 + sizeGen3), equalTo(translog.sizeInBytesByMinGen(gen1)))); // Since gen1 + } + public void testTotalTests() { final TranslogStats total = new TranslogStats(0, 0, 0, 0, 1); final int n = randomIntBetween(0, 16); @@ -2590,6 +2643,7 @@ public void testMinSeqNoBasedAPI() throws IOException { seenSeqNos.addAll(generationSeqNo); } assertThat(translog.estimateTotalOperationsFromMinSeq(seqNo), equalTo(expectedSnapshotOps)); + assertThat(translog.sizeInBytesByMinGen(generation), equalTo(translog.sizeOfGensAboveSeqNoInBytes(seqNo))); int readFromSnapshot = 0; try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(seqNo)) { assertThat(snapshot.totalOperations(), equalTo(expectedSnapshotOps)); From c2d8d4d28d38a84fe82bd8af51c670710838fc8f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 17 Mar 2018 15:26:15 -0400 Subject: [PATCH 02/18] share code between sizeOfGensAboveSeqNoInBytes and getMinGenerationForSeqNo --- .../index/translog/Translog.java | 41 +++++++++---------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index ff14082f471e8..1bf53c61568f2 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -447,21 +447,7 @@ long sizeInBytesByMinGen(long minGeneration) { public long sizeOfGensAboveSeqNoInBytes(long minSeqNo) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); - final List readers = new ArrayList<>(this.readers); - readers.add(current); - int keptIndex = Integer.MAX_VALUE; - for (int i = 0; i < readers.size(); i++) { - final long maxSeqNo = readers.get(i).getCheckpoint().maxSeqNo; - if (maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo) { - keptIndex = i; - break; - } - } - long totalBytes = 0; - for (int i = keptIndex; i < readers.size(); i++) { - totalBytes += readers.get(i).sizeInBytes(); - } - return totalBytes; + return sizeInBytesByMinGen(minGenerationForSeqNo(minSeqNo)); } } @@ -1536,14 +1522,27 @@ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) { * be the current translog generation as we do not need any prior generations to have a complete history up to the current local * checkpoint. */ - long minTranslogFileGeneration = this.currentFileGeneration(); - for (final TranslogReader reader : readers) { - if (seqNo <= reader.getCheckpoint().maxSeqNo) { - minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration()); - } + final long minOrCurrentGeneration = Math.min(minGenerationForSeqNo(seqNo), currentFileGeneration()); + return new TranslogGeneration(translogUUID, minOrCurrentGeneration); + } + } + + /** + * Returns the minimum generation that contains the given seqno. + * If no generation contains it, returns {@link Long#MAX_VALUE}. + */ + private long minGenerationForSeqNo(final long seqNo) { + assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() : "Lock is not held the current thread"; + long minGen = Long.MAX_VALUE; + if (seqNo <= this.current.getCheckpoint().maxSeqNo) { + minGen = this.current.generation; + } + for (final TranslogReader reader : readers) { + if (seqNo <= reader.getCheckpoint().maxSeqNo) { + minGen = Math.min(minGen, reader.getGeneration()); } - return new TranslogGeneration(translogUUID, minTranslogFileGeneration); } + return minGen; } /** From fc1a9fcd00bbc56847ba7a7ac693be5d0c328088 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 17 Mar 2018 15:39:10 -0400 Subject: [PATCH 03/18] comment --- .../main/java/org/elasticsearch/index/translog/Translog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 1bf53c61568f2..665e02f6d2d53 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1532,7 +1532,7 @@ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) { * If no generation contains it, returns {@link Long#MAX_VALUE}. */ private long minGenerationForSeqNo(final long seqNo) { - assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() : "Lock is not held the current thread"; + assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() : "Translog lock is not held by the current thread"; long minGen = Long.MAX_VALUE; if (seqNo <= this.current.getCheckpoint().maxSeqNo) { minGen = this.current.generation; From 330527c9ead9f847a3f435c756a4e62d536360da Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 19 Mar 2018 13:41:24 -0400 Subject: [PATCH 04/18] Remove uncommitted ops and operations --- .../index/engine/InternalEngine.java | 22 ++--- .../index/translog/Translog.java | 87 +++++++------------ .../translog/TranslogDeletionPolicy.java | 2 +- .../index/engine/EngineDiskUtilsTests.java | 2 +- .../index/engine/InternalEngineTests.java | 28 +++--- .../index/shard/IndexShardIT.java | 23 ++--- .../index/translog/TranslogTests.java | 66 ++------------ .../indices/recovery/RecoveryTests.java | 3 +- 8 files changed, 81 insertions(+), 152 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 49be68efcad5d..7b57388d4727c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1361,7 +1361,8 @@ final boolean tryRenewSyncCommit() { ensureOpen(); ensureCanFlush(); String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID); - if (syncId != null && translog.uncommittedOperations() == 0 && indexWriter.hasUncommittedChanges()) { + if (syncId != null && indexWriter.hasUncommittedChanges() + && translog.totalOperationsByMinGen(translog.uncommittedGeneration()) == 0) { logger.trace("start renewing sync commit [{}]", syncId); commitIndexWriter(indexWriter, translog, syncId); logger.debug("successfully sync committed. sync id [{}].", syncId); @@ -1383,19 +1384,20 @@ final boolean tryRenewSyncCommit() { @Override public boolean shouldPeriodicallyFlush() { ensureOpen(); + final long translogGenerationOfCurrentCommit = translog.uncommittedGeneration(); + final long uncommittedTranslogSize = translog.sizeInBytesByMinGen(translogGenerationOfCurrentCommit); final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); - final long uncommittedSizeOfCurrentCommit = translog.uncommittedSizeInBytes(); - if (uncommittedSizeOfCurrentCommit < flushThreshold) { + if (uncommittedTranslogSize < flushThreshold) { return false; } /* - * We should only flush ony if the shouldFlush condition can become false after flushing. - * This condition will change if the `uncommittedSize` of the new commit is smaller than - * the `uncommittedSize` of the current commit. This method is to maintain translog only, - * thus the IndexWriter#hasUncommittedChanges condition is not considered. + * We should only flush ony if the shouldPeriodicallyFlush condition can become false after flushing. + * This condition will change if the new commit points to the later translog generation than the current commit's. + * This method is to maintain translog only, thus the IndexWriter#hasUncommittedChanges condition is not considered. */ - final long uncommittedSizeOfNewCommit = translog.sizeOfGensAboveSeqNoInBytes(localCheckpointTracker.getCheckpoint() + 1); - return uncommittedSizeOfNewCommit < uncommittedSizeOfCurrentCommit; + final long translogGenerationOfNewCommit = + translog.getMinGenerationForSeqNo(localCheckpointTracker.getCheckpoint() + 1, false).translogFileGeneration; + return translogGenerationOfCurrentCommit < translogGenerationOfNewCommit; } @Override @@ -2015,7 +2017,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl ensureCanFlush(); try { final long localCheckpoint = localCheckpointTracker.getCheckpoint(); - final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1); + final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1, true); final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration); final String translogUUID = translogGeneration.translogUUID; final String localCheckpointValue = Long.toString(localCheckpoint); diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 183733560f989..bf18764a78135 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -356,26 +356,11 @@ public long getMinFileGeneration() { } } - - /** - * Returns the number of operations in the translog files that aren't committed to lucene. - */ - public int uncommittedOperations() { - return totalOperations(deletionPolicy.getTranslogGenerationOfLastCommit()); - } - - /** - * Returns the size in bytes of the translog files that aren't committed to lucene. - */ - public long uncommittedSizeInBytes() { - return sizeInBytesByMinGen(deletionPolicy.getTranslogGenerationOfLastCommit()); - } - /** * Returns the number of operations in the translog files */ public int totalOperations() { - return totalOperations(-1); + return totalOperationsByMinGen(-1); } /** @@ -405,19 +390,6 @@ static long findEarliestLastModifiedAge(long currentTime, Iterable r.getGeneration() >= minGeneration) - .mapToInt(BaseTranslogReader::totalOperations) - .sum(); - } - } - /** * Returns the number of operations in the transaction files that contain operations with seq# above the given number. */ @@ -431,7 +403,7 @@ public int estimateTotalOperationsFromMinSeq(long minSeqNo) { /** * Returns the size in bytes of the translog files above the given generation */ - long sizeInBytesByMinGen(long minGeneration) { + public long sizeInBytesByMinGen(long minGeneration) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); return Stream.concat(readers.stream(), Stream.of(current)) @@ -442,12 +414,15 @@ long sizeInBytesByMinGen(long minGeneration) { } /** - * Returns the size in bytes of the translog files with ops above the given seqNo + * Returns the number of operations in the transaction files above the given generation */ - public long sizeOfGensAboveSeqNoInBytes(long minSeqNo) { + public int totalOperationsByMinGen(long minGeneration) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); - return sizeInBytesByMinGen(minGenerationForSeqNo(minSeqNo)); + return Stream.concat(readers.stream(), Stream.of(current)) + .filter(r -> r.getGeneration() >= minGeneration) + .mapToInt(BaseTranslogReader::totalOperations) + .sum(); } } @@ -758,7 +733,8 @@ private void closeOnTragicEvent(Exception ex) { public TranslogStats stats() { // acquire lock to make the two numbers roughly consistent (no file change half way) try (ReleasableLock lock = readLock.acquire()) { - return new TranslogStats(totalOperations(), sizeInBytes(), uncommittedOperations(), uncommittedSizeInBytes(), earliestLastModifiedAge()); + final long uncommittedGen = uncommittedGeneration(); + return new TranslogStats(totalOperations(), sizeInBytes(), totalOperationsByMinGen(uncommittedGen), sizeInBytesByMinGen(uncommittedGen), earliestLastModifiedAge()); } } @@ -1508,13 +1484,13 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl } /** - * Gets the minimum generation that could contain any sequence number after the specified sequence number, or the current generation if - * there is no generation that could any such sequence number. + * Gets the minimum generation that could contain any sequence number after the specified sequence number * - * @param seqNo the sequence number + * @param seqNo the sequence number + * @param alwaysIncludeCurrent if true, the current generation is returned there is no generation that could any such seq# * @return the minimum generation for the sequence number */ - public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) { + public TranslogGeneration getMinGenerationForSeqNo(final long seqNo, final boolean alwaysIncludeCurrent) { try (ReleasableLock ignored = readLock.acquire()) { /* * When flushing, the engine will ask the translog for the minimum generation that could contain any sequence number after the @@ -1522,27 +1498,17 @@ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) { * be the current translog generation as we do not need any prior generations to have a complete history up to the current local * checkpoint. */ - final long minOrCurrentGeneration = Math.min(minGenerationForSeqNo(seqNo), currentFileGeneration()); - return new TranslogGeneration(translogUUID, minOrCurrentGeneration); - } - } - - /** - * Returns the minimum generation that contains the given seqno. - * If no generation contains it, returns {@link Long#MAX_VALUE}. - */ - private long minGenerationForSeqNo(final long seqNo) { - assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() : "Translog lock is not held by the current thread"; - long minGen = Long.MAX_VALUE; - if (seqNo <= this.current.getCheckpoint().maxSeqNo) { - minGen = this.current.generation; - } - for (final TranslogReader reader : readers) { - if (seqNo <= reader.getCheckpoint().maxSeqNo) { - minGen = Math.min(minGen, reader.getGeneration()); + long minGen = Long.MAX_VALUE; + if (alwaysIncludeCurrent || seqNo <= this.current.getCheckpoint().maxSeqNo) { + minGen = this.current.generation; + } + for (final TranslogReader reader : readers) { + if (seqNo <= reader.getCheckpoint().maxSeqNo) { + minGen = Math.min(minGen, reader.getGeneration()); + } } + return new TranslogGeneration(translogUUID, minGen); } - return minGen; } /** @@ -1663,6 +1629,13 @@ public TranslogGeneration getGeneration() { } } + /** + * Returns the translog generation of the last Lucene commit. + */ + public long uncommittedGeneration() { + return deletionPolicy.getTranslogGenerationOfLastCommit(); + } + /** * Returns true iff the given generation is the current generation of this translog */ diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 5eba198378a1d..cb242dd0aeac7 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -211,7 +211,7 @@ public synchronized long getMinTranslogGenerationForRecovery() { /** * Returns a translog generation that will be used to calculate the number of uncommitted operations since the last index commit. - * See {@link Translog#uncommittedOperations()} and {@link Translog#uncommittedSizeInBytes()} + * See {@link Translog#uncommittedGeneration()}) */ public synchronized long getTranslogGenerationOfLastCommit() { return translogGenerationOfLastCommit; diff --git a/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java b/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java index c57af9b448671..8bb030ce6e91a 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java @@ -156,7 +156,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException { assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); engine.recoverFromTranslog(); assertEquals(2, engine.getTranslog().currentFileGeneration()); - assertEquals(0L, engine.getTranslog().uncommittedOperations()); + assertEquals(0L, engine.getTranslog().totalOperationsByMinGen(engine.getTranslog().uncommittedGeneration())); } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 20bf96366860e..94fbf67fcb462 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -726,7 +726,8 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s } }; - assertThat(recoveringEngine.getTranslog().uncommittedOperations(), equalTo(docs)); + final Translog translog = recoveringEngine.getTranslog(); + assertThat(translog.totalOperationsByMinGen(translog.uncommittedGeneration()), equalTo(docs)); recoveringEngine.recoverFromTranslog(); assertTrue(committed.get()); } finally { @@ -3614,7 +3615,7 @@ protected long doGenerateSeqNoForOperation(Operation operation) { System.nanoTime(), reason)); assertThat(noOpEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo((long) (maxSeqNo + 1))); - assertThat(noOpEngine.getTranslog().uncommittedOperations(), equalTo(1 + gapsFilled)); + assertThat(noOpEngine.getTranslog().totalOperationsByMinGen(noOpEngine.getTranslog().uncommittedGeneration()), equalTo(1 + gapsFilled)); // skip to the op that we added to the translog Translog.Operation op; Translog.Operation last = null; @@ -3680,7 +3681,7 @@ public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierExcepti * This sequence number landed in the last generation, but the lower and upper bounds for an earlier generation straddle * this sequence number. */ - assertThat(translog.getMinGenerationForSeqNo(3 * i + 1).translogFileGeneration, equalTo(i + generation)); + assertThat(translog.getMinGenerationForSeqNo(3 * i + 1, true).translogFileGeneration, equalTo(i + generation)); } int i = 0; @@ -3814,7 +3815,8 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { assertEquals(maxSeqIDOnReplica, replicaEngine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint()); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); - assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().uncommittedOperations()); + assertEquals(numDocsOnReplica, + recoveringEngine.getTranslog().totalOperationsByMinGen(recoveringEngine.getTranslog().uncommittedGeneration())); recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint()); @@ -3848,7 +3850,8 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { try { recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); if (flushed) { - assertEquals(0, recoveringEngine.getTranslog().uncommittedOperations()); + assertThat(recoveringEngine.getTranslog().totalOperationsByMinGen(recoveringEngine.getTranslog().uncommittedGeneration()), + equalTo(0)); } recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); @@ -4252,7 +4255,8 @@ public void testCleanupCommitsWhenReleaseSnapshot() throws Exception { public void testShouldPeriodicallyFlush() throws Exception { assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false)); // A new engine may have more than one empty translog files - the test should account this extra. - final long extraTranslogSizeInNewEngine = engine.getTranslog().uncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES; + final Translog translog = engine.getTranslog(); + final long extraTranslogSizeInNewEngine = translog.sizeInBytesByMinGen(translog.uncommittedGeneration()) - Translog.DEFAULT_HEADER_SIZE_IN_BYTES; int numDocs = between(10, 100); for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); @@ -4260,17 +4264,17 @@ public void testShouldPeriodicallyFlush() throws Exception { } assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false)); long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, - engine.getTranslog().uncommittedSizeInBytes() - extraTranslogSizeInNewEngine); + translog.sizeInBytesByMinGen(translog.uncommittedGeneration()) - extraTranslogSizeInNewEngine); final IndexSettings indexSettings = engine.config().getIndexSettings(); final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) .settings(Settings.builder().put(indexSettings.getSettings()) .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); indexSettings.updateIndexMetaData(indexMetaData); engine.onSettingsChanged(); - assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs)); + assertThat(translog.totalOperationsByMinGen(translog.uncommittedGeneration()), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(); - assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0)); + assertThat(translog.totalOperationsByMinGen(translog.uncommittedGeneration()), equalTo(0)); // Stale operations skipped by Lucene but added to translog - still able to flush for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); @@ -4278,18 +4282,18 @@ public void testShouldPeriodicallyFlush() throws Exception { assertThat(result.isCreated(), equalTo(false)); } SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos(); - assertThat(engine.getTranslog().uncommittedOperations(), equalTo(numDocs)); + assertThat(translog.totalOperationsByMinGen(translog.uncommittedGeneration()), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(false, false); assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); - assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0)); + assertThat(translog.totalOperationsByMinGen(translog.uncommittedGeneration()), equalTo(0)); // If the new index commit still points to the same translog generation as the current index commit, // we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes. engine.getLocalCheckpointTracker().generateSeqNo(); // create a gap here for (int id = 0; id < numDocs; id++) { if (randomBoolean()){ - engine.getTranslog().rollGeneration(); + translog.rollGeneration(); } final ParsedDocument doc = testParsedDocument("new" + id, null, testDocumentWithTextField(), SOURCE, null); long seqno = engine.getLocalCheckpointTracker().generateSeqNo(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 72813cf26372d..6a27cdbd9a9b1 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -342,29 +342,32 @@ public void testMaybeFlush() throws Exception { IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {}); assertTrue(shard.shouldPeriodicallyFlush()); final Translog translog = shard.getEngine().getTranslog(); - assertEquals(2, translog.uncommittedOperations()); + assertEquals(2, translog.totalOperationsByMinGen(translog.uncommittedGeneration())); client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertBusy(() -> { // this is async assertFalse(shard.shouldPeriodicallyFlush()); }); - assertEquals(0, translog.uncommittedOperations()); + assertEquals(0, translog.totalOperationsByMinGen(translog.uncommittedGeneration())); translog.sync(); - long size = Math.max(translog.uncommittedSizeInBytes(), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1); - logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), - translog.uncommittedOperations(), translog.getGeneration()); + long size = Math.max(translog.sizeInBytesByMinGen(translog.uncommittedGeneration()), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1); + logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", + translog.sizeInBytesByMinGen(translog.uncommittedGeneration()), + translog.totalOperationsByMinGen(translog.uncommittedGeneration()), translog.getGeneration()); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put( IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES)) .build()).get(); client().prepareDelete("test", "test", "2").get(); - logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), - translog.uncommittedOperations(), translog.getGeneration()); + logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", + translog.sizeInBytesByMinGen(translog.uncommittedGeneration()), + translog.totalOperationsByMinGen(translog.uncommittedGeneration()), translog.getGeneration()); assertBusy(() -> { // this is async - logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), - translog.uncommittedOperations(), translog.getGeneration()); + logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", + translog.sizeInBytesByMinGen(translog.uncommittedGeneration()), + translog.totalOperationsByMinGen(translog.uncommittedGeneration()), translog.getGeneration()); assertFalse(shard.shouldPeriodicallyFlush()); }); - assertEquals(0, translog.uncommittedOperations()); + assertEquals(0, translog.totalOperationsByMinGen(translog.uncommittedGeneration())); } public void testMaybeRollTranslogGeneration() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 98105e6fcc9eb..a3b7e982cf0eb 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -31,6 +31,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.ByteArrayDataOutput; import org.apache.lucene.store.MockDirectoryWrapper; +import org.elasticsearch.core.internal.io.IOUtils; import org.apache.lucene.util.LineFileDocs; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -53,7 +54,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; @@ -113,7 +113,6 @@ import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; -import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -502,67 +501,15 @@ public void testUncommittedOperations() throws Exception { translog.rollGeneration(); operationsInLastGen = 0; } - assertThat(translog.uncommittedOperations(), equalTo(uncommittedOps)); + assertThat(translog.totalOperationsByMinGen(translog.uncommittedGeneration()), equalTo(uncommittedOps)); if (frequently()) { markCurrentGenAsCommitted(translog); - assertThat(translog.uncommittedOperations(), equalTo(operationsInLastGen)); + assertThat(translog.totalOperationsByMinGen(translog.uncommittedGeneration()), equalTo(operationsInLastGen)); uncommittedOps = operationsInLastGen; } } } - public void testSizeOfGensAboveSeqNoInBytes() throws Exception { - final long emptyTranslogSize = Translog.DEFAULT_HEADER_SIZE_IN_BYTES; - assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomNonNegativeLong()), equalTo(0L)); - // Gen1: seqno in 1001-2000 - int ops = between(1, 100); - final Set seqnoGen1 = new HashSet<>(); - final long gen1 = translog.currentFileGeneration(); - for (int i = 0; i < ops; i++) { - long seqno = randomValueOtherThanMany(n -> seqnoGen1.add(n) == false, () -> randomLongBetween(1001, 2000)); - translog.add(new Translog.Index("test", UUIDs.randomBase64UUID(), seqno, new byte[]{1})); - } - final long maxSeqnoGen1 = Collections.max(seqnoGen1); - long sizeGen1 = translog.getCurrent().sizeInBytes(); - for (int numOfEmptyGen = between(0, 10), i = 0; i < numOfEmptyGen; i++) { - translog.rollGeneration(); - sizeGen1 += emptyTranslogSize; - } - assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(maxSeqnoGen1 + 1, Long.MAX_VALUE)), equalTo(0L)); - assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(0, maxSeqnoGen1)), - allOf(equalTo(sizeGen1), equalTo(translog.sizeInBytesByMinGen(gen1)))); - // Gen2: seqno in 0-1000 - translog.rollGeneration(); - ops = between(1, 100); - for (int i = 0; i < ops; i++) { - translog.add(new Translog.Index("test", UUIDs.randomBase64UUID(), i, new byte[]{1})); - } - long sizeGen2 = translog.getCurrent().sizeInBytes(); - assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(maxSeqnoGen1 + 1, Long.MAX_VALUE)), equalTo(0L)); - assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(0, maxSeqnoGen1)), - allOf(equalTo(sizeGen1 + sizeGen2), equalTo(translog.sizeInBytesByMinGen(gen1)))); - // Gen3: seqno in 2001+ - ops = between(1, 100); - translog.rollGeneration(); - final long gen3 = translog.currentFileGeneration(); - final Set seqnoGen3 = new HashSet<>(); - for (int i = 0; i < ops; i++) { - long seqno = randomValueOtherThanMany(n -> seqnoGen3.add(n) == false, () -> randomLongBetween(2001, Long.MAX_VALUE)); - translog.add(new Translog.Index("test", UUIDs.randomBase64UUID(), seqno, new byte[]{1})); - } - final long maxSeqnoGen3 = Collections.max(seqnoGen3); - long sizeGen3 = translog.getCurrent().sizeInBytes(); - for (int numOfEmptyGen = between(0, 10), i = 0; i < numOfEmptyGen; i++) { - translog.rollGeneration(); - sizeGen3 += emptyTranslogSize; // check an empty generation is included - } - assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(maxSeqnoGen3 + 1, Long.MAX_VALUE)), equalTo(0L)); - assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(maxSeqnoGen1 + 1, maxSeqnoGen3)), - allOf(equalTo(sizeGen3), equalTo(translog.sizeInBytesByMinGen(gen3)))); // Since gen3 - assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(0, maxSeqnoGen1)), - allOf(equalTo(sizeGen1 + sizeGen2 + sizeGen3), equalTo(translog.sizeInBytesByMinGen(gen1)))); // Since gen1 - } - public void testTotalTests() { final TranslogStats total = new TranslogStats(0, 0, 0, 0, 1); final int n = randomIntBetween(0, 16); @@ -957,7 +904,7 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep lastCommittedLocalCheckpoint.set(localCheckpoint); deletionPolicy.setTranslogGenerationOfLastCommit(translog.currentFileGeneration()); deletionPolicy.setMinTranslogGenerationForRecovery( - translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration); + translog.getMinGenerationForSeqNo(localCheckpoint + 1, true).translogFileGeneration); translog.trimUnreferencedReaders(); } } @@ -2567,7 +2514,7 @@ public void testRollGeneration() throws Exception { long minGenForRecovery = randomLongBetween(generation, generation + rolls); commit(translog, minGenForRecovery, generation + rolls); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); - assertThat(translog.uncommittedOperations(), equalTo(0)); + assertThat(translog.totalOperationsByMinGen(translog.uncommittedGeneration()), equalTo(0)); if (longRetention) { for (int i = 0; i <= rolls; i++) { assertFileIsPresent(translog, generation + i); @@ -2616,7 +2563,7 @@ public void testMinSeqNoBasedAPI() throws IOException { translog.rollGeneration(); for (long seqNo = 0; seqNo < operations; seqNo++) { final Set> seenSeqNos = new HashSet<>(); - final long generation = translog.getMinGenerationForSeqNo(seqNo).translogFileGeneration; + final long generation = translog.getMinGenerationForSeqNo(seqNo, randomBoolean()).translogFileGeneration; int expectedSnapshotOps = 0; for (long g = generation; g < translog.currentFileGeneration(); g++) { if (!seqNoPerGeneration.containsKey(g)) { @@ -2643,7 +2590,6 @@ public void testMinSeqNoBasedAPI() throws IOException { seenSeqNos.addAll(generationSeqNo); } assertThat(translog.estimateTotalOperationsFromMinSeq(seqNo), equalTo(expectedSnapshotOps)); - assertThat(translog.sizeInBytesByMinGen(generation), equalTo(translog.sizeOfGensAboveSeqNoInBytes(seqNo))); int readFromSnapshot = 0; try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(seqNo)) { assertThat(snapshot.totalOperations(), equalTo(expectedSnapshotOps)); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index a496664c0260b..c25f9cb7927d8 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.translog.SnapshotMatchers; +import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import java.util.HashMap; @@ -306,7 +307,7 @@ public void testShouldFlushAfterPeerRecovery() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startAll(); int numDocs = shards.indexDocs(between(10, 100)); - final long translogSizeOnPrimary = shards.getPrimary().getTranslog().uncommittedSizeInBytes(); + final long translogSizeOnPrimary = shards.getPrimary().translogStats().getUncommittedSizeInBytes(); shards.flush(); final IndexShard replica = shards.addReplica(); From 2a740717eb0770793b7b7dbd400e93296fd1ea78 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 19 Mar 2018 18:17:33 -0400 Subject: [PATCH 05/18] add stress test --- .../index/engine/InternalEngineTests.java | 57 ++++++++++++++++++- .../indices/recovery/RecoveryTests.java | 1 - 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 94fbf67fcb462..21080334e4a5b 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -65,6 +65,7 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -163,6 +164,7 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; @@ -4300,7 +4302,6 @@ public void testShouldPeriodicallyFlush() throws Exception { final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 2L, seqno, false)); assertThat(result.isCreated(), equalTo(true)); } - // A flush must change the periodically flush condition. lastCommitInfo = engine.getLastCommittedSegmentInfos(); if (engine.shouldPeriodicallyFlush()) { engine.flush(); @@ -4309,6 +4310,60 @@ public void testShouldPeriodicallyFlush() throws Exception { assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); } + public void testStressShouldPeriodicallyFlush() throws Exception { + final long flushThreshold = randomLongBetween(100, 5000); + final long generationThreshold = randomLongBetween(1000, 5000); + final IndexSettings indexSettings = engine.config().getIndexSettings(); + final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) + .settings(Settings.builder().put(indexSettings.getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(), generationThreshold + "b") + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); + indexSettings.updateIndexMetaData(indexMetaData); + engine.onSettingsChanged(); + final int iterations = scaledRandomIntBetween(100, 1000); + final List pendingSeqNo = new ArrayList<>(); + for (int iteration = 0; iteration < iterations; iteration++) { + final int opsPerIter = scaledRandomIntBetween(1, 100); + for (int op = 0; op < opsPerIter; op++) { + final String id = UUIDs.randomBase64UUID(); + final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); + final long seqno; + if (randomBoolean() && pendingSeqNo.isEmpty() == false) { + seqno = pendingSeqNo.remove(0); + } else { + seqno = engine.getLocalCheckpointTracker().generateSeqNo(); + } + engine.index(replicaIndexForDoc(doc, 1L, seqno, false)); + try { + if (rarely() || engine.getTranslog().shouldRollGeneration()) { + engine.rollTranslogGeneration(); + } + if (engine.shouldPeriodicallyFlush()) { + engine.flush(); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); + } + } catch (EngineException ex) { + // This happens because the test may have open too many files (max 2048 fds on test) + assertThat(engine.getTranslog().currentFileGeneration() - engine.getTranslog().getMinFileGeneration(), + greaterThan(100L)); + return; + } + } + if (randomBoolean() && pendingSeqNo.isEmpty()) { + pendingSeqNo.add(engine.getLocalCheckpointTracker().generateSeqNo()); + } + } + try { + if (engine.shouldPeriodicallyFlush()) { + engine.flush(); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); + } + } catch (EngineException ex) { + // This happens because the test may have open too many files (max 2048 fds on test) + assertThat(engine.getTranslog().currentFileGeneration() - engine.getTranslog().getMinFileGeneration(), + greaterThan(100L)); + } + } public void testStressUpdateSameDocWhileGettingIt() throws IOException, InterruptedException { final int iters = randomIntBetween(1, 15); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index c25f9cb7927d8..49e557c3dde78 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -42,7 +42,6 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.translog.SnapshotMatchers; -import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import java.util.HashMap; From 6ee93733408f85f7ebebbb4190cea0e525569022 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 19 Mar 2018 19:54:51 -0400 Subject: [PATCH 06/18] Comment --- .../org/elasticsearch/index/engine/InternalEngineTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 21080334e4a5b..a8d70bac03c60 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4343,7 +4343,7 @@ public void testStressShouldPeriodicallyFlush() throws Exception { assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); } } catch (EngineException ex) { - // This happens because the test may have open too many files (max 2048 fds on test) + // This happened because the test may have opened too many files (max 2048 fds on test) assertThat(engine.getTranslog().currentFileGeneration() - engine.getTranslog().getMinFileGeneration(), greaterThan(100L)); return; @@ -4359,7 +4359,7 @@ public void testStressShouldPeriodicallyFlush() throws Exception { assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); } } catch (EngineException ex) { - // This happens because the test may have open too many files (max 2048 fds on test) + // This happened because the test may have opened too many files (max 2048 fds on test) assertThat(engine.getTranslog().currentFileGeneration() - engine.getTranslog().getMinFileGeneration(), greaterThan(100L)); } From 7587ee6100cd452643d2677c804c5f2351c0a337 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 20 Mar 2018 16:17:23 -0400 Subject: [PATCH 07/18] Do not ask translog for the last commit - engine has it --- .../index/engine/InternalEngine.java | 22 +++---- .../index/translog/Translog.java | 58 +++++++++---------- .../index/engine/InternalEngineTests.java | 2 +- .../index/translog/TranslogTests.java | 4 +- 4 files changed, 43 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7b57388d4727c..ba7cba8800d74 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1361,8 +1361,8 @@ final boolean tryRenewSyncCommit() { ensureOpen(); ensureCanFlush(); String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID); - if (syncId != null && indexWriter.hasUncommittedChanges() - && translog.totalOperationsByMinGen(translog.uncommittedGeneration()) == 0) { + long translogGenOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); + if (syncId != null && indexWriter.hasUncommittedChanges() && translog.totalOperationsByMinGen(translogGenOfLastCommit) == 0) { logger.trace("start renewing sync commit [{}]", syncId); commitIndexWriter(indexWriter, translog, syncId); logger.debug("successfully sync committed. sync id [{}].", syncId); @@ -1384,20 +1384,22 @@ final boolean tryRenewSyncCommit() { @Override public boolean shouldPeriodicallyFlush() { ensureOpen(); - final long translogGenerationOfCurrentCommit = translog.uncommittedGeneration(); - final long uncommittedTranslogSize = translog.sizeInBytesByMinGen(translogGenerationOfCurrentCommit); + final SegmentInfos lastCommit = this.lastCommittedSegmentInfos; + final long translogGenerationOfLastCommit = Long.parseLong(lastCommit.userData.get(Translog.TRANSLOG_GENERATION_KEY)); final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); - if (uncommittedTranslogSize < flushThreshold) { + if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) { return false; } /* - * We should only flush ony if the shouldPeriodicallyFlush condition can become false after flushing. - * This condition will change if the new commit points to the later translog generation than the current commit's. + * We should only flush ony if the shouldFlush condition can become false after flushing. This condition will change if: + * 1. The new commit points to the later generation the last commit's. + * 2. The local checkpoint equals to max_seqno. This makes the new commit point to the newly rolled translog generation. * This method is to maintain translog only, thus the IndexWriter#hasUncommittedChanges condition is not considered. */ final long translogGenerationOfNewCommit = - translog.getMinGenerationForSeqNo(localCheckpointTracker.getCheckpoint() + 1, false).translogFileGeneration; - return translogGenerationOfCurrentCommit < translogGenerationOfNewCommit; + translog.getMinGenerationForSeqNo(localCheckpointTracker.getCheckpoint() + 1).translogFileGeneration; + return translogGenerationOfLastCommit < translogGenerationOfNewCommit + || localCheckpointTracker.getCheckpoint() == localCheckpointTracker.getMaxSeqNo(); } @Override @@ -2017,7 +2019,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl ensureCanFlush(); try { final long localCheckpoint = localCheckpointTracker.getCheckpoint(); - final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1, true); + final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1); final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration); final String translogUUID = translogGeneration.translogUUID; final String localCheckpointValue = Long.toString(localCheckpoint); diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index bf18764a78135..0dcb449cd2132 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.index.Term; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.UUIDs; @@ -38,7 +39,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ReleasableLock; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; @@ -356,6 +356,14 @@ public long getMinFileGeneration() { } } + + /** + * Returns the translog generation of the last index commit. + */ + public long uncommittedGeneration() { + return deletionPolicy.getTranslogGenerationOfLastCommit(); + } + /** * Returns the number of operations in the translog files */ @@ -391,37 +399,37 @@ static long findEarliestLastModifiedAge(long currentTime, Iterable r.getGeneration() >= minGeneration) + .mapToInt(BaseTranslogReader::totalOperations) + .sum(); } } /** - * Returns the size in bytes of the translog files above the given generation + * Returns the number of operations in the transaction files that contain operations with seq# above the given number. */ - public long sizeInBytesByMinGen(long minGeneration) { + public int estimateTotalOperationsFromMinSeq(long minSeqNo) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); - return Stream.concat(readers.stream(), Stream.of(current)) - .filter(r -> r.getGeneration() >= minGeneration) - .mapToLong(BaseTranslogReader::sizeInBytes) - .sum(); + return readersAboveMinSeqNo(minSeqNo).mapToInt(BaseTranslogReader::totalOperations).sum(); } } /** - * Returns the number of operations in the transaction files above the given generation + * Returns the size in bytes of the translog files at least the given generation */ - public int totalOperationsByMinGen(long minGeneration) { + public long sizeInBytesByMinGen(long minGeneration) { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); return Stream.concat(readers.stream(), Stream.of(current)) .filter(r -> r.getGeneration() >= minGeneration) - .mapToInt(BaseTranslogReader::totalOperations) + .mapToLong(BaseTranslogReader::sizeInBytes) .sum(); } } @@ -1484,13 +1492,13 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl } /** - * Gets the minimum generation that could contain any sequence number after the specified sequence number + * Gets the minimum generation that could contain any sequence number after the specified sequence number, or the current generation if + * there is no generation that could any such sequence number. * - * @param seqNo the sequence number - * @param alwaysIncludeCurrent if true, the current generation is returned there is no generation that could any such seq# + * @param seqNo the sequence number * @return the minimum generation for the sequence number */ - public TranslogGeneration getMinGenerationForSeqNo(final long seqNo, final boolean alwaysIncludeCurrent) { + public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) { try (ReleasableLock ignored = readLock.acquire()) { /* * When flushing, the engine will ask the translog for the minimum generation that could contain any sequence number after the @@ -1498,16 +1506,13 @@ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo, final boole * be the current translog generation as we do not need any prior generations to have a complete history up to the current local * checkpoint. */ - long minGen = Long.MAX_VALUE; - if (alwaysIncludeCurrent || seqNo <= this.current.getCheckpoint().maxSeqNo) { - minGen = this.current.generation; - } + long minTranslogFileGeneration = this.currentFileGeneration(); for (final TranslogReader reader : readers) { if (seqNo <= reader.getCheckpoint().maxSeqNo) { - minGen = Math.min(minGen, reader.getGeneration()); + minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration()); } } - return new TranslogGeneration(translogUUID, minGen); + return new TranslogGeneration(translogUUID, minTranslogFileGeneration); } } @@ -1629,13 +1634,6 @@ public TranslogGeneration getGeneration() { } } - /** - * Returns the translog generation of the last Lucene commit. - */ - public long uncommittedGeneration() { - return deletionPolicy.getTranslogGenerationOfLastCommit(); - } - /** * Returns true iff the given generation is the current generation of this translog */ diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index a8d70bac03c60..ce72f11356e16 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3683,7 +3683,7 @@ public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierExcepti * This sequence number landed in the last generation, but the lower and upper bounds for an earlier generation straddle * this sequence number. */ - assertThat(translog.getMinGenerationForSeqNo(3 * i + 1, true).translogFileGeneration, equalTo(i + generation)); + assertThat(translog.getMinGenerationForSeqNo(3 * i + 1).translogFileGeneration, equalTo(i + generation)); } int i = 0; diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index a3b7e982cf0eb..685bf0514ffc2 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -904,7 +904,7 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep lastCommittedLocalCheckpoint.set(localCheckpoint); deletionPolicy.setTranslogGenerationOfLastCommit(translog.currentFileGeneration()); deletionPolicy.setMinTranslogGenerationForRecovery( - translog.getMinGenerationForSeqNo(localCheckpoint + 1, true).translogFileGeneration); + translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration); translog.trimUnreferencedReaders(); } } @@ -2563,7 +2563,7 @@ public void testMinSeqNoBasedAPI() throws IOException { translog.rollGeneration(); for (long seqNo = 0; seqNo < operations; seqNo++) { final Set> seenSeqNos = new HashSet<>(); - final long generation = translog.getMinGenerationForSeqNo(seqNo, randomBoolean()).translogFileGeneration; + final long generation = translog.getMinGenerationForSeqNo(seqNo).translogFileGeneration; int expectedSnapshotOps = 0; for (long g = generation; g < translog.currentFileGeneration(); g++) { if (!seqNoPerGeneration.containsKey(g)) { From d02770e7c14f0be24d942b6a0344639ddb7afacd Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 20 Mar 2018 16:47:23 -0400 Subject: [PATCH 08/18] simplify test --- .../index/engine/InternalEngineTests.java | 48 +++++-------------- 1 file changed, 13 insertions(+), 35 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index ce72f11356e16..fae6761fb3ea3 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4320,48 +4320,26 @@ public void testStressShouldPeriodicallyFlush() throws Exception { .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); indexSettings.updateIndexMetaData(indexMetaData); engine.onSettingsChanged(); - final int iterations = scaledRandomIntBetween(100, 1000); - final List pendingSeqNo = new ArrayList<>(); + final int iterations = scaledRandomIntBetween(10, 100); for (int iteration = 0; iteration < iterations; iteration++) { final int opsPerIter = scaledRandomIntBetween(1, 100); for (int op = 0; op < opsPerIter; op++) { - final String id = UUIDs.randomBase64UUID(); - final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); - final long seqno; - if (randomBoolean() && pendingSeqNo.isEmpty() == false) { - seqno = pendingSeqNo.remove(0); - } else { - seqno = engine.getLocalCheckpointTracker().generateSeqNo(); - } + final long localCheckPoint = engine.getLocalCheckpointTracker().getCheckpoint(); + final long seqno = randomLongBetween(Math.max(0, localCheckPoint), localCheckPoint + 5); + final ParsedDocument doc = testParsedDocument(Long.toString(seqno), null, testDocumentWithTextField(), SOURCE, null); engine.index(replicaIndexForDoc(doc, 1L, seqno, false)); - try { - if (rarely() || engine.getTranslog().shouldRollGeneration()) { - engine.rollTranslogGeneration(); - } - if (engine.shouldPeriodicallyFlush()) { - engine.flush(); - assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); - } - } catch (EngineException ex) { - // This happened because the test may have opened too many files (max 2048 fds on test) - assertThat(engine.getTranslog().currentFileGeneration() - engine.getTranslog().getMinFileGeneration(), - greaterThan(100L)); - return; + if (rarely() || engine.getTranslog().shouldRollGeneration()) { + engine.rollTranslogGeneration(); + } + if (engine.shouldPeriodicallyFlush()) { + engine.flush(); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); } - } - if (randomBoolean() && pendingSeqNo.isEmpty()) { - pendingSeqNo.add(engine.getLocalCheckpointTracker().generateSeqNo()); } } - try { - if (engine.shouldPeriodicallyFlush()) { - engine.flush(); - assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); - } - } catch (EngineException ex) { - // This happened because the test may have opened too many files (max 2048 fds on test) - assertThat(engine.getTranslog().currentFileGeneration() - engine.getTranslog().getMinFileGeneration(), - greaterThan(100L)); + if (engine.shouldPeriodicallyFlush()) { + engine.flush(); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); } } From 2e7897fdd99a0a48ba89639eef73e4daee3a0338 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 20 Mar 2018 16:56:22 -0400 Subject: [PATCH 09/18] Inline the last commit --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index ba7cba8800d74..1d5dce5bffcc1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1384,8 +1384,7 @@ final boolean tryRenewSyncCommit() { @Override public boolean shouldPeriodicallyFlush() { ensureOpen(); - final SegmentInfos lastCommit = this.lastCommittedSegmentInfos; - final long translogGenerationOfLastCommit = Long.parseLong(lastCommit.userData.get(Translog.TRANSLOG_GENERATION_KEY)); + final long translogGenerationOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) { return false; From 50c352d005af616217958f5fa503c4083c3a4bc4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 20 Mar 2018 18:23:27 -0400 Subject: [PATCH 10/18] single loop --- .../index/engine/InternalEngineTests.java | 49 +++++++------------ 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index fae6761fb3ea3..e31f2f2744673 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4289,25 +4289,21 @@ public void testShouldPeriodicallyFlush() throws Exception { engine.flush(false, false); assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); assertThat(translog.totalOperationsByMinGen(translog.uncommittedGeneration()), equalTo(0)); - // If the new index commit still points to the same translog generation as the current index commit, // we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes. engine.getLocalCheckpointTracker().generateSeqNo(); // create a gap here for (int id = 0; id < numDocs; id++) { - if (randomBoolean()){ + if (randomBoolean()) { translog.rollGeneration(); } final ParsedDocument doc = testParsedDocument("new" + id, null, testDocumentWithTextField(), SOURCE, null); - long seqno = engine.getLocalCheckpointTracker().generateSeqNo(); - final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 2L, seqno, false)); - assertThat(result.isCreated(), equalTo(true)); - } - lastCommitInfo = engine.getLastCommittedSegmentInfos(); - if (engine.shouldPeriodicallyFlush()) { - engine.flush(); - assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); + engine.index(replicaIndexForDoc(doc, 2L, engine.getLocalCheckpointTracker().generateSeqNo(), false)); + if (engine.shouldPeriodicallyFlush()) { + engine.flush(); + assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); + } } - assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); } public void testStressShouldPeriodicallyFlush() throws Exception { @@ -4320,26 +4316,19 @@ public void testStressShouldPeriodicallyFlush() throws Exception { .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); indexSettings.updateIndexMetaData(indexMetaData); engine.onSettingsChanged(); - final int iterations = scaledRandomIntBetween(10, 100); - for (int iteration = 0; iteration < iterations; iteration++) { - final int opsPerIter = scaledRandomIntBetween(1, 100); - for (int op = 0; op < opsPerIter; op++) { - final long localCheckPoint = engine.getLocalCheckpointTracker().getCheckpoint(); - final long seqno = randomLongBetween(Math.max(0, localCheckPoint), localCheckPoint + 5); - final ParsedDocument doc = testParsedDocument(Long.toString(seqno), null, testDocumentWithTextField(), SOURCE, null); - engine.index(replicaIndexForDoc(doc, 1L, seqno, false)); - if (rarely() || engine.getTranslog().shouldRollGeneration()) { - engine.rollTranslogGeneration(); - } - if (engine.shouldPeriodicallyFlush()) { - engine.flush(); - assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); - } + final int numOps = scaledRandomIntBetween(100, 10_000); + for (int i = 0; i < numOps; i++) { + final long localCheckPoint = engine.getLocalCheckpointTracker().getCheckpoint(); + final long seqno = randomLongBetween(Math.max(0, localCheckPoint), localCheckPoint + 5); + final ParsedDocument doc = testParsedDocument(Long.toString(seqno), null, testDocumentWithTextField(), SOURCE, null); + engine.index(replicaIndexForDoc(doc, 1L, seqno, false)); + if (rarely() || engine.getTranslog().shouldRollGeneration()) { + engine.rollTranslogGeneration(); + } + if (engine.shouldPeriodicallyFlush()) { + engine.flush(); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); } - } - if (engine.shouldPeriodicallyFlush()) { - engine.flush(); - assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); } } From 031b1126329fff4197469008b52bd6c7e94034c4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 21 Mar 2018 18:08:28 -0400 Subject: [PATCH 11/18] improve the engine test --- .../org/elasticsearch/index/engine/InternalEngineTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index e31f2f2744673..e438f958ad0b2 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4316,16 +4316,16 @@ public void testStressShouldPeriodicallyFlush() throws Exception { .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); indexSettings.updateIndexMetaData(indexMetaData); engine.onSettingsChanged(); - final int numOps = scaledRandomIntBetween(100, 10_000); + final int numOps = 10_000; //scaledRandomIntBetween(100, 10_000); for (int i = 0; i < numOps; i++) { final long localCheckPoint = engine.getLocalCheckpointTracker().getCheckpoint(); final long seqno = randomLongBetween(Math.max(0, localCheckPoint), localCheckPoint + 5); final ParsedDocument doc = testParsedDocument(Long.toString(seqno), null, testDocumentWithTextField(), SOURCE, null); engine.index(replicaIndexForDoc(doc, 1L, seqno, false)); - if (rarely() || engine.getTranslog().shouldRollGeneration()) { + if (rarely() && engine.getTranslog().shouldRollGeneration()) { engine.rollTranslogGeneration(); } - if (engine.shouldPeriodicallyFlush()) { + if (rarely() || engine.shouldPeriodicallyFlush()) { engine.flush(); assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); } From 05bffeccb353abea66c28bd0e5d2d3247214ed16 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 21 Mar 2018 18:22:40 -0400 Subject: [PATCH 12/18] Remove #uncommittedGeneration from translog --- .../index/translog/Translog.java | 10 +------- .../translog/TranslogDeletionPolicy.java | 1 - .../index/engine/EngineDiskUtilsTests.java | 3 ++- .../index/engine/InternalEngineTests.java | 25 ++++++++----------- .../index/shard/IndexShardIT.java | 18 ++++++------- .../index/translog/TestTranslog.java | 14 +++++++++++ .../index/translog/TranslogTests.java | 6 ++--- 7 files changed, 39 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 0dcb449cd2132..c2d494fd07a34 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -356,14 +356,6 @@ public long getMinFileGeneration() { } } - - /** - * Returns the translog generation of the last index commit. - */ - public long uncommittedGeneration() { - return deletionPolicy.getTranslogGenerationOfLastCommit(); - } - /** * Returns the number of operations in the translog files */ @@ -741,7 +733,7 @@ private void closeOnTragicEvent(Exception ex) { public TranslogStats stats() { // acquire lock to make the two numbers roughly consistent (no file change half way) try (ReleasableLock lock = readLock.acquire()) { - final long uncommittedGen = uncommittedGeneration(); + final long uncommittedGen = deletionPolicy.getTranslogGenerationOfLastCommit(); return new TranslogStats(totalOperations(), sizeInBytes(), totalOperationsByMinGen(uncommittedGen), sizeInBytesByMinGen(uncommittedGen), earliestLastModifiedAge()); } } diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index cb242dd0aeac7..eb23a415d3e34 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -211,7 +211,6 @@ public synchronized long getMinTranslogGenerationForRecovery() { /** * Returns a translog generation that will be used to calculate the number of uncommitted operations since the last index commit. - * See {@link Translog#uncommittedGeneration()}) */ public synchronized long getTranslogGenerationOfLastCommit() { return translogGenerationOfLastCommit; diff --git a/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java b/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java index 8bb030ce6e91a..d137544bad096 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.IndexSettingsModule; @@ -156,7 +157,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException { assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); engine.recoverFromTranslog(); assertEquals(2, engine.getTranslog().currentFileGeneration()); - assertEquals(0L, engine.getTranslog().totalOperationsByMinGen(engine.getTranslog().uncommittedGeneration())); + assertEquals(0L, TestTranslog.uncommittedOperations(engine.getTranslog())); } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index e438f958ad0b2..65e754b585f01 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -116,6 +116,7 @@ import org.elasticsearch.index.shard.ShardUtils; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.SnapshotMatchers; +import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -727,9 +728,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s super.commitIndexWriter(writer, translog, syncId); } }; - - final Translog translog = recoveringEngine.getTranslog(); - assertThat(translog.totalOperationsByMinGen(translog.uncommittedGeneration()), equalTo(docs)); + assertThat(TestTranslog.uncommittedOperations(recoveringEngine.getTranslog()), equalTo(docs)); recoveringEngine.recoverFromTranslog(); assertTrue(committed.get()); } finally { @@ -3617,7 +3616,7 @@ protected long doGenerateSeqNoForOperation(Operation operation) { System.nanoTime(), reason)); assertThat(noOpEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo((long) (maxSeqNo + 1))); - assertThat(noOpEngine.getTranslog().totalOperationsByMinGen(noOpEngine.getTranslog().uncommittedGeneration()), equalTo(1 + gapsFilled)); + assertThat(TestTranslog.uncommittedOperations(noOpEngine.getTranslog()), equalTo(1 + gapsFilled)); // skip to the op that we added to the translog Translog.Operation op; Translog.Operation last = null; @@ -3817,8 +3816,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { assertEquals(maxSeqIDOnReplica, replicaEngine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint()); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); - assertEquals(numDocsOnReplica, - recoveringEngine.getTranslog().totalOperationsByMinGen(recoveringEngine.getTranslog().uncommittedGeneration())); + assertEquals(numDocsOnReplica, TestTranslog.uncommittedOperations(recoveringEngine.getTranslog())); recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint()); @@ -3852,8 +3850,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { try { recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); if (flushed) { - assertThat(recoveringEngine.getTranslog().totalOperationsByMinGen(recoveringEngine.getTranslog().uncommittedGeneration()), - equalTo(0)); + assertThat(TestTranslog.uncommittedOperations(recoveringEngine.getTranslog()), equalTo(0)); } recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); @@ -4258,7 +4255,7 @@ public void testShouldPeriodicallyFlush() throws Exception { assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false)); // A new engine may have more than one empty translog files - the test should account this extra. final Translog translog = engine.getTranslog(); - final long extraTranslogSizeInNewEngine = translog.sizeInBytesByMinGen(translog.uncommittedGeneration()) - Translog.DEFAULT_HEADER_SIZE_IN_BYTES; + final long extraTranslogSizeInNewEngine = TestTranslog.uncommittedSizeInBytes(engine.getTranslog()) - Translog.DEFAULT_HEADER_SIZE_IN_BYTES; int numDocs = between(10, 100); for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); @@ -4266,17 +4263,17 @@ public void testShouldPeriodicallyFlush() throws Exception { } assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false)); long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, - translog.sizeInBytesByMinGen(translog.uncommittedGeneration()) - extraTranslogSizeInNewEngine); + TestTranslog.uncommittedSizeInBytes(engine.getTranslog()) - extraTranslogSizeInNewEngine); final IndexSettings indexSettings = engine.config().getIndexSettings(); final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) .settings(Settings.builder().put(indexSettings.getSettings()) .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); indexSettings.updateIndexMetaData(indexMetaData); engine.onSettingsChanged(); - assertThat(translog.totalOperationsByMinGen(translog.uncommittedGeneration()), equalTo(numDocs)); + assertThat(TestTranslog.uncommittedOperations(engine.getTranslog()), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(); - assertThat(translog.totalOperationsByMinGen(translog.uncommittedGeneration()), equalTo(0)); + assertThat(TestTranslog.uncommittedOperations(engine.getTranslog()), equalTo(0)); // Stale operations skipped by Lucene but added to translog - still able to flush for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); @@ -4284,11 +4281,11 @@ public void testShouldPeriodicallyFlush() throws Exception { assertThat(result.isCreated(), equalTo(false)); } SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos(); - assertThat(translog.totalOperationsByMinGen(translog.uncommittedGeneration()), equalTo(numDocs)); + assertThat(TestTranslog.uncommittedOperations(engine.getTranslog()), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(false, false); assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); - assertThat(translog.totalOperationsByMinGen(translog.uncommittedGeneration()), equalTo(0)); + assertThat(TestTranslog.uncommittedOperations(engine.getTranslog()), equalTo(0)); // If the new index commit still points to the same translog generation as the current index commit, // we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes. engine.getLocalCheckpointTracker().generateSeqNo(); // create a gap here diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 6a27cdbd9a9b1..68454df4205a3 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -61,6 +61,7 @@ import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -342,32 +343,29 @@ public void testMaybeFlush() throws Exception { IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {}); assertTrue(shard.shouldPeriodicallyFlush()); final Translog translog = shard.getEngine().getTranslog(); - assertEquals(2, translog.totalOperationsByMinGen(translog.uncommittedGeneration())); + assertEquals(2, TestTranslog.uncommittedOperations(translog)); client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertBusy(() -> { // this is async assertFalse(shard.shouldPeriodicallyFlush()); }); - assertEquals(0, translog.totalOperationsByMinGen(translog.uncommittedGeneration())); + assertEquals(0, TestTranslog.uncommittedOperations(translog)); translog.sync(); - long size = Math.max(translog.sizeInBytesByMinGen(translog.uncommittedGeneration()), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1); + long size = Math.max(TestTranslog.uncommittedSizeInBytes(translog), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1); logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", - translog.sizeInBytesByMinGen(translog.uncommittedGeneration()), - translog.totalOperationsByMinGen(translog.uncommittedGeneration()), translog.getGeneration()); + TestTranslog.uncommittedSizeInBytes(translog), TestTranslog.uncommittedOperations(translog), translog.getGeneration()); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put( IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES)) .build()).get(); client().prepareDelete("test", "test", "2").get(); logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", - translog.sizeInBytesByMinGen(translog.uncommittedGeneration()), - translog.totalOperationsByMinGen(translog.uncommittedGeneration()), translog.getGeneration()); + TestTranslog.uncommittedSizeInBytes(translog), TestTranslog.uncommittedOperations(translog), translog.getGeneration()); assertBusy(() -> { // this is async logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", - translog.sizeInBytesByMinGen(translog.uncommittedGeneration()), - translog.totalOperationsByMinGen(translog.uncommittedGeneration()), translog.getGeneration()); + TestTranslog.uncommittedSizeInBytes(translog), TestTranslog.uncommittedOperations(translog), translog.getGeneration()); assertFalse(shard.shouldPeriodicallyFlush()); }); - assertEquals(0, translog.totalOperationsByMinGen(translog.uncommittedGeneration())); + assertEquals(0, TestTranslog.uncommittedOperations(translog)); } public void testMaybeRollTranslogGeneration() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java index 4077d033da9cd..d78a9816ed501 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java @@ -122,4 +122,18 @@ private static long minTranslogGenUsedInRecovery(Path translogPath) throws IOExc return Long.parseLong(recoveringCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); } } + + /** + * Returns the number of operations in the translog files since the last commit. + */ + public static int uncommittedOperations(Translog translog) { + return translog.totalOperationsByMinGen(translog.getDeletionPolicy().getTranslogGenerationOfLastCommit()); + } + + /** + * Returns the size in bytes of the translog files since the last commit. + */ + public static long uncommittedSizeInBytes(Translog translog) { + return translog.sizeInBytesByMinGen(translog.getDeletionPolicy().getTranslogGenerationOfLastCommit()); + } } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 685bf0514ffc2..690ff387215be 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -501,10 +501,10 @@ public void testUncommittedOperations() throws Exception { translog.rollGeneration(); operationsInLastGen = 0; } - assertThat(translog.totalOperationsByMinGen(translog.uncommittedGeneration()), equalTo(uncommittedOps)); + assertThat(TestTranslog.uncommittedOperations(translog), equalTo(uncommittedOps)); if (frequently()) { markCurrentGenAsCommitted(translog); - assertThat(translog.totalOperationsByMinGen(translog.uncommittedGeneration()), equalTo(operationsInLastGen)); + assertThat(TestTranslog.uncommittedOperations(translog), equalTo(operationsInLastGen)); uncommittedOps = operationsInLastGen; } } @@ -2514,7 +2514,7 @@ public void testRollGeneration() throws Exception { long minGenForRecovery = randomLongBetween(generation, generation + rolls); commit(translog, minGenForRecovery, generation + rolls); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); - assertThat(translog.totalOperationsByMinGen(translog.uncommittedGeneration()), equalTo(0)); + assertThat(TestTranslog.uncommittedOperations(translog), equalTo(0)); if (longRetention) { for (int i = 0; i <= rolls; i++) { assertFileIsPresent(translog, generation + i); From 4b8708f87b9cfe75491b5ab31ef8604b50be43be Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 21 Mar 2018 18:35:07 -0400 Subject: [PATCH 13/18] add boaz comment --- .../elasticsearch/index/engine/InternalEngine.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1d5dce5bffcc1..85a4f83da6a44 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1390,9 +1390,14 @@ public boolean shouldPeriodicallyFlush() { return false; } /* - * We should only flush ony if the shouldFlush condition can become false after flushing. This condition will change if: - * 1. The new commit points to the later generation the last commit's. - * 2. The local checkpoint equals to max_seqno. This makes the new commit point to the newly rolled translog generation. + * We flush to reduce the size of uncommitted translog but strictly speaking the uncommitted size won't always be below than + * the threshold after a flush. To avoid getting into an endless flushing loop, we only enable the periodically flush condition + * if this condition is disabled after a flush. The condition will change if: + * 1. The new commit points to the later generation the last commit's + * 2. If the local checkpoint equals to max_seqno and translogGenerationOfLastCommit equals to translogGenerationOfNewCommit, + * we know that the last generation must contain operations because its size is above the flush threshold and + * the flush threshold is guaranteed to be higher than an empty translog by the setting validation. + * * This method is to maintain translog only, thus the IndexWriter#hasUncommittedChanges condition is not considered. */ final long translogGenerationOfNewCommit = From ef8a2d3befdec9816d94add948ee5f29d5917a9c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 21 Mar 2018 18:46:36 -0400 Subject: [PATCH 14/18] =?UTF-8?q?Let=E2=80=99s=20use=20stats?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../index/engine/EngineDiskUtilsTests.java | 2 +- .../index/engine/InternalEngineTests.java | 20 +++++++++---------- .../index/shard/IndexShardIT.java | 14 ++++++------- .../index/translog/TestTranslog.java | 14 ------------- .../index/translog/TranslogTests.java | 6 +++--- 5 files changed, 21 insertions(+), 35 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java b/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java index d137544bad096..560e4c9d416e7 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java @@ -157,7 +157,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException { assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); engine.recoverFromTranslog(); assertEquals(2, engine.getTranslog().currentFileGeneration()); - assertEquals(0L, TestTranslog.uncommittedOperations(engine.getTranslog())); + assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations()); } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 65e754b585f01..0d97e1bd8905c 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -728,7 +728,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s super.commitIndexWriter(writer, translog, syncId); } }; - assertThat(TestTranslog.uncommittedOperations(recoveringEngine.getTranslog()), equalTo(docs)); + assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(docs)); recoveringEngine.recoverFromTranslog(); assertTrue(committed.get()); } finally { @@ -3616,7 +3616,7 @@ protected long doGenerateSeqNoForOperation(Operation operation) { System.nanoTime(), reason)); assertThat(noOpEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo((long) (maxSeqNo + 1))); - assertThat(TestTranslog.uncommittedOperations(noOpEngine.getTranslog()), equalTo(1 + gapsFilled)); + assertThat(noOpEngine.getTranslog().stats().getUncommittedOperations(), equalTo(1 + gapsFilled)); // skip to the op that we added to the translog Translog.Operation op; Translog.Operation last = null; @@ -3816,7 +3816,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { assertEquals(maxSeqIDOnReplica, replicaEngine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint()); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); - assertEquals(numDocsOnReplica, TestTranslog.uncommittedOperations(recoveringEngine.getTranslog())); + assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().stats().getUncommittedOperations()); recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint()); @@ -3850,7 +3850,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { try { recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); if (flushed) { - assertThat(TestTranslog.uncommittedOperations(recoveringEngine.getTranslog()), equalTo(0)); + assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); } recoveringEngine.recoverFromTranslog(); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); @@ -4255,7 +4255,7 @@ public void testShouldPeriodicallyFlush() throws Exception { assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false)); // A new engine may have more than one empty translog files - the test should account this extra. final Translog translog = engine.getTranslog(); - final long extraTranslogSizeInNewEngine = TestTranslog.uncommittedSizeInBytes(engine.getTranslog()) - Translog.DEFAULT_HEADER_SIZE_IN_BYTES; + final long extraTranslogSizeInNewEngine = engine.getTranslog().stats().getUncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES; int numDocs = between(10, 100); for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); @@ -4263,17 +4263,17 @@ public void testShouldPeriodicallyFlush() throws Exception { } assertThat("Not exceeded translog flush threshold yet", engine.shouldPeriodicallyFlush(), equalTo(false)); long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, - TestTranslog.uncommittedSizeInBytes(engine.getTranslog()) - extraTranslogSizeInNewEngine); + engine.getTranslog().stats().getUncommittedSizeInBytes()- extraTranslogSizeInNewEngine); final IndexSettings indexSettings = engine.config().getIndexSettings(); final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData()) .settings(Settings.builder().put(indexSettings.getSettings()) .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); indexSettings.updateIndexMetaData(indexMetaData); engine.onSettingsChanged(); - assertThat(TestTranslog.uncommittedOperations(engine.getTranslog()), equalTo(numDocs)); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(); - assertThat(TestTranslog.uncommittedOperations(engine.getTranslog()), equalTo(0)); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); // Stale operations skipped by Lucene but added to translog - still able to flush for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null); @@ -4281,11 +4281,11 @@ public void testShouldPeriodicallyFlush() throws Exception { assertThat(result.isCreated(), equalTo(false)); } SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos(); - assertThat(TestTranslog.uncommittedOperations(engine.getTranslog()), equalTo(numDocs)); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(false, false); assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); - assertThat(TestTranslog.uncommittedOperations(engine.getTranslog()), equalTo(0)); + assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); // If the new index commit still points to the same translog generation as the current index commit, // we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes. engine.getLocalCheckpointTracker().generateSeqNo(); // create a gap here diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 68454df4205a3..fa5bffad16923 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -343,29 +343,29 @@ public void testMaybeFlush() throws Exception { IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {}); assertTrue(shard.shouldPeriodicallyFlush()); final Translog translog = shard.getEngine().getTranslog(); - assertEquals(2, TestTranslog.uncommittedOperations(translog)); + assertEquals(2, translog.stats().getUncommittedOperations()); client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertBusy(() -> { // this is async assertFalse(shard.shouldPeriodicallyFlush()); }); - assertEquals(0, TestTranslog.uncommittedOperations(translog)); + assertEquals(0, translog.stats().getUncommittedOperations()); translog.sync(); - long size = Math.max(TestTranslog.uncommittedSizeInBytes(translog), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1); + long size = Math.max(translog.stats().getUncommittedSizeInBytes(), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1); logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", - TestTranslog.uncommittedSizeInBytes(translog), TestTranslog.uncommittedOperations(translog), translog.getGeneration()); + translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration()); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put( IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES)) .build()).get(); client().prepareDelete("test", "test", "2").get(); logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", - TestTranslog.uncommittedSizeInBytes(translog), TestTranslog.uncommittedOperations(translog), translog.getGeneration()); + translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration()); assertBusy(() -> { // this is async logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", - TestTranslog.uncommittedSizeInBytes(translog), TestTranslog.uncommittedOperations(translog), translog.getGeneration()); + translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration()); assertFalse(shard.shouldPeriodicallyFlush()); }); - assertEquals(0, TestTranslog.uncommittedOperations(translog)); + assertEquals(0, translog.stats().getUncommittedOperations()); } public void testMaybeRollTranslogGeneration() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java index d78a9816ed501..4077d033da9cd 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java @@ -122,18 +122,4 @@ private static long minTranslogGenUsedInRecovery(Path translogPath) throws IOExc return Long.parseLong(recoveringCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); } } - - /** - * Returns the number of operations in the translog files since the last commit. - */ - public static int uncommittedOperations(Translog translog) { - return translog.totalOperationsByMinGen(translog.getDeletionPolicy().getTranslogGenerationOfLastCommit()); - } - - /** - * Returns the size in bytes of the translog files since the last commit. - */ - public static long uncommittedSizeInBytes(Translog translog) { - return translog.sizeInBytesByMinGen(translog.getDeletionPolicy().getTranslogGenerationOfLastCommit()); - } } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 690ff387215be..1fb36486c2b3f 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -501,10 +501,10 @@ public void testUncommittedOperations() throws Exception { translog.rollGeneration(); operationsInLastGen = 0; } - assertThat(TestTranslog.uncommittedOperations(translog), equalTo(uncommittedOps)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(uncommittedOps)); if (frequently()) { markCurrentGenAsCommitted(translog); - assertThat(TestTranslog.uncommittedOperations(translog), equalTo(operationsInLastGen)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(operationsInLastGen)); uncommittedOps = operationsInLastGen; } } @@ -2514,7 +2514,7 @@ public void testRollGeneration() throws Exception { long minGenForRecovery = randomLongBetween(generation, generation + rolls); commit(translog, minGenForRecovery, generation + rolls); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); - assertThat(TestTranslog.uncommittedOperations(translog), equalTo(0)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(0)); if (longRetention) { for (int i = 0; i <= rolls; i++) { assertFileIsPresent(translog, generation + i); From 00890d508db02da20d0e279a74f57e1545045ff2 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 21 Mar 2018 18:51:18 -0400 Subject: [PATCH 15/18] Remove unused imports --- .../org/elasticsearch/index/engine/EngineDiskUtilsTests.java | 1 - .../org/elasticsearch/index/engine/InternalEngineTests.java | 3 --- .../test/java/org/elasticsearch/index/shard/IndexShardIT.java | 1 - 3 files changed, 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java b/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java index 560e4c9d416e7..aca94708af9f8 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java @@ -35,7 +35,6 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.IndexSettingsModule; diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 0d97e1bd8905c..4e6a93043fd08 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -65,7 +65,6 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -116,7 +115,6 @@ import org.elasticsearch.index.shard.ShardUtils; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.SnapshotMatchers; -import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -165,7 +163,6 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index fa5bffad16923..b14030d46e4ca 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -61,7 +61,6 @@ import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.mapper.SourceToParse; -import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; From be377743209280ed26cd49d64321c76229d7d392 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 21 Mar 2018 18:52:48 -0400 Subject: [PATCH 16/18] restore scale randome ops --- .../org/elasticsearch/index/engine/InternalEngineTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 4e6a93043fd08..bc4ecbee4d6a8 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4310,7 +4310,7 @@ public void testStressShouldPeriodicallyFlush() throws Exception { .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")).build(); indexSettings.updateIndexMetaData(indexMetaData); engine.onSettingsChanged(); - final int numOps = 10_000; //scaledRandomIntBetween(100, 10_000); + final int numOps = scaledRandomIntBetween(100, 10_000); for (int i = 0; i < numOps; i++) { final long localCheckPoint = engine.getLocalCheckpointTracker().getCheckpoint(); final long seqno = randomLongBetween(Math.max(0, localCheckPoint), localCheckPoint + 5); From a24b434e471125ce3b53e4ce06c11b922ea60f7d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 22 Mar 2018 08:52:06 -0400 Subject: [PATCH 17/18] more comment --- .../elasticsearch/index/engine/InternalEngine.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 85a4f83da6a44..43a5ca8fd98e6 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1390,13 +1390,16 @@ public boolean shouldPeriodicallyFlush() { return false; } /* - * We flush to reduce the size of uncommitted translog but strictly speaking the uncommitted size won't always be below than - * the threshold after a flush. To avoid getting into an endless flushing loop, we only enable the periodically flush condition - * if this condition is disabled after a flush. The condition will change if: - * 1. The new commit points to the later generation the last commit's - * 2. If the local checkpoint equals to max_seqno and translogGenerationOfLastCommit equals to translogGenerationOfNewCommit, + * We flush to reduce the size of uncommitted translog but strictly speaking the uncommitted size won't always be below + * the threshold after a flush. An endless loop may occur when the uncommitted size is close to the flush threshold, + * the current generation is also close to the generation threshold, and an index is faster and rolls a new generation. + * + * Avoid getting into an endless loop of flushing, we only enable the periodically flush condition if this condition is disabled + * after a flush. The condition will change if the new commit points to the later generation the last commit's. + * Or if the local checkpoint equals to max_seqno and translogGenerationOfLastCommit equals to translogGenerationOfNewCommit, * we know that the last generation must contain operations because its size is above the flush threshold and * the flush threshold is guaranteed to be higher than an empty translog by the setting validation. + * This guarantees that the new commit will point to the newly rolled generation. * * This method is to maintain translog only, thus the IndexWriter#hasUncommittedChanges condition is not considered. */ From 42402417bd6ae1b39533571bb4256167feeb17f0 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 22 Mar 2018 12:30:53 -0400 Subject: [PATCH 18/18] fix comment after talked to Boaz --- .../index/engine/InternalEngine.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 43a5ca8fd98e6..6c6752de7d2d4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1390,18 +1390,19 @@ public boolean shouldPeriodicallyFlush() { return false; } /* - * We flush to reduce the size of uncommitted translog but strictly speaking the uncommitted size won't always be below - * the threshold after a flush. An endless loop may occur when the uncommitted size is close to the flush threshold, - * the current generation is also close to the generation threshold, and an index is faster and rolls a new generation. + * We flush to reduce the size of uncommitted translog but strictly speaking the uncommitted size won't always be + * below the flush-threshold after a flush. To avoid getting into an endless loop of flushing, we only enable the + * periodically flush condition if this condition is disabled after a flush. The condition will change if the new + * commit points to the later generation the last commit's(eg. gen-of-last-commit < gen-of-new-commit)[1]. * - * Avoid getting into an endless loop of flushing, we only enable the periodically flush condition if this condition is disabled - * after a flush. The condition will change if the new commit points to the later generation the last commit's. - * Or if the local checkpoint equals to max_seqno and translogGenerationOfLastCommit equals to translogGenerationOfNewCommit, - * we know that the last generation must contain operations because its size is above the flush threshold and - * the flush threshold is guaranteed to be higher than an empty translog by the setting validation. - * This guarantees that the new commit will point to the newly rolled generation. + * When the local checkpoint equals to max_seqno, and translog-gen of the last commit equals to translog-gen of + * the new commit, we know that the last generation must contain operations because its size is above the flush + * threshold and the flush-threshold is guaranteed to be higher than an empty translog by the setting validation. + * This guarantees that the new commit will point to the newly rolled generation. In fact, this scenario only + * happens when the generation-threshold is close to or above the flush-threshold; otherwise we have rolled + * generations as the generation-threshold was reached, then the first condition (eg. [1]) is already satisfied. * - * This method is to maintain translog only, thus the IndexWriter#hasUncommittedChanges condition is not considered. + * This method is to maintain translog only, thus IndexWriter#hasUncommittedChanges condition is not considered. */ final long translogGenerationOfNewCommit = translog.getMinGenerationForSeqNo(localCheckpointTracker.getCheckpoint() + 1).translogFileGeneration;