diff --git a/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java b/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java index 7aab597f8816c..8e2d81d0fe711 100644 --- a/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java @@ -33,6 +33,7 @@ package org.opensearch.index.seqno; import com.carrotsearch.hppc.LongObjectHashMap; +import org.opensearch.common.Nullable; import org.opensearch.common.SuppressForbidden; import java.util.concurrent.atomic.AtomicLong; @@ -116,6 +117,13 @@ public void advanceMaxSeqNo(final long seqNo) { nextSeqNo.accumulateAndGet(seqNo + 1, Math::max); } + /** + * Checks that the sequence number is in an acceptable range for an update to take place. + */ + private boolean shouldUpdateSeqNo(final long seqNo, final long lowerBound, @Nullable final AtomicLong upperBound) { + return !((seqNo <= lowerBound) || (upperBound != null && seqNo > upperBound.get())); + } + /** * Marks the provided sequence number as processed and updates the processed checkpoint if possible. * @@ -134,11 +142,29 @@ public synchronized void markSeqNoAsPersisted(final long seqNo) { markSeqNo(seqNo, persistedCheckpoint, persistedSeqNo); } + /** + * Updates the processed sequence checkpoint to the given value. + * + * This method is only used for segment replication since indexing doesn't + * take place on the replica allowing us to avoid the check that all sequence numbers + * are consecutively processed. + * + * @param seqNo the sequence number to mark as processed + */ + public synchronized void fastForwardProcessedSeqNo(final long seqNo) { + advanceMaxSeqNo(seqNo); + final long currentProcessedCheckpoint = processedCheckpoint.get(); + if (shouldUpdateSeqNo(seqNo, currentProcessedCheckpoint, persistedCheckpoint) == false) { + return; + } + processedCheckpoint.compareAndSet(currentProcessedCheckpoint, seqNo); + } + private void markSeqNo(final long seqNo, final AtomicLong checkPoint, final LongObjectHashMap bitSetMap) { assert Thread.holdsLock(this); // make sure we track highest seen sequence number advanceMaxSeqNo(seqNo); - if (seqNo <= checkPoint.get()) { + if (shouldUpdateSeqNo(seqNo, checkPoint.get(), null) == false) { // this is possible during recovery where we might replay an operation that was also replicated return; } diff --git a/server/src/test/java/org/opensearch/index/seqno/LocalCheckpointTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/LocalCheckpointTrackerTests.java index bcb178e05065c..237066e549b09 100644 --- a/server/src/test/java/org/opensearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/LocalCheckpointTrackerTests.java @@ -331,4 +331,60 @@ public void testContains() { final long seqNo = randomNonNegativeLong(); assertThat(tracker.hasProcessed(seqNo), equalTo(seqNo <= localCheckpoint || seqNos.contains(seqNo))); } + + public void testFastForwardProcessedNoPersistentUpdate() { + // base case with no persistent checkpoint update + long seqNo1; + assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + seqNo1 = tracker.generateSeqNo(); + assertThat(seqNo1, equalTo(0L)); + tracker.fastForwardProcessedSeqNo(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L)); + } + + public void testFastForwardProcessedPersistentUpdate() { + // base case with persistent checkpoint update + long seqNo1; + assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + seqNo1 = tracker.generateSeqNo(); + assertThat(seqNo1, equalTo(0L)); + + tracker.markSeqNoAsPersisted(seqNo1); + assertThat(tracker.getPersistedCheckpoint(), equalTo(0L)); + tracker.fastForwardProcessedSeqNo(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(0L)); + assertThat(tracker.hasProcessed(0L), equalTo(true)); + assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false)); + + // idempotent case + tracker.fastForwardProcessedSeqNo(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(0L)); + assertThat(tracker.hasProcessed(0L), equalTo(true)); + assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false)); + + } + + public void testFastForwardProcessedPersistentUpdate2() { + long seqNo1, seqNo2; + assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + seqNo1 = tracker.generateSeqNo(); + seqNo2 = tracker.generateSeqNo(); + assertThat(seqNo1, equalTo(0L)); + assertThat(seqNo2, equalTo(1L)); + tracker.markSeqNoAsPersisted(seqNo1); + tracker.markSeqNoAsPersisted(seqNo2); + assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L)); + assertThat(tracker.getPersistedCheckpoint(), equalTo(1L)); + + tracker.fastForwardProcessedSeqNo(seqNo2); + assertThat(tracker.getProcessedCheckpoint(), equalTo(1L)); + assertThat(tracker.hasProcessed(seqNo1), equalTo(true)); + assertThat(tracker.hasProcessed(seqNo2), equalTo(true)); + + tracker.fastForwardProcessedSeqNo(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(1L)); + assertThat(tracker.hasProcessed(between(0, 1)), equalTo(true)); + assertThat(tracker.hasProcessed(atLeast(2)), equalTo(false)); + assertThat(tracker.getMaxSeqNo(), equalTo(1L)); + } }