From 2d89e2dd497cefcc4ba36a781182f1bf3ffedd5c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 14 Dec 2019 09:32:16 -0500 Subject: [PATCH 1/2] Account trimAboveSeqNo in committed translog generation --- .../java/org/elasticsearch/index/translog/Translog.java | 7 ++----- .../index/replication/RecoveryDuringReplicationTests.java | 5 +++++ 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index e38880797785b..32789a8d82b30 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -697,10 +697,7 @@ private Stream readersAboveMinSeqNo(long minSeqNo) "callers of readersAboveMinSeqNo must hold a lock: readLock [" + readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]"; return Stream.concat(readers.stream(), Stream.of(current)) - .filter(reader -> { - final long maxSeqNo = reader.getCheckpoint().maxSeqNo; - return maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo; - }); + .filter(reader -> minSeqNo <= SequenceNumbers.min(reader.getCheckpoint().trimmedAboveSeqNo, reader.getCheckpoint().maxSeqNo)); } /** @@ -1629,7 +1626,7 @@ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) { */ long minTranslogFileGeneration = this.currentFileGeneration(); for (final TranslogReader reader : readers) { - if (seqNo <= reader.getCheckpoint().maxSeqNo) { + if (seqNo <= SequenceNumbers.min(reader.getCheckpoint().trimmedAboveSeqNo, reader.getCheckpoint().maxSeqNo)) { minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration()); } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index f6756274642e7..53a96e531585a 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -800,6 +800,11 @@ public void testRollbackOnPromotion() throws Exception { shards.assertAllEqual(initDocs + inFlightOpsOnNewPrimary + moreDocsAfterRollback); done.set(true); thread.join(); + + for (IndexShard shard : shards) { + shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); + assertThat(shard.translogStats().getUncommittedOperations(), equalTo(0)); + } } } From 563545c5ee3629f5e2816463020815716aa65e17 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 16 Dec 2019 09:36:00 -0500 Subject: [PATCH 2/2] maxEffectiveSeqNo --- .../org/elasticsearch/index/translog/Checkpoint.java | 11 +++++++++++ .../org/elasticsearch/index/translog/Translog.java | 5 ++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java b/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java index fe20a52f482f3..1e16b9c3a60ae 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java @@ -106,6 +106,17 @@ private void write(DataOutput out) throws IOException { out.writeLong(trimmedAboveSeqNo); } + /** + * Returns the maximum sequence number of operations in this checkpoint after applying {@link #trimmedAboveSeqNo}. + */ + long maxEffectiveSeqNo() { + if (trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) { + return maxSeqNo; + } else { + return Math.min(trimmedAboveSeqNo, maxSeqNo); + } + } + static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint, long minTranslogGeneration) { final long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED; diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 32789a8d82b30..055b9b82fc917 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -696,8 +696,7 @@ private Stream readersAboveMinSeqNo(long minSeqNo) assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() : "callers of readersAboveMinSeqNo must hold a lock: readLock [" + readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]"; - return Stream.concat(readers.stream(), Stream.of(current)) - .filter(reader -> minSeqNo <= SequenceNumbers.min(reader.getCheckpoint().trimmedAboveSeqNo, reader.getCheckpoint().maxSeqNo)); + return Stream.concat(readers.stream(), Stream.of(current)).filter(reader -> minSeqNo <= reader.getCheckpoint().maxEffectiveSeqNo()); } /** @@ -1626,7 +1625,7 @@ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) { */ long minTranslogFileGeneration = this.currentFileGeneration(); for (final TranslogReader reader : readers) { - if (seqNo <= SequenceNumbers.min(reader.getCheckpoint().trimmedAboveSeqNo, reader.getCheckpoint().maxSeqNo)) { + if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) { minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration()); } }