Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix local processed checkpoint update #2576

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOExceptio
assert engineConfig.isReadOnly() : "Only replicas should update Infos";
externalReaderManager.internalReaderManager.updateSegments(infos);
externalReaderManager.maybeRefresh();
localCheckpointTracker.markSeqNoAsProcessed(seqNo);
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}

private LocalCheckpointTracker createLocalCheckpointTracker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index.seqno;

import com.carrotsearch.hppc.LongObjectHashMap;
import org.opensearch.common.Nullable;
import org.opensearch.common.SuppressForbidden;

import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -116,6 +117,13 @@ public void advanceMaxSeqNo(final long seqNo) {
nextSeqNo.accumulateAndGet(seqNo + 1, Math::max);
}

/**
* Checks that the sequence number is in an acceptable range for an update to take place.
*/
private boolean shouldUpdateSeqNo(final long seqNo, final AtomicLong lowerBound, @Nullable final AtomicLong upperBound) {
return !((seqNo <= lowerBound.get()) || (upperBound != null && seqNo > upperBound.get()));
}

/**
* Marks the provided sequence number as processed and updates the processed checkpoint if possible.
*
Expand All @@ -134,11 +142,28 @@ public synchronized void markSeqNoAsPersisted(final long seqNo) {
markSeqNo(seqNo, persistedCheckpoint, persistedSeqNo);
}

/**
* Updates the processed sequence checkpoint to the given value.
*
* This method is only used for segment replication since indexing doesn't
* take place on the replica allowing us to avoid the check that all sequence numbers
* are consecutively processed.
*
* @param seqNo the sequence number to mark as processed
*/
public synchronized void fastForwardProcessedSeqNo(final long seqNo) {
advanceMaxSeqNo(seqNo);
if (shouldUpdateSeqNo(seqNo, processedCheckpoint, persistedCheckpoint) == false) {
return;
}
processedCheckpoint.compareAndSet(processedCheckpoint.get(), seqNo);
}

private void markSeqNo(final long seqNo, final AtomicLong checkPoint, final LongObjectHashMap<CountedBitSet> bitSetMap) {
assert Thread.holdsLock(this);
// make sure we track highest seen sequence number
advanceMaxSeqNo(seqNo);
if (seqNo <= checkPoint.get()) {
if (shouldUpdateSeqNo(seqNo, checkPoint, null) == false) {
// this is possible during recovery where we might replay an operation that was also replicated
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.seqno;

import org.junit.Before;
import org.opensearch.test.OpenSearchTestCase;

import static org.hamcrest.Matchers.equalTo;

public class SegmentReplicationLocalCheckpointTrackerTests extends OpenSearchTestCase {

private LocalCheckpointTracker tracker;

public static LocalCheckpointTracker createEmptyTracker() {
return new LocalCheckpointTracker(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED);
}

@Override
@Before
public void setUp() throws Exception {
super.setUp();
tracker = createEmptyTracker();
}

public void testSimpleSegrepProcessedNoPersistentUpdate() {
// base case with no persistent checkpoint update
long seqNo1;
assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
seqNo1 = tracker.generateSeqNo();
assertThat(seqNo1, equalTo(0L));
tracker.fastForwardProcessedSeqNo(seqNo1);
assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L));
}

public void testSimpleSegrepProcessedPersistentUpdate() {
// base case with persistent checkpoint update
long seqNo1;
assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
seqNo1 = tracker.generateSeqNo();
assertThat(seqNo1, equalTo(0L));

tracker.markSeqNoAsPersisted(seqNo1);
assertThat(tracker.getPersistedCheckpoint(), equalTo(0L));
tracker.fastForwardProcessedSeqNo(seqNo1);
assertThat(tracker.getProcessedCheckpoint(), equalTo(0L));
assertThat(tracker.hasProcessed(0L), equalTo(true));
assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false));

// idempotent case
tracker.fastForwardProcessedSeqNo(seqNo1);
assertThat(tracker.getProcessedCheckpoint(), equalTo(0L));
assertThat(tracker.hasProcessed(0L), equalTo(true));
assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false));

}

public void testSimpleSegrepProcessedPersistentUpdate2() {
long seqNo1, seqNo2;
assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
seqNo1 = tracker.generateSeqNo();
seqNo2 = tracker.generateSeqNo();
assertThat(seqNo1, equalTo(0L));
assertThat(seqNo2, equalTo(1L));
tracker.markSeqNoAsPersisted(seqNo1);
tracker.markSeqNoAsPersisted(seqNo2);
assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L));
assertThat(tracker.getPersistedCheckpoint(), equalTo(1L));

tracker.fastForwardProcessedSeqNo(seqNo2);
assertThat(tracker.getProcessedCheckpoint(), equalTo(1L));
assertThat(tracker.hasProcessed(seqNo1), equalTo(true));
assertThat(tracker.hasProcessed(seqNo2), equalTo(true));

tracker.fastForwardProcessedSeqNo(seqNo1);
assertThat(tracker.getProcessedCheckpoint(), equalTo(1L));
assertThat(tracker.hasProcessed(between(0, 1)), equalTo(true));
assertThat(tracker.hasProcessed(atLeast(2)), equalTo(false));
assertThat(tracker.getMaxSeqNo(), equalTo(1L));
}
}