diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 49be68efcad5d..6c6752de7d2d4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -1361,7 +1361,8 @@ final boolean tryRenewSyncCommit() { ensureOpen(); ensureCanFlush(); String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID); - if (syncId != null && translog.uncommittedOperations() == 0 && indexWriter.hasUncommittedChanges()) { + long translogGenOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); + if (syncId != null && indexWriter.hasUncommittedChanges() && translog.totalOperationsByMinGen(translogGenOfLastCommit) == 0) { logger.trace("start renewing sync commit [{}]", syncId); commitIndexWriter(indexWriter, translog, syncId); logger.debug("successfully sync committed. sync id [{}].", syncId); @@ -1383,19 +1384,30 @@ final boolean tryRenewSyncCommit() { @Override public boolean shouldPeriodicallyFlush() { ensureOpen(); + final long translogGenerationOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); - final long uncommittedSizeOfCurrentCommit = translog.uncommittedSizeInBytes(); - if (uncommittedSizeOfCurrentCommit < flushThreshold) { + if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) { return false; } /* - * We should only flush ony if the shouldFlush condition can become false after flushing. - * This condition will change if the `uncommittedSize` of the new commit is smaller than - * the `uncommittedSize` of the current commit. This method is to maintain translog only, - * thus the IndexWriter#hasUncommittedChanges condition is not considered. + * We flush to reduce the size of uncommitted translog but strictly speaking the uncommitted size won't always be + * below the flush-threshold after a flush. To avoid getting into an endless loop of flushing, we only enable the + * periodically flush condition if this condition is disabled after a flush. The condition will change if the new + * commit points to the later generation the last commit's(eg. gen-of-last-commit < gen-of-new-commit)[1]. + * + * When the local checkpoint equals to max_seqno, and translog-gen of the last commit equals to translog-gen of + * the new commit, we know that the last generation must contain operations because its size is above the flush + * threshold and the flush-threshold is guaranteed to be higher than an empty translog by the setting validation. + * This guarantees that the new commit will point to the newly rolled generation. In fact, this scenario only + * happens when the generation-threshold is close to or above the flush-threshold; otherwise we have rolled + * generations as the generation-threshold was reached, then the first condition (eg. [1]) is already satisfied. + * + * This method is to maintain translog only, thus IndexWriter#hasUncommittedChanges condition is not considered. */ - final long uncommittedSizeOfNewCommit = translog.sizeOfGensAboveSeqNoInBytes(localCheckpointTracker.getCheckpoint() + 1); - return uncommittedSizeOfNewCommit < uncommittedSizeOfCurrentCommit; + final long translogGenerationOfNewCommit = + translog.getMinGenerationForSeqNo(localCheckpointTracker.getCheckpoint() + 1).translogFileGeneration; + return translogGenerationOfLastCommit < translogGenerationOfNewCommit + || localCheckpointTracker.getCheckpoint() == localCheckpointTracker.getMaxSeqNo(); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index c34f851195a9f..c2d494fd07a34 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -356,26 +356,11 @@ public long getMinFileGeneration() { } } - - /** - * Returns the number of operations in the translog files that aren't committed to lucene. - */ - public int uncommittedOperations() { - return totalOperations(deletionPolicy.getTranslogGenerationOfLastCommit()); - } - - /** - * Returns the size in bytes of the translog files that aren't committed to lucene. - */ - public long uncommittedSizeInBytes() { - return sizeInBytesByMinGen(deletionPolicy.getTranslogGenerationOfLastCommit()); - } - /** * Returns the number of operations in the translog files */ public int totalOperations() { - return totalOperations(-1); + return totalOperationsByMinGen(-1); } /** @@ -406,9 +391,9 @@ static long findEarliestLastModifiedAge(long currentTime, Iterable {}); assertTrue(shard.shouldPeriodicallyFlush()); final Translog translog = shard.getEngine().getTranslog(); - assertEquals(2, translog.uncommittedOperations()); + assertEquals(2, translog.stats().getUncommittedOperations()); client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertBusy(() -> { // this is async assertFalse(shard.shouldPeriodicallyFlush()); }); - assertEquals(0, translog.uncommittedOperations()); + assertEquals(0, translog.stats().getUncommittedOperations()); translog.sync(); - long size = Math.max(translog.uncommittedSizeInBytes(), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1); - logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), - translog.uncommittedOperations(), translog.getGeneration()); + long size = Math.max(translog.stats().getUncommittedSizeInBytes(), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1); + logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", + translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration()); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put( IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES)) .build()).get(); client().prepareDelete("test", "test", "2").get(); - logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), - translog.uncommittedOperations(), translog.getGeneration()); + logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", + translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration()); assertBusy(() -> { // this is async - logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(), - translog.uncommittedOperations(), translog.getGeneration()); + logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", + translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration()); assertFalse(shard.shouldPeriodicallyFlush()); }); - assertEquals(0, translog.uncommittedOperations()); + assertEquals(0, translog.stats().getUncommittedOperations()); } public void testMaybeRollTranslogGeneration() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index c18784873a472..1fb36486c2b3f 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -501,10 +501,10 @@ public void testUncommittedOperations() throws Exception { translog.rollGeneration(); operationsInLastGen = 0; } - assertThat(translog.uncommittedOperations(), equalTo(uncommittedOps)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(uncommittedOps)); if (frequently()) { markCurrentGenAsCommitted(translog); - assertThat(translog.uncommittedOperations(), equalTo(operationsInLastGen)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(operationsInLastGen)); uncommittedOps = operationsInLastGen; } } @@ -2514,7 +2514,7 @@ public void testRollGeneration() throws Exception { long minGenForRecovery = randomLongBetween(generation, generation + rolls); commit(translog, minGenForRecovery, generation + rolls); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); - assertThat(translog.uncommittedOperations(), equalTo(0)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(0)); if (longRetention) { for (int i = 0; i <= rolls; i++) { assertFileIsPresent(translog, generation + i); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index a496664c0260b..49e557c3dde78 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -306,7 +306,7 @@ public void testShouldFlushAfterPeerRecovery() throws Exception { try (ReplicationGroup shards = createGroup(0)) { shards.startAll(); int numDocs = shards.indexDocs(between(10, 100)); - final long translogSizeOnPrimary = shards.getPrimary().getTranslog().uncommittedSizeInBytes(); + final long translogSizeOnPrimary = shards.getPrimary().translogStats().getUncommittedSizeInBytes(); shards.flush(); final IndexShard replica = shards.addReplica();