From a5ddf20af2e34325b23468a20cd27f253bd5ab27 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Wed, 27 Jun 2018 06:43:09 -0400 Subject: [PATCH] bump etcd/raft to pick up https://github.com/coreos/etcd/pull/9073 --- github.com/coreos/etcd/raft/logger.go | 2 +- github.com/coreos/etcd/raft/node.go | 69 +++++++++++++++++++++------ github.com/coreos/etcd/raft/raft.go | 36 +++++++------- 3 files changed, 76 insertions(+), 31 deletions(-) diff --git a/github.com/coreos/etcd/raft/logger.go b/github.com/coreos/etcd/raft/logger.go index 92e55b373e..426a77d344 100644 --- a/github.com/coreos/etcd/raft/logger.go +++ b/github.com/coreos/etcd/raft/logger.go @@ -114,7 +114,7 @@ func (l *DefaultLogger) Fatalf(format string, v ...interface{}) { } func (l *DefaultLogger) Panic(v ...interface{}) { - l.Logger.Panic(v) + l.Logger.Panic(v...) } func (l *DefaultLogger) Panicf(format string, v ...interface{}) { diff --git a/github.com/coreos/etcd/raft/node.go b/github.com/coreos/etcd/raft/node.go index f3ba250b9a..b24ba609f3 100644 --- a/github.com/coreos/etcd/raft/node.go +++ b/github.com/coreos/etcd/raft/node.go @@ -224,9 +224,14 @@ func RestartNode(c *Config) Node { return &n } +type msgWithResult struct { + m pb.Message + result chan error +} + // node is the canonical implementation of the Node interface type node struct { - propc chan pb.Message + propc chan msgWithResult recvc chan pb.Message confc chan pb.ConfChange confstatec chan pb.ConfState @@ -242,7 +247,7 @@ type node struct { func newNode() node { return node{ - propc: make(chan pb.Message), + propc: make(chan msgWithResult), recvc: make(chan pb.Message), confc: make(chan pb.ConfChange), confstatec: make(chan pb.ConfState), @@ -271,7 +276,7 @@ func (n *node) Stop() { } func (n *node) run(r *raft) { - var propc chan pb.Message + var propc chan msgWithResult var readyc chan Ready var advancec chan struct{} var prevLastUnstablei, prevLastUnstablet uint64 @@ -314,13 +319,18 @@ func (n *node) run(r *raft) { // TODO: maybe buffer the config propose if there exists one (the way // described in raft dissertation) // Currently it is dropped in Step silently. - case m := <-propc: + case pm := <-propc: + m := pm.m m.From = r.id - r.Step(m) + err := r.Step(m) + if pm.result != nil { + pm.result <- err + close(pm.result) + } case m := <-n.recvc: // filter out response message from unknown From. if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { - r.Step(m) // raft never returns an error + r.Step(m) } case cc := <-n.confc: if cc.NodeID == None { @@ -408,7 +418,7 @@ func (n *node) Tick() { func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) } func (n *node) Propose(ctx context.Context, data []byte) error { - return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) + return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) } func (n *node) Step(ctx context.Context, m pb.Message) error { @@ -428,22 +438,53 @@ func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error { return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}}) } +func (n *node) step(ctx context.Context, m pb.Message) error { + return n.stepWithWaitOption(ctx, m, false) +} + +func (n *node) stepWait(ctx context.Context, m pb.Message) error { + return n.stepWithWaitOption(ctx, m, true) +} + // Step advances the state machine using msgs. The ctx.Err() will be returned, // if any. -func (n *node) step(ctx context.Context, m pb.Message) error { - ch := n.recvc - if m.Type == pb.MsgProp { - ch = n.propc +func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error { + if m.Type != pb.MsgProp { + select { + case n.recvc <- m: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-n.done: + return ErrStopped + } + } + ch := n.propc + pm := msgWithResult{m: m} + if wait { + pm.result = make(chan error, 1) } - select { - case ch <- m: - return nil + case ch <- pm: + if !wait { + return nil + } + case <-ctx.Done(): + return ctx.Err() + case <-n.done: + return ErrStopped + } + select { + case rsp := <-pm.result: + if rsp != nil { + return rsp + } case <-ctx.Done(): return ctx.Err() case <-n.done: return ErrStopped } + return nil } func (n *node) Ready() <-chan Ready { return n.readyc } diff --git a/github.com/coreos/etcd/raft/raft.go b/github.com/coreos/etcd/raft/raft.go index a4d744a440..0c8c96c3f8 100644 --- a/github.com/coreos/etcd/raft/raft.go +++ b/github.com/coreos/etcd/raft/raft.go @@ -72,8 +72,8 @@ const ( var ErrProposalDropped = errors.New("raft proposal dropped") // lockedRand is a small wrapper around rand.Rand to provide -// synchronization. Only the methods needed by the code are exposed -// (e.g. Intn). +// synchronization among multiple raft groups. Only the methods needed +// by the code are exposed (e.g. Intn). type lockedRand struct { mu sync.Mutex rand *rand.Rand @@ -192,7 +192,7 @@ type Config struct { // this feature would be in a situation where the Raft leader is used to // compute the data of a proposal, for example, adding a timestamp from a // hybrid logical clock to data in a monotonically increasing way. Forwarding - // should be disabled to prevent a follower with an innaccurate hybrid + // should be disabled to prevent a follower with an inaccurate hybrid // logical clock from assigning the timestamp and then forwarding the data // to the leader. DisableProposalForwarding bool @@ -604,8 +604,9 @@ func (r *raft) appendEntry(es ...pb.Entry) { es[i].Term = r.Term es[i].Index = li + 1 + uint64(i) } - r.raftLog.append(es...) - r.getProgress(r.id).maybeUpdate(r.raftLog.lastIndex()) + // use latest "last" index after truncate/append + li = r.raftLog.append(es...) + r.getProgress(r.id).maybeUpdate(li) // Regardless of maybeCommit's return, our caller will call bcastAppend. r.maybeCommit() } @@ -693,19 +694,13 @@ func (r *raft) becomeLeader() { r.tick = r.tickHeartbeat r.lead = r.id r.state = StateLeader - ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit) - if err != nil { - r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err) - } // Conservatively set the pendingConfIndex to the last index in the // log. There may or may not be a pending config change, but it's // safe to delay any future proposals until we commit all our // pending log entries, and scanning the entire tail of the log // could be expensive. - if len(ents) > 0 { - r.pendingConfIndex = ents[len(ents)-1].Index - } + r.pendingConfIndex = r.raftLog.lastIndex() r.appendEntry(pb.Entry{Data: nil}) r.logger.Infof("%x became leader at term %d", r.id, r.Term) @@ -816,8 +811,15 @@ func (r *raft) Step(m pb.Message) error { // nodes that have been removed from the cluster's configuration: a // removed node will send MsgVotes (or MsgPreVotes) which will be ignored, // but it will not receive MsgApp or MsgHeartbeat, so it will not create - // disruptive term increases + // disruptive term increases, by notifying leader of this node's activeness. // The above comments also true for Pre-Vote + // + // When follower gets isolated, it soon starts an election ending + // up with a higher term than leader, although it won't receive enough + // votes to win the election. When it regains connectivity, this response + // with "pb.MsgAppResp" of higher term would force leader to step down. + // However, this disruption is inevitable to free this stuck node with + // fresh election. This can be prevented with Pre-Vote phase. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp}) } else if m.Type == pb.MsgPreVote { // Before Pre-Vote enable, there may have candidate with higher term, @@ -1127,13 +1129,13 @@ func stepCandidate(r *raft, m pb.Message) error { r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) return ErrProposalDropped case pb.MsgApp: - r.becomeFollower(r.Term, m.From) + r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleAppendEntries(m) case pb.MsgHeartbeat: - r.becomeFollower(r.Term, m.From) + r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleHeartbeat(m) case pb.MsgSnap: - r.becomeFollower(m.Term, m.From) + r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleSnapshot(m) case myVoteRespType: gr := r.poll(m.From, m.Type, !m.Reject) @@ -1147,6 +1149,8 @@ func stepCandidate(r *raft, m pb.Message) error { r.bcastAppend() } case len(r.votes) - gr: + // pb.MsgPreVoteResp contains future term of pre-candidate + // m.Term > r.Term; reuse r.Term r.becomeFollower(r.Term, None) } case pb.MsgTimeoutNow: