diff --git a/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java b/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java index 713088beab5e6..8e2d81d0fe711 100644 --- a/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java @@ -120,8 +120,8 @@ public void advanceMaxSeqNo(final long seqNo) { /** * Checks that the sequence number is in an acceptable range for an update to take place. */ - private boolean shouldUpdateSeqNo(final long seqNo, final AtomicLong lowerBound, @Nullable final AtomicLong upperBound) { - return !((seqNo <= lowerBound.get()) || (upperBound != null && seqNo > upperBound.get())); + private boolean shouldUpdateSeqNo(final long seqNo, final long lowerBound, @Nullable final AtomicLong upperBound) { + return !((seqNo <= lowerBound) || (upperBound != null && seqNo > upperBound.get())); } /** @@ -153,17 +153,18 @@ public synchronized void markSeqNoAsPersisted(final long seqNo) { */ public synchronized void fastForwardProcessedSeqNo(final long seqNo) { advanceMaxSeqNo(seqNo); - if (shouldUpdateSeqNo(seqNo, processedCheckpoint, persistedCheckpoint) == false) { + final long currentProcessedCheckpoint = processedCheckpoint.get(); + if (shouldUpdateSeqNo(seqNo, currentProcessedCheckpoint, persistedCheckpoint) == false) { return; } - processedCheckpoint.compareAndSet(processedCheckpoint.get(), seqNo); + processedCheckpoint.compareAndSet(currentProcessedCheckpoint, seqNo); } private void markSeqNo(final long seqNo, final AtomicLong checkPoint, final LongObjectHashMap bitSetMap) { assert Thread.holdsLock(this); // make sure we track highest seen sequence number advanceMaxSeqNo(seqNo); - if (shouldUpdateSeqNo(seqNo, checkPoint, null) == false) { + if (shouldUpdateSeqNo(seqNo, checkPoint.get(), null) == false) { // this is possible during recovery where we might replay an operation that was also replicated return; } diff --git a/server/src/test/java/org/opensearch/index/seqno/LocalCheckpointTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/LocalCheckpointTrackerTests.java index bcb178e05065c..237066e549b09 100644 --- a/server/src/test/java/org/opensearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/LocalCheckpointTrackerTests.java @@ -331,4 +331,60 @@ public void testContains() { final long seqNo = randomNonNegativeLong(); assertThat(tracker.hasProcessed(seqNo), equalTo(seqNo <= localCheckpoint || seqNos.contains(seqNo))); } + + public void testFastForwardProcessedNoPersistentUpdate() { + // base case with no persistent checkpoint update + long seqNo1; + assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + seqNo1 = tracker.generateSeqNo(); + assertThat(seqNo1, equalTo(0L)); + tracker.fastForwardProcessedSeqNo(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L)); + } + + public void testFastForwardProcessedPersistentUpdate() { + // base case with persistent checkpoint update + long seqNo1; + assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + seqNo1 = tracker.generateSeqNo(); + assertThat(seqNo1, equalTo(0L)); + + tracker.markSeqNoAsPersisted(seqNo1); + assertThat(tracker.getPersistedCheckpoint(), equalTo(0L)); + tracker.fastForwardProcessedSeqNo(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(0L)); + assertThat(tracker.hasProcessed(0L), equalTo(true)); + assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false)); + + // idempotent case + tracker.fastForwardProcessedSeqNo(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(0L)); + assertThat(tracker.hasProcessed(0L), equalTo(true)); + assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false)); + + } + + public void testFastForwardProcessedPersistentUpdate2() { + long seqNo1, seqNo2; + assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + seqNo1 = tracker.generateSeqNo(); + seqNo2 = tracker.generateSeqNo(); + assertThat(seqNo1, equalTo(0L)); + assertThat(seqNo2, equalTo(1L)); + tracker.markSeqNoAsPersisted(seqNo1); + tracker.markSeqNoAsPersisted(seqNo2); + assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L)); + assertThat(tracker.getPersistedCheckpoint(), equalTo(1L)); + + tracker.fastForwardProcessedSeqNo(seqNo2); + assertThat(tracker.getProcessedCheckpoint(), equalTo(1L)); + assertThat(tracker.hasProcessed(seqNo1), equalTo(true)); + assertThat(tracker.hasProcessed(seqNo2), equalTo(true)); + + tracker.fastForwardProcessedSeqNo(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(1L)); + assertThat(tracker.hasProcessed(between(0, 1)), equalTo(true)); + assertThat(tracker.hasProcessed(atLeast(2)), equalTo(false)); + assertThat(tracker.getMaxSeqNo(), equalTo(1L)); + } } diff --git a/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java deleted file mode 100644 index 0321ea23b1438..0000000000000 --- a/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.seqno; - -import org.junit.Before; -import org.opensearch.test.OpenSearchTestCase; - -import static org.hamcrest.Matchers.equalTo; - -public class SegmentReplicationLocalCheckpointTrackerTests extends OpenSearchTestCase { - - private LocalCheckpointTracker tracker; - - public static LocalCheckpointTracker createEmptyTracker() { - return new LocalCheckpointTracker(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED); - } - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - tracker = createEmptyTracker(); - } - - public void testSimpleSegrepProcessedNoPersistentUpdate() { - // base case with no persistent checkpoint update - long seqNo1; - assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); - seqNo1 = tracker.generateSeqNo(); - assertThat(seqNo1, equalTo(0L)); - tracker.fastForwardProcessedSeqNo(seqNo1); - assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L)); - } - - public void testSimpleSegrepProcessedPersistentUpdate() { - // base case with persistent checkpoint update - long seqNo1; - assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); - seqNo1 = tracker.generateSeqNo(); - assertThat(seqNo1, equalTo(0L)); - - tracker.markSeqNoAsPersisted(seqNo1); - assertThat(tracker.getPersistedCheckpoint(), equalTo(0L)); - tracker.fastForwardProcessedSeqNo(seqNo1); - assertThat(tracker.getProcessedCheckpoint(), equalTo(0L)); - assertThat(tracker.hasProcessed(0L), equalTo(true)); - assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false)); - - // idempotent case - tracker.fastForwardProcessedSeqNo(seqNo1); - assertThat(tracker.getProcessedCheckpoint(), equalTo(0L)); - assertThat(tracker.hasProcessed(0L), equalTo(true)); - assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false)); - - } - - public void testSimpleSegrepProcessedPersistentUpdate2() { - long seqNo1, seqNo2; - assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); - seqNo1 = tracker.generateSeqNo(); - seqNo2 = tracker.generateSeqNo(); - assertThat(seqNo1, equalTo(0L)); - assertThat(seqNo2, equalTo(1L)); - tracker.markSeqNoAsPersisted(seqNo1); - tracker.markSeqNoAsPersisted(seqNo2); - assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L)); - assertThat(tracker.getPersistedCheckpoint(), equalTo(1L)); - - tracker.fastForwardProcessedSeqNo(seqNo2); - assertThat(tracker.getProcessedCheckpoint(), equalTo(1L)); - assertThat(tracker.hasProcessed(seqNo1), equalTo(true)); - assertThat(tracker.hasProcessed(seqNo2), equalTo(true)); - - tracker.fastForwardProcessedSeqNo(seqNo1); - assertThat(tracker.getProcessedCheckpoint(), equalTo(1L)); - assertThat(tracker.hasProcessed(between(0, 1)), equalTo(true)); - assertThat(tracker.hasProcessed(atLeast(2)), equalTo(false)); - assertThat(tracker.getMaxSeqNo(), equalTo(1L)); - } -}