diff --git a/raft.go b/raft.go index 7fd61f951..034dcfc60 100644 --- a/raft.go +++ b/raft.go @@ -235,8 +235,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna // Restore the current term and the last log r.setCurrentTerm(currentTerm) - r.setLastLogIndex(lastLog.Index) - r.setLastLogTerm(lastLog.Term) + r.setLastLog(lastLog.Index, lastLog.Term) // Attempt to restore a snapshot if there are any if err := r.restoreSnapshot(); err != nil { @@ -465,16 +464,18 @@ func (r *Raft) Stats() map[string]string { toString := func(v uint64) string { return strconv.FormatUint(v, 10) } + lastLogIndex, lastLogTerm := r.getLastLog() + lastSnapIndex, lastSnapTerm := r.getLastSnapshot() s := map[string]string{ "state": r.getState().String(), "term": toString(r.getCurrentTerm()), - "last_log_index": toString(r.getLastLogIndex()), - "last_log_term": toString(r.getLastLogTerm()), + "last_log_index": toString(lastLogIndex), + "last_log_term": toString(lastLogTerm), "commit_index": toString(r.getCommitIndex()), "applied_index": toString(r.getLastApplied()), "fsm_pending": toString(uint64(len(r.fsmCommitCh))), - "last_snapshot_index": toString(r.getLastSnapshotIndex()), - "last_snapshot_term": toString(r.getLastSnapshotTerm()), + "last_snapshot_index": toString(lastSnapIndex), + "last_snapshot_term": toString(lastSnapTerm), "num_peers": toString(uint64(len(r.peers))), } last := r.LastContact() @@ -1127,8 +1128,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { r.leaderState.inflight.StartAll(applyLogs) // Update the last log since it's on disk now - r.setLastLogIndex(lastIndex + uint64(len(applyLogs))) - r.setLastLogTerm(term) + r.setLastLog(lastIndex+uint64(len(applyLogs)), term) // Notify the replicators of the new log for _, f := range r.leaderState.replState { @@ -1369,7 +1369,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { last := a.Entries[n-1] // Delete any conflicting entries - lastLogIdx := r.getLastLogIndex() + lastLogIdx, _ := r.getLastLog() if first.Index <= lastLogIdx { r.logger.Printf("[WARN] raft: Clearing log suffix from %d to %d", first.Index, lastLogIdx) if err := r.logs.DeleteRange(first.Index, lastLogIdx); err != nil { @@ -1385,8 +1385,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { } // Update the lastLog - r.setLastLogIndex(last.Index) - r.setLastLogTerm(last.Term) + r.setLastLog(last.Index, last.Term) metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "storeLogs"}, start) } @@ -1572,8 +1571,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { r.setLastApplied(req.LastLogIndex) // Update the last stable snapshot info - r.setLastSnapshotIndex(req.LastLogIndex) - r.setLastSnapshotTerm(req.LastLogTerm) + r.setLastSnapshot(req.LastLogIndex, req.LastLogTerm) // Restore the peer set peers := decodePeers(req.Peers, r.trans) @@ -1735,7 +1733,7 @@ func (r *Raft) runSnapshots() { // a new snapshot. func (r *Raft) shouldSnapshot() bool { // Check the last snapshot index - lastSnap := r.getLastSnapshotIndex() + lastSnap, _ := r.getLastSnapshot() // Check the last log index lastIdx, err := r.logs.LastIndex() @@ -1800,8 +1798,7 @@ func (r *Raft) takeSnapshot() error { } // Update the last stable snapshot info - r.setLastSnapshotIndex(req.index) - r.setLastSnapshotTerm(req.term) + r.setLastSnapshot(req.index, req.term) // Compact the logs if err := r.compactLogs(req.index); err != nil { @@ -1824,7 +1821,8 @@ func (r *Raft) compactLogs(snapIdx uint64) error { } // Check if we have enough logs to truncate - if r.getLastLogIndex() <= r.conf.TrailingLogs { + lastLogIdx, _ := r.getLastLog() + if lastLogIdx <= r.conf.TrailingLogs { return nil } @@ -1832,7 +1830,7 @@ func (r *Raft) compactLogs(snapIdx uint64) error { // back from the head, which ever is further back. This ensures // at least `TrailingLogs` entries, but does not allow logs // after the snapshot to be removed. - maxLog := min(snapIdx, r.getLastLogIndex()-r.conf.TrailingLogs) + maxLog := min(snapIdx, lastLogIdx-r.conf.TrailingLogs) // Log this r.logger.Printf("[INFO] raft: Compacting logs from %d to %d", minLog, maxLog) @@ -1875,8 +1873,7 @@ func (r *Raft) restoreSnapshot() error { r.setLastApplied(snapshot.Index) // Update the last stable snapshot info - r.setLastSnapshotIndex(snapshot.Index) - r.setLastSnapshotTerm(snapshot.Term) + r.setLastSnapshot(snapshot.Index, snapshot.Term) // Success! return nil diff --git a/replication.go b/replication.go index 6a01631d2..3b2451241 100644 --- a/replication.go +++ b/replication.go @@ -102,9 +102,9 @@ RPC: } return case <-s.triggerCh: - shouldStop = r.replicateTo(s, r.getLastLogIndex()) + shouldStop = r.replicateTo(s, r.getLastLogIndexOnly()) case <-randomTimeout(r.conf.CommitTimeout): - shouldStop = r.replicateTo(s, r.getLastLogIndex()) + shouldStop = r.replicateTo(s, r.getLastLogIndexOnly()) } // If things looks healthy, switch to pipeline mode @@ -358,9 +358,9 @@ SEND: } break SEND case <-s.triggerCh: - shouldStop = r.pipelineSend(s, pipeline, &nextIndex, r.getLastLogIndex()) + shouldStop = r.pipelineSend(s, pipeline, &nextIndex, r.getLastLogIndexOnly()) case <-randomTimeout(r.conf.CommitTimeout): - shouldStop = r.pipelineSend(s, pipeline, &nextIndex, r.getLastLogIndex()) + shouldStop = r.pipelineSend(s, pipeline, &nextIndex, r.getLastLogIndexOnly()) } } @@ -446,13 +446,14 @@ func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequ func (r *Raft) setPreviousLog(req *AppendEntriesRequest, nextIndex uint64) error { // Guard for the first index, since there is no 0 log entry // Guard against the previous index being a snapshot as well + lastSnapIdx, lastSnapTerm := r.getLastSnapshot() if nextIndex == 1 { req.PrevLogEntry = 0 req.PrevLogTerm = 0 - } else if (nextIndex - 1) == r.getLastSnapshotIndex() { - req.PrevLogEntry = r.getLastSnapshotIndex() - req.PrevLogTerm = r.getLastSnapshotTerm() + } else if (nextIndex - 1) == lastSnapIdx { + req.PrevLogEntry = lastSnapIdx + req.PrevLogTerm = lastSnapTerm } else { var l Log diff --git a/state.go b/state.go index d53530dec..53ffd8a1a 100644 --- a/state.go +++ b/state.go @@ -1,6 +1,7 @@ package raft import ( + "sync" "sync/atomic" ) @@ -44,20 +45,23 @@ type raftState struct { // The current term, cache of StableStore currentTerm uint64 - // Cache the latest log from LogStore - LastLogIndex uint64 - LastLogTerm uint64 - // Highest committed log entry commitIndex uint64 // Last applied log to the FSM lastApplied uint64 + // protects 4 next fields + lastLock sync.Mutex + // Cache the latest snapshot index/term lastSnapshotIndex uint64 lastSnapshotTerm uint64 + // Cache the latest log from LogStore + lastLogIndex uint64 + lastLogTerm uint64 + // Tracks the number of live routines runningRoutines int32 @@ -83,20 +87,39 @@ func (r *raftState) setCurrentTerm(term uint64) { atomic.StoreUint64(&r.currentTerm, term) } -func (r *raftState) getLastLogIndex() uint64 { - return atomic.LoadUint64(&r.LastLogIndex) +func (r *raftState) getLastLog() (index, term uint64) { + r.lastLock.Lock() + index = r.lastLogIndex + term = r.lastLogTerm + r.lastLock.Unlock() + return +} + +func (r *raftState) getLastLogIndexOnly() uint64 { + i, _ := r.getLastLog() + return i } -func (r *raftState) setLastLogIndex(index uint64) { - atomic.StoreUint64(&r.LastLogIndex, index) +func (r *raftState) setLastLog(index, term uint64) { + r.lastLock.Lock() + r.lastLogIndex = index + r.lastLogTerm = term + r.lastLock.Unlock() } -func (r *raftState) getLastLogTerm() uint64 { - return atomic.LoadUint64(&r.LastLogTerm) +func (r *raftState) getLastSnapshot() (index, term uint64) { + r.lastLock.Lock() + index = r.lastSnapshotIndex + term = r.lastSnapshotTerm + r.lastLock.Unlock() + return } -func (r *raftState) setLastLogTerm(term uint64) { - atomic.StoreUint64(&r.LastLogTerm, term) +func (r *raftState) setLastSnapshot(index, term uint64) { + r.lastLock.Lock() + r.lastSnapshotIndex = index + r.lastSnapshotTerm = term + r.lastLock.Unlock() } func (r *raftState) getCommitIndex() uint64 { @@ -115,6 +138,7 @@ func (r *raftState) setLastApplied(index uint64) { atomic.StoreUint64(&r.lastApplied, index) } +<<<<<<< 2e665d4be2a5d4698d83f06012524e02bcabd104 func (r *raftState) getLastSnapshotIndex() uint64 { return atomic.LoadUint64(&r.lastSnapshotIndex) } @@ -156,14 +180,18 @@ func (r *raftState) goFunc(f func()) { // getLastIndex returns the last index in stable storage. // Either from the last log or from the last snapshot. func (r *raftState) getLastIndex() uint64 { - return max(r.getLastLogIndex(), r.getLastSnapshotIndex()) + r.lastLock.Lock() + defer r.lastLock.Unlock() + return max(r.lastLogIndex, r.lastSnapshotIndex) } // getLastEntry returns the last index and term in stable storage. // Either from the last log or from the last snapshot. func (r *raftState) getLastEntry() (uint64, uint64) { - if r.getLastLogIndex() >= r.getLastSnapshotIndex() { - return r.getLastLogIndex(), r.getLastLogTerm() + r.lastLock.Lock() + defer r.lastLock.Unlock() + if r.lastLogIndex >= r.lastSnapshotIndex { + return r.lastLogIndex, r.lastLogTerm } - return r.getLastSnapshotIndex(), r.getLastSnapshotTerm() + return r.lastSnapshotIndex, r.lastSnapshotTerm }