diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 11ec8c573b4a2..9c296403a3d81 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -340,7 +340,7 @@ public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOExceptio assert engineConfig.isReadOnly() : "Only replicas should update Infos"; externalReaderManager.internalReaderManager.updateSegments(infos); externalReaderManager.maybeRefresh(); - localCheckpointTracker.markSeqNoAsProcessed(seqNo); + localCheckpointTracker.segrepMarkSeqNoAsProcessed(seqNo); } private LocalCheckpointTracker createLocalCheckpointTracker( diff --git a/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java b/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java index 7aab597f8816c..10889c12a80fd 100644 --- a/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java @@ -134,6 +134,29 @@ public synchronized void markSeqNoAsPersisted(final long seqNo) { markSeqNo(seqNo, persistedCheckpoint, persistedSeqNo); } + /** + * Updates the processed checkpoint with the provided sequence number if segment replication is enabled + * + * @param seqNo the sequence number to mark as processed + */ + public synchronized void segrepMarkSeqNoAsProcessed(final long seqNo) { + markSegrepSeqNo(seqNo, processedCheckpoint, persistedCheckpoint); + } + + @SuppressForbidden(reason = "Object#notifyAll") + private void markSegrepSeqNo(final long seqNo, final AtomicLong processedCheckpoint, final AtomicLong persistedCheckpoint) { + assert Thread.holdsLock(this); + advanceMaxSeqNo(seqNo); + if ((seqNo > persistedCheckpoint.get()) || (seqNo <= processedCheckpoint.get())) { + return; + } + try { + processedCheckpoint.compareAndSet(processedCheckpoint.get(), seqNo); + } finally { + this.notifyAll(); + } + } + private void markSeqNo(final long seqNo, final AtomicLong checkPoint, final LongObjectHashMap bitSetMap) { assert Thread.holdsLock(this); // make sure we track highest seen sequence number diff --git a/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java new file mode 100644 index 0000000000000..06eaf0e682f68 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.seqno; + +import org.junit.Before; +import org.opensearch.test.OpenSearchTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class SegmentReplicationLocalCheckpointTrackerTests extends OpenSearchTestCase { + + private LocalCheckpointTracker tracker; + + public static LocalCheckpointTracker createEmptyTracker() { + return new LocalCheckpointTracker(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + tracker = createEmptyTracker(); + } + + public void testSimpleSegrepPrimaryProcessed() { + long seqNo1, seqNo2; + assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + seqNo1 = tracker.generateSeqNo(); + assertThat(seqNo1, equalTo(0L)); + tracker.segrepMarkSeqNoAsProcessed(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L)); + + tracker.markSeqNoAsPersisted(seqNo1); + assertThat(tracker.getPersistedCheckpoint(), equalTo(0L)); + tracker.segrepMarkSeqNoAsProcessed(seqNo1); + tracker.segrepMarkSeqNoAsProcessed(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(0L)); + assertThat(tracker.hasProcessed(0L), equalTo(true)); + assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false)); + + seqNo1 = tracker.generateSeqNo(); + seqNo2 = tracker.generateSeqNo(); + assertThat(seqNo1, equalTo(1L)); + assertThat(seqNo2, equalTo(2L)); + tracker.markSeqNoAsPersisted(seqNo1); + tracker.markSeqNoAsPersisted(seqNo2); + assertThat(tracker.getProcessedCheckpoint(), equalTo(0L)); + assertThat(tracker.getPersistedCheckpoint(), equalTo(2L)); + + tracker.segrepMarkSeqNoAsProcessed(seqNo2); + assertThat(tracker.getProcessedCheckpoint(), equalTo(2L)); + assertThat(tracker.hasProcessed(seqNo1), equalTo(true)); + assertThat(tracker.hasProcessed(seqNo2), equalTo(true)); + + tracker.segrepMarkSeqNoAsProcessed(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(2L)); + assertThat(tracker.hasProcessed(between(0, 2)), equalTo(true)); + assertThat(tracker.hasProcessed(atLeast(3)), equalTo(false)); + assertThat(tracker.getMaxSeqNo(), equalTo(2L)); + } +}