Skip to content

Commit

Permalink
fix local processed checkpoint update
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed Mar 30, 2022
1 parent 9bcee79 commit a432fb8
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOExceptio
assert engineConfig.isReadOnly() : "Only replicas should update Infos";
externalReaderManager.internalReaderManager.updateSegments(infos);
externalReaderManager.maybeRefresh();
localCheckpointTracker.markSeqNoAsProcessed(seqNo);
localCheckpointTracker.segrepMarkSeqNoAsProcessed(seqNo);
}

private LocalCheckpointTracker createLocalCheckpointTracker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,29 @@ public synchronized void markSeqNoAsPersisted(final long seqNo) {
markSeqNo(seqNo, persistedCheckpoint, persistedSeqNo);
}

/**
* Updates the processed checkpoint with the provided sequence number if segment replication is enabled
*
* @param seqNo the sequence number to mark as processed
*/
public synchronized void segrepMarkSeqNoAsProcessed(final long seqNo) {
markSegrepSeqNo(seqNo, processedCheckpoint, persistedCheckpoint);
}

@SuppressForbidden(reason = "Object#notifyAll")
private void markSegrepSeqNo(final long seqNo, final AtomicLong processedCheckpoint, final AtomicLong persistedCheckpoint) {
assert Thread.holdsLock(this);
advanceMaxSeqNo(seqNo);
if ((seqNo > persistedCheckpoint.get()) || (seqNo <= processedCheckpoint.get())) {
return;
}
try {
processedCheckpoint.compareAndSet(processedCheckpoint.get(), seqNo);
} finally {
this.notifyAll();
}
}

private void markSeqNo(final long seqNo, final AtomicLong checkPoint, final LongObjectHashMap<CountedBitSet> bitSetMap) {
assert Thread.holdsLock(this);
// make sure we track highest seen sequence number
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.seqno;

import org.junit.Before;
import org.opensearch.test.OpenSearchTestCase;

import static org.hamcrest.Matchers.equalTo;

public class SegmentReplicationLocalCheckpointTrackerTests extends OpenSearchTestCase {

private LocalCheckpointTracker tracker;

public static LocalCheckpointTracker createEmptyTracker() {
return new LocalCheckpointTracker(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED);
}

@Override
@Before
public void setUp() throws Exception {
super.setUp();
tracker = createEmptyTracker();
}

public void testSimpleSegrepPrimaryProcessed() {
long seqNo1, seqNo2;
assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
seqNo1 = tracker.generateSeqNo();
assertThat(seqNo1, equalTo(0L));
tracker.segrepMarkSeqNoAsProcessed(seqNo1);
assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L));

tracker.markSeqNoAsPersisted(seqNo1);
assertThat(tracker.getPersistedCheckpoint(), equalTo(0L));
tracker.segrepMarkSeqNoAsProcessed(seqNo1);
tracker.segrepMarkSeqNoAsProcessed(seqNo1);
assertThat(tracker.getProcessedCheckpoint(), equalTo(0L));
assertThat(tracker.hasProcessed(0L), equalTo(true));
assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false));

seqNo1 = tracker.generateSeqNo();
seqNo2 = tracker.generateSeqNo();
assertThat(seqNo1, equalTo(1L));
assertThat(seqNo2, equalTo(2L));
tracker.markSeqNoAsPersisted(seqNo1);
tracker.markSeqNoAsPersisted(seqNo2);
assertThat(tracker.getProcessedCheckpoint(), equalTo(0L));
assertThat(tracker.getPersistedCheckpoint(), equalTo(2L));

tracker.segrepMarkSeqNoAsProcessed(seqNo2);
assertThat(tracker.getProcessedCheckpoint(), equalTo(2L));
assertThat(tracker.hasProcessed(seqNo1), equalTo(true));
assertThat(tracker.hasProcessed(seqNo2), equalTo(true));

tracker.segrepMarkSeqNoAsProcessed(seqNo1);
assertThat(tracker.getProcessedCheckpoint(), equalTo(2L));
assertThat(tracker.hasProcessed(between(0, 2)), equalTo(true));
assertThat(tracker.hasProcessed(atLeast(3)), equalTo(false));
assertThat(tracker.getMaxSeqNo(), equalTo(2L));
}
}

0 comments on commit a432fb8

Please sign in to comment.