From 259e54d3ce80b300171f2706cd54b7b40c33ce5e Mon Sep 17 00:00:00 2001 From: Diego Ongaro Date: Thu, 18 Feb 2016 14:32:36 -0800 Subject: [PATCH 1/9] Replace inflight tracker with commitment tracker inflight.go used to have two jobs: it buffered log futures until they were committed, and it marked new log entries committed. This second job is what commitment.go does, while raft.go now tracks its own uncommitted log futures. Inflight used to track the replication factor of each log future/entry independently. It kept a count of how many servers stored that entry, and how many were needed to mark it committed. The primary issue is that a simple count of how many servers store an entry is fragile with respect to membership changes, where the identity of the voting servers might change over time. It was also a fair amount of bookkeeping to track this information per entry. The new 'commitment' module instead tracks the match index for each server: the last entry that the server has on disk that agrees with this leader. This has several advantages: - No information is kept per log entry. - Only the latest known membership configuration is used to mark entries committed, as described in the Raft paper/dissertation. - Servers that aren't voters are simply ignored for the purpose of commitment. This is useful for leaders that remove themselves today but would also be useful for more general non-voting members in the future. Because no information is kept per log entry, the log futures can now be state kept local to the leader goroutine, without any need for locking. This simplified the interface to commitment.go (relative to inflight.go) significantly, while only adding a few lines here and there to raft.go. --- commitment.go | 100 +++++++++++++++++++++ commitment_test.go | 159 +++++++++++++++++++++++++++++++++ future.go | 1 - inflight.go | 213 --------------------------------------------- inflight_test.go | 150 ------------------------------- raft.go | 64 +++++++++----- replication.go | 19 ++-- util.go | 7 ++ 8 files changed, 311 insertions(+), 402 deletions(-) create mode 100644 commitment.go create mode 100644 commitment_test.go delete mode 100644 inflight.go delete mode 100644 inflight_test.go diff --git a/commitment.go b/commitment.go new file mode 100644 index 00000000000..f48d89fff80 --- /dev/null +++ b/commitment.go @@ -0,0 +1,100 @@ +package raft + +import ( + "sort" + "sync" +) + +// Commitment is used to advance the leader's commit index. The leader and +// replication goroutines report in newly written entries with Match(), and +// this notifies on commitCh when the commit index has advanced. +type commitment struct { + // protectes matchIndexes and commitIndex + sync.Mutex + // notified when commitIndex increases + commitCh chan struct{} + // voter to log index: the server stores up through this log entry + matchIndexes map[string]uint64 + // a quorum stores up through this log entry. monotonically increases. + commitIndex uint64 + // the first index of this leader's term: this needs to be replicated to a + // majority of the cluster before this leader may mark anything committed + // (per Raft's commitment rule) + startIndex uint64 +} + +// newCommitment returns an commitment struct that notifies the provided +// channel when log entries have been committed. A new commitment struct is +// created each time this server becomes leader for a particular term. +// 'voters' are the voting members of the cluster, including the +// local server except when it's removed itself from the cluster. +// 'startIndex' is the first index created in this term (see +// its description above). +func newCommitment(commitCh chan struct{}, voters []string, startIndex uint64) *commitment { + matchIndexes := make(map[string]uint64, len(voters)) + for _, voter := range voters { + matchIndexes[voter] = 0 + } + return &commitment{ + commitCh: commitCh, + matchIndexes: matchIndexes, + commitIndex: 0, + startIndex: startIndex, + } +} + +// Called when a new cluster membership configuration is created: it will be +// used to determine commitment from now on. 'voters' are the voting members of +// the cluster, including the local server except when it's removed itself from +// the cluster. +func (c *commitment) setVoters(voters []string) { + c.Lock() + defer c.Unlock() + oldMatchIndexes := c.matchIndexes + c.matchIndexes = make(map[string]uint64, len(voters)) + for _, voter := range voters { + c.matchIndexes[voter] = oldMatchIndexes[voter] // defaults to 0 + } + c.recalculate() +} + +// Called by leader after commitCh is notified +func (c *commitment) getCommitIndex() uint64 { + c.Lock() + defer c.Unlock() + return c.commitIndex +} + +// Match is called once a server completes writing entries to disk: either the +// leader has written the new entry or a follower has replied to an +// AppendEntries RPC. The given server's disk agrees with this server's log up +// through the given index. +func (c *commitment) match(server string, matchIndex uint64) { + c.Lock() + defer c.Unlock() + if prev, hasVote := c.matchIndexes[server]; hasVote && matchIndex > prev { + c.matchIndexes[server] = matchIndex + c.recalculate() + } +} + +// Internal helper to calculate new commitIndex from matchIndexes. +// Must be called with lock held. +func (c *commitment) recalculate() { + if len(c.matchIndexes) == 0 { + return + } + var quorumMatchIndex uint64 + { + matched := make([]uint64, 0, len(c.matchIndexes)) + for _, idx := range c.matchIndexes { + matched = append(matched, idx) + } + sort.Sort(uint64Slice(matched)) + quorumMatchIndex = matched[len(c.matchIndexes)/2] + } + if quorumMatchIndex > c.commitIndex && quorumMatchIndex >= c.startIndex { + c.commitIndex = quorumMatchIndex + asyncNotifyCh(c.commitCh) + } +} diff --git a/commitment_test.go b/commitment_test.go new file mode 100644 index 00000000000..1ba5c4498c9 --- /dev/null +++ b/commitment_test.go @@ -0,0 +1,159 @@ +package raft + +import ( + "testing" +) + +// Returns a slice of server names of size n. +func voters(n int) []string { + if n > 7 { + panic("only up to 7 servers implemented") + } + return []string{"s1", "s2", "s3", "s4", "s5", "s6", "s7"}[:n+1] +} + +// Tests setVoters() keeps matchIndexes where possible. +func TestCommitment_setVoters(t *testing.T) { + commitCh := make(chan struct{}, 1) + c := newCommitment(commitCh, []string{"a", "b", "c"}, 0) + c.match("a", 10) + c.match("b", 20) + c.match("c", 30) + // commitIndex: 20 + c.setVoters([]string{"c", "d", "e"}) + // c: 30, d: 0, e: 0 + c.match("e", 40) + if c.getCommitIndex() != 30 { + t.Fatalf("expected 30 entries committed, found %d", + c.getCommitIndex()) + } +} + +// Tests match() being called with smaller index than before. +func TestCommitment_match_max(t *testing.T) { + commitCh := make(chan struct{}, 1) + c := newCommitment(commitCh, voters(5), 4) + + c.match("s1", 8) + c.match("s2", 8) + c.match("s2", 1) + c.match("s3", 8) + + if c.getCommitIndex() != 8 { + t.Fatalf("calling match with an earlier index should be ignored") + } +} + +// Tests match() being called with non-voters. +func TestCommitment_match_nonVoting(t *testing.T) { + commitCh := make(chan struct{}, 1) + c := newCommitment(commitCh, voters(5), 4) + + c.match("s1", 8) + c.match("s2", 8) + c.match("s3", 8) + + c.match("s90", 10) + c.match("s91", 10) + c.match("s92", 10) + + if c.getCommitIndex() != 8 { + t.Fatalf("non-voting servers shouldn't be able to commit") + } +} + +// Tests recalculate() algorithm. +func TestCommitment_recalculate(t *testing.T) { + commitCh := make(chan struct{}, 1) + c := newCommitment(commitCh, voters(5), 0) + + c.match("s1", 30) + c.match("s2", 20) + + if c.getCommitIndex() != 0 { + t.Fatalf("shouldn't commit after two of five servers") + } + + c.match("s3", 10) + if c.getCommitIndex() != 10 { + t.Fatalf("expected 10 entries committed, found %d", + c.getCommitIndex()) + } + c.match("s4", 15) + if c.getCommitIndex() != 15 { + t.Fatalf("expected 15 entries committed, found %d", + c.getCommitIndex()) + } + + c.setVoters(voters(3)) + // s1: 30, s2: 20, s3: 10 + if c.getCommitIndex() != 20 { + t.Fatalf("expected 20 entries committed, found %d", + c.getCommitIndex()) + } + + c.setVoters(voters(4)) + // s1: 30, s2: 20, s3: 10, s4: 0 + c.match("s2", 25) + if c.getCommitIndex() != 20 { + t.Fatalf("expected 20 entries committed, found %d", + c.getCommitIndex()) + } + c.match("s4", 23) + if c.getCommitIndex() != 23 { + t.Fatalf("expected 23 entries committed, found %d", + c.getCommitIndex()) + } +} + +// Tests recalculate() respecting startIndex. +func TestCommitment_recalculate_startIndex(t *testing.T) { + commitCh := make(chan struct{}, 1) + c := newCommitment(commitCh, voters(5), 4) + + c.match("s1", 3) + c.match("s2", 3) + c.match("s3", 3) + + if c.getCommitIndex() != 0 { + t.Fatalf("can't commit until startIndex is replicated to a quorum") + } + + c.match("s1", 4) + c.match("s2", 4) + c.match("s3", 4) + + if c.getCommitIndex() != 4 { + t.Fatalf("should be able to commit startIndex once replicated to a quorum") + } +} + +// With no voting members in the cluster, the most sane behavior is probably +// to not not mark anything committed. +func TestCommitment_noVoterSanity(t *testing.T) { + commitCh := make(chan struct{}, 1) + c := newCommitment(commitCh, []string{}, 4) + c.match("s1", 10) + c.setVoters([]string{}) + c.match("s1", 10) + if c.getCommitIndex() != 0 { + t.Fatalf("no voting servers: shouldn't be able to commit") + } +} + +// Single voter commits immediately. +func TestCommitment_singleVoter(t *testing.T) { + commitCh := make(chan struct{}, 1) + c := newCommitment(commitCh, voters(1), 4) + c.match("s1", 10) + if c.getCommitIndex() != 10 { + t.Fatalf("expected 10 entries committed, found %d", + c.getCommitIndex()) + } + c.setVoters(voters(1)) + c.match("s1", 12) + if c.getCommitIndex() != 12 { + t.Fatalf("expected 12 entries committed, found %d", + c.getCommitIndex()) + } +} diff --git a/future.go b/future.go index 54d2d32ca74..db3771bac2b 100644 --- a/future.go +++ b/future.go @@ -74,7 +74,6 @@ func (d *deferError) respond(err error) { type logFuture struct { deferError log Log - policy quorumPolicy response interface{} dispatch time.Time } diff --git a/inflight.go b/inflight.go deleted file mode 100644 index 7014ff50394..00000000000 --- a/inflight.go +++ /dev/null @@ -1,213 +0,0 @@ -package raft - -import ( - "container/list" - "sync" -) - -// QuorumPolicy allows individual logFutures to have different -// commitment rules while still using the inflight mechanism. -type quorumPolicy interface { - // Checks if a commit from a given peer is enough to - // satisfy the commitment rules - Commit() bool - - // Checks if a commit is committed - IsCommitted() bool -} - -// MajorityQuorum is used by Apply transactions and requires -// a simple majority of nodes. -type majorityQuorum struct { - count int - votesNeeded int -} - -func newMajorityQuorum(clusterSize int) *majorityQuorum { - votesNeeded := (clusterSize / 2) + 1 - return &majorityQuorum{count: 0, votesNeeded: votesNeeded} -} - -func (m *majorityQuorum) Commit() bool { - m.count++ - return m.count >= m.votesNeeded -} - -func (m *majorityQuorum) IsCommitted() bool { - return m.count >= m.votesNeeded -} - -// Inflight is used to track operations that are still in-flight. -type inflight struct { - sync.Mutex - committed *list.List - commitCh chan struct{} - minCommit uint64 - maxCommit uint64 - operations map[uint64]*logFuture - stopCh chan struct{} -} - -// NewInflight returns an inflight struct that notifies -// the provided channel when logs are finished committing. -func newInflight(commitCh chan struct{}) *inflight { - return &inflight{ - committed: list.New(), - commitCh: commitCh, - minCommit: 0, - maxCommit: 0, - operations: make(map[uint64]*logFuture), - stopCh: make(chan struct{}), - } -} - -// Start is used to mark a logFuture as being inflight. It -// also commits the entry, as it is assumed the leader is -// starting. -func (i *inflight) Start(l *logFuture) { - i.Lock() - defer i.Unlock() - i.start(l) -} - -// StartAll is used to mark a list of logFuture's as being -// inflight. It also commits each entry as the leader is -// assumed to be starting. -func (i *inflight) StartAll(logs []*logFuture) { - i.Lock() - defer i.Unlock() - for _, l := range logs { - i.start(l) - } -} - -// start is used to mark a single entry as inflight, -// must be invoked with the lock held. -func (i *inflight) start(l *logFuture) { - idx := l.log.Index - i.operations[idx] = l - - if idx > i.maxCommit { - i.maxCommit = idx - } - if i.minCommit == 0 { - i.minCommit = idx - } - i.commit(idx) -} - -// Cancel is used to cancel all in-flight operations. -// This is done when the leader steps down, and all futures -// are sent the given error. -func (i *inflight) Cancel(err error) { - // Close the channel first to unblock any pending commits - close(i.stopCh) - - // Lock after close to avoid deadlock - i.Lock() - defer i.Unlock() - - // Respond to all inflight operations - for _, op := range i.operations { - op.respond(err) - } - - // Clear all the committed but not processed - for e := i.committed.Front(); e != nil; e = e.Next() { - e.Value.(*logFuture).respond(err) - } - - // Clear the map - i.operations = make(map[uint64]*logFuture) - - // Clear the list of committed - i.committed = list.New() - - // Close the commmitCh - close(i.commitCh) - - // Reset indexes - i.minCommit = 0 - i.maxCommit = 0 -} - -// Committed returns all the committed operations in order. -func (i *inflight) Committed() (l *list.List) { - i.Lock() - l, i.committed = i.committed, list.New() - i.Unlock() - return l -} - -// Commit is used by leader replication routines to indicate that -// a follower was finished committing a log to disk. -func (i *inflight) Commit(index uint64) { - i.Lock() - defer i.Unlock() - i.commit(index) -} - -// CommitRange is used to commit a range of indexes inclusively. -// It is optimized to avoid commits for indexes that are not tracked. -func (i *inflight) CommitRange(minIndex, maxIndex uint64) { - i.Lock() - defer i.Unlock() - - // Update the minimum index - minIndex = max(i.minCommit, minIndex) - - // Commit each index - for idx := minIndex; idx <= maxIndex; idx++ { - i.commit(idx) - } -} - -// commit is used to commit a single index. Must be called with the lock held. -func (i *inflight) commit(index uint64) { - op, ok := i.operations[index] - if !ok { - // Ignore if not in the map, as it may be committed already - return - } - - // Check if we've satisfied the commit - if !op.policy.Commit() { - return - } - - // Cannot commit if this is not the minimum inflight. This can happen - // if the quorum size changes, meaning a previous commit requires a larger - // quorum that this commit. We MUST block until the previous log is committed, - // otherwise logs will be applied out of order. - if index != i.minCommit { - return - } - -NOTIFY: - // Add the operation to the committed list - i.committed.PushBack(op) - - // Stop tracking since it is committed - delete(i.operations, index) - - // Update the indexes - if index == i.maxCommit { - i.minCommit = 0 - i.maxCommit = 0 - - } else { - i.minCommit++ - } - - // Check if the next in-flight operation is ready - if i.minCommit != 0 { - op = i.operations[i.minCommit] - if op.policy.IsCommitted() { - index = i.minCommit - goto NOTIFY - } - } - - // Async notify of ready operations - asyncNotifyCh(i.commitCh) -} diff --git a/inflight_test.go b/inflight_test.go deleted file mode 100644 index a9f57d6ead7..00000000000 --- a/inflight_test.go +++ /dev/null @@ -1,150 +0,0 @@ -package raft - -import ( - "fmt" - "testing" -) - -func TestInflight_StartCommit(t *testing.T) { - commitCh := make(chan struct{}, 1) - in := newInflight(commitCh) - - // Commit a transaction as being in flight - l := &logFuture{log: Log{Index: 1}} - l.policy = newMajorityQuorum(5) - in.Start(l) - - // Commit 3 times - in.Commit(1) - if in.Committed().Len() != 0 { - t.Fatalf("should not be commited") - } - - in.Commit(1) - if in.Committed().Len() != 1 { - t.Fatalf("should be commited") - } - - // Already committed but should work anyways - in.Commit(1) -} - -func TestInflight_Cancel(t *testing.T) { - commitCh := make(chan struct{}, 1) - in := newInflight(commitCh) - - // Commit a transaction as being in flight - l := &logFuture{ - log: Log{Index: 1}, - } - l.init() - l.policy = newMajorityQuorum(3) - in.Start(l) - - // Cancel with an error - err := fmt.Errorf("error 1") - in.Cancel(err) - - // Should get an error return - if l.Error() != err { - t.Fatalf("expected error") - } -} - -func TestInflight_StartAll(t *testing.T) { - commitCh := make(chan struct{}, 1) - in := newInflight(commitCh) - - // Commit a few transaction as being in flight - l1 := &logFuture{log: Log{Index: 2}} - l1.policy = newMajorityQuorum(5) - l2 := &logFuture{log: Log{Index: 3}} - l2.policy = newMajorityQuorum(5) - l3 := &logFuture{log: Log{Index: 4}} - l3.policy = newMajorityQuorum(5) - - // Start all the entries - in.StartAll([]*logFuture{l1, l2, l3}) - - // Commit ranges - in.CommitRange(1, 5) - in.CommitRange(1, 4) - in.CommitRange(1, 10) - - // Should get 3 back - if in.Committed().Len() != 3 { - t.Fatalf("expected all 3 to commit") - } -} - -func TestInflight_CommitRange(t *testing.T) { - commitCh := make(chan struct{}, 1) - in := newInflight(commitCh) - - // Commit a few transaction as being in flight - l1 := &logFuture{log: Log{Index: 2}} - l1.policy = newMajorityQuorum(5) - in.Start(l1) - - l2 := &logFuture{log: Log{Index: 3}} - l2.policy = newMajorityQuorum(5) - in.Start(l2) - - l3 := &logFuture{log: Log{Index: 4}} - l3.policy = newMajorityQuorum(5) - in.Start(l3) - - // Commit ranges - in.CommitRange(1, 5) - in.CommitRange(1, 4) - in.CommitRange(1, 10) - - // Should get 3 back - if in.Committed().Len() != 3 { - t.Fatalf("expected all 3 to commit") - } -} - -// Should panic if we commit non contiguously! -func TestInflight_NonContiguous(t *testing.T) { - commitCh := make(chan struct{}, 1) - in := newInflight(commitCh) - - // Commit a few transaction as being in flight - l1 := &logFuture{log: Log{Index: 2}} - l1.policy = newMajorityQuorum(5) - in.Start(l1) - - l2 := &logFuture{log: Log{Index: 3}} - l2.policy = newMajorityQuorum(5) - in.Start(l2) - - in.Commit(3) - in.Commit(3) - in.Commit(3) // panic! - - if in.Committed().Len() != 0 { - t.Fatalf("should not commit") - } - - in.Commit(2) - in.Commit(2) - in.Commit(2) // panic! - - committed := in.Committed() - if committed.Len() != 2 { - t.Fatalf("should commit both") - } - - current := committed.Front() - l := current.Value.(*logFuture) - if l.log.Index != 2 { - t.Fatalf("bad: %v", *l) - } - - current = current.Next() - l = current.Value.(*logFuture) - if l.log.Index != 3 { - t.Fatalf("bad: %v", *l) - } -} diff --git a/raft.go b/raft.go index 0a962fee07a..6698011e733 100644 --- a/raft.go +++ b/raft.go @@ -2,6 +2,7 @@ package raft import ( "bytes" + "container/list" "errors" "fmt" "io" @@ -64,11 +65,12 @@ type commitTuple struct { // leaderState is state that is used while we are a leader. type leaderState struct { - commitCh chan struct{} - inflight *inflight - replState map[string]*followerReplication - notify map[*verifyFuture]struct{} - stepDown chan struct{} + commitCh chan struct{} + commitment *commitment + inflight *list.List // list of logFuture in log index order + replState map[string]*followerReplication + notify map[*verifyFuture]struct{} + stepDown chan struct{} } // Raft implements a Raft node. @@ -750,7 +752,10 @@ func (r *Raft) runLeader() { // Setup leader state r.leaderState.commitCh = make(chan struct{}, 1) - r.leaderState.inflight = newInflight(r.leaderState.commitCh) + r.leaderState.commitment = newCommitment(r.leaderState.commitCh, + append([]string{r.localAddr}, r.peers...), + r.getLastIndex()+1) + r.leaderState.inflight = list.New() r.leaderState.replState = make(map[string]*followerReplication) r.leaderState.notify = make(map[*verifyFuture]struct{}) r.leaderState.stepDown = make(chan struct{}, 1) @@ -769,8 +774,10 @@ func (r *Raft) runLeader() { close(p.stopCh) } - // Cancel inflight requests - r.leaderState.inflight.Cancel(ErrLeadershipLost) + // Respond to all inflight operations + for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() { + e.Value.(*logFuture).respond(ErrLeadershipLost) + } // Respond to any pending verify requests for future := range r.leaderState.notify { @@ -779,6 +786,7 @@ func (r *Raft) runLeader() { // Clear all the state r.leaderState.commitCh = nil + r.leaderState.commitment = nil r.leaderState.inflight = nil r.leaderState.replState = nil r.leaderState.notify = nil @@ -845,11 +853,10 @@ func (r *Raft) startReplication(peer string) { lastIdx := r.getLastIndex() s := &followerReplication{ peer: peer, - inflight: r.leaderState.inflight, + commitment: r.leaderState.commitment, stopCh: make(chan uint64, 1), triggerCh: make(chan struct{}, 1), currentTerm: r.getCurrentTerm(), - matchIndex: 0, nextIndex: lastIdx + 1, lastContact: time.Now(), notifyCh: make(chan struct{}, 1), @@ -863,6 +870,7 @@ func (r *Raft) startReplication(peer string) { // leaderLoop is the hot loop for a leader. It is invoked // after all the various leader setup is done. func (r *Raft) leaderLoop() { + // TODO: reconsider // stepDown is used to track if there is an inflight log that // would cause us to lose leadership (specifically a RemovePeer of // ourselves). If this is the case, we must not allow any logs to @@ -881,17 +889,23 @@ func (r *Raft) leaderLoop() { r.setState(Follower) case <-r.leaderState.commitCh: - // Get the committed messages - committed := r.leaderState.inflight.Committed() - for e := committed.Front(); e != nil; e = e.Next() { - // Measure the commit time + // Process the newly committed entries + commitIndex := r.leaderState.commitment.getCommitIndex() + r.setCommitIndex(commitIndex) + for { + e := r.leaderState.inflight.Front() + if e == nil { + break + } commitLog := e.Value.(*logFuture) - metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch) - - // Increment the commit index idx := commitLog.log.Index - r.setCommitIndex(idx) + if idx > commitIndex { + break + } + // Measure the commit time + metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch) r.processLogs(idx, commitLog) + r.leaderState.inflight.Remove(e) } case v := <-r.verifyCh: @@ -961,6 +975,9 @@ func (r *Raft) leaderLoop() { // behavior. if ok := r.processLog(&log.log, nil, true); ok { stepDown = true + r.leaderState.commitment.setVoters(r.peers) + } else { + r.leaderState.commitment.setVoters(append([]string{r.localAddr}, r.peers...)) } } @@ -1107,10 +1124,11 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { for idx, applyLog := range applyLogs { applyLog.dispatch = now - applyLog.log.Index = lastIndex + uint64(idx) + 1 + lastIndex++ + applyLog.log.Index = lastIndex applyLog.log.Term = term - applyLog.policy = newMajorityQuorum(len(r.peers) + 1) logs[idx] = &applyLog.log + r.leaderState.inflight.PushBack(applyLog) } // Write the log entry locally @@ -1122,12 +1140,10 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { r.setState(Follower) return } - - // Add this to the inflight logs, commit - r.leaderState.inflight.StartAll(applyLogs) + r.leaderState.commitment.match(r.localAddr, lastIndex) // Update the last log since it's on disk now - r.setLastLogIndex(lastIndex + uint64(len(applyLogs))) + r.setLastLogIndex(lastIndex) r.setLastLogTerm(term) // Notify the replicators of the new log diff --git a/replication.go b/replication.go index 6a01631d237..ee16ca4e618 100644 --- a/replication.go +++ b/replication.go @@ -25,14 +25,13 @@ var ( ) type followerReplication struct { - peer string - inflight *inflight + peer string + commitment *commitment stopCh chan uint64 triggerCh chan struct{} currentTerm uint64 - matchIndex uint64 nextIndex uint64 lastContact time.Time @@ -180,7 +179,6 @@ START: s.allowPipeline = true } else { s.nextIndex = max(min(s.nextIndex-1, resp.LastLog+1), 1) - s.matchIndex = s.nextIndex - 1 if resp.NoRetryBackoff { s.failures = 0 } else { @@ -265,12 +263,9 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { // Check for success if resp.Success { - // Mark any inflight logs as committed - s.inflight.CommitRange(s.matchIndex+1, meta.Index) - // Update the indexes - s.matchIndex = meta.Index - s.nextIndex = s.matchIndex + 1 + s.nextIndex = meta.Index + 1 + s.commitment.match(s.peer, meta.Index) // Clear any failures s.failures = 0 @@ -503,13 +498,9 @@ func (r *Raft) handleStaleTerm(s *followerReplication) { func updateLastAppended(s *followerReplication, req *AppendEntriesRequest) { // Mark any inflight logs as committed if logs := req.Entries; len(logs) > 0 { - first := logs[0] last := logs[len(logs)-1] - s.inflight.CommitRange(first.Index, last.Index) - - // Update the indexes - s.matchIndex = last.Index s.nextIndex = last.Index + 1 + s.commitment.match(s.peer, last.Index) } // Notify still leader diff --git a/util.go b/util.go index a6642c4c9e6..6407caea04a 100644 --- a/util.go +++ b/util.go @@ -198,3 +198,10 @@ func backoff(base time.Duration, round, limit uint64) time.Duration { } return base } + +// Needed for sorting []uint64, used to determine commitment +type uint64Slice []uint64 + +func (p uint64Slice) Len() int { return len(p) } +func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } From 163a55f55990b3c743462ddba70677c4a045aa40 Mon Sep 17 00:00:00 2001 From: Diego Ongaro Date: Sun, 20 Mar 2016 17:14:49 -0700 Subject: [PATCH 2/9] commitment: Fix off-by-one affecting even-sized clusters --- commitment.go | 2 +- commitment_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/commitment.go b/commitment.go index f48d89fff80..925ad7817aa 100644 --- a/commitment.go +++ b/commitment.go @@ -91,7 +91,7 @@ func (c *commitment) recalculate() { matched = append(matched, idx) } sort.Sort(uint64Slice(matched)) - quorumMatchIndex = matched[len(c.matchIndexes)/2] + quorumMatchIndex = matched[(len(matched)-1)/2] } if quorumMatchIndex > c.commitIndex && quorumMatchIndex >= c.startIndex { c.commitIndex = quorumMatchIndex diff --git a/commitment_test.go b/commitment_test.go index 1ba5c4498c9..2879280bbf1 100644 --- a/commitment_test.go +++ b/commitment_test.go @@ -9,7 +9,7 @@ func voters(n int) []string { if n > 7 { panic("only up to 7 servers implemented") } - return []string{"s1", "s2", "s3", "s4", "s5", "s6", "s7"}[:n+1] + return []string{"s1", "s2", "s3", "s4", "s5", "s6", "s7"}[:n] } // Tests setVoters() keeps matchIndexes where possible. From ccea1b42f9a007d6f36db06013a5df4003badc8b Mon Sep 17 00:00:00 2001 From: Diego Ongaro Date: Sun, 20 Mar 2016 17:16:28 -0700 Subject: [PATCH 3/9] commitment: Fix typo in comment --- commitment_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commitment_test.go b/commitment_test.go index 2879280bbf1..32257af7862 100644 --- a/commitment_test.go +++ b/commitment_test.go @@ -129,7 +129,7 @@ func TestCommitment_recalculate_startIndex(t *testing.T) { } // With no voting members in the cluster, the most sane behavior is probably -// to not not mark anything committed. +// to not mark anything committed. func TestCommitment_noVoterSanity(t *testing.T) { commitCh := make(chan struct{}, 1) c := newCommitment(commitCh, []string{}, 4) From 872c2690c7598a8bfcbabab40d4ede930371aedd Mon Sep 17 00:00:00 2001 From: Diego Ongaro Date: Sun, 20 Mar 2016 17:38:53 -0700 Subject: [PATCH 4/9] commitment: Test that commitCh is notified (only) when expected --- commitment_test.go | 49 ++++++++++++++++++++++++++++++++++++++++++++++ util.go | 11 +++++++++++ 2 files changed, 60 insertions(+) diff --git a/commitment_test.go b/commitment_test.go index 32257af7862..7fdae925e66 100644 --- a/commitment_test.go +++ b/commitment_test.go @@ -20,6 +20,9 @@ func TestCommitment_setVoters(t *testing.T) { c.match("b", 20) c.match("c", 30) // commitIndex: 20 + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } c.setVoters([]string{"c", "d", "e"}) // c: 30, d: 0, e: 0 c.match("e", 40) @@ -27,6 +30,9 @@ func TestCommitment_setVoters(t *testing.T) { t.Fatalf("expected 30 entries committed, found %d", c.getCommitIndex()) } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } } // Tests match() being called with smaller index than before. @@ -53,6 +59,10 @@ func TestCommitment_match_nonVoting(t *testing.T) { c.match("s2", 8) c.match("s3", 8) + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } + c.match("s90", 10) c.match("s91", 10) c.match("s92", 10) @@ -60,6 +70,9 @@ func TestCommitment_match_nonVoting(t *testing.T) { if c.getCommitIndex() != 8 { t.Fatalf("non-voting servers shouldn't be able to commit") } + if drainNotifyCh(commitCh) { + t.Fatalf("unexpected commit notify") + } } // Tests recalculate() algorithm. @@ -73,17 +86,26 @@ func TestCommitment_recalculate(t *testing.T) { if c.getCommitIndex() != 0 { t.Fatalf("shouldn't commit after two of five servers") } + if drainNotifyCh(commitCh) { + t.Fatalf("unexpected commit notify") + } c.match("s3", 10) if c.getCommitIndex() != 10 { t.Fatalf("expected 10 entries committed, found %d", c.getCommitIndex()) } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } c.match("s4", 15) if c.getCommitIndex() != 15 { t.Fatalf("expected 15 entries committed, found %d", c.getCommitIndex()) } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } c.setVoters(voters(3)) // s1: 30, s2: 20, s3: 10 @@ -91,6 +113,9 @@ func TestCommitment_recalculate(t *testing.T) { t.Fatalf("expected 20 entries committed, found %d", c.getCommitIndex()) } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } c.setVoters(voters(4)) // s1: 30, s2: 20, s3: 10, s4: 0 @@ -99,11 +124,17 @@ func TestCommitment_recalculate(t *testing.T) { t.Fatalf("expected 20 entries committed, found %d", c.getCommitIndex()) } + if drainNotifyCh(commitCh) { + t.Fatalf("unexpected commit notify") + } c.match("s4", 23) if c.getCommitIndex() != 23 { t.Fatalf("expected 23 entries committed, found %d", c.getCommitIndex()) } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } } // Tests recalculate() respecting startIndex. @@ -118,6 +149,9 @@ func TestCommitment_recalculate_startIndex(t *testing.T) { if c.getCommitIndex() != 0 { t.Fatalf("can't commit until startIndex is replicated to a quorum") } + if drainNotifyCh(commitCh) { + t.Fatalf("unexpected commit notify") + } c.match("s1", 4) c.match("s2", 4) @@ -126,6 +160,9 @@ func TestCommitment_recalculate_startIndex(t *testing.T) { if c.getCommitIndex() != 4 { t.Fatalf("should be able to commit startIndex once replicated to a quorum") } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } } // With no voting members in the cluster, the most sane behavior is probably @@ -139,6 +176,9 @@ func TestCommitment_noVoterSanity(t *testing.T) { if c.getCommitIndex() != 0 { t.Fatalf("no voting servers: shouldn't be able to commit") } + if drainNotifyCh(commitCh) { + t.Fatalf("unexpected commit notify") + } } // Single voter commits immediately. @@ -150,10 +190,19 @@ func TestCommitment_singleVoter(t *testing.T) { t.Fatalf("expected 10 entries committed, found %d", c.getCommitIndex()) } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } c.setVoters(voters(1)) + if drainNotifyCh(commitCh) { + t.Fatalf("unexpected commit notify") + } c.match("s1", 12) if c.getCommitIndex() != 12 { t.Fatalf("expected 12 entries committed, found %d", c.getCommitIndex()) } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } } diff --git a/util.go b/util.go index 6407caea04a..c0e1925f828 100644 --- a/util.go +++ b/util.go @@ -85,6 +85,17 @@ func asyncNotifyCh(ch chan struct{}) { } } +// drainNotifyCh empties out a single-item notification channel without +// blocking, and returns whether it received anything. +func drainNotifyCh(ch chan struct{}) bool { + select { + case <-ch: + return true + default: + return false + } +} + // asyncNotifyBool is used to do an async notification // on a bool channel. func asyncNotifyBool(ch chan bool, v bool) { From 5932a0884bc58ac4b0624a496eca71614828410b Mon Sep 17 00:00:00 2001 From: Diego Ongaro Date: Sun, 20 Mar 2016 17:42:31 -0700 Subject: [PATCH 5/9] commitment: Adjust for style --- commitment.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/commitment.go b/commitment.go index 925ad7817aa..f865e1fa33a 100644 --- a/commitment.go +++ b/commitment.go @@ -84,15 +84,14 @@ func (c *commitment) recalculate() { if len(c.matchIndexes) == 0 { return } - var quorumMatchIndex uint64 - { - matched := make([]uint64, 0, len(c.matchIndexes)) - for _, idx := range c.matchIndexes { - matched = append(matched, idx) - } - sort.Sort(uint64Slice(matched)) - quorumMatchIndex = matched[(len(matched)-1)/2] + + matched := make([]uint64, 0, len(c.matchIndexes)) + for _, idx := range c.matchIndexes { + matched = append(matched, idx) } + sort.Sort(uint64Slice(matched)) + quorumMatchIndex := matched[(len(matched)-1)/2] + if quorumMatchIndex > c.commitIndex && quorumMatchIndex >= c.startIndex { c.commitIndex = quorumMatchIndex asyncNotifyCh(c.commitCh) From f4f4f8ad311abb5924d289d1fb448270b5fb3477 Mon Sep 17 00:00:00 2001 From: Diego Ongaro Date: Sun, 20 Mar 2016 17:45:47 -0700 Subject: [PATCH 6/9] commitment: Add test for losing voters --- commitment_test.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/commitment_test.go b/commitment_test.go index 7fdae925e66..c3cfd7f1b66 100644 --- a/commitment_test.go +++ b/commitment_test.go @@ -179,6 +179,28 @@ func TestCommitment_noVoterSanity(t *testing.T) { if drainNotifyCh(commitCh) { t.Fatalf("unexpected commit notify") } + + // add a voter so we can commit something and then remove it + c.setVoters(voters(1)) + c.match("s1", 10) + if c.getCommitIndex() != 10 { + t.Fatalf("expected 10 entries committed, found %d", + c.getCommitIndex()) + } + if !drainNotifyCh(commitCh) { + t.Fatalf("expected commit notify") + } + + c.setVoters([]string{}) + c.match("s1", 20) + if c.getCommitIndex() != 10 { + t.Fatalf("expected 10 entries committed, found %d", + c.getCommitIndex()) + } + if drainNotifyCh(commitCh) { + t.Fatalf("unexpected commit notify") + } + } // Single voter commits immediately. From a9067dd8a16c40037c68b5ec9c2dd4d262049cfc Mon Sep 17 00:00:00 2001 From: Diego Ongaro Date: Sun, 20 Mar 2016 17:50:03 -0700 Subject: [PATCH 7/9] Remove stray TODO, replaced by GitHub #100 --- raft.go | 1 - 1 file changed, 1 deletion(-) diff --git a/raft.go b/raft.go index 6698011e733..d75838d60e1 100644 --- a/raft.go +++ b/raft.go @@ -870,7 +870,6 @@ func (r *Raft) startReplication(peer string) { // leaderLoop is the hot loop for a leader. It is invoked // after all the various leader setup is done. func (r *Raft) leaderLoop() { - // TODO: reconsider // stepDown is used to track if there is an inflight log that // would cause us to lose leadership (specifically a RemovePeer of // ourselves). If this is the case, we must not allow any logs to From 390c19197a83f3e5f858a84dda066de0375df3d7 Mon Sep 17 00:00:00 2001 From: Diego Ongaro Date: Sun, 20 Mar 2016 17:55:13 -0700 Subject: [PATCH 8/9] commitment: Add usage comment in raft.go --- raft.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft.go b/raft.go index d75838d60e1..e5fdb31b740 100644 --- a/raft.go +++ b/raft.go @@ -754,7 +754,7 @@ func (r *Raft) runLeader() { r.leaderState.commitCh = make(chan struct{}, 1) r.leaderState.commitment = newCommitment(r.leaderState.commitCh, append([]string{r.localAddr}, r.peers...), - r.getLastIndex()+1) + /* first index that may be committed in this term: */ r.getLastIndex()+1) r.leaderState.inflight = list.New() r.leaderState.replState = make(map[string]*followerReplication) r.leaderState.notify = make(map[*verifyFuture]struct{}) From 2ec123f7954adfc85cf13e221b41413d9e40b7f3 Mon Sep 17 00:00:00 2001 From: Diego Ongaro Date: Mon, 21 Mar 2016 13:00:34 -0700 Subject: [PATCH 9/9] Move comment --- raft.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft.go b/raft.go index e5fdb31b740..2a648b07177 100644 --- a/raft.go +++ b/raft.go @@ -754,7 +754,7 @@ func (r *Raft) runLeader() { r.leaderState.commitCh = make(chan struct{}, 1) r.leaderState.commitment = newCommitment(r.leaderState.commitCh, append([]string{r.localAddr}, r.peers...), - /* first index that may be committed in this term: */ r.getLastIndex()+1) + r.getLastIndex()+1 /* first index that may be committed in this term */) r.leaderState.inflight = list.New() r.leaderState.replState = make(map[string]*followerReplication) r.leaderState.notify = make(map[*verifyFuture]struct{})