Skip to content
This repository has been archived by the owner on Jun 24, 2024. It is now read-only.

Commit

Permalink
bump etcd/raft to pick up etcd-io/etcd#9073
Browse files Browse the repository at this point in the history
  • Loading branch information
tbg committed Jun 27, 2018
1 parent 1ec79d4 commit a5ddf20
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 31 deletions.
2 changes: 1 addition & 1 deletion github.com/coreos/etcd/raft/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (l *DefaultLogger) Fatalf(format string, v ...interface{}) {
}

func (l *DefaultLogger) Panic(v ...interface{}) {
l.Logger.Panic(v)
l.Logger.Panic(v...)
}

func (l *DefaultLogger) Panicf(format string, v ...interface{}) {
Expand Down
69 changes: 55 additions & 14 deletions github.com/coreos/etcd/raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,14 @@ func RestartNode(c *Config) Node {
return &n
}

type msgWithResult struct {
m pb.Message
result chan error
}

// node is the canonical implementation of the Node interface
type node struct {
propc chan pb.Message
propc chan msgWithResult
recvc chan pb.Message
confc chan pb.ConfChange
confstatec chan pb.ConfState
Expand All @@ -242,7 +247,7 @@ type node struct {

func newNode() node {
return node{
propc: make(chan pb.Message),
propc: make(chan msgWithResult),
recvc: make(chan pb.Message),
confc: make(chan pb.ConfChange),
confstatec: make(chan pb.ConfState),
Expand Down Expand Up @@ -271,7 +276,7 @@ func (n *node) Stop() {
}

func (n *node) run(r *raft) {
var propc chan pb.Message
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var prevLastUnstablei, prevLastUnstablet uint64
Expand Down Expand Up @@ -314,13 +319,18 @@ func (n *node) run(r *raft) {
// TODO: maybe buffer the config propose if there exists one (the way
// described in raft dissertation)
// Currently it is dropped in Step silently.
case m := <-propc:
case pm := <-propc:
m := pm.m
m.From = r.id
r.Step(m)
err := r.Step(m)
if pm.result != nil {
pm.result <- err
close(pm.result)
}
case m := <-n.recvc:
// filter out response message from unknown From.
if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
r.Step(m) // raft never returns an error
r.Step(m)
}
case cc := <-n.confc:
if cc.NodeID == None {
Expand Down Expand Up @@ -408,7 +418,7 @@ func (n *node) Tick() {
func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) }

func (n *node) Propose(ctx context.Context, data []byte) error {
return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

func (n *node) Step(ctx context.Context, m pb.Message) error {
Expand All @@ -428,22 +438,53 @@ func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
}

func (n *node) step(ctx context.Context, m pb.Message) error {
return n.stepWithWaitOption(ctx, m, false)
}

func (n *node) stepWait(ctx context.Context, m pb.Message) error {
return n.stepWithWaitOption(ctx, m, true)
}

// Step advances the state machine using msgs. The ctx.Err() will be returned,
// if any.
func (n *node) step(ctx context.Context, m pb.Message) error {
ch := n.recvc
if m.Type == pb.MsgProp {
ch = n.propc
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
if m.Type != pb.MsgProp {
select {
case n.recvc <- m:
return nil
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
}
ch := n.propc
pm := msgWithResult{m: m}
if wait {
pm.result = make(chan error, 1)
}

select {
case ch <- m:
return nil
case ch <- pm:
if !wait {
return nil
}
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
select {
case rsp := <-pm.result:
if rsp != nil {
return rsp
}
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
return nil
}

func (n *node) Ready() <-chan Ready { return n.readyc }
Expand Down
36 changes: 20 additions & 16 deletions github.com/coreos/etcd/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ const (
var ErrProposalDropped = errors.New("raft proposal dropped")

// lockedRand is a small wrapper around rand.Rand to provide
// synchronization. Only the methods needed by the code are exposed
// (e.g. Intn).
// synchronization among multiple raft groups. Only the methods needed
// by the code are exposed (e.g. Intn).
type lockedRand struct {
mu sync.Mutex
rand *rand.Rand
Expand Down Expand Up @@ -192,7 +192,7 @@ type Config struct {
// this feature would be in a situation where the Raft leader is used to
// compute the data of a proposal, for example, adding a timestamp from a
// hybrid logical clock to data in a monotonically increasing way. Forwarding
// should be disabled to prevent a follower with an innaccurate hybrid
// should be disabled to prevent a follower with an inaccurate hybrid
// logical clock from assigning the timestamp and then forwarding the data
// to the leader.
DisableProposalForwarding bool
Expand Down Expand Up @@ -604,8 +604,9 @@ func (r *raft) appendEntry(es ...pb.Entry) {
es[i].Term = r.Term
es[i].Index = li + 1 + uint64(i)
}
r.raftLog.append(es...)
r.getProgress(r.id).maybeUpdate(r.raftLog.lastIndex())
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
r.getProgress(r.id).maybeUpdate(li)
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
}
Expand Down Expand Up @@ -693,19 +694,13 @@ func (r *raft) becomeLeader() {
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err)
}

// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
// safe to delay any future proposals until we commit all our
// pending log entries, and scanning the entire tail of the log
// could be expensive.
if len(ents) > 0 {
r.pendingConfIndex = ents[len(ents)-1].Index
}
r.pendingConfIndex = r.raftLog.lastIndex()

r.appendEntry(pb.Entry{Data: nil})
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
Expand Down Expand Up @@ -816,8 +811,15 @@ func (r *raft) Step(m pb.Message) error {
// nodes that have been removed from the cluster's configuration: a
// removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
// but it will not receive MsgApp or MsgHeartbeat, so it will not create
// disruptive term increases
// disruptive term increases, by notifying leader of this node's activeness.
// The above comments also true for Pre-Vote
//
// When follower gets isolated, it soon starts an election ending
// up with a higher term than leader, although it won't receive enough
// votes to win the election. When it regains connectivity, this response
// with "pb.MsgAppResp" of higher term would force leader to step down.
// However, this disruption is inevitable to free this stuck node with
// fresh election. This can be prevented with Pre-Vote phase.
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
} else if m.Type == pb.MsgPreVote {
// Before Pre-Vote enable, there may have candidate with higher term,
Expand Down Expand Up @@ -1127,13 +1129,13 @@ func stepCandidate(r *raft, m pb.Message) error {
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
case pb.MsgApp:
r.becomeFollower(r.Term, m.From)
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
r.becomeFollower(r.Term, m.From)
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleHeartbeat(m)
case pb.MsgSnap:
r.becomeFollower(m.Term, m.From)
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleSnapshot(m)
case myVoteRespType:
gr := r.poll(m.From, m.Type, !m.Reject)
Expand All @@ -1147,6 +1149,8 @@ func stepCandidate(r *raft, m pb.Message) error {
r.bcastAppend()
}
case len(r.votes) - gr:
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, None)
}
case pb.MsgTimeoutNow:
Expand Down

0 comments on commit a5ddf20

Please sign in to comment.