Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add methods to Node for getting last log indexes #880

Merged
merged 2 commits into from
Sep 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/FSMCaller.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ interface LastAppliedLogIndexListener {
*/
long getLastAppliedIndex();

/**
* Returns the last log entry that was committed to raft group.
*/
long getLastCommittedIndex();

/**
* Called after shutdown to wait it terminates.
*
Expand Down
21 changes: 21 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,25 @@ public interface Node extends Lifecycle<NodeOptions>, Describer {
* @since 1.3.8
*/
State getNodeState();

/**
* Retrieve the last log index in log storage. Note: the last log may not be committed to raft group, and may be truncated in some cases.
* @return the last log index
* @since 1.3.12
*/
long getLastLogIndex();

/**
* Retrieve the last log index that committed to the raft group.
* @return the last committed log index
* @since 1.3.12
*/
long getLastCommittedIndex();

/**
* Retrieve the last log index that applied to state machine.
* @return the last applied log index
* @since 1.3.12
*/
long getLastAppliedLogIndex();
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ private void setFsmThread() {
private StateMachine fsm;
private ClosureQueue closureQueue;
private final AtomicLong lastAppliedIndex;
private final AtomicLong lastCommittedIndex;
private long lastAppliedTerm;
private Closure afterShutdown;
private NodeImpl node;
Expand All @@ -180,6 +181,7 @@ public FSMCallerImpl() {
this.currTask = TaskType.IDLE;
this.lastAppliedIndex = new AtomicLong(0);
this.applyingIndex = new AtomicLong(0);
this.lastCommittedIndex = new AtomicLong(0);
}

@SuppressWarnings("unchecked")
Expand All @@ -191,6 +193,7 @@ public boolean init(final FSMCallerOptions opts) {
this.afterShutdown = opts.getAfterShutdown();
this.node = opts.getNode();
this.nodeMetrics = this.node.getNodeMetrics();
this.lastCommittedIndex.set(opts.getBootstrapId().getIndex());
this.lastAppliedIndex.set(opts.getBootstrapId().getIndex());
notifyLastAppliedIndexUpdated(this.lastAppliedIndex.get());
this.lastAppliedTerm = opts.getBootstrapId().getTerm();
Expand Down Expand Up @@ -357,6 +360,11 @@ public boolean onError(final RaftException error) {
});
}

@Override
public long getLastCommittedIndex() {
return lastCommittedIndex.get();
}

@Override
public long getLastAppliedIndex() {
return this.lastAppliedIndex.get();
Expand Down Expand Up @@ -509,6 +517,7 @@ private void doCommitted(final long committedIndex) {
if (lastAppliedIndex >= committedIndex) {
return;
}
this.lastCommittedIndex.set(committedIndex);
final long startMs = Utils.monotonicMs();
try {
final List<Closure> closures = new ArrayList<>();
Expand Down Expand Up @@ -707,6 +716,7 @@ private void doSnapshotLoad(final LoadSnapshotClosure done) {
}
this.fsm.onConfigurationCommitted(conf);
}
this.lastCommittedIndex.set(meta.getLastIncludedIndex());
this.lastAppliedIndex.set(meta.getLastIncludedIndex());
this.lastAppliedTerm = meta.getLastIncludedTerm();
done.run(Status.OK());
Expand Down
39 changes: 39 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -3458,6 +3458,45 @@ public State getNodeState() {
return this.state;
}

@Override
public long getLastLogIndex() {
this.readLock.lock();
try {
if (this.state.isActive()) {
return this.logManager.getLastLogIndex();
}
throw new IllegalStateException("The node is not active, current state: " + this.state);
} finally {
this.readLock.unlock();
}
}

@Override
public long getLastCommittedIndex() {
this.readLock.lock();
try {
if (this.state.isActive()) {
return this.fsmCaller.getLastCommittedIndex();
}
throw new IllegalStateException("The node is not active, current state: " + this.state);
} finally {
this.readLock.unlock();
}
}

@Override
public long getLastAppliedLogIndex() {
this.readLock.lock();
try {
if (this.state.isActive()) {
return this.fsmCaller.getLastAppliedIndex();
}
throw new IllegalStateException("The node is not active, current state: " + this.state);
} finally {
this.readLock.unlock();
}
}

@Override
public void describe(final Printer out) {
// node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void testOnCommittedError() throws Exception {
assertTrue(this.fsmCaller.onCommitted(11));

this.fsmCaller.flush();
assertEquals(this.fsmCaller.getLastCommittedIndex(), 11);
assertEquals(this.fsmCaller.getLastAppliedIndex(), 10);
Mockito.verify(this.logManager).setAppliedId(new LogId(10, 1));
assertFalse(this.fsmCaller.getError().getStatus().isOk());
Expand All @@ -122,6 +123,7 @@ public void testOnCommitted() throws Exception {
assertTrue(this.fsmCaller.onCommitted(11));

this.fsmCaller.flush();
assertEquals(this.fsmCaller.getLastCommittedIndex(), 11);
assertEquals(this.fsmCaller.getLastAppliedIndex(), 11);
Mockito.verify(this.fsm).onApply(itArg.capture());
final Iterator it = itArg.getValue();
Expand Down Expand Up @@ -153,6 +155,7 @@ public SnapshotReader start() {
}
});
latch.await();
assertEquals(this.fsmCaller.getLastCommittedIndex(), 12);
assertEquals(this.fsmCaller.getLastAppliedIndex(), 12);
Mockito.verify(this.fsm).onConfigurationCommitted(Mockito.any());
}
Expand Down Expand Up @@ -181,6 +184,7 @@ public SnapshotReader start() {
}
});
latch.await();
assertEquals(this.fsmCaller.getLastCommittedIndex(), 10);
assertEquals(this.fsmCaller.getLastAppliedIndex(), 10);
}

Expand Down Expand Up @@ -316,6 +320,7 @@ public SnapshotReader start() {
}
});
latch.await();
assertEquals(this.fsmCaller.getLastCommittedIndex(), 10);
assertEquals(this.fsmCaller.getLastAppliedIndex(), 10);
}

Expand Down
18 changes: 18 additions & 0 deletions jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -596,11 +596,19 @@ public void testTripleNodes() throws Exception {

// get leader
final Node leader = cluster.getLeader();
assertEquals(1, leader.getLastAppliedLogIndex());
assertEquals(1, leader.getLastCommittedIndex());
assertEquals(1, leader.getLastLogIndex());
assertNotNull(leader);
assertEquals(3, leader.listPeers().size());
// apply tasks to leader
this.sendTestTaskAndWait(leader);

assertEquals(11, leader.getLastCommittedIndex());
assertEquals(11, leader.getLastLogIndex());
Thread.sleep(500);
assertEquals(11, leader.getLastAppliedLogIndex());

{
final ByteBuffer data = ByteBuffer.wrap("no closure".getBytes());
final Task task = new Task(data, null);
Expand Down Expand Up @@ -633,8 +641,18 @@ public void onCommitted() {
assertEquals("apply", cbs.get(1));
}

assertEquals(13, leader.getLastCommittedIndex());
assertEquals(13, leader.getLastLogIndex());
Thread.sleep(500);
assertEquals(13, leader.getLastAppliedLogIndex());

cluster.ensureSame(-1);
assertEquals(2, cluster.getFollowers().size());
for (Node follower : cluster.getFollowers()) {
assertEquals(13, follower.getLastCommittedIndex());
assertEquals(13, follower.getLastLogIndex());
assertEquals(13, follower.getLastAppliedLogIndex());
}
cluster.stopAll();
}

Expand Down