From a432fb812e2bd179a89ec67d66e825b7b3cddc71 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Wed, 23 Mar 2022 14:00:57 -0700 Subject: [PATCH 1/2] fix local processed checkpoint update Signed-off-by: Poojita Raj --- .../index/engine/InternalEngine.java | 2 +- .../index/seqno/LocalCheckpointTracker.java | 23 +++++++ ...eplicationLocalCheckpointTrackerTests.java | 67 +++++++++++++++++++ 3 files changed, 91 insertions(+), 1 deletion(-) create mode 100644 server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 11ec8c573b4a2..9c296403a3d81 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -340,7 +340,7 @@ public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOExceptio assert engineConfig.isReadOnly() : "Only replicas should update Infos"; externalReaderManager.internalReaderManager.updateSegments(infos); externalReaderManager.maybeRefresh(); - localCheckpointTracker.markSeqNoAsProcessed(seqNo); + localCheckpointTracker.segrepMarkSeqNoAsProcessed(seqNo); } private LocalCheckpointTracker createLocalCheckpointTracker( diff --git a/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java b/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java index 7aab597f8816c..10889c12a80fd 100644 --- a/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java @@ -134,6 +134,29 @@ public synchronized void markSeqNoAsPersisted(final long seqNo) { markSeqNo(seqNo, persistedCheckpoint, persistedSeqNo); } + /** + * Updates the processed checkpoint with the provided sequence number if segment replication is enabled + * + * @param seqNo the sequence number to mark as processed + */ + public synchronized void segrepMarkSeqNoAsProcessed(final long seqNo) { + markSegrepSeqNo(seqNo, processedCheckpoint, persistedCheckpoint); + } + + @SuppressForbidden(reason = "Object#notifyAll") + private void markSegrepSeqNo(final long seqNo, final AtomicLong processedCheckpoint, final AtomicLong persistedCheckpoint) { + assert Thread.holdsLock(this); + advanceMaxSeqNo(seqNo); + if ((seqNo > persistedCheckpoint.get()) || (seqNo <= processedCheckpoint.get())) { + return; + } + try { + processedCheckpoint.compareAndSet(processedCheckpoint.get(), seqNo); + } finally { + this.notifyAll(); + } + } + private void markSeqNo(final long seqNo, final AtomicLong checkPoint, final LongObjectHashMap bitSetMap) { assert Thread.holdsLock(this); // make sure we track highest seen sequence number diff --git a/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java new file mode 100644 index 0000000000000..06eaf0e682f68 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.seqno; + +import org.junit.Before; +import org.opensearch.test.OpenSearchTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class SegmentReplicationLocalCheckpointTrackerTests extends OpenSearchTestCase { + + private LocalCheckpointTracker tracker; + + public static LocalCheckpointTracker createEmptyTracker() { + return new LocalCheckpointTracker(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + tracker = createEmptyTracker(); + } + + public void testSimpleSegrepPrimaryProcessed() { + long seqNo1, seqNo2; + assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + seqNo1 = tracker.generateSeqNo(); + assertThat(seqNo1, equalTo(0L)); + tracker.segrepMarkSeqNoAsProcessed(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L)); + + tracker.markSeqNoAsPersisted(seqNo1); + assertThat(tracker.getPersistedCheckpoint(), equalTo(0L)); + tracker.segrepMarkSeqNoAsProcessed(seqNo1); + tracker.segrepMarkSeqNoAsProcessed(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(0L)); + assertThat(tracker.hasProcessed(0L), equalTo(true)); + assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false)); + + seqNo1 = tracker.generateSeqNo(); + seqNo2 = tracker.generateSeqNo(); + assertThat(seqNo1, equalTo(1L)); + assertThat(seqNo2, equalTo(2L)); + tracker.markSeqNoAsPersisted(seqNo1); + tracker.markSeqNoAsPersisted(seqNo2); + assertThat(tracker.getProcessedCheckpoint(), equalTo(0L)); + assertThat(tracker.getPersistedCheckpoint(), equalTo(2L)); + + tracker.segrepMarkSeqNoAsProcessed(seqNo2); + assertThat(tracker.getProcessedCheckpoint(), equalTo(2L)); + assertThat(tracker.hasProcessed(seqNo1), equalTo(true)); + assertThat(tracker.hasProcessed(seqNo2), equalTo(true)); + + tracker.segrepMarkSeqNoAsProcessed(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(2L)); + assertThat(tracker.hasProcessed(between(0, 2)), equalTo(true)); + assertThat(tracker.hasProcessed(atLeast(3)), equalTo(false)); + assertThat(tracker.getMaxSeqNo(), equalTo(2L)); + } +} From c35bac2eea12c58e7f1db323704db0f3a177ef5f Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Thu, 24 Mar 2022 16:00:06 -0700 Subject: [PATCH 2/2] separated tests + wrapper function Signed-off-by: Poojita Raj --- .../index/engine/InternalEngine.java | 2 +- .../index/seqno/LocalCheckpointTracker.java | 32 ++++++------ ...eplicationLocalCheckpointTrackerTests.java | 51 +++++++++++++------ 3 files changed, 53 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 9c296403a3d81..fa64e78e40c45 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -340,7 +340,7 @@ public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOExceptio assert engineConfig.isReadOnly() : "Only replicas should update Infos"; externalReaderManager.internalReaderManager.updateSegments(infos); externalReaderManager.maybeRefresh(); - localCheckpointTracker.segrepMarkSeqNoAsProcessed(seqNo); + localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); } private LocalCheckpointTracker createLocalCheckpointTracker( diff --git a/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java b/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java index 10889c12a80fd..713088beab5e6 100644 --- a/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/LocalCheckpointTracker.java @@ -33,6 +33,7 @@ package org.opensearch.index.seqno; import com.carrotsearch.hppc.LongObjectHashMap; +import org.opensearch.common.Nullable; import org.opensearch.common.SuppressForbidden; import java.util.concurrent.atomic.AtomicLong; @@ -116,6 +117,13 @@ public void advanceMaxSeqNo(final long seqNo) { nextSeqNo.accumulateAndGet(seqNo + 1, Math::max); } + /** + * Checks that the sequence number is in an acceptable range for an update to take place. + */ + private boolean shouldUpdateSeqNo(final long seqNo, final AtomicLong lowerBound, @Nullable final AtomicLong upperBound) { + return !((seqNo <= lowerBound.get()) || (upperBound != null && seqNo > upperBound.get())); + } + /** * Marks the provided sequence number as processed and updates the processed checkpoint if possible. * @@ -135,33 +143,27 @@ public synchronized void markSeqNoAsPersisted(final long seqNo) { } /** - * Updates the processed checkpoint with the provided sequence number if segment replication is enabled + * Updates the processed sequence checkpoint to the given value. + * + * This method is only used for segment replication since indexing doesn't + * take place on the replica allowing us to avoid the check that all sequence numbers + * are consecutively processed. * * @param seqNo the sequence number to mark as processed */ - public synchronized void segrepMarkSeqNoAsProcessed(final long seqNo) { - markSegrepSeqNo(seqNo, processedCheckpoint, persistedCheckpoint); - } - - @SuppressForbidden(reason = "Object#notifyAll") - private void markSegrepSeqNo(final long seqNo, final AtomicLong processedCheckpoint, final AtomicLong persistedCheckpoint) { - assert Thread.holdsLock(this); + public synchronized void fastForwardProcessedSeqNo(final long seqNo) { advanceMaxSeqNo(seqNo); - if ((seqNo > persistedCheckpoint.get()) || (seqNo <= processedCheckpoint.get())) { + if (shouldUpdateSeqNo(seqNo, processedCheckpoint, persistedCheckpoint) == false) { return; } - try { - processedCheckpoint.compareAndSet(processedCheckpoint.get(), seqNo); - } finally { - this.notifyAll(); - } + processedCheckpoint.compareAndSet(processedCheckpoint.get(), seqNo); } private void markSeqNo(final long seqNo, final AtomicLong checkPoint, final LongObjectHashMap bitSetMap) { assert Thread.holdsLock(this); // make sure we track highest seen sequence number advanceMaxSeqNo(seqNo); - if (seqNo <= checkPoint.get()) { + if (shouldUpdateSeqNo(seqNo, checkPoint, null) == false) { // this is possible during recovery where we might replay an operation that was also replicated return; } diff --git a/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java index 06eaf0e682f68..0321ea23b1438 100644 --- a/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java @@ -28,40 +28,59 @@ public void setUp() throws Exception { tracker = createEmptyTracker(); } - public void testSimpleSegrepPrimaryProcessed() { - long seqNo1, seqNo2; + public void testSimpleSegrepProcessedNoPersistentUpdate() { + // base case with no persistent checkpoint update + long seqNo1; assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); seqNo1 = tracker.generateSeqNo(); assertThat(seqNo1, equalTo(0L)); - tracker.segrepMarkSeqNoAsProcessed(seqNo1); + tracker.fastForwardProcessedSeqNo(seqNo1); assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L)); + } + + public void testSimpleSegrepProcessedPersistentUpdate() { + // base case with persistent checkpoint update + long seqNo1; + assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + seqNo1 = tracker.generateSeqNo(); + assertThat(seqNo1, equalTo(0L)); tracker.markSeqNoAsPersisted(seqNo1); assertThat(tracker.getPersistedCheckpoint(), equalTo(0L)); - tracker.segrepMarkSeqNoAsProcessed(seqNo1); - tracker.segrepMarkSeqNoAsProcessed(seqNo1); + tracker.fastForwardProcessedSeqNo(seqNo1); assertThat(tracker.getProcessedCheckpoint(), equalTo(0L)); assertThat(tracker.hasProcessed(0L), equalTo(true)); assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false)); + // idempotent case + tracker.fastForwardProcessedSeqNo(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(0L)); + assertThat(tracker.hasProcessed(0L), equalTo(true)); + assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false)); + + } + + public void testSimpleSegrepProcessedPersistentUpdate2() { + long seqNo1, seqNo2; + assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); seqNo1 = tracker.generateSeqNo(); seqNo2 = tracker.generateSeqNo(); - assertThat(seqNo1, equalTo(1L)); - assertThat(seqNo2, equalTo(2L)); + assertThat(seqNo1, equalTo(0L)); + assertThat(seqNo2, equalTo(1L)); tracker.markSeqNoAsPersisted(seqNo1); tracker.markSeqNoAsPersisted(seqNo2); - assertThat(tracker.getProcessedCheckpoint(), equalTo(0L)); - assertThat(tracker.getPersistedCheckpoint(), equalTo(2L)); + assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L)); + assertThat(tracker.getPersistedCheckpoint(), equalTo(1L)); - tracker.segrepMarkSeqNoAsProcessed(seqNo2); - assertThat(tracker.getProcessedCheckpoint(), equalTo(2L)); + tracker.fastForwardProcessedSeqNo(seqNo2); + assertThat(tracker.getProcessedCheckpoint(), equalTo(1L)); assertThat(tracker.hasProcessed(seqNo1), equalTo(true)); assertThat(tracker.hasProcessed(seqNo2), equalTo(true)); - tracker.segrepMarkSeqNoAsProcessed(seqNo1); - assertThat(tracker.getProcessedCheckpoint(), equalTo(2L)); - assertThat(tracker.hasProcessed(between(0, 2)), equalTo(true)); - assertThat(tracker.hasProcessed(atLeast(3)), equalTo(false)); - assertThat(tracker.getMaxSeqNo(), equalTo(2L)); + tracker.fastForwardProcessedSeqNo(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(1L)); + assertThat(tracker.hasProcessed(between(0, 1)), equalTo(true)); + assertThat(tracker.hasProcessed(atLeast(2)), equalTo(false)); + assertThat(tracker.getMaxSeqNo(), equalTo(1L)); } }