diff --git a/commitment.go b/commitment.go new file mode 100644 index 00000000000..f865e1fa33a --- /dev/null +++ b/commitment.go @@ -0,0 +1,99 @@ +package raft + +import ( + "sort" + "sync" +) + +// Commitment is used to advance the leader's commit index. The leader and +// replication goroutines report in newly written entries with Match(), and +// this notifies on commitCh when the commit index has advanced. +type commitment struct { + // protectes matchIndexes and commitIndex + sync.Mutex + // notified when commitIndex increases + commitCh chan struct{} + // voter to log index: the server stores up through this log entry + matchIndexes map[string]uint64 + // a quorum stores up through this log entry. monotonically increases. + commitIndex uint64 + // the first index of this leader's term: this needs to be replicated to a + // majority of the cluster before this leader may mark anything committed + // (per Raft's commitment rule) + startIndex uint64 +} + +// newCommitment returns an commitment struct that notifies the provided +// channel when log entries have been committed. A new commitment struct is +// created each time this server becomes leader for a particular term. +// 'voters' are the voting members of the cluster, including the +// local server except when it's removed itself from the cluster. +// 'startIndex' is the first index created in this term (see +// its description above). +func newCommitment(commitCh chan struct{}, voters []string, startIndex uint64) *commitment { + matchIndexes := make(map[string]uint64, len(voters)) + for _, voter := range voters { + matchIndexes[voter] = 0 + } + return &commitment{ + commitCh: commitCh, + matchIndexes: matchIndexes, + commitIndex: 0, + startIndex: startIndex, + } +} + +// Called when a new cluster membership configuration is created: it will be +// used to determine commitment from now on. 'voters' are the voting members of +// the cluster, including the local server except when it's removed itself from +// the cluster. +func (c *commitment) setVoters(voters []string) { + c.Lock() + defer c.Unlock() + oldMatchIndexes := c.matchIndexes + c.matchIndexes = make(map[string]uint64, len(voters)) + for _, voter := range voters { + c.matchIndexes[voter] = oldMatchIndexes[voter] // defaults to 0 + } + c.recalculate() +} + +// Called by leader after commitCh is notified +func (c *commitment) getCommitIndex() uint64 { + c.Lock() + defer c.Unlock() + return c.commitIndex +} + +// Match is called once a server completes writing entries to disk: either the +// leader has written the new entry or a follower has replied to an +// AppendEntries RPC. The given server's disk agrees with this server's log up +// through the given index. +func (c *commitment) match(server string, matchIndex uint64) { + c.Lock() + defer c.Unlock() + if prev, hasVote := c.matchIndexes[server]; hasVote && matchIndex > prev { + c.matchIndexes[server] = matchIndex + c.recalculate() + } +} + +// Internal helper to calculate new commitIndex from matchIndexes. +// Must be called with lock held. +func (c *commitment) recalculate() { + if len(c.matchIndexes) == 0 { + return + } + + matched := make([]uint64, 0, len(c.matchIndexes)) + for _, idx := range c.matchIndexes { + matched = append(matched, idx) + } + sort.Sort(uint64Slice(matched)) + quorumMatchIndex := matched[(len(matched)-1)/2] + + if quorumMatchIndex > c.commitIndex && quorumMatchIndex >= c.startIndex { + c.commitIndex = quorumMatchIndex + asyncNotifyCh(c.commitCh) + } +} diff --git a/commitment_test.go b/commitment_test.go new file mode 100644 index 00000000000..c3cfd7f1b66 --- /dev/null +++ b/commitment_test.go @@ -0,0 +1,230 @@ +package raft + +import ( + "testing" +) + +// Returns a slice of server names of size n. +func voters(n int) []string { + if n > 7 { + panic("only up to 7 servers implemented") + } + return []string{"s1", "s2", "s3", "s4", "s5", "s6", "s7"}[:n] +} + +// Tests setVoters() keeps matchIndexes where possible. +func TestCommitment_setVoters(t *testing.T) { + commitCh := make(chan struct{}, 1) + c := newCommitment(commitCh, []string{"a", "b", "c"}, 0) + c.match("a", 10) + c.match("b", 20) + c.match("c", 30) + // commitIndex: 20 + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } + c.setVoters([]string{"c", "d", "e"}) + // c: 30, d: 0, e: 0 + c.match("e", 40) + if c.getCommitIndex() != 30 { + t.Fatalf("expected 30 entries committed, found %d", + c.getCommitIndex()) + } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } +} + +// Tests match() being called with smaller index than before. +func TestCommitment_match_max(t *testing.T) { + commitCh := make(chan struct{}, 1) + c := newCommitment(commitCh, voters(5), 4) + + c.match("s1", 8) + c.match("s2", 8) + c.match("s2", 1) + c.match("s3", 8) + + if c.getCommitIndex() != 8 { + t.Fatalf("calling match with an earlier index should be ignored") + } +} + +// Tests match() being called with non-voters. +func TestCommitment_match_nonVoting(t *testing.T) { + commitCh := make(chan struct{}, 1) + c := newCommitment(commitCh, voters(5), 4) + + c.match("s1", 8) + c.match("s2", 8) + c.match("s3", 8) + + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } + + c.match("s90", 10) + c.match("s91", 10) + c.match("s92", 10) + + if c.getCommitIndex() != 8 { + t.Fatalf("non-voting servers shouldn't be able to commit") + } + if drainNotifyCh(commitCh) { + t.Fatalf("unexpected commit notify") + } +} + +// Tests recalculate() algorithm. +func TestCommitment_recalculate(t *testing.T) { + commitCh := make(chan struct{}, 1) + c := newCommitment(commitCh, voters(5), 0) + + c.match("s1", 30) + c.match("s2", 20) + + if c.getCommitIndex() != 0 { + t.Fatalf("shouldn't commit after two of five servers") + } + if drainNotifyCh(commitCh) { + t.Fatalf("unexpected commit notify") + } + + c.match("s3", 10) + if c.getCommitIndex() != 10 { + t.Fatalf("expected 10 entries committed, found %d", + c.getCommitIndex()) + } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } + c.match("s4", 15) + if c.getCommitIndex() != 15 { + t.Fatalf("expected 15 entries committed, found %d", + c.getCommitIndex()) + } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } + + c.setVoters(voters(3)) + // s1: 30, s2: 20, s3: 10 + if c.getCommitIndex() != 20 { + t.Fatalf("expected 20 entries committed, found %d", + c.getCommitIndex()) + } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } + + c.setVoters(voters(4)) + // s1: 30, s2: 20, s3: 10, s4: 0 + c.match("s2", 25) + if c.getCommitIndex() != 20 { + t.Fatalf("expected 20 entries committed, found %d", + c.getCommitIndex()) + } + if drainNotifyCh(commitCh) { + t.Fatalf("unexpected commit notify") + } + c.match("s4", 23) + if c.getCommitIndex() != 23 { + t.Fatalf("expected 23 entries committed, found %d", + c.getCommitIndex()) + } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } +} + +// Tests recalculate() respecting startIndex. +func TestCommitment_recalculate_startIndex(t *testing.T) { + commitCh := make(chan struct{}, 1) + c := newCommitment(commitCh, voters(5), 4) + + c.match("s1", 3) + c.match("s2", 3) + c.match("s3", 3) + + if c.getCommitIndex() != 0 { + t.Fatalf("can't commit until startIndex is replicated to a quorum") + } + if drainNotifyCh(commitCh) { + t.Fatalf("unexpected commit notify") + } + + c.match("s1", 4) + c.match("s2", 4) + c.match("s3", 4) + + if c.getCommitIndex() != 4 { + t.Fatalf("should be able to commit startIndex once replicated to a quorum") + } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } +} + +// With no voting members in the cluster, the most sane behavior is probably +// to not mark anything committed. +func TestCommitment_noVoterSanity(t *testing.T) { + commitCh := make(chan struct{}, 1) + c := newCommitment(commitCh, []string{}, 4) + c.match("s1", 10) + c.setVoters([]string{}) + c.match("s1", 10) + if c.getCommitIndex() != 0 { + t.Fatalf("no voting servers: shouldn't be able to commit") + } + if drainNotifyCh(commitCh) { + t.Fatalf("unexpected commit notify") + } + + // add a voter so we can commit something and then remove it + c.setVoters(voters(1)) + c.match("s1", 10) + if c.getCommitIndex() != 10 { + t.Fatalf("expected 10 entries committed, found %d", + c.getCommitIndex()) + } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } + + c.setVoters([]string{}) + c.match("s1", 20) + if c.getCommitIndex() != 10 { + t.Fatalf("expected 10 entries committed, found %d", + c.getCommitIndex()) + } + if drainNotifyCh(commitCh) { + t.Fatalf("unexpected commit notify") + } + +} + +// Single voter commits immediately. +func TestCommitment_singleVoter(t *testing.T) { + commitCh := make(chan struct{}, 1) + c := newCommitment(commitCh, voters(1), 4) + c.match("s1", 10) + if c.getCommitIndex() != 10 { + t.Fatalf("expected 10 entries committed, found %d", + c.getCommitIndex()) + } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } + c.setVoters(voters(1)) + if drainNotifyCh(commitCh) { + t.Fatalf("unexpected commit notify") + } + c.match("s1", 12) + if c.getCommitIndex() != 12 { + t.Fatalf("expected 12 entries committed, found %d", + c.getCommitIndex()) + } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } +} diff --git a/future.go b/future.go index 54d2d32ca74..db3771bac2b 100644 --- a/future.go +++ b/future.go @@ -74,7 +74,6 @@ func (d *deferError) respond(err error) { type logFuture struct { deferError log Log - policy quorumPolicy response interface{} dispatch time.Time } diff --git a/inflight.go b/inflight.go deleted file mode 100644 index 7014ff50394..00000000000 --- a/inflight.go +++ /dev/null @@ -1,213 +0,0 @@ -package raft - -import ( - "container/list" - "sync" -) - -// QuorumPolicy allows individual logFutures to have different -// commitment rules while still using the inflight mechanism. -type quorumPolicy interface { - // Checks if a commit from a given peer is enough to - // satisfy the commitment rules - Commit() bool - - // Checks if a commit is committed - IsCommitted() bool -} - -// MajorityQuorum is used by Apply transactions and requires -// a simple majority of nodes. -type majorityQuorum struct { - count int - votesNeeded int -} - -func newMajorityQuorum(clusterSize int) *majorityQuorum { - votesNeeded := (clusterSize / 2) + 1 - return &majorityQuorum{count: 0, votesNeeded: votesNeeded} -} - -func (m *majorityQuorum) Commit() bool { - m.count++ - return m.count >= m.votesNeeded -} - -func (m *majorityQuorum) IsCommitted() bool { - return m.count >= m.votesNeeded -} - -// Inflight is used to track operations that are still in-flight. -type inflight struct { - sync.Mutex - committed *list.List - commitCh chan struct{} - minCommit uint64 - maxCommit uint64 - operations map[uint64]*logFuture - stopCh chan struct{} -} - -// NewInflight returns an inflight struct that notifies -// the provided channel when logs are finished committing. -func newInflight(commitCh chan struct{}) *inflight { - return &inflight{ - committed: list.New(), - commitCh: commitCh, - minCommit: 0, - maxCommit: 0, - operations: make(map[uint64]*logFuture), - stopCh: make(chan struct{}), - } -} - -// Start is used to mark a logFuture as being inflight. It -// also commits the entry, as it is assumed the leader is -// starting. -func (i *inflight) Start(l *logFuture) { - i.Lock() - defer i.Unlock() - i.start(l) -} - -// StartAll is used to mark a list of logFuture's as being -// inflight. It also commits each entry as the leader is -// assumed to be starting. -func (i *inflight) StartAll(logs []*logFuture) { - i.Lock() - defer i.Unlock() - for _, l := range logs { - i.start(l) - } -} - -// start is used to mark a single entry as inflight, -// must be invoked with the lock held. -func (i *inflight) start(l *logFuture) { - idx := l.log.Index - i.operations[idx] = l - - if idx > i.maxCommit { - i.maxCommit = idx - } - if i.minCommit == 0 { - i.minCommit = idx - } - i.commit(idx) -} - -// Cancel is used to cancel all in-flight operations. -// This is done when the leader steps down, and all futures -// are sent the given error. -func (i *inflight) Cancel(err error) { - // Close the channel first to unblock any pending commits - close(i.stopCh) - - // Lock after close to avoid deadlock - i.Lock() - defer i.Unlock() - - // Respond to all inflight operations - for _, op := range i.operations { - op.respond(err) - } - - // Clear all the committed but not processed - for e := i.committed.Front(); e != nil; e = e.Next() { - e.Value.(*logFuture).respond(err) - } - - // Clear the map - i.operations = make(map[uint64]*logFuture) - - // Clear the list of committed - i.committed = list.New() - - // Close the commmitCh - close(i.commitCh) - - // Reset indexes - i.minCommit = 0 - i.maxCommit = 0 -} - -// Committed returns all the committed operations in order. -func (i *inflight) Committed() (l *list.List) { - i.Lock() - l, i.committed = i.committed, list.New() - i.Unlock() - return l -} - -// Commit is used by leader replication routines to indicate that -// a follower was finished committing a log to disk. -func (i *inflight) Commit(index uint64) { - i.Lock() - defer i.Unlock() - i.commit(index) -} - -// CommitRange is used to commit a range of indexes inclusively. -// It is optimized to avoid commits for indexes that are not tracked. -func (i *inflight) CommitRange(minIndex, maxIndex uint64) { - i.Lock() - defer i.Unlock() - - // Update the minimum index - minIndex = max(i.minCommit, minIndex) - - // Commit each index - for idx := minIndex; idx <= maxIndex; idx++ { - i.commit(idx) - } -} - -// commit is used to commit a single index. Must be called with the lock held. -func (i *inflight) commit(index uint64) { - op, ok := i.operations[index] - if !ok { - // Ignore if not in the map, as it may be committed already - return - } - - // Check if we've satisfied the commit - if !op.policy.Commit() { - return - } - - // Cannot commit if this is not the minimum inflight. This can happen - // if the quorum size changes, meaning a previous commit requires a larger - // quorum that this commit. We MUST block until the previous log is committed, - // otherwise logs will be applied out of order. - if index != i.minCommit { - return - } - -NOTIFY: - // Add the operation to the committed list - i.committed.PushBack(op) - - // Stop tracking since it is committed - delete(i.operations, index) - - // Update the indexes - if index == i.maxCommit { - i.minCommit = 0 - i.maxCommit = 0 - - } else { - i.minCommit++ - } - - // Check if the next in-flight operation is ready - if i.minCommit != 0 { - op = i.operations[i.minCommit] - if op.policy.IsCommitted() { - index = i.minCommit - goto NOTIFY - } - } - - // Async notify of ready operations - asyncNotifyCh(i.commitCh) -} diff --git a/inflight_test.go b/inflight_test.go deleted file mode 100644 index a9f57d6ead7..00000000000 --- a/inflight_test.go +++ /dev/null @@ -1,150 +0,0 @@ -package raft - -import ( - "fmt" - "testing" -) - -func TestInflight_StartCommit(t *testing.T) { - commitCh := make(chan struct{}, 1) - in := newInflight(commitCh) - - // Commit a transaction as being in flight - l := &logFuture{log: Log{Index: 1}} - l.policy = newMajorityQuorum(5) - in.Start(l) - - // Commit 3 times - in.Commit(1) - if in.Committed().Len() != 0 { - t.Fatalf("should not be commited") - } - - in.Commit(1) - if in.Committed().Len() != 1 { - t.Fatalf("should be commited") - } - - // Already committed but should work anyways - in.Commit(1) -} - -func TestInflight_Cancel(t *testing.T) { - commitCh := make(chan struct{}, 1) - in := newInflight(commitCh) - - // Commit a transaction as being in flight - l := &logFuture{ - log: Log{Index: 1}, - } - l.init() - l.policy = newMajorityQuorum(3) - in.Start(l) - - // Cancel with an error - err := fmt.Errorf("error 1") - in.Cancel(err) - - // Should get an error return - if l.Error() != err { - t.Fatalf("expected error") - } -} - -func TestInflight_StartAll(t *testing.T) { - commitCh := make(chan struct{}, 1) - in := newInflight(commitCh) - - // Commit a few transaction as being in flight - l1 := &logFuture{log: Log{Index: 2}} - l1.policy = newMajorityQuorum(5) - l2 := &logFuture{log: Log{Index: 3}} - l2.policy = newMajorityQuorum(5) - l3 := &logFuture{log: Log{Index: 4}} - l3.policy = newMajorityQuorum(5) - - // Start all the entries - in.StartAll([]*logFuture{l1, l2, l3}) - - // Commit ranges - in.CommitRange(1, 5) - in.CommitRange(1, 4) - in.CommitRange(1, 10) - - // Should get 3 back - if in.Committed().Len() != 3 { - t.Fatalf("expected all 3 to commit") - } -} - -func TestInflight_CommitRange(t *testing.T) { - commitCh := make(chan struct{}, 1) - in := newInflight(commitCh) - - // Commit a few transaction as being in flight - l1 := &logFuture{log: Log{Index: 2}} - l1.policy = newMajorityQuorum(5) - in.Start(l1) - - l2 := &logFuture{log: Log{Index: 3}} - l2.policy = newMajorityQuorum(5) - in.Start(l2) - - l3 := &logFuture{log: Log{Index: 4}} - l3.policy = newMajorityQuorum(5) - in.Start(l3) - - // Commit ranges - in.CommitRange(1, 5) - in.CommitRange(1, 4) - in.CommitRange(1, 10) - - // Should get 3 back - if in.Committed().Len() != 3 { - t.Fatalf("expected all 3 to commit") - } -} - -// Should panic if we commit non contiguously! -func TestInflight_NonContiguous(t *testing.T) { - commitCh := make(chan struct{}, 1) - in := newInflight(commitCh) - - // Commit a few transaction as being in flight - l1 := &logFuture{log: Log{Index: 2}} - l1.policy = newMajorityQuorum(5) - in.Start(l1) - - l2 := &logFuture{log: Log{Index: 3}} - l2.policy = newMajorityQuorum(5) - in.Start(l2) - - in.Commit(3) - in.Commit(3) - in.Commit(3) // panic! - - if in.Committed().Len() != 0 { - t.Fatalf("should not commit") - } - - in.Commit(2) - in.Commit(2) - in.Commit(2) // panic! - - committed := in.Committed() - if committed.Len() != 2 { - t.Fatalf("should commit both") - } - - current := committed.Front() - l := current.Value.(*logFuture) - if l.log.Index != 2 { - t.Fatalf("bad: %v", *l) - } - - current = current.Next() - l = current.Value.(*logFuture) - if l.log.Index != 3 { - t.Fatalf("bad: %v", *l) - } -} diff --git a/raft.go b/raft.go index 0a962fee07a..2a648b07177 100644 --- a/raft.go +++ b/raft.go @@ -2,6 +2,7 @@ package raft import ( "bytes" + "container/list" "errors" "fmt" "io" @@ -64,11 +65,12 @@ type commitTuple struct { // leaderState is state that is used while we are a leader. type leaderState struct { - commitCh chan struct{} - inflight *inflight - replState map[string]*followerReplication - notify map[*verifyFuture]struct{} - stepDown chan struct{} + commitCh chan struct{} + commitment *commitment + inflight *list.List // list of logFuture in log index order + replState map[string]*followerReplication + notify map[*verifyFuture]struct{} + stepDown chan struct{} } // Raft implements a Raft node. @@ -750,7 +752,10 @@ func (r *Raft) runLeader() { // Setup leader state r.leaderState.commitCh = make(chan struct{}, 1) - r.leaderState.inflight = newInflight(r.leaderState.commitCh) + r.leaderState.commitment = newCommitment(r.leaderState.commitCh, + append([]string{r.localAddr}, r.peers...), + r.getLastIndex()+1 /* first index that may be committed in this term */) + r.leaderState.inflight = list.New() r.leaderState.replState = make(map[string]*followerReplication) r.leaderState.notify = make(map[*verifyFuture]struct{}) r.leaderState.stepDown = make(chan struct{}, 1) @@ -769,8 +774,10 @@ func (r *Raft) runLeader() { close(p.stopCh) } - // Cancel inflight requests - r.leaderState.inflight.Cancel(ErrLeadershipLost) + // Respond to all inflight operations + for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() { + e.Value.(*logFuture).respond(ErrLeadershipLost) + } // Respond to any pending verify requests for future := range r.leaderState.notify { @@ -779,6 +786,7 @@ func (r *Raft) runLeader() { // Clear all the state r.leaderState.commitCh = nil + r.leaderState.commitment = nil r.leaderState.inflight = nil r.leaderState.replState = nil r.leaderState.notify = nil @@ -845,11 +853,10 @@ func (r *Raft) startReplication(peer string) { lastIdx := r.getLastIndex() s := &followerReplication{ peer: peer, - inflight: r.leaderState.inflight, + commitment: r.leaderState.commitment, stopCh: make(chan uint64, 1), triggerCh: make(chan struct{}, 1), currentTerm: r.getCurrentTerm(), - matchIndex: 0, nextIndex: lastIdx + 1, lastContact: time.Now(), notifyCh: make(chan struct{}, 1), @@ -881,17 +888,23 @@ func (r *Raft) leaderLoop() { r.setState(Follower) case <-r.leaderState.commitCh: - // Get the committed messages - committed := r.leaderState.inflight.Committed() - for e := committed.Front(); e != nil; e = e.Next() { - // Measure the commit time + // Process the newly committed entries + commitIndex := r.leaderState.commitment.getCommitIndex() + r.setCommitIndex(commitIndex) + for { + e := r.leaderState.inflight.Front() + if e == nil { + break + } commitLog := e.Value.(*logFuture) - metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch) - - // Increment the commit index idx := commitLog.log.Index - r.setCommitIndex(idx) + if idx > commitIndex { + break + } + // Measure the commit time + metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch) r.processLogs(idx, commitLog) + r.leaderState.inflight.Remove(e) } case v := <-r.verifyCh: @@ -961,6 +974,9 @@ func (r *Raft) leaderLoop() { // behavior. if ok := r.processLog(&log.log, nil, true); ok { stepDown = true + r.leaderState.commitment.setVoters(r.peers) + } else { + r.leaderState.commitment.setVoters(append([]string{r.localAddr}, r.peers...)) } } @@ -1107,10 +1123,11 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { for idx, applyLog := range applyLogs { applyLog.dispatch = now - applyLog.log.Index = lastIndex + uint64(idx) + 1 + lastIndex++ + applyLog.log.Index = lastIndex applyLog.log.Term = term - applyLog.policy = newMajorityQuorum(len(r.peers) + 1) logs[idx] = &applyLog.log + r.leaderState.inflight.PushBack(applyLog) } // Write the log entry locally @@ -1122,12 +1139,10 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { r.setState(Follower) return } - - // Add this to the inflight logs, commit - r.leaderState.inflight.StartAll(applyLogs) + r.leaderState.commitment.match(r.localAddr, lastIndex) // Update the last log since it's on disk now - r.setLastLogIndex(lastIndex + uint64(len(applyLogs))) + r.setLastLogIndex(lastIndex) r.setLastLogTerm(term) // Notify the replicators of the new log diff --git a/replication.go b/replication.go index 6a01631d237..ee16ca4e618 100644 --- a/replication.go +++ b/replication.go @@ -25,14 +25,13 @@ var ( ) type followerReplication struct { - peer string - inflight *inflight + peer string + commitment *commitment stopCh chan uint64 triggerCh chan struct{} currentTerm uint64 - matchIndex uint64 nextIndex uint64 lastContact time.Time @@ -180,7 +179,6 @@ START: s.allowPipeline = true } else { s.nextIndex = max(min(s.nextIndex-1, resp.LastLog+1), 1) - s.matchIndex = s.nextIndex - 1 if resp.NoRetryBackoff { s.failures = 0 } else { @@ -265,12 +263,9 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { // Check for success if resp.Success { - // Mark any inflight logs as committed - s.inflight.CommitRange(s.matchIndex+1, meta.Index) - // Update the indexes - s.matchIndex = meta.Index - s.nextIndex = s.matchIndex + 1 + s.nextIndex = meta.Index + 1 + s.commitment.match(s.peer, meta.Index) // Clear any failures s.failures = 0 @@ -503,13 +498,9 @@ func (r *Raft) handleStaleTerm(s *followerReplication) { func updateLastAppended(s *followerReplication, req *AppendEntriesRequest) { // Mark any inflight logs as committed if logs := req.Entries; len(logs) > 0 { - first := logs[0] last := logs[len(logs)-1] - s.inflight.CommitRange(first.Index, last.Index) - - // Update the indexes - s.matchIndex = last.Index s.nextIndex = last.Index + 1 + s.commitment.match(s.peer, last.Index) } // Notify still leader diff --git a/util.go b/util.go index a6642c4c9e6..c0e1925f828 100644 --- a/util.go +++ b/util.go @@ -85,6 +85,17 @@ func asyncNotifyCh(ch chan struct{}) { } } +// drainNotifyCh empties out a single-item notification channel without +// blocking, and returns whether it received anything. +func drainNotifyCh(ch chan struct{}) bool { + select { + case <-ch: + return true + default: + return false + } +} + // asyncNotifyBool is used to do an async notification // on a bool channel. func asyncNotifyBool(ch chan bool, v bool) { @@ -198,3 +209,10 @@ func backoff(base time.Duration, round, limit uint64) time.Duration { } return base } + +// Needed for sorting []uint64, used to determine commitment +type uint64Slice []uint64 + +func (p uint64Slice) Len() int { return len(p) } +func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }