diff --git a/github.com/coreos/etcd/raft/node.go b/github.com/coreos/etcd/raft/node.go index 5da1c1193b..160ae9f540 100644 --- a/github.com/coreos/etcd/raft/node.go +++ b/github.com/coreos/etcd/raft/node.go @@ -324,7 +324,6 @@ func (n *node) run(r *raft) { } case cc := <-n.confc: if cc.NodeID == None { - r.resetPendingConf() select { case n.confstatec <- pb.ConfState{Nodes: r.nodes()}: case <-n.done: @@ -342,7 +341,6 @@ func (n *node) run(r *raft) { } r.removeNode(cc.NodeID) case pb.ConfChangeUpdateNode: - r.resetPendingConf() default: panic("unexpected conf type") } diff --git a/github.com/coreos/etcd/raft/node_test.go b/github.com/coreos/etcd/raft/node_test.go index f4c726ea86..c97d154688 100644 --- a/github.com/coreos/etcd/raft/node_test.go +++ b/github.com/coreos/etcd/raft/node_test.go @@ -348,6 +348,7 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) { n.Tick() case rd := <-n.Ready(): s.Append(rd.Entries) + applied := false for _, e := range rd.Entries { rdyEntries = append(rdyEntries, e) switch e.Type { @@ -356,10 +357,13 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) { var cc raftpb.ConfChange cc.Unmarshal(e.Data) n.ApplyConfChange(cc) - applyConfChan <- struct{}{} + applied = true } } n.Advance() + if applied { + applyConfChan <- struct{}{} + } } } }() diff --git a/github.com/coreos/etcd/raft/raft.go b/github.com/coreos/etcd/raft/raft.go index f408a326b2..fbd6f3353c 100644 --- a/github.com/coreos/etcd/raft/raft.go +++ b/github.com/coreos/etcd/raft/raft.go @@ -242,8 +242,13 @@ type raft struct { // leadTransferee is id of the leader transfer target when its value is not zero. // Follow the procedure defined in raft thesis 3.10. leadTransferee uint64 - // New configuration is ignored if there exists unapplied configuration. - pendingConf bool + // Only one conf change may be pending (in the log, but not yet + // applied) at a time. This is enforced via pendingConfIndex, which + // is set to a value >= the log index of the latest pending + // configuration change (if any). Config changes are only allowed to + // be proposed if the leader's applied index is greater than this + // value. + pendingConfIndex uint64 readOnly *readOnly @@ -528,7 +533,7 @@ func (r *raft) reset(term uint64) { r.prs[id].Match = r.raftLog.lastIndex() } } - r.pendingConf = false + r.pendingConfIndex = 0 r.readOnly = newReadOnly(r.readOnly.option) } @@ -632,12 +637,13 @@ func (r *raft) becomeLeader() { r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err) } - nconf := numOfPendingConf(ents) - if nconf > 1 { - panic("unexpected multiple uncommitted config entry") - } - if nconf == 1 { - r.pendingConf = true + // Conservatively set the pendingConfIndex to the last index in the + // log. There may or may not be a pending config change, but it's + // safe to delay any future proposals until we commit all our + // pending log entries, and scanning the entire tail of the log + // could be expensive. + if len(ents) > 0 { + r.pendingConfIndex = ents[len(ents)-1].Index } r.appendEntry(pb.Entry{Data: nil}) @@ -843,11 +849,13 @@ func stepLeader(r *raft, m pb.Message) { for i, e := range m.Entries { if e.Type == pb.EntryConfChange { - if r.pendingConf { - r.logger.Infof("propose conf %s ignored since pending unapplied configuration", e.String()) + if r.pendingConfIndex > r.raftLog.applied { + r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]", + e.String(), r.pendingConfIndex, r.raftLog.applied) m.Entries[i] = pb.Entry{Type: pb.EntryNormal} + } else { + r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1 } - r.pendingConf = true } } r.appendEntry(m.Entries...) @@ -1186,7 +1194,6 @@ func (r *raft) promotable() bool { } func (r *raft) addNode(id uint64) { - r.pendingConf = false if _, ok := r.prs[id]; ok { // Ignore any redundant addNode calls (which can happen because the // initial bootstrapping entries are applied twice). @@ -1202,7 +1209,6 @@ func (r *raft) addNode(id uint64) { func (r *raft) removeNode(id uint64) { r.delProgress(id) - r.pendingConf = false // do not try to commit or abort transferring if there is no nodes in the cluster. if len(r.prs) == 0 { @@ -1220,8 +1226,6 @@ func (r *raft) removeNode(id uint64) { } } -func (r *raft) resetPendingConf() { r.pendingConf = false } - func (r *raft) setProgress(id, match, next uint64) { r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)} } diff --git a/github.com/coreos/etcd/raft/raft_test.go b/github.com/coreos/etcd/raft/raft_test.go index 357e9bdf68..8572266d4e 100644 --- a/github.com/coreos/etcd/raft/raft_test.go +++ b/github.com/coreos/etcd/raft/raft_test.go @@ -2499,8 +2499,8 @@ func TestStepConfig(t *testing.T) { if g := r.raftLog.lastIndex(); g != index+1 { t.Errorf("index = %d, want %d", g, index+1) } - if !r.pendingConf { - t.Errorf("pendingConf = %v, want true", r.pendingConf) + if r.pendingConfIndex != index+1 { + t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, index+1) } } @@ -2514,7 +2514,7 @@ func TestStepIgnoreConfig(t *testing.T) { r.becomeLeader() r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) index := r.raftLog.lastIndex() - pendingConf := r.pendingConf + pendingConfIndex := r.pendingConfIndex r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}} ents, err := r.raftLog.entries(index+1, noLimit) @@ -2524,57 +2524,39 @@ func TestStepIgnoreConfig(t *testing.T) { if !reflect.DeepEqual(ents, wents) { t.Errorf("ents = %+v, want %+v", ents, wents) } - if r.pendingConf != pendingConf { - t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf) + if r.pendingConfIndex != pendingConfIndex { + t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, pendingConfIndex) } } -// TestRecoverPendingConfig tests that new leader recovers its pendingConf flag +// TestNewLeaderPendingConfig tests that new leader sets its pendingConfigIndex // based on uncommitted entries. -func TestRecoverPendingConfig(t *testing.T) { +func TestNewLeaderPendingConfig(t *testing.T) { tests := []struct { - entType pb.EntryType - wpending bool + addEntry bool + wpendingIndex uint64 }{ - {pb.EntryNormal, false}, - {pb.EntryConfChange, true}, + {false, 0}, + {true, 1}, } for i, tt := range tests { r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) - r.appendEntry(pb.Entry{Type: tt.entType}) + if tt.addEntry { + r.appendEntry(pb.Entry{Type: pb.EntryNormal}) + } r.becomeCandidate() r.becomeLeader() - if r.pendingConf != tt.wpending { - t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending) + if r.pendingConfIndex != tt.wpendingIndex { + t.Errorf("#%d: pendingConfIndex = %d, want %d", + i, r.pendingConfIndex, tt.wpendingIndex) } } } -// TestRecoverDoublePendingConfig tests that new leader will panic if -// there exist two uncommitted config entries. -func TestRecoverDoublePendingConfig(t *testing.T) { - func() { - defer func() { - if err := recover(); err == nil { - t.Errorf("expect panic, but nothing happens") - } - }() - r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) - r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) - r.appendEntry(pb.Entry{Type: pb.EntryConfChange}) - r.becomeCandidate() - r.becomeLeader() - }() -} - -// TestAddNode tests that addNode could update pendingConf and nodes correctly. +// TestAddNode tests that addNode could update nodes correctly. func TestAddNode(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - r.pendingConf = true r.addNode(2) - if r.pendingConf { - t.Errorf("pendingConf = %v, want false", r.pendingConf) - } nodes := r.nodes() wnodes := []uint64{1, 2} if !reflect.DeepEqual(nodes, wnodes) { @@ -2586,7 +2568,6 @@ func TestAddNode(t *testing.T) { // immediately when checkQuorum is set. func TestAddNodeCheckQuorum(t *testing.T) { r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - r.pendingConf = true r.checkQuorum = true r.becomeCandidate() @@ -2617,15 +2598,11 @@ func TestAddNodeCheckQuorum(t *testing.T) { } } -// TestRemoveNode tests that removeNode could update pendingConf, nodes and +// TestRemoveNode tests that removeNode could update nodes and // and removed list correctly. func TestRemoveNode(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) - r.pendingConf = true r.removeNode(2) - if r.pendingConf { - t.Errorf("pendingConf = %v, want false", r.pendingConf) - } w := []uint64{1} if g := r.nodes(); !reflect.DeepEqual(g, w) { t.Errorf("nodes = %v, want %v", g, w) diff --git a/github.com/coreos/etcd/raft/rawnode.go b/github.com/coreos/etcd/raft/rawnode.go index b950d5169a..8fe8d32c0f 100644 --- a/github.com/coreos/etcd/raft/rawnode.go +++ b/github.com/coreos/etcd/raft/rawnode.go @@ -169,7 +169,6 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error { // ApplyConfChange applies a config change to the local node. func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { if cc.NodeID == None { - rn.raft.resetPendingConf() return &pb.ConfState{Nodes: rn.raft.nodes()} } switch cc.Type { @@ -178,7 +177,6 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { case pb.ConfChangeRemoveNode: rn.raft.removeNode(cc.NodeID) case pb.ConfChangeUpdateNode: - rn.raft.resetPendingConf() default: panic("unexpected conf type") }