Skip to content

Commit

Permalink
Merge pull request #2168 from influxdb/stale-term
Browse files Browse the repository at this point in the history
Return term from vote, add term logging.
  • Loading branch information
toddboom committed Apr 4, 2015
2 parents 8d785b5 + 3d71d45 commit 48f491f
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 41 deletions.
12 changes: 9 additions & 3 deletions raft/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Handler struct {
RemovePeer(id uint64) error
Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64, err error)
WriteEntriesTo(w io.Writer, id, term, index uint64) error
RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) error
RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (peerTerm uint64, err error)
}
}

Expand Down Expand Up @@ -123,7 +123,7 @@ func (h *Handler) serveHeartbeat(w http.ResponseWriter, r *http.Request) {
// Execute heartbeat on the log.
currentIndex, err := h.Log.Heartbeat(term, commitIndex, leaderID)

// Return current term and index.
// Return current index.
w.Header().Set("X-Raft-Index", strconv.FormatUint(currentIndex, 10))

// Write error, if applicable.
Expand Down Expand Up @@ -201,8 +201,14 @@ func (h *Handler) serveRequestVote(w http.ResponseWriter, r *http.Request) {
return
}

// Request vote from log.
peerTerm, err := h.Log.RequestVote(term, candidateID, lastLogIndex, lastLogTerm)

// Write current term.
w.Header().Set("X-Raft-Term", strconv.FormatUint(peerTerm, 10))

// Write error, if applicable.
if err := h.Log.RequestVote(term, candidateID, lastLogIndex, lastLogTerm); err != nil {
if err != nil {
w.Header().Set("X-Raft-Error", err.Error())
w.WriteHeader(http.StatusInternalServerError)
return
Expand Down
14 changes: 8 additions & 6 deletions raft/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func TestHandler_HandleStream_Error(t *testing.T) {
// Ensure a vote request can be sent over HTTP.
func TestHandler_HandleRequestVote(t *testing.T) {
h := NewHandler()
h.RequestVoteFunc = func(term, candidateID, lastLogIndex, lastLogTerm uint64) error {
h.RequestVoteFunc = func(term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error) {
if term != 1 {
t.Fatalf("unexpected term: %d", term)
} else if candidateID != 2 {
Expand All @@ -286,7 +286,7 @@ func TestHandler_HandleRequestVote(t *testing.T) {
} else if lastLogTerm != 4 {
t.Fatalf("unexpected last log term: %d", lastLogTerm)
}
return nil
return 5, nil
}
s := httptest.NewServer(h)
defer s.Close()
Expand All @@ -298,6 +298,8 @@ func TestHandler_HandleRequestVote(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
} else if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status: %d", resp.StatusCode)
} else if term := resp.Header.Get("X-Raft-Term"); term != "5" {
t.Fatalf("unexpected raft term: %s", term)
} else if s := resp.Header.Get("X-Raft-Error"); s != "" {
t.Fatalf("unexpected raft error: %s", s)
}
Expand All @@ -306,8 +308,8 @@ func TestHandler_HandleRequestVote(t *testing.T) {
// Ensure sending invalid parameters in a vote request returns an error.
func TestHandler_HandleRequestVote_Error(t *testing.T) {
h := NewHandler()
h.RequestVoteFunc = func(term, candidateID, lastLogIndex, lastLogTerm uint64) error {
return raft.ErrStaleTerm
h.RequestVoteFunc = func(term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error) {
return 100, raft.ErrStaleTerm
}
s := httptest.NewServer(h)
defer s.Close()
Expand Down Expand Up @@ -373,7 +375,7 @@ type Handler struct {
RemovePeerFunc func(id uint64) error
HeartbeatFunc func(term, commitIndex, leaderID uint64) (currentIndex uint64, err error)
WriteEntriesToFunc func(w io.Writer, id, term, index uint64) error
RequestVoteFunc func(term, candidateID, lastLogIndex, lastLogTerm uint64) error
RequestVoteFunc func(term, candidateID, lastLogIndex, lastLogTerm uint64) (peerTerm uint64, err error)
}

// NewHandler returns a new instance of Handler.
Expand All @@ -394,6 +396,6 @@ func (h *Handler) WriteEntriesTo(w io.Writer, id, term, index uint64) error {
return h.WriteEntriesToFunc(w, id, term, index)
}

func (h *Handler) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) error {
func (h *Handler) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error) {
return h.RequestVoteFunc(term, candidateID, lastLogIndex, lastLogTerm)
}
65 changes: 42 additions & 23 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ type Log struct {
Leave(u url.URL, id uint64) error
Heartbeat(u url.URL, term, commitIndex, leaderID uint64) (lastIndex uint64, err error)
ReadFrom(u url.URL, id, term, index uint64) (io.ReadCloser, error)
RequestVote(u url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error
RequestVote(u url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) (peerTerm uint64, err error)
}

// Clock is an abstraction of time.
Expand Down Expand Up @@ -200,8 +200,8 @@ func NewLog() *Log {
Clock: NewClock(),
Transport: &HTTPTransport{},
Rand: rand.NewSource(time.Now().UnixNano()).Int63,
heartbeats: make(chan heartbeat, 1),
terms: make(chan uint64, 1),
heartbeats: make(chan heartbeat, 10),
terms: make(chan uint64, 10),
Logger: log.New(os.Stderr, "[raft] ", log.LstdFlags),
}
l.updateLogPrefix()
Expand Down Expand Up @@ -525,10 +525,23 @@ func (l *Log) writeTerm(term uint64) error {
}

// setTerm sets the current term and clears the vote.
func (l *Log) setTerm(term uint64) {
func (l *Log) setTerm(term uint64) error {
l.Logger.Printf("changing term: %d => %d", l.term, term)

if err := l.writeTerm(term); err != nil {
return err
}

l.term = term
l.votedFor = 0
return nil
}

// mustSetTerm sets the current term and clears the vote. Panic on error.
func (l *Log) mustSetTerm(term uint64) {
if err := l.setTerm(term); err != nil {
panic("unable to set term: " + err.Error())
}
}

// readConfig reads the configuration from disk.
Expand Down Expand Up @@ -602,10 +615,9 @@ func (l *Log) Initialize() error {

// Automatically promote to leader.
term := uint64(1)
if err := l.writeTerm(term); err != nil {
return fmt.Errorf("write term: %s", err)
if err := l.setTerm(term); err != nil {
return fmt.Errorf("set term: %s", err)
}
l.setTerm(term)
l.lastLogTerm = term
l.leaderID = l.id

Expand Down Expand Up @@ -851,7 +863,7 @@ func (l *Log) followerLoop(closing <-chan struct{}) State {
// Update term, commit index & leader.
l.mu.Lock()
if hb.term > l.term {
l.setTerm(hb.term)
l.mustSetTerm(hb.term)
}
if hb.commitIndex > l.commitIndex {
l.commitIndex = hb.commitIndex
Expand All @@ -862,7 +874,7 @@ func (l *Log) followerLoop(closing <-chan struct{}) State {
case term := <-l.terms:
l.mu.Lock()
if term > l.term {
l.setTerm(term)
l.mustSetTerm(term)
}
l.mu.Unlock()
}
Expand Down Expand Up @@ -969,7 +981,7 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State {
case hb := <-l.heartbeats:
l.mu.Lock()
if hb.term >= term {
l.setTerm(hb.term)
l.mustSetTerm(hb.term)
l.leaderID = hb.leaderID
l.mu.Unlock()
return Follower
Expand All @@ -984,7 +996,7 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State {
// Check against the current term since that may have changed.
l.mu.Lock()
if newTerm >= l.term {
l.setTerm(newTerm)
l.mustSetTerm(newTerm)
l.mu.Unlock()
return Follower
}
Expand Down Expand Up @@ -1018,8 +1030,15 @@ func (l *Log) elect(term uint64, elected chan struct{}, wg *sync.WaitGroup) {
continue
}
go func(n *ConfigNode) {
if err := l.Transport.RequestVote(n.URL, term, id, lastLogIndex, lastLogTerm); err != nil {
l.tracef("sendVoteRequests: %s: %s", n.URL.String(), err)
peerTerm, err := l.Transport.RequestVote(n.URL, term, id, lastLogIndex, lastLogTerm)
l.Logger.Printf("send req vote(term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (term=%d, err=%v)", term, id, lastLogIndex, lastLogTerm, peerTerm, err)

// If an error occured then send the peer's term.
if err != nil {
select {
case l.terms <- peerTerm:
default:
}
return
}
votes <- struct{}{}
Expand Down Expand Up @@ -1075,7 +1094,7 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State {
case newTerm := <-l.terms: // step down on higher term
if newTerm > term {
l.mu.Lock()
l.setTerm(newTerm)
l.mustSetTerm(newTerm)
l.truncateTo(l.commitIndex)
l.mu.Unlock()
return Follower
Expand All @@ -1085,7 +1104,7 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State {
case hb := <-l.heartbeats: // step down on higher term
if hb.term > term {
l.mu.Lock()
l.setTerm(hb.term)
l.mustSetTerm(hb.term)
l.truncateTo(l.commitIndex)
l.mu.Unlock()
return Follower
Expand Down Expand Up @@ -1570,31 +1589,31 @@ func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64
}

// RequestVote requests a vote from the log.
func (l *Log) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (err error) {
func (l *Log) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (peerTerm uint64, err error) {
l.mu.Lock()
defer l.mu.Unlock()

// Check if log is closed.
if !l.opened() {
return ErrClosed
return l.term, ErrClosed
}

defer func() {
l.tracef("RV(term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (err=%v)", term, candidateID, lastLogIndex, lastLogTerm, err)
l.Logger.Printf("recv req vote(term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (err=%v)", term, candidateID, lastLogIndex, lastLogTerm, err)
}()

// Deny vote if:
// 1. Candidate is requesting a vote from an earlier term. (§5.1)
// 2. Already voted for a different candidate in this term. (§5.2)
// 3. Candidate log is less up-to-date than local log. (§5.4)
if term < l.term {
return ErrStaleTerm
return l.term, ErrStaleTerm
} else if term == l.term && l.votedFor != 0 && l.votedFor != candidateID {
return ErrAlreadyVoted
return l.term, ErrAlreadyVoted
} else if lastLogTerm < l.lastLogTerm {
return ErrOutOfDateLog
return l.term, ErrOutOfDateLog
} else if lastLogTerm == l.lastLogTerm && lastLogIndex < l.lastLogIndex {
return ErrOutOfDateLog
return l.term, ErrOutOfDateLog
}

// Notify term change.
Expand All @@ -1609,7 +1628,7 @@ func (l *Log) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (
l.term = term
l.votedFor = candidateID

return nil
return l.term, nil
}

// WriteEntriesTo attaches a writer to the log from a given index.
Expand Down
14 changes: 10 additions & 4 deletions raft/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (t *HTTPTransport) ReadFrom(uri url.URL, id, term, index uint64) (io.ReadCl
}

// RequestVote requests a vote for a candidate in a given term.
func (t *HTTPTransport) RequestVote(uri url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error {
func (t *HTTPTransport) RequestVote(uri url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error) {
// Construct URL.
u := uri
u.Path = path.Join(u.Path, "raft/vote")
Expand All @@ -156,14 +156,20 @@ func (t *HTTPTransport) RequestVote(uri url.URL, term, candidateID, lastLogIndex
// Send HTTP request.
resp, err := http.Get(u.String())
if err != nil {
return err
return 0, err
}
_ = resp.Body.Close()

// Parse returned term.
peerTerm, err := strconv.ParseUint(resp.Header.Get("X-Raft-Term"), 10, 64)
if err != nil {
return 0, fmt.Errorf("invalid returned term: %q", resp.Header.Get("X-Raft-Term"))
}

// Parse returned error.
if s := resp.Header.Get("X-Raft-Error"); s != "" {
return errors.New(s)
return peerTerm, errors.New(s)
}

return nil
return peerTerm, nil
}
13 changes: 8 additions & 5 deletions raft/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,14 +322,17 @@ func TestHTTPTransport_RequestVote(t *testing.T) {
if lastLogTerm := r.FormValue("lastLogTerm"); lastLogTerm != `4` {
t.Fatalf("unexpected last log term: %v", lastLogTerm)
}
w.Header().Set("X-Raft-Term", `100`)
w.WriteHeader(http.StatusOK)
}))
defer s.Close()

// Execute heartbeat against test server.
u, _ := url.Parse(s.URL)
if err := (&raft.HTTPTransport{}).RequestVote(*u, 1, 2, 3, 4); err != nil {
if peerTerm, err := (&raft.HTTPTransport{}).RequestVote(*u, 1, 2, 3, 4); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if peerTerm != 100 {
t.Fatalf("unexpected peer term: %d", peerTerm)
}
}

Expand All @@ -343,7 +346,7 @@ func TestHTTPTransport_RequestVote_Error(t *testing.T) {
defer s.Close()

u, _ := url.Parse(s.URL)
if err := (&raft.HTTPTransport{}).RequestVote(*u, 0, 0, 0, 0); err == nil {
if _, err := (&raft.HTTPTransport{}).RequestVote(*u, 0, 0, 0, 0); err == nil {
t.Errorf("expected error")
} else if err.Error() != `already voted` {
t.Errorf("unexpected error: %s", err)
Expand All @@ -353,7 +356,7 @@ func TestHTTPTransport_RequestVote_Error(t *testing.T) {
// Ensure that requesting a vote over HTTP to a stopped server returns an error.
func TestHTTPTransport_RequestVote_ErrConnectionRefused(t *testing.T) {
u, _ := url.Parse("http://localhost:41932")
if err := (&raft.HTTPTransport{}).RequestVote(*u, 0, 0, 0, 0); err == nil {
if _, err := (&raft.HTTPTransport{}).RequestVote(*u, 0, 0, 0, 0); err == nil {
t.Fatal("expected error")
} else if !is_connection_refused(err) {
t.Fatalf("unexpected error: %s", err)
Expand Down Expand Up @@ -430,10 +433,10 @@ func (t *Transport) ReadFrom(u url.URL, id, term, index uint64) (io.ReadCloser,
}

// RequestVote calls RequestVote() on the target log.
func (t *Transport) RequestVote(u url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error {
func (t *Transport) RequestVote(u url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error) {
l, err := t.log(u)
if err != nil {
return err
return 0, err
}
return l.RequestVote(term, candidateID, lastLogIndex, lastLogTerm)
}
Expand Down

0 comments on commit 48f491f

Please sign in to comment.