Skip to content

Commit

Permalink
separated tests + wrapper function
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed Mar 31, 2022
1 parent a432fb8 commit c35bac2
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOExceptio
assert engineConfig.isReadOnly() : "Only replicas should update Infos";
externalReaderManager.internalReaderManager.updateSegments(infos);
externalReaderManager.maybeRefresh();
localCheckpointTracker.segrepMarkSeqNoAsProcessed(seqNo);
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}

private LocalCheckpointTracker createLocalCheckpointTracker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index.seqno;

import com.carrotsearch.hppc.LongObjectHashMap;
import org.opensearch.common.Nullable;
import org.opensearch.common.SuppressForbidden;

import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -116,6 +117,13 @@ public void advanceMaxSeqNo(final long seqNo) {
nextSeqNo.accumulateAndGet(seqNo + 1, Math::max);
}

/**
* Checks that the sequence number is in an acceptable range for an update to take place.
*/
private boolean shouldUpdateSeqNo(final long seqNo, final AtomicLong lowerBound, @Nullable final AtomicLong upperBound) {
return !((seqNo <= lowerBound.get()) || (upperBound != null && seqNo > upperBound.get()));
}

/**
* Marks the provided sequence number as processed and updates the processed checkpoint if possible.
*
Expand All @@ -135,33 +143,27 @@ public synchronized void markSeqNoAsPersisted(final long seqNo) {
}

/**
* Updates the processed checkpoint with the provided sequence number if segment replication is enabled
* Updates the processed sequence checkpoint to the given value.
*
* This method is only used for segment replication since indexing doesn't
* take place on the replica allowing us to avoid the check that all sequence numbers
* are consecutively processed.
*
* @param seqNo the sequence number to mark as processed
*/
public synchronized void segrepMarkSeqNoAsProcessed(final long seqNo) {
markSegrepSeqNo(seqNo, processedCheckpoint, persistedCheckpoint);
}

@SuppressForbidden(reason = "Object#notifyAll")
private void markSegrepSeqNo(final long seqNo, final AtomicLong processedCheckpoint, final AtomicLong persistedCheckpoint) {
assert Thread.holdsLock(this);
public synchronized void fastForwardProcessedSeqNo(final long seqNo) {
advanceMaxSeqNo(seqNo);
if ((seqNo > persistedCheckpoint.get()) || (seqNo <= processedCheckpoint.get())) {
if (shouldUpdateSeqNo(seqNo, processedCheckpoint, persistedCheckpoint) == false) {
return;
}
try {
processedCheckpoint.compareAndSet(processedCheckpoint.get(), seqNo);
} finally {
this.notifyAll();
}
processedCheckpoint.compareAndSet(processedCheckpoint.get(), seqNo);
}

private void markSeqNo(final long seqNo, final AtomicLong checkPoint, final LongObjectHashMap<CountedBitSet> bitSetMap) {
assert Thread.holdsLock(this);
// make sure we track highest seen sequence number
advanceMaxSeqNo(seqNo);
if (seqNo <= checkPoint.get()) {
if (shouldUpdateSeqNo(seqNo, checkPoint, null) == false) {
// this is possible during recovery where we might replay an operation that was also replicated
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,40 +28,59 @@ public void setUp() throws Exception {
tracker = createEmptyTracker();
}

public void testSimpleSegrepPrimaryProcessed() {
long seqNo1, seqNo2;
public void testSimpleSegrepProcessedNoPersistentUpdate() {
// base case with no persistent checkpoint update
long seqNo1;
assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
seqNo1 = tracker.generateSeqNo();
assertThat(seqNo1, equalTo(0L));
tracker.segrepMarkSeqNoAsProcessed(seqNo1);
tracker.fastForwardProcessedSeqNo(seqNo1);
assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L));
}

public void testSimpleSegrepProcessedPersistentUpdate() {
// base case with persistent checkpoint update
long seqNo1;
assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
seqNo1 = tracker.generateSeqNo();
assertThat(seqNo1, equalTo(0L));

tracker.markSeqNoAsPersisted(seqNo1);
assertThat(tracker.getPersistedCheckpoint(), equalTo(0L));
tracker.segrepMarkSeqNoAsProcessed(seqNo1);
tracker.segrepMarkSeqNoAsProcessed(seqNo1);
tracker.fastForwardProcessedSeqNo(seqNo1);
assertThat(tracker.getProcessedCheckpoint(), equalTo(0L));
assertThat(tracker.hasProcessed(0L), equalTo(true));
assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false));

// idempotent case
tracker.fastForwardProcessedSeqNo(seqNo1);
assertThat(tracker.getProcessedCheckpoint(), equalTo(0L));
assertThat(tracker.hasProcessed(0L), equalTo(true));
assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false));

}

public void testSimpleSegrepProcessedPersistentUpdate2() {
long seqNo1, seqNo2;
assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
seqNo1 = tracker.generateSeqNo();
seqNo2 = tracker.generateSeqNo();
assertThat(seqNo1, equalTo(1L));
assertThat(seqNo2, equalTo(2L));
assertThat(seqNo1, equalTo(0L));
assertThat(seqNo2, equalTo(1L));
tracker.markSeqNoAsPersisted(seqNo1);
tracker.markSeqNoAsPersisted(seqNo2);
assertThat(tracker.getProcessedCheckpoint(), equalTo(0L));
assertThat(tracker.getPersistedCheckpoint(), equalTo(2L));
assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L));
assertThat(tracker.getPersistedCheckpoint(), equalTo(1L));

tracker.segrepMarkSeqNoAsProcessed(seqNo2);
assertThat(tracker.getProcessedCheckpoint(), equalTo(2L));
tracker.fastForwardProcessedSeqNo(seqNo2);
assertThat(tracker.getProcessedCheckpoint(), equalTo(1L));
assertThat(tracker.hasProcessed(seqNo1), equalTo(true));
assertThat(tracker.hasProcessed(seqNo2), equalTo(true));

tracker.segrepMarkSeqNoAsProcessed(seqNo1);
assertThat(tracker.getProcessedCheckpoint(), equalTo(2L));
assertThat(tracker.hasProcessed(between(0, 2)), equalTo(true));
assertThat(tracker.hasProcessed(atLeast(3)), equalTo(false));
assertThat(tracker.getMaxSeqNo(), equalTo(2L));
tracker.fastForwardProcessedSeqNo(seqNo1);
assertThat(tracker.getProcessedCheckpoint(), equalTo(1L));
assertThat(tracker.hasProcessed(between(0, 1)), equalTo(true));
assertThat(tracker.hasProcessed(atLeast(2)), equalTo(false));
assertThat(tracker.getMaxSeqNo(), equalTo(1L));
}
}

0 comments on commit c35bac2

Please sign in to comment.