Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non voter with higher term should not cause cluster instability #525

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,7 @@ func (r *Raft) processHeartbeat(rpc RPC) {
func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
defer metrics.MeasureSince([]string{"raft", "rpc", "appendEntries"}, time.Now())
// Setup a response
noRetryBackoffIfErr := true
resp := &AppendEntriesResponse{
RPCHeader: r.getRPCHeader(),
Term: r.getCurrentTerm(),
Expand All @@ -1375,7 +1376,23 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
rpc.Respond(resp, rpcErr)
}()

// Ignore an older term
// this use case would happen when a node was a voter and is added back to the cluster as non voter,
// it term could be higher then the cluster, but because it's a non voter that term need to be discarded
// in favor of the cluster current term to keep the cluster stable, as an election don't need to be started
// by a node which don't have voting rights.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a comment here is useful but perhaps a link to this PR or the issue would be good because it's super hard to express all the nuance of the issue and why this is correct in a few lines?

currentTerm := r.getCurrentTerm()
hasVote := hasVote(r.getLatestConfiguration(), r.localID)
if a.Term < currentTerm && !hasVote && a.PrevLogTerm >= r.lastLogTerm {
r.logger.Warn("older term sent to non-voter", "PrevLogTerm", a.PrevLogTerm,
"lastLogTerm", r.lastLogTerm,
" Term", a.Term, "currentTerm", currentTerm)
r.setState(Follower)
r.setCurrentTerm(a.Term)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any other state we need to set here 🤔 I don't think so because this is the same as the code below for normal happy-path replication where we switch to a new leader's term after an election when we get the first appendEntries but I feel like we also store state about which node is our current leader in raft and it feels weird to me that we might be potentially switching term and leader here without updating anything about who that leader is....

But i know we do persist the identity of the candidate we last voted for, so I wonder if the happy path doesn't update anything because it assumes that we set the identity during election? Even then that can't be right in the case of a follower coming up and learning about a new leader that it didn't vote for so again I guess you did the right thing here but I'm slightly intrigued about how and where we update the state about who our current leader 🤔. Do you know @dhiaayachi ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's done right after in here
but I'm a bit confused about it as that mean we trust any node who sends us an appendEntries request with a higher term as the leader, but maybe that's the right thing to do, I'm not sure.

resp.Term = a.Term
noRetryBackoffIfErr = false
}

// if a node is a voter, Ignore an older term
if a.Term < r.getCurrentTerm() {
return
}
Expand Down Expand Up @@ -1410,7 +1427,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
"previous-index", a.PrevLogEntry,
"last-index", lastIdx,
"error", err)
resp.NoRetryBackoff = true
resp.NoRetryBackoff = noRetryBackoffIfErr
return
}
prevLogTerm = prevLog.Term
Expand Down
79 changes: 79 additions & 0 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2644,6 +2644,85 @@ func TestRaft_VoteNotGranted_WhenNodeNotInCluster(t *testing.T) {
}
}

func TestRaft_StabilityIsKept_WhenNonVoterWithHigherTermJoin(t *testing.T) {
// Make a cluster
c := MakeCluster(3, t, nil)

defer c.Close()

// Get the leader
leader := c.Leader()

// Wait until we have 2 followers
limit := time.Now().Add(c.longstopTimeout)
var followers []*Raft
for time.Now().Before(limit) && len(followers) != 2 {
c.WaitEvent(nil, c.conf.CommitTimeout)
followers = c.GetInState(Follower)
}
if len(followers) != 2 {
t.Fatalf("expected two followers: %v", followers)
}

// Remove a follower
followerRemoved := followers[0]
future := leader.RemoveServer(followerRemoved.localID, 0, 0)
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}

//set that follower term to higher term to simulate a partitioning
followerRemoved.setCurrentTerm(leader.getCurrentTerm() + 100)

//Add the node back as NonVoter
future = leader.AddNonvoter(followerRemoved.localID, followerRemoved.localAddr, 0, 0)
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
// Wait a while
time.Sleep(c.propagateTimeout * 100)

//Write some logs to ensure they replicate
for i := 0; i < 100; i++ {
future := leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0)
if err := future.Error(); err != nil {
t.Fatalf("[ERR] apply err: %v", err)
}
}
c.WaitForReplication(100)

// Check leader stable
newLeader := c.Leader()
if newLeader.leaderID != leader.leaderID {
t.Fatalf("leader changed")
}

//Remove the server and add it back as Voter
future = leader.RemoveServer(followerRemoved.localID, 0, 0)
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
leader.AddVoter(followerRemoved.localID, followerRemoved.localAddr, 0, 0)

// Wait a while
time.Sleep(c.propagateTimeout * 10)

//Write some logs to ensure they replicate
for i := 100; i < 200; i++ {
future := leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0)
if err := future.Error(); err != nil {
t.Fatalf("[ERR] apply err: %v", err)
}
}
c.WaitForReplication(200)

// Check leader stable
newLeader = c.Leader()
if newLeader.leaderID != leader.leaderID {
t.Fatalf("leader changed")
}
}

// TestRaft_FollowerRemovalNoElection ensures that a leader election is not
// started when a standby is shut down and restarted.
func TestRaft_FollowerRemovalNoElection(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ START:
appendStats(string(peer.ID), start, float32(len(req.Entries)))

// Check for a newer term, stop running
if resp.Term > req.Term {
if resp.Term > req.Term && s.peer.Suffrage != Nonvoter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like the real fix to prevent disruption to the leader and I think is safe and correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes actually that was the first change I made but this won't help getting the non voter effectively join the cluster as it will continue rejecting the replication forever.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this instead of Suffrage == Voter?

r.handleStaleTerm(s)
return true
}
Expand Down