diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/FSMCaller.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/FSMCaller.java index 5de8f6555..b6824118c 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/FSMCaller.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/FSMCaller.java @@ -125,6 +125,11 @@ interface LastAppliedLogIndexListener { */ long getLastAppliedIndex(); + /** + * Returns the last log entry that was committed to raft group. + */ + long getLastCommittedIndex(); + /** * Called after shutdown to wait it terminates. * diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/Node.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/Node.java index d309c2b7e..fe89d9cd8 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/Node.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/Node.java @@ -359,4 +359,25 @@ public interface Node extends Lifecycle, Describer { * @since 1.3.8 */ State getNodeState(); + + /** + * Retrieve the last log index in log storage. Note: the last log may not be committed to raft group, and may be truncated in some cases. + * @return the last log index + * @since 1.3.12 + */ + long getLastLogIndex(); + + /** + * Retrieve the last log index that committed to the raft group. + * @return the last committed log index + * @since 1.3.12 + */ + long getLastCommittedIndex(); + + /** + * Retrieve the last log index that applied to state machine. + * @return the last applied log index + * @since 1.3.12 + */ + long getLastAppliedLogIndex(); } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/FSMCallerImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/FSMCallerImpl.java index b43c2b4c9..d1c6ad4ac 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/FSMCallerImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/FSMCallerImpl.java @@ -163,6 +163,7 @@ private void setFsmThread() { private StateMachine fsm; private ClosureQueue closureQueue; private final AtomicLong lastAppliedIndex; + private final AtomicLong lastCommittedIndex; private long lastAppliedTerm; private Closure afterShutdown; private NodeImpl node; @@ -180,6 +181,7 @@ public FSMCallerImpl() { this.currTask = TaskType.IDLE; this.lastAppliedIndex = new AtomicLong(0); this.applyingIndex = new AtomicLong(0); + this.lastCommittedIndex = new AtomicLong(0); } @SuppressWarnings("unchecked") @@ -191,6 +193,7 @@ public boolean init(final FSMCallerOptions opts) { this.afterShutdown = opts.getAfterShutdown(); this.node = opts.getNode(); this.nodeMetrics = this.node.getNodeMetrics(); + this.lastCommittedIndex.set(opts.getBootstrapId().getIndex()); this.lastAppliedIndex.set(opts.getBootstrapId().getIndex()); notifyLastAppliedIndexUpdated(this.lastAppliedIndex.get()); this.lastAppliedTerm = opts.getBootstrapId().getTerm(); @@ -357,6 +360,11 @@ public boolean onError(final RaftException error) { }); } + @Override + public long getLastCommittedIndex() { + return lastCommittedIndex.get(); + } + @Override public long getLastAppliedIndex() { return this.lastAppliedIndex.get(); @@ -509,6 +517,7 @@ private void doCommitted(final long committedIndex) { if (lastAppliedIndex >= committedIndex) { return; } + this.lastCommittedIndex.set(committedIndex); final long startMs = Utils.monotonicMs(); try { final List closures = new ArrayList<>(); @@ -707,6 +716,7 @@ private void doSnapshotLoad(final LoadSnapshotClosure done) { } this.fsm.onConfigurationCommitted(conf); } + this.lastCommittedIndex.set(meta.getLastIncludedIndex()); this.lastAppliedIndex.set(meta.getLastIncludedIndex()); this.lastAppliedTerm = meta.getLastIncludedTerm(); done.run(Status.OK()); diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index b87793a92..ff87d27ca 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -3458,6 +3458,45 @@ public State getNodeState() { return this.state; } + @Override + public long getLastLogIndex() { + this.readLock.lock(); + try { + if (this.state.isActive()) { + return this.logManager.getLastLogIndex(); + } + throw new IllegalStateException("The node is not active, current state: " + this.state); + } finally { + this.readLock.unlock(); + } + } + + @Override + public long getLastCommittedIndex() { + this.readLock.lock(); + try { + if (this.state.isActive()) { + return this.fsmCaller.getLastCommittedIndex(); + } + throw new IllegalStateException("The node is not active, current state: " + this.state); + } finally { + this.readLock.unlock(); + } + } + + @Override + public long getLastAppliedLogIndex() { + this.readLock.lock(); + try { + if (this.state.isActive()) { + return this.fsmCaller.getLastAppliedIndex(); + } + throw new IllegalStateException("The node is not active, current state: " + this.state); + } finally { + this.readLock.unlock(); + } + } + @Override public void describe(final Printer out) { // node diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/FSMCallerTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/FSMCallerTest.java index 4e4eed45c..742eab3f3 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/FSMCallerTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/FSMCallerTest.java @@ -103,6 +103,7 @@ public void testOnCommittedError() throws Exception { assertTrue(this.fsmCaller.onCommitted(11)); this.fsmCaller.flush(); + assertEquals(this.fsmCaller.getLastCommittedIndex(), 11); assertEquals(this.fsmCaller.getLastAppliedIndex(), 10); Mockito.verify(this.logManager).setAppliedId(new LogId(10, 1)); assertFalse(this.fsmCaller.getError().getStatus().isOk()); @@ -122,6 +123,7 @@ public void testOnCommitted() throws Exception { assertTrue(this.fsmCaller.onCommitted(11)); this.fsmCaller.flush(); + assertEquals(this.fsmCaller.getLastCommittedIndex(), 11); assertEquals(this.fsmCaller.getLastAppliedIndex(), 11); Mockito.verify(this.fsm).onApply(itArg.capture()); final Iterator it = itArg.getValue(); @@ -153,6 +155,7 @@ public SnapshotReader start() { } }); latch.await(); + assertEquals(this.fsmCaller.getLastCommittedIndex(), 12); assertEquals(this.fsmCaller.getLastAppliedIndex(), 12); Mockito.verify(this.fsm).onConfigurationCommitted(Mockito.any()); } @@ -181,6 +184,7 @@ public SnapshotReader start() { } }); latch.await(); + assertEquals(this.fsmCaller.getLastCommittedIndex(), 10); assertEquals(this.fsmCaller.getLastAppliedIndex(), 10); } @@ -316,6 +320,7 @@ public SnapshotReader start() { } }); latch.await(); + assertEquals(this.fsmCaller.getLastCommittedIndex(), 10); assertEquals(this.fsmCaller.getLastAppliedIndex(), 10); } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java index 82f89e03c..ac13692b1 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java @@ -596,11 +596,19 @@ public void testTripleNodes() throws Exception { // get leader final Node leader = cluster.getLeader(); + assertEquals(1, leader.getLastAppliedLogIndex()); + assertEquals(1, leader.getLastCommittedIndex()); + assertEquals(1, leader.getLastLogIndex()); assertNotNull(leader); assertEquals(3, leader.listPeers().size()); // apply tasks to leader this.sendTestTaskAndWait(leader); + assertEquals(11, leader.getLastCommittedIndex()); + assertEquals(11, leader.getLastLogIndex()); + Thread.sleep(500); + assertEquals(11, leader.getLastAppliedLogIndex()); + { final ByteBuffer data = ByteBuffer.wrap("no closure".getBytes()); final Task task = new Task(data, null); @@ -633,8 +641,18 @@ public void onCommitted() { assertEquals("apply", cbs.get(1)); } + assertEquals(13, leader.getLastCommittedIndex()); + assertEquals(13, leader.getLastLogIndex()); + Thread.sleep(500); + assertEquals(13, leader.getLastAppliedLogIndex()); + cluster.ensureSame(-1); assertEquals(2, cluster.getFollowers().size()); + for (Node follower : cluster.getFollowers()) { + assertEquals(13, follower.getLastCommittedIndex()); + assertEquals(13, follower.getLastLogIndex()); + assertEquals(13, follower.getLastAppliedLogIndex()); + } cluster.stopAll(); }