From 337234edaefe13743210b025c0176a86fbd1aeb2 Mon Sep 17 00:00:00 2001 From: HridoyRoy Date: Wed, 9 Mar 2022 09:33:12 -0800 Subject: [PATCH 1/9] cap maximum grpc wait time when heartbeating to heartbeatTimeout/2 --- replication.go | 4 +++- util.go | 14 ++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/replication.go b/replication.go index b3301b5c542..bfb14cfcae0 100644 --- a/replication.go +++ b/replication.go @@ -406,8 +406,10 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { r.observe(FailedHeartbeatObservation{PeerID: peer.ID, LastContact: s.LastContact()}) failures++ select { - case <-time.After(backoff(failureWait, failures, maxFailureScale)): + case <-time.After(cappedExponentialBackoff(failureWait, failures, maxFailureScale, + r.config().HeartbeatTimeout/2)): case <-stopCh: + return } } else { if failures > 0 { diff --git a/util.go b/util.go index 54e47ca04ba..17caab6999a 100644 --- a/util.go +++ b/util.go @@ -144,6 +144,20 @@ func backoff(base time.Duration, round, limit uint64) time.Duration { return base } +// cappedExponentialBackoff computes the exponential backoff with an adjustable +// cap on the max timeout. +func cappedExponentialBackoff(base time.Duration, round, limit uint64, cap time.Duration) time.Duration { + power := min(round, limit) + for power > 2 { + if base > cap { + return base + } + base *= 2 + power-- + } + return base +} + // Needed for sorting []uint64, used to determine commitment type uint64Slice []uint64 From bab8c54357be05b107211a08c0520d7c6d470a3a Mon Sep 17 00:00:00 2001 From: HridoyRoy Date: Wed, 9 Mar 2022 10:09:09 -0800 Subject: [PATCH 2/9] change timeout cap to heartbeatTimeout --- replication.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/replication.go b/replication.go index bfb14cfcae0..d9284fb9b4d 100644 --- a/replication.go +++ b/replication.go @@ -407,7 +407,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { failures++ select { case <-time.After(cappedExponentialBackoff(failureWait, failures, maxFailureScale, - r.config().HeartbeatTimeout/2)): + r.config().HeartbeatTimeout)): case <-stopCh: return } From 8c1dd507f56985a52ac700cea9b2631ab180783c Mon Sep 17 00:00:00 2001 From: HridoyRoy Date: Thu, 10 Mar 2022 09:58:10 -0800 Subject: [PATCH 3/9] test in progress --- raft_test.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ replication.go | 2 +- 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/raft_test.go b/raft_test.go index 6af480eb14a..317bc540ab7 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2639,6 +2639,51 @@ func TestRaft_VoteNotGranted_WhenNodeNotInCluster(t *testing.T) { } } +// ROY +// Verifies that when a follower is taken out of rotation for heartbeat timeout +// a leader election does not occur. +func TestRaft_FollowerRemovalNoElection(t *testing.T) { + // Make a cluster + c := MakeCluster(3, t, nil) + + defer c.Close() + waitForLeader(c) + + // Wait until we have 2 followers + limit := time.Now().Add(c.longstopTimeout) + var followers []*Raft + for time.Now().Before(limit) && len(followers) != 2 { + c.WaitEvent(nil, c.conf.CommitTimeout) + followers = c.GetInState(Follower) + } + if len(followers) != 2 { + t.Fatalf("expected two followers: %v", followers) + } + // leaderTerm := c.Leader().getCurrentTerm() + // Disconnect one of the followers and wait for the heartbeat timeout + t.Logf("[INFO] Disconnecting %v", followers[0]) + c.Disconnect(followers[0].localAddr) + time.Sleep(2 * time.Second) + nodes := c.GetInState(Candidate) + fmt.Printf("leader is %+v\n", c.Leader()) + fmt.Printf("followers are %+v\n", followers) + fmt.Printf("candidates are %+v\n", nodes) + t.Fatalf("oops") + + // var newLead *Raft + // for time.Now().Before(limit) && newLead == nil { + // c.WaitEvent(nil, c.conf.CommitTimeout) + // leaders := c.GetInState(Leader) + // if len(leaders) == 1 { + // newLead = leaders[0] + // } + // } + // Ensure the term is greater + // if newLead.getCurrentTerm() <= leaderTerm { + // t.Fatalf("expected newer term! %d %d", newLead.getCurrentTerm(), leaderTerm) + // } +} + func TestRaft_VoteWithNoIDNoAddr(t *testing.T) { // Make a cluster c := MakeCluster(3, t, nil) diff --git a/replication.go b/replication.go index d9284fb9b4d..d46fd2ca459 100644 --- a/replication.go +++ b/replication.go @@ -407,7 +407,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { failures++ select { case <-time.After(cappedExponentialBackoff(failureWait, failures, maxFailureScale, - r.config().HeartbeatTimeout)): + r.config().HeartbeatTimeout) / 2): case <-stopCh: return } From 790258c299763ac9e97f0c0af2d08c3b827a78ab Mon Sep 17 00:00:00 2001 From: HridoyRoy Date: Mon, 14 Mar 2022 13:31:48 -0700 Subject: [PATCH 4/9] added test stub used to check logs and see if a re-election occurs --- raft_test.go | 53 +++++++++++++++++++++++++++----------------------- replication.go | 4 ++-- util.go | 2 +- 3 files changed, 32 insertions(+), 27 deletions(-) diff --git a/raft_test.go b/raft_test.go index 317bc540ab7..f79db680d07 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2639,9 +2639,7 @@ func TestRaft_VoteNotGranted_WhenNodeNotInCluster(t *testing.T) { } } -// ROY -// Verifies that when a follower is taken out of rotation for heartbeat timeout -// a leader election does not occur. +// This test is currently a stub func TestRaft_FollowerRemovalNoElection(t *testing.T) { // Make a cluster c := MakeCluster(3, t, nil) @@ -2659,29 +2657,36 @@ func TestRaft_FollowerRemovalNoElection(t *testing.T) { if len(followers) != 2 { t.Fatalf("expected two followers: %v", followers) } - // leaderTerm := c.Leader().getCurrentTerm() + // Disconnect one of the followers and wait for the heartbeat timeout - t.Logf("[INFO] Disconnecting %v", followers[0]) - c.Disconnect(followers[0].localAddr) + i := 0 + follower := c.rafts[i] + if follower == c.Leader() { + i = 1 + follower = c.rafts[i] + } + logs := follower.logs + t.Logf("[INFO] restarting %v", follower) + // Shutdown follower + if f := follower.Shutdown(); f.Error() != nil { + t.Fatalf("error shuting down follower: %v", f.Error()) + } + time.Sleep(3 * time.Second) + + _, trans := NewInmemTransport(follower.localAddr) + conf := follower.config() + n, err := NewRaft(&conf, &MockFSM{}, logs, follower.stable, follower.snapshots, trans) + if err != nil { + t.Fatalf("error restarting follower: %v", err) + } + c.rafts[i] = n + c.trans[i] = n.trans.(*InmemTransport) + c.fsms[i] = n.fsm.(*MockFSM) + c.FullyConnect() + // There should be no re-election during this sleep time.Sleep(2 * time.Second) - nodes := c.GetInState(Candidate) - fmt.Printf("leader is %+v\n", c.Leader()) - fmt.Printf("followers are %+v\n", followers) - fmt.Printf("candidates are %+v\n", nodes) - t.Fatalf("oops") - - // var newLead *Raft - // for time.Now().Before(limit) && newLead == nil { - // c.WaitEvent(nil, c.conf.CommitTimeout) - // leaders := c.GetInState(Leader) - // if len(leaders) == 1 { - // newLead = leaders[0] - // } - // } - // Ensure the term is greater - // if newLead.getCurrentTerm() <= leaderTerm { - // t.Fatalf("expected newer term! %d %d", newLead.getCurrentTerm(), leaderTerm) - // } + n.Shutdown() + t.Fatalf("exit") } func TestRaft_VoteWithNoIDNoAddr(t *testing.T) { diff --git a/replication.go b/replication.go index d46fd2ca459..0df90c834d2 100644 --- a/replication.go +++ b/replication.go @@ -406,8 +406,8 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { r.observe(FailedHeartbeatObservation{PeerID: peer.ID, LastContact: s.LastContact()}) failures++ select { - case <-time.After(cappedExponentialBackoff(failureWait, failures, maxFailureScale, - r.config().HeartbeatTimeout) / 2): + // case <-time.After(backoff(failureWait, s.failures, maxFailureScale)): + case <-time.After(cappedExponentialBackoff(failureWait, s.failures, maxFailureScale, DefaultConfig().HeartbeatTimeout)): case <-stopCh: return } diff --git a/util.go b/util.go index 17caab6999a..a1b84395e00 100644 --- a/util.go +++ b/util.go @@ -150,7 +150,7 @@ func cappedExponentialBackoff(base time.Duration, round, limit uint64, cap time. power := min(round, limit) for power > 2 { if base > cap { - return base + return cap } base *= 2 power-- From 085d3730e5a3591077159054ee727113f0d70089 Mon Sep 17 00:00:00 2001 From: HridoyRoy Date: Wed, 16 Mar 2022 11:35:21 -0700 Subject: [PATCH 5/9] modify exponential backoff to be capped at heartbeat timeout / 2 and add test --- raft_test.go | 16 +++++++++++++--- replication.go | 6 ++++-- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/raft_test.go b/raft_test.go index f79db680d07..98cb2ce0f62 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2639,14 +2639,20 @@ func TestRaft_VoteNotGranted_WhenNodeNotInCluster(t *testing.T) { } } -// This test is currently a stub +// TestRaft_FollowerRemovalNoElection ensures that a leader election is not +// started when a standby is shut down and restarted. func TestRaft_FollowerRemovalNoElection(t *testing.T) { // Make a cluster - c := MakeCluster(3, t, nil) + inmemConf := inmemConfig(t) + inmemConf.HeartbeatTimeout = 500 * time.Millisecond + inmemConf.ElectionTimeout = 500 * time.Millisecond + c := MakeCluster(3, t, inmemConf) defer c.Close() waitForLeader(c) + leader := c.Leader() + // Wait until we have 2 followers limit := time.Now().Add(c.longstopTimeout) var followers []*Raft @@ -2685,8 +2691,12 @@ func TestRaft_FollowerRemovalNoElection(t *testing.T) { c.FullyConnect() // There should be no re-election during this sleep time.Sleep(2 * time.Second) + + // Let things settle and make sure we recovered. + c.EnsureLeader(t, leader.localAddr) + c.EnsureSame(t) + c.EnsureSamePeers(t) n.Shutdown() - t.Fatalf("exit") } func TestRaft_VoteWithNoIDNoAddr(t *testing.T) { diff --git a/replication.go b/replication.go index 0df90c834d2..ddef6ebd3f6 100644 --- a/replication.go +++ b/replication.go @@ -402,12 +402,14 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { start := time.Now() if err := r.trans.AppendEntries(peer.ID, peer.Address, &req, &resp); err != nil { - r.logger.Error("failed to heartbeat to", "peer", peer.Address, "error", err) + nextBackoffTime := cappedExponentialBackoff(failureWait, s.failures, maxFailureScale, r.config().HeartbeatTimeout/2) + r.logger.Error("failed to heartbeat to", "peer", peer.Address, "backoff time", + nextBackoffTime, "error", err) r.observe(FailedHeartbeatObservation{PeerID: peer.ID, LastContact: s.LastContact()}) failures++ select { // case <-time.After(backoff(failureWait, s.failures, maxFailureScale)): - case <-time.After(cappedExponentialBackoff(failureWait, s.failures, maxFailureScale, DefaultConfig().HeartbeatTimeout)): + case <-time.After(nextBackoffTime): case <-stopCh: return } From 2411b3198d4b2e42be110752c72089a02471d4c0 Mon Sep 17 00:00:00 2001 From: HridoyRoy Date: Wed, 16 Mar 2022 11:42:39 -0700 Subject: [PATCH 6/9] remove comment --- replication.go | 1 - 1 file changed, 1 deletion(-) diff --git a/replication.go b/replication.go index ddef6ebd3f6..8b85d55a15f 100644 --- a/replication.go +++ b/replication.go @@ -408,7 +408,6 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { r.observe(FailedHeartbeatObservation{PeerID: peer.ID, LastContact: s.LastContact()}) failures++ select { - // case <-time.After(backoff(failureWait, s.failures, maxFailureScale)): case <-time.After(nextBackoffTime): case <-stopCh: return From 47fe146789e24679e99aaa9000ccef3ed4e67271 Mon Sep 17 00:00:00 2001 From: Hridoy Roy Date: Thu, 17 Mar 2022 11:27:31 -0700 Subject: [PATCH 7/9] Update util.go Co-authored-by: Brian Kassouf --- util.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/util.go b/util.go index a1b84395e00..4859ee5d976 100644 --- a/util.go +++ b/util.go @@ -155,6 +155,9 @@ func cappedExponentialBackoff(base time.Duration, round, limit uint64, cap time. base *= 2 power-- } + if base > cap { + return cap + } return base } From 585ec5b44d080060f44a3b62595df3b863f01b33 Mon Sep 17 00:00:00 2001 From: HridoyRoy Date: Thu, 17 Mar 2022 11:35:50 -0700 Subject: [PATCH 8/9] change s.failures to failures for heartbeat backoff --- replication.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/replication.go b/replication.go index 8b85d55a15f..efe46e4e4f1 100644 --- a/replication.go +++ b/replication.go @@ -402,7 +402,7 @@ func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) { start := time.Now() if err := r.trans.AppendEntries(peer.ID, peer.Address, &req, &resp); err != nil { - nextBackoffTime := cappedExponentialBackoff(failureWait, s.failures, maxFailureScale, r.config().HeartbeatTimeout/2) + nextBackoffTime := cappedExponentialBackoff(failureWait, failures, maxFailureScale, r.config().HeartbeatTimeout/2) r.logger.Error("failed to heartbeat to", "peer", peer.Address, "backoff time", nextBackoffTime, "error", err) r.observe(FailedHeartbeatObservation{PeerID: peer.ID, LastContact: s.LastContact()}) From 81a7d27020de95a8408b95804666de7b56633841 Mon Sep 17 00:00:00 2001 From: HridoyRoy Date: Mon, 9 May 2022 11:48:49 -0700 Subject: [PATCH 9/9] make timeouts 100 milliseconds each and remove the time.Sleeps --- raft_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/raft_test.go b/raft_test.go index 98cb2ce0f62..4dd90e2adc5 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2644,8 +2644,8 @@ func TestRaft_VoteNotGranted_WhenNodeNotInCluster(t *testing.T) { func TestRaft_FollowerRemovalNoElection(t *testing.T) { // Make a cluster inmemConf := inmemConfig(t) - inmemConf.HeartbeatTimeout = 500 * time.Millisecond - inmemConf.ElectionTimeout = 500 * time.Millisecond + inmemConf.HeartbeatTimeout = 100 * time.Millisecond + inmemConf.ElectionTimeout = 100 * time.Millisecond c := MakeCluster(3, t, inmemConf) defer c.Close() @@ -2677,7 +2677,6 @@ func TestRaft_FollowerRemovalNoElection(t *testing.T) { if f := follower.Shutdown(); f.Error() != nil { t.Fatalf("error shuting down follower: %v", f.Error()) } - time.Sleep(3 * time.Second) _, trans := NewInmemTransport(follower.localAddr) conf := follower.config() @@ -2690,7 +2689,7 @@ func TestRaft_FollowerRemovalNoElection(t *testing.T) { c.fsms[i] = n.fsm.(*MockFSM) c.FullyConnect() // There should be no re-election during this sleep - time.Sleep(2 * time.Second) + time.Sleep(250 * time.Millisecond) // Let things settle and make sure we recovered. c.EnsureLeader(t, leader.localAddr)