From 3ad363d070cece1284a1762f7515841de5305d7d Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 7 Sep 2022 10:55:25 +0200 Subject: [PATCH 01/36] raft: always mark leader as RecentActive RecentActive is now initialized to true in `becomeLeader`. Both configuration changes and CheckQuorum make sure not to break this, so we now now that the leader is always RecentActive. Signed-off-by: Tobias Grieger --- raft/raft.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 6375abd2edc..4ce5af5f18d 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -735,7 +735,11 @@ func (r *raft) becomeLeader() { // (perhaps after having received a snapshot as a result). The leader is // trivially in this state. Note that r.reset() has initialized this // progress with the last index already. - r.prs.Progress[r.id].BecomeReplicate() + pr := r.prs.Progress[r.id] + pr.BecomeReplicate() + // The leader always has RecentActive == true; MsgCheckQuorum makes sure to + // preserve this. + pr.RecentActive = true // Conservatively set the pendingConfIndex to the last index in the // log. There may or may not be a pending config change, but it's @@ -995,15 +999,6 @@ func stepLeader(r *raft, m pb.Message) error { r.bcastHeartbeat() return nil case pb.MsgCheckQuorum: - // The leader should always see itself as active. As a precaution, handle - // the case in which the leader isn't in the configuration any more (for - // example if it just removed itself). - // - // TODO(tbg): I added a TODO in removeNode, it doesn't seem that the - // leader steps down when removing itself. I might be missing something. - if pr := r.prs.Progress[r.id]; pr != nil { - pr.RecentActive = true - } if !r.prs.QuorumActive() { r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id) r.becomeFollower(r.Term, None) From 5e3314da42377b50d390f87c9c908130238d0fae Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 7 Sep 2022 10:06:30 +0200 Subject: [PATCH 02/36] raft: add BenchmarkRawNode This is a speed-of-light benchmark that uses an in-memory single-voter RawNode to sequentially propose and process entries. As a bonus, it also measures the number of calls to the underlying Storage. Calls to the Storage are cheap since the benchmark is in- memory, but in a real-world implementation, especially one that doesn't cache results, additional calls to the Storage interface can translate to a heavy hit as they might involve additional I/O. Signed-off-by: Tobias Grieger --- raft/rawnode_test.go | 87 ++++++++++++++++++++++++++++++++++++++++++++ raft/storage.go | 12 ++++++ 2 files changed, 99 insertions(+) diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 535152e14c2..413f232df4c 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -1105,3 +1105,90 @@ func TestRawNodeConsumeReady(t *testing.T) { t.Fatalf("expected only m2 in raft.msgs, got %+v", rn.raft.msgs) } } + +func BenchmarkRawNode(b *testing.B) { + b.Run("single-voter", func(b *testing.B) { + benchmarkRawNodeImpl(b, 1) + }) + b.Run("two-voters", func(b *testing.B) { + benchmarkRawNodeImpl(b, 1, 2) + }) +} + +func benchmarkRawNodeImpl(b *testing.B, peers ...uint64) { + + const debug = false + + s := newTestMemoryStorage(withPeers(peers...)) + cfg := newTestConfig(1, 10, 1, s) + if !debug { + cfg.Logger = discardLogger // avoid distorting benchmark output + } + rn, err := NewRawNode(cfg) + if err != nil { + b.Fatal(err) + } + + run := make(chan struct{}, 1) + defer close(run) + + var numReady uint64 + stabilize := func() (applied uint64) { + for rn.HasReady() { + numReady++ + rd := rn.Ready() + if debug { + b.Log(DescribeReady(rd, nil)) + } + if n := len(rd.CommittedEntries); n > 0 { + applied = rd.CommittedEntries[n-1].Index + } + s.Append(rd.Entries) + for _, m := range rd.Messages { + if m.Type == pb.MsgVote { + resp := pb.Message{To: m.From, From: m.To, Term: m.Term, Type: pb.MsgVoteResp} + if debug { + b.Log(DescribeMessage(resp, nil)) + } + rn.Step(resp) + } + if m.Type == pb.MsgApp { + idx := m.Index + if n := len(m.Entries); n > 0 { + idx = m.Entries[n-1].Index + } + resp := pb.Message{To: m.From, From: m.To, Type: pb.MsgAppResp, Term: m.Term, Index: idx} + if debug { + b.Log(DescribeMessage(resp, nil)) + } + rn.Step(resp) + } + } + rn.Advance(rd) + } + return applied + } + + rn.Campaign() + stabilize() + + if debug { + b.N = 1 + } + + var applied uint64 + for i := 0; i < b.N; i++ { + if err := rn.Propose([]byte("foo")); err != nil { + b.Fatal(err) + } + applied = stabilize() + } + if applied < uint64(b.N) { + b.Fatalf("did not apply everything: %d < %d", applied, b.N) + } + b.ReportMetric(float64(s.callStats.firstIndex)/float64(b.N), "firstIndex/op") + b.ReportMetric(float64(s.callStats.lastIndex)/float64(b.N), "lastIndex/op") + b.ReportMetric(float64(s.callStats.term)/float64(b.N), "term/op") + b.ReportMetric(float64(numReady)/float64(b.N), "ready/op") + b.Logf("storage access stats: %+v", s.callStats) +} diff --git a/raft/storage.go b/raft/storage.go index 8b16d4fa24f..67ec16b13aa 100644 --- a/raft/storage.go +++ b/raft/storage.go @@ -71,6 +71,10 @@ type Storage interface { Snapshot() (pb.Snapshot, error) } +type inMemStorageCallStats struct { + initialState, firstIndex, lastIndex, entries, term, snapshot int +} + // MemoryStorage implements the Storage interface backed by an // in-memory array. type MemoryStorage struct { @@ -83,6 +87,8 @@ type MemoryStorage struct { snapshot pb.Snapshot // ents[i] has raft log position i+snapshot.Metadata.Index ents []pb.Entry + + callStats inMemStorageCallStats } // NewMemoryStorage creates an empty MemoryStorage. @@ -95,6 +101,7 @@ func NewMemoryStorage() *MemoryStorage { // InitialState implements the Storage interface. func (ms *MemoryStorage) InitialState() (pb.HardState, pb.ConfState, error) { + ms.callStats.initialState++ return ms.hardState, ms.snapshot.Metadata.ConfState, nil } @@ -110,6 +117,7 @@ func (ms *MemoryStorage) SetHardState(st pb.HardState) error { func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) { ms.Lock() defer ms.Unlock() + ms.callStats.entries++ offset := ms.ents[0].Index if lo <= offset { return nil, ErrCompacted @@ -130,6 +138,7 @@ func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) { func (ms *MemoryStorage) Term(i uint64) (uint64, error) { ms.Lock() defer ms.Unlock() + ms.callStats.term++ offset := ms.ents[0].Index if i < offset { return 0, ErrCompacted @@ -144,6 +153,7 @@ func (ms *MemoryStorage) Term(i uint64) (uint64, error) { func (ms *MemoryStorage) LastIndex() (uint64, error) { ms.Lock() defer ms.Unlock() + ms.callStats.lastIndex++ return ms.lastIndex(), nil } @@ -155,6 +165,7 @@ func (ms *MemoryStorage) lastIndex() uint64 { func (ms *MemoryStorage) FirstIndex() (uint64, error) { ms.Lock() defer ms.Unlock() + ms.callStats.firstIndex++ return ms.firstIndex(), nil } @@ -166,6 +177,7 @@ func (ms *MemoryStorage) firstIndex() uint64 { func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error) { ms.Lock() defer ms.Unlock() + ms.callStats.snapshot++ return ms.snapshot, nil } From 21be9fa337964e472489c54e85da40aabf17e548 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 1 Sep 2022 17:05:41 +0200 Subject: [PATCH 03/36] raft: add single_node InteractionEnv test case Show-cases the current behavior and changes made in future commits for [^1]. The test demonstrates that a single-voter raft instance will emit an entry as committed while it still needs to be appended to the log. [^1]: https://github.com/etcd-io/etcd/issues/14370 Signed-off-by: Tobias Grieger --- raft/testdata/single_node.txt | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 raft/testdata/single_node.txt diff --git a/raft/testdata/single_node.txt b/raft/testdata/single_node.txt new file mode 100644 index 00000000000..112b32c0404 --- /dev/null +++ b/raft/testdata/single_node.txt @@ -0,0 +1,27 @@ +log-level info +---- +ok + +add-nodes 1 voters=(1) index=3 +---- +INFO 1 switched to configuration voters=(1) +INFO 1 became follower at term 0 +INFO newRaft 1 [peers: [1], term: 0, commit: 3, applied: 3, lastindex: 3, lastterm: 1] + +campaign 1 +---- +INFO 1 is starting a new election at term 0 +INFO 1 became candidate at term 1 +INFO 1 received MsgVoteResp from 1 at term 1 +INFO 1 became leader at term 1 + +stabilize +---- +> 1 handling Ready + Ready MustSync=true: + Lead:1 State:StateLeader + HardState Term:1 Vote:1 Commit:4 + Entries: + 1/4 EntryNormal "" + CommittedEntries: + 1/4 EntryNormal "" From 169f4c3cc786524f03ba90e1dced34651df915c4 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Thu, 1 Sep 2022 17:12:07 +0200 Subject: [PATCH 04/36] raft: don't emit unstable CommittedEntries See https://github.com/etcd-io/etcd/issues/14370. When run in a single-voter configuration, prior to this PR raft would emit `HardState`s that would emit a proposed `Entry` simultaneously in `CommittedEntries` and `Entries`. To be correct, this requires users of the raft library to enforce an ordering between appending to the log and notifying the client about `CommittedEntries` also present in `Entries`. This was easy to miss. Walk back this behavior to arrive at a simpler contract: what's emitted in `CommittedEntries` is truly committed, i.e. present in stable storage on a quorum of voters. This in turn pessimizes the single-voter case: rather than fully handling an `Entry` in just one `Ready`, now two are required, and in particular one has to do extra work to save on allocations. We accept this as a good tradeoff, since raft primarily serves multi-voter configurations. Signed-off-by: Tobias Grieger --- raft/raft.go | 28 ++++++++++++++++++++++++---- raft/testdata/single_node.txt | 5 ++++- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 4ce5af5f18d..cd4a604e91b 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -572,6 +572,19 @@ func (r *raft) advance(rd Ready) { if len(rd.Entries) > 0 { e := rd.Entries[len(rd.Entries)-1] + if r.id == r.lead { + // The leader needs to self-ack the entries just appended (since it doesn't + // send an MsgApp to itself). This is roughly equivalent to: + // + // r.prs.Progress[r.id].MaybeUpdate(e.Index) + // if r.maybeCommit() { + // r.bcastAppend() + // } + _ = r.Step(pb.Message{From: r.id, Type: pb.MsgAppResp, Index: e.Index}) + } + // NB: it's important for performance that this call happens after + // r.Step above on the leader. This is because r.Step can then use + // a fast-path for `r.raftLog.term()`. r.raftLog.stableTo(e.Index, e.Term) } if !IsEmptySnap(rd.Snapshot) { @@ -635,9 +648,6 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { } // use latest "last" index after truncate/append li = r.raftLog.append(es...) - r.prs.Progress[r.id].MaybeUpdate(li) - // Regardless of maybeCommit's return, our caller will call bcastAppend. - r.maybeCommit() return true } @@ -1099,6 +1109,9 @@ func stepLeader(r *raft, m pb.Message) error { } switch m.Type { case pb.MsgAppResp: + // NB: this code path is also hit from (*raft).advance, where the leader steps + // an MsgAppResp to acknowledge the appended entries in the last Ready. + pr.RecentActive = true if m.Reject { @@ -1267,7 +1280,9 @@ func stepLeader(r *raft, m pb.Message) error { // replicate, or when freeTo() covers multiple messages). If // we have more entries to send, send as many messages as we // can (without sending empty messages for the commit index) - for r.maybeSendAppend(m.From, false) { + if r.id != m.From { + for r.maybeSendAppend(m.From, false) { + } } // Transfer leadership is in progress. if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() { @@ -1806,6 +1821,11 @@ func numOfPendingConf(ents []pb.Entry) int { } func releasePendingReadIndexMessages(r *raft) { + if len(r.pendingReadIndexMessages) == 0 { + // Fast path for the common case to avoid a call to storage.LastIndex() + // via committedEntryInCurrentTerm. + return + } if !r.committedEntryInCurrentTerm() { r.logger.Error("pending MsgReadIndex should be released only after first commit in current term") return diff --git a/raft/testdata/single_node.txt b/raft/testdata/single_node.txt index 112b32c0404..3b6e4f4c1dd 100644 --- a/raft/testdata/single_node.txt +++ b/raft/testdata/single_node.txt @@ -20,8 +20,11 @@ stabilize > 1 handling Ready Ready MustSync=true: Lead:1 State:StateLeader - HardState Term:1 Vote:1 Commit:4 + HardState Term:1 Vote:1 Commit:3 Entries: 1/4 EntryNormal "" +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:4 CommittedEntries: 1/4 EntryNormal "" From dad8208a4ddd46bcbab937d0c2cac5fedf5a3113 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 5 Sep 2022 17:29:07 +0200 Subject: [PATCH 05/36] raft: avoid panics during *node tests `StartNode` runs a naked goroutine, so it's impossible to test against it in a way that will reliably produce contained test failures when assertions are hit on the `(*node).run` goroutine. This commit introduces a harness that we can use in tests to wrap this goroutine and allow it to defer errors to `*testing.T`. Note that tests of `Node` still need to be architected carefully since it's easy to produce a deadlock in them should things not go exactly as planned. Signed-off-by: Tobias Grieger --- raft/node.go | 16 +++--- raft/node_test.go | 7 +-- raft/node_util_test.go | 109 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 121 insertions(+), 11 deletions(-) create mode 100644 raft/node_util_test.go diff --git a/raft/node.go b/raft/node.go index d374b6c0c21..381000621ac 100644 --- a/raft/node.go +++ b/raft/node.go @@ -211,11 +211,7 @@ type Peer struct { Context []byte } -// StartNode returns a new Node given configuration and a list of raft peers. -// It appends a ConfChangeAddNode entry for each given peer to the initial log. -// -// Peers must not be zero length; call RestartNode in that case. -func StartNode(c *Config, peers []Peer) Node { +func setupNode(c *Config, peers []Peer) *node { if len(peers) == 0 { panic("no peers given; use RestartNode instead") } @@ -229,9 +225,17 @@ func StartNode(c *Config, peers []Peer) Node { } n := newNode(rn) + return &n +} +// StartNode returns a new Node given configuration and a list of raft peers. +// It appends a ConfChangeAddNode entry for each given peer to the initial log. +// +// Peers must not be zero length; call RestartNode in that case. +func StartNode(c *Config, peers []Peer) Node { + n := setupNode(c, peers) go n.run() - return &n + return n } // RestartNode is similar to StartNode but does not take a list of peers. diff --git a/raft/node_test.go b/raft/node_test.go index 2c2ff801809..b732174f1d3 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -740,9 +740,6 @@ func TestNodeRestartFromSnapshot(t *testing.T) { } func TestNodeAdvance(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - storage := NewMemoryStorage() c := &Config{ ID: 1, @@ -752,8 +749,8 @@ func TestNodeAdvance(t *testing.T) { MaxSizePerMsg: noLimit, MaxInflightMsgs: 256, } - n := StartNode(c, []Peer{{ID: 1}}) - defer n.Stop() + ctx, cancel, n := newNodeTestHarness(t, context.Background(), c, Peer{ID: 1}) + defer cancel() rd := <-n.Ready() storage.Append(rd.Entries) n.Advance() diff --git a/raft/node_util_test.go b/raft/node_util_test.go new file mode 100644 index 00000000000..fb3473d128c --- /dev/null +++ b/raft/node_util_test.go @@ -0,0 +1,109 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raft + +import ( + "context" + "fmt" + "testing" + "time" +) + +type nodeTestHarness struct { + *node + t *testing.T +} + +func (l *nodeTestHarness) Debug(v ...interface{}) { + l.t.Log(v...) +} + +func (l *nodeTestHarness) Debugf(format string, v ...interface{}) { + l.t.Logf(format, v...) +} + +func (l *nodeTestHarness) Error(v ...interface{}) { + l.t.Error(v...) +} + +func (l *nodeTestHarness) Errorf(format string, v ...interface{}) { + l.t.Errorf(format, v...) +} + +func (l *nodeTestHarness) Info(v ...interface{}) { + l.t.Log(v...) +} + +func (l *nodeTestHarness) Infof(format string, v ...interface{}) { + l.t.Logf(format, v...) +} + +func (l *nodeTestHarness) Warning(v ...interface{}) { + l.t.Log(v...) +} + +func (l *nodeTestHarness) Warningf(format string, v ...interface{}) { + l.t.Logf(format, v...) +} + +func (l *nodeTestHarness) Fatal(v ...interface{}) { + l.t.Error(v...) + panic(v) +} + +func (l *nodeTestHarness) Fatalf(format string, v ...interface{}) { + l.t.Errorf(format, v...) + panic(fmt.Sprintf(format, v...)) +} + +func (l *nodeTestHarness) Panic(v ...interface{}) { + l.t.Log(v...) + panic(v) +} + +func (l *nodeTestHarness) Panicf(format string, v ...interface{}) { + l.t.Errorf(format, v...) + panic(fmt.Sprintf(format, v...)) +} + +func newNodeTestHarness(t *testing.T, ctx context.Context, cfg *Config, peers ...Peer) (_ context.Context, cancel func(), _ *nodeTestHarness) { + // Wrap context in a 10s timeout to make tests more robust. Otherwise, + // it's likely that deadlock will occur unless Node behaves exactly as + // expected - when you expect a Ready and start waiting on the channel + // but no Ready ever shows up, for example. + ctx, cancel = context.WithTimeout(ctx, 10*time.Second) + var n *node + if len(peers) > 0 { + n = setupNode(cfg, peers) + } else { + rn, err := NewRawNode(cfg) + if err != nil { + t.Fatal(err) + } + nn := newNode(rn) + n = &nn + } + go func() { + defer func() { + if r := recover(); r != nil { + t.Error(r) + } + }() + defer cancel() + defer n.Stop() + n.run() + }() + return ctx, cancel, &nodeTestHarness{node: n, t: t} +} From 36860f863f4134e714351322f877a2100c72ee80 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 5 Sep 2022 17:43:19 +0200 Subject: [PATCH 06/36] TestLeaderAcknowledgeCommit Signed-off-by: Tobias Grieger --- raft/raft_paper_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 9577823d426..afa1f96c43b 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -473,9 +473,9 @@ func TestLeaderCommitEntry(t *testing.T) { // Reference: section 5.3 func TestLeaderAcknowledgeCommit(t *testing.T) { tests := []struct { - size int - acceptors map[uint64]bool - wack bool + size int + nonLeaderAcceptors map[uint64]bool + wack bool }{ {1, nil, true}, {3, nil, false}, @@ -496,8 +496,9 @@ func TestLeaderAcknowledgeCommit(t *testing.T) { li := r.raftLog.lastIndex() r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) + tt.nonLeaderAcceptors[1] = true // leader always has the entry for _, m := range r.readMessages() { - if tt.acceptors[m.To] { + if tt.nonLeaderAcceptors[m.To] { r.Step(acceptAndReply(m)) } } From 1f39a8fe79541c24d21f727af9e0c9f67bf392f9 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 5 Sep 2022 18:02:19 +0200 Subject: [PATCH 07/36] raft: teach readyWithTimeout to log received `Ready`s Signed-off-by: Tobias Grieger --- raft/node_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/raft/node_test.go b/raft/node_test.go index b732174f1d3..2103d24dfc8 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -35,6 +35,12 @@ import ( func readyWithTimeout(n Node) Ready { select { case rd := <-n.Ready(): + if nn, ok := n.(*nodeTestHarness); ok { + n = nn.node + } + if nn, ok := n.(*node); ok { + nn.rn.raft.logger.Infof("emitted ready: %s", DescribeReady(rd, nil)) + } return rd case <-time.After(time.Second): panic("timed out waiting for ready") From 931fec3b6d3afb88b76983aef3120b00c0af523a Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 5 Sep 2022 18:02:49 +0200 Subject: [PATCH 08/36] TestCommitPagination Signed-off-by: Tobias Grieger --- raft/node_test.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/raft/node_test.go b/raft/node_test.go index 2103d24dfc8..dcb14d02858 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -914,15 +914,14 @@ func TestCommitPagination(t *testing.T) { s := newTestMemoryStorage(withPeers(1)) cfg := newTestConfig(1, 10, 1, s) cfg.MaxCommittedSizePerReady = 2048 - rn, err := NewRawNode(cfg) - if err != nil { - t.Fatal(err) - } - n := newNode(rn) - go n.run() - n.Campaign(context.TODO()) + ctx, cancel, n := newNodeTestHarness(t, context.Background(), cfg) + defer cancel() + n.Campaign(ctx) - rd := readyWithTimeout(&n) + rd := readyWithTimeout(n) + s.Append(rd.Entries) + n.Advance() + rd = readyWithTimeout(n) if len(rd.CommittedEntries) != 1 { t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries)) } @@ -931,25 +930,32 @@ func TestCommitPagination(t *testing.T) { blob := []byte(strings.Repeat("a", 1000)) for i := 0; i < 3; i++ { - if err := n.Propose(context.TODO(), blob); err != nil { + if err := n.Propose(ctx, blob); err != nil { t.Fatal(err) } } + // First the three proposals have to be appended. + rd = readyWithTimeout(n) + if len(rd.Entries) != 3 { + t.Fatal("expected to see three entries") + } + s.Append(rd.Entries) + n.Advance() + // The 3 proposals will commit in two batches. - rd = readyWithTimeout(&n) + rd = readyWithTimeout(n) if len(rd.CommittedEntries) != 2 { t.Fatalf("expected 2 entries in first batch, got %d", len(rd.CommittedEntries)) } s.Append(rd.Entries) n.Advance() - rd = readyWithTimeout(&n) + rd = readyWithTimeout(n) if len(rd.CommittedEntries) != 1 { t.Fatalf("expected 1 entry in second batch, got %d", len(rd.CommittedEntries)) } s.Append(rd.Entries) n.Advance() - n.Stop() } type ignoreSizeHintMemStorage struct { From 87a9b80d7bc4bd45694ab45e63616d632dbd3534 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 09:08:20 +0200 Subject: [PATCH 09/36] TestNodeProposeAddDuplicateNode This needed to apply entries from CommittedEntries, not Entries. Previously the test got away with it because the two slices were equal. Now it was hanging because when it proposed the second conf change the first one hadn't applied yet, and so it got dropped, and the test would hang. Signed-off-by: Tobias Grieger --- raft/node_test.go | 49 +++++++++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/raft/node_test.go b/raft/node_test.go index dcb14d02858..ecce38287fb 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -358,30 +358,34 @@ func TestNodeProposeConfig(t *testing.T) { // not affect the later propose to add new node. func TestNodeProposeAddDuplicateNode(t *testing.T) { s := newTestMemoryStorage(withPeers(1)) - rn := newTestRawNode(1, 10, 1, s) - n := newNode(rn) - go n.run() - n.Campaign(context.TODO()) - rdyEntries := make([]raftpb.Entry, 0) + cfg := newTestConfig(1, 10, 1, s) + ctx, cancel, n := newNodeTestHarness(t, context.Background(), cfg) + defer cancel() + n.Campaign(ctx) + allCommittedEntries := make([]raftpb.Entry, 0) ticker := time.NewTicker(time.Millisecond * 100) defer ticker.Stop() - done := make(chan struct{}) - stop := make(chan struct{}) + goroutineStopped := make(chan struct{}) applyConfChan := make(chan struct{}) + rd := readyWithTimeout(n) + s.Append(rd.Entries) + n.Advance() + go func() { - defer close(done) + defer close(goroutineStopped) for { select { - case <-stop: + case <-ctx.Done(): return case <-ticker.C: n.Tick() case rd := <-n.Ready(): + t.Log(DescribeReady(rd, nil)) s.Append(rd.Entries) applied := false - for _, e := range rd.Entries { - rdyEntries = append(rdyEntries, e) + for _, e := range rd.CommittedEntries { + allCommittedEntries = append(allCommittedEntries, e) switch e.Type { case raftpb.EntryNormal: case raftpb.EntryConfChange: @@ -401,32 +405,31 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) { cc1 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1} ccdata1, _ := cc1.Marshal() - n.ProposeConfChange(context.TODO(), cc1) + n.ProposeConfChange(ctx, cc1) <-applyConfChan // try add the same node again - n.ProposeConfChange(context.TODO(), cc1) + n.ProposeConfChange(ctx, cc1) <-applyConfChan // the new node join should be ok cc2 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2} ccdata2, _ := cc2.Marshal() - n.ProposeConfChange(context.TODO(), cc2) + n.ProposeConfChange(ctx, cc2) <-applyConfChan - close(stop) - <-done + cancel() + <-goroutineStopped - if len(rdyEntries) != 4 { - t.Errorf("len(entry) = %d, want %d, %v\n", len(rdyEntries), 4, rdyEntries) + if len(allCommittedEntries) != 4 { + t.Errorf("len(entry) = %d, want %d, %v\n", len(allCommittedEntries), 4, allCommittedEntries) } - if !bytes.Equal(rdyEntries[1].Data, ccdata1) { - t.Errorf("data = %v, want %v", rdyEntries[1].Data, ccdata1) + if !bytes.Equal(allCommittedEntries[1].Data, ccdata1) { + t.Errorf("data = %v, want %v", allCommittedEntries[1].Data, ccdata1) } - if !bytes.Equal(rdyEntries[3].Data, ccdata2) { - t.Errorf("data = %v, want %v", rdyEntries[3].Data, ccdata2) + if !bytes.Equal(allCommittedEntries[3].Data, ccdata2) { + t.Errorf("data = %v, want %v", allCommittedEntries[3].Data, ccdata2) } - n.Stop() } // TestBlockProposal ensures that node will block proposal when it does not From 1a81b27bedbccfc54351c501350fa1eebe2688d5 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 09:11:15 +0200 Subject: [PATCH 10/36] TestNodePropose{,Config} This test now observes the `MsgAppResp` injected in `(*raft).advance`. Signed-off-by: Tobias Grieger --- raft/node_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/raft/node_test.go b/raft/node_test.go index ecce38287fb..1d13de5ecd4 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -132,6 +132,10 @@ func TestNodeStepUnblock(t *testing.T) { func TestNodePropose(t *testing.T) { var msgs []raftpb.Message appendStep := func(r *raft, m raftpb.Message) error { + t.Log(DescribeMessage(m, nil)) + if m.Type == raftpb.MsgAppResp { + return nil // injected by (*raft).advance + } msgs = append(msgs, m) return nil } @@ -314,6 +318,9 @@ func TestNodeReadIndexToOldLeader(t *testing.T) { func TestNodeProposeConfig(t *testing.T) { var msgs []raftpb.Message appendStep := func(r *raft, m raftpb.Message) error { + if m.Type == raftpb.MsgAppResp { + return nil // injected by (*raft).advance + } msgs = append(msgs, m) return nil } From 873cdf3fa6cebf1e7e18ab6653481205ec50182b Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 09:13:46 +0200 Subject: [PATCH 11/36] TestNodeProposeWaitDropped The test just needs to ignore the MsgAppResp. Signed-off-by: Tobias Grieger --- raft/node_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/raft/node_test.go b/raft/node_test.go index 1d13de5ecd4..224a4641412 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -479,6 +479,10 @@ func TestNodeProposeWaitDropped(t *testing.T) { t.Logf("dropping message: %v", m.String()) return ErrProposalDropped } + if m.Type == raftpb.MsgAppResp { + // This is produced by raft internally, see (*raft).advance. + return nil + } msgs = append(msgs, m) return nil } @@ -511,7 +515,7 @@ func TestNodeProposeWaitDropped(t *testing.T) { n.Stop() if len(msgs) != 0 { - t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1) + t.Fatalf("len(msgs) = %d, want %d", len(msgs), 0) } } From 14a76d755f71789c65aabdeae39f3e76ba441706 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 09:27:33 +0200 Subject: [PATCH 12/36] TestNodeStart This now sees the extra append-then-commit cycle. Signed-off-by: Tobias Grieger --- raft/node_test.go | 56 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/raft/node_test.go b/raft/node_test.go index 224a4641412..abb6a42b3f3 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -600,9 +600,6 @@ func TestReadyContainUpdates(t *testing.T) { // start with correct configuration change entries, and can accept and commit // proposals. func TestNodeStart(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1} ccdata, err := cc.Marshal() if err != nil { @@ -620,11 +617,17 @@ func TestNodeStart(t *testing.T) { MustSync: true, }, { - HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1}, + HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1}, Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, - CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, + CommittedEntries: []raftpb.Entry{{Term: 2, Index: 2, Data: nil}}, MustSync: true, }, + { + HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1}, + Entries: nil, + CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, + MustSync: false, + }, } storage := NewMemoryStorage() c := &Config{ @@ -636,27 +639,44 @@ func TestNodeStart(t *testing.T) { MaxInflightMsgs: 256, } n := StartNode(c, []Peer{{ID: 1}}) - defer n.Stop() - g := <-n.Ready() - if !reflect.DeepEqual(g, wants[0]) { - t.Fatalf("#%d: g = %+v,\n w %+v", 1, g, wants[0]) - } else { - storage.Append(g.Entries) + ctx, cancel, n := newNodeTestHarness(t, context.Background(), c, Peer{ID: 1}) + defer cancel() + + { + rd := <-n.Ready() + if !reflect.DeepEqual(rd, wants[0]) { + t.Fatalf("#1: rd = %+v,\n w %+v", rd, wants[0]) + } + storage.Append(rd.Entries) n.Advance() } if err := n.Campaign(ctx); err != nil { t.Fatal(err) } - rd := <-n.Ready() - storage.Append(rd.Entries) - n.Advance() + + { + rd := <-n.Ready() + storage.Append(rd.Entries) + n.Advance() + } n.Propose(ctx, []byte("foo")) - if g2 := <-n.Ready(); !reflect.DeepEqual(g2, wants[1]) { - t.Errorf("#%d: g = %+v,\n w %+v", 2, g2, wants[1]) - } else { - storage.Append(g2.Entries) + { + rd := <-n.Ready() + if !reflect.DeepEqual(rd, wants[1]) { + t.Errorf("#2: rd = %+v,\n w %+v", rd, wants[1]) + } + storage.Append(rd.Entries) + n.Advance() + } + + { + rd := <-n.Ready() + if !reflect.DeepEqual(rd, wants[2]) { + t.Errorf("#3: rd = %+v,\n w %+v", rd, wants[2]) + } + storage.Append(rd.Entries) n.Advance() } From 0d9a6061c31ffae7563f7db5a9f891ca5924638f Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 09:28:40 +0200 Subject: [PATCH 13/36] TestNodeReadIndex Needs to ignore the injected MsgAppResp. Signed-off-by: Tobias Grieger --- raft/node_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/raft/node_test.go b/raft/node_test.go index abb6a42b3f3..4fcc001b1da 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -178,6 +178,10 @@ func TestNodePropose(t *testing.T) { func TestNodeReadIndex(t *testing.T) { var msgs []raftpb.Message appendStep := func(r *raft, m raftpb.Message) error { + if m.Type == raftpb.MsgAppResp { + // See (*raft).advance. + return nil + } msgs = append(msgs, m) return nil } From b2dba1c86c970d91eba8177570ff686d471fad3a Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 09:35:13 +0200 Subject: [PATCH 14/36] TestNodeAdvance Switched this to baking the conf changes into the initial state to have fewer cycles to walk through in the test. Signed-off-by: Tobias Grieger --- raft/node_test.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/raft/node_test.go b/raft/node_test.go index 4fcc001b1da..93747515478 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -784,7 +784,7 @@ func TestNodeRestartFromSnapshot(t *testing.T) { } func TestNodeAdvance(t *testing.T) { - storage := NewMemoryStorage() + storage := newTestMemoryStorage(withPeers(1)) c := &Config{ ID: 1, ElectionTick: 10, @@ -793,21 +793,17 @@ func TestNodeAdvance(t *testing.T) { MaxSizePerMsg: noLimit, MaxInflightMsgs: 256, } - ctx, cancel, n := newNodeTestHarness(t, context.Background(), c, Peer{ID: 1}) + ctx, cancel, n := newNodeTestHarness(t, context.Background(), c) defer cancel() - rd := <-n.Ready() - storage.Append(rd.Entries) - n.Advance() n.Campaign(ctx) - <-n.Ready() + rd := readyWithTimeout(n) + // Commit empty entry. + storage.Append(rd.Entries) + n.Advance() n.Propose(ctx, []byte("foo")) - select { - case rd = <-n.Ready(): - t.Fatalf("unexpected Ready before Advance: %+v", rd) - case <-time.After(time.Millisecond): - } + rd = readyWithTimeout(n) storage.Append(rd.Entries) n.Advance() select { From f10579d3b5901d70dec60b2502a2c9abfcdc9303 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 10:08:43 +0200 Subject: [PATCH 15/36] TestLeaderAcknowledgeCommit This needed to call `(*raft).advance` so that the leader would self-ack the entries. Signed-off-by: Tobias Grieger --- raft/raft_paper_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index afa1f96c43b..54b38b4388c 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -496,8 +496,10 @@ func TestLeaderAcknowledgeCommit(t *testing.T) { li := r.raftLog.lastIndex() r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) - tt.nonLeaderAcceptors[1] = true // leader always has the entry - for _, m := range r.readMessages() { + rd := newReady(r, &SoftState{}, pb.HardState{}) + s.Append(rd.Entries) + r.advance(rd) // simulate having appended entry on leader + for _, m := range rd.Messages { if tt.nonLeaderAcceptors[m.To] { r.Step(acceptAndReply(m)) } From bd46776f039979d8f7d1c88f3d5fd701d734696d Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 10:15:21 +0200 Subject: [PATCH 16/36] Commit + apply all in nextEnts This fixes essentially all tests using this, since now they don't have to do anything special about the extra cycle introduced for single nodes. Signed-off-by: Tobias Grieger --- raft/raft_test.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/raft/raft_test.go b/raft/raft_test.go index 945611a50e5..dbd1eb01902 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -29,13 +29,15 @@ import ( // nextEnts returns the appliable entries and updates the applied index func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) { - // Transfer all unstable entries to "stable" storage. - s.Append(r.raftLog.unstableEntries()) - r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm()) - - ents = r.raftLog.nextEnts() - r.raftLog.appliedTo(r.raftLog.committed) - return ents + for { + rd := newReady(r, &SoftState{}, pb.HardState{}) + s.Append(rd.Entries) + r.advance(rd) + if len(rd.Entries)+len(rd.CommittedEntries) == 0 { + return ents + } + ents = append(ents, rd.CommittedEntries...) + } } func mustAppendEntry(r *raft, ents ...pb.Entry) { From 9ff144ef75d1e92438bde03e001c830133bdbf94 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 10:21:13 +0200 Subject: [PATCH 17/36] TestProgressLeader This was expecting the progress of the leader to be updated as a result of MsgProp but it is now happening in `(*raft).advance`. Signed-off-by: Tobias Grieger --- raft/raft_test.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/raft/raft_test.go b/raft/raft_test.go index dbd1eb01902..9375aa75b5a 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -59,21 +59,33 @@ func (r *raft) readMessages() []pb.Message { } func TestProgressLeader(t *testing.T) { - r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2))) + s := newTestMemoryStorage(withPeers(1, 2)) + r := newTestRaft(1, 5, 1, s) r.becomeCandidate() r.becomeLeader() r.prs.Progress[2].BecomeReplicate() - // Send proposals to r1. The first 5 entries should be appended to the log. + // Send proposals to r1. The first 5 entries should be queued in the unstable log. propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}} for i := 0; i < 5; i++ { - if pr := r.prs.Progress[r.id]; pr.State != tracker.StateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 { - t.Errorf("unexpected progress %v", pr) - } if err := r.Step(propMsg); err != nil { t.Fatalf("proposal resulted in error: %v", err) } } + if m := r.prs.Progress[1].Match; m != 0 { + t.Fatalf("expected zero match, got %d", m) + } + rd := newReady(r, &SoftState{}, pb.HardState{}) + if len(rd.Entries) != 6 || len(rd.Entries[0].Data) > 0 || string(rd.Entries[5].Data) != "foo" { + t.Fatalf("unexpected Entries: %s", DescribeReady(rd, nil)) + } + r.advance(rd) + if m := r.prs.Progress[1].Match; m != 6 { + t.Fatalf("unexpected Match %d", m) + } + if m := r.prs.Progress[1].Next; m != 7 { + t.Fatalf("unexpected Next %d", m) + } } // TestProgressResumeByHeartbeatResp ensures raft.heartbeat reset progress.paused by heartbeat response. From 7060d75527c44979939e190edb7c884d94254488 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 10:27:18 +0200 Subject: [PATCH 18/36] TestLeaderOnlyCommitsLogFromCurrentTerm Leader only acks to itself on `(*raft).advance` so we have to make this test a bit more like the real thing. Signed-off-by: Tobias Grieger --- raft/raft_paper_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 54b38b4388c..84c451cdf2d 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -894,6 +894,9 @@ func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Term: r.Term, Index: tt.index}) + rd := newReady(r, &SoftState{}, pb.HardState{}) + storage.Append(rd.Entries) + r.advance(rd) if r.raftLog.committed != tt.wcommit { t.Errorf("#%d: commit = %d, want %d", i, r.raftLog.committed, tt.wcommit) } From 99adcaa299b6b25772efd70109b95fbb01609a8f Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 11:37:15 +0200 Subject: [PATCH 19/36] TestLearnerLogReplication Needed to `(*raft).advance` on `n1` so that it would actually commit the entry. Signed-off-by: Tobias Grieger --- raft/raft_test.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/raft/raft_test.go b/raft/raft_test.go index 9375aa75b5a..2dd371a6b45 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -677,10 +677,12 @@ func TestLogReplication(t *testing.T) { // TestLearnerLogReplication tests that a learner can receive entries from the leader. func TestLearnerLogReplication(t *testing.T) { - n1 := newTestLearnerRaft(1, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2))) + s1 := newTestMemoryStorage(withPeers(1), withLearners(2)) + n1 := newTestLearnerRaft(1, 10, 1, s1) n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2))) nt := newNetwork(n1, n2) + nt.t = t n1.becomeFollower(1, None) n2.becomeFollower(1, None) @@ -700,12 +702,23 @@ func TestLearnerLogReplication(t *testing.T) { t.Error("peer 2 state: not learner, want yes") } - nextCommitted := n1.raftLog.committed + 1 - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) + nextCommitted := uint64(2) + { + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) + rd := newReady(n1, &SoftState{}, pb.HardState{}) + nt.send(rd.Messages...) + s1.Append(rd.Entries) + n1.advance(rd) + } if n1.raftLog.committed != nextCommitted { t.Errorf("peer 1 wants committed to %d, but still %d", nextCommitted, n1.raftLog.committed) } + { + rd := newReady(n1, &SoftState{}, pb.HardState{}) + nt.send(rd.Messages...) + } + if n1.raftLog.committed != n2.raftLog.committed { t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed) } From d6f3e88a52b0ed70739cd92e7da2f1b4f73ce981 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 11:44:23 +0200 Subject: [PATCH 20/36] TestSingleNodeCommit Signed-off-by: Tobias Grieger --- raft/raft_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/raft/raft_test.go b/raft/raft_test.go index 2dd371a6b45..34e0b66073b 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -730,11 +730,18 @@ func TestLearnerLogReplication(t *testing.T) { } func TestSingleNodeCommit(t *testing.T) { - tt := newNetwork(nil) + s := newTestMemoryStorage(withPeers(1)) + cfg := newTestConfig(1, 10, 1, s) + r := newRaft(cfg) + tt := newNetwork(r) tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) + rd := newReady(r, &SoftState{}, pb.HardState{}) + s.Append(rd.Entries) + r.advance(rd) + sm := tt.peers[1].(*raft) if sm.raftLog.committed != 3 { t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3) From 15abe294e78def1fab1833e75f76d48769192182 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 11:54:21 +0200 Subject: [PATCH 21/36] TestDueling{Pre,}Candidates Signed-off-by: Tobias Grieger --- raft/raft_test.go | 76 ++++++++++++++++++++--------------------------- 1 file changed, 33 insertions(+), 43 deletions(-) diff --git a/raft/raft_test.go b/raft/raft_test.go index 34e0b66073b..17070f8f1d1 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -826,9 +826,12 @@ func TestCommitWithoutNewTermEntry(t *testing.T) { } func TestDuelingCandidates(t *testing.T) { - a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) + s1 := newTestMemoryStorage(withPeers(1, 2, 3)) + s2 := newTestMemoryStorage(withPeers(1, 2, 3)) + s3 := newTestMemoryStorage(withPeers(1, 2, 3)) + a := newTestRaft(1, 10, 1, s1) + b := newTestRaft(2, 10, 1, s2) + c := newTestRaft(3, 10, 1, s3) nt := newNetwork(a, b, c) nt.cut(1, 3) @@ -854,21 +857,19 @@ func TestDuelingCandidates(t *testing.T) { // we expect it to disrupt the leader 1 since it has a higher term // 3 will be follower again since both 1 and 2 rejects its vote request since 3 does not have a long enough log nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) - - wlog := &raftLog{ - storage: &MemoryStorage{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}}}, - committed: 1, - unstable: unstable{offset: 2}, + if sm.state != StateFollower { + t.Errorf("state = %s, want %s", sm.state, StateFollower) } + tests := []struct { - sm *raft - state StateType - term uint64 - raftLog *raftLog + sm *raft + state StateType + term uint64 + lastIndex uint64 }{ - {a, StateFollower, 2, wlog}, - {b, StateFollower, 2, wlog}, - {c, StateFollower, 2, newLog(NewMemoryStorage(), raftLogger)}, + {a, StateFollower, 2, 1}, + {b, StateFollower, 2, 1}, + {c, StateFollower, 2, 0}, } for i, tt := range tests { @@ -878,14 +879,8 @@ func TestDuelingCandidates(t *testing.T) { if g := tt.sm.Term; g != tt.term { t.Errorf("#%d: term = %d, want %d", i, g, tt.term) } - base := ltoa(tt.raftLog) - if sm, ok := nt.peers[1+uint64(i)].(*raft); ok { - l := ltoa(sm.raftLog) - if g := diffu(base, l); g != "" { - t.Errorf("#%d: diff:\n%s", i, g) - } - } else { - t.Logf("#%d: empty log", i) + if exp, act := tt.lastIndex, tt.sm.raftLog.lastIndex(); exp != act { + t.Errorf("#%d: last index exp = %d, act = %d", i, exp, act) } } } @@ -902,6 +897,7 @@ func TestDuelingPreCandidates(t *testing.T) { c := newRaft(cfgC) nt := newNetwork(a, b, c) + nt.t = t nt.cut(1, 3) nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) @@ -925,20 +921,15 @@ func TestDuelingPreCandidates(t *testing.T) { // With PreVote, it does not disrupt the leader. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) - wlog := &raftLog{ - storage: &MemoryStorage{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}}}, - committed: 1, - unstable: unstable{offset: 2}, - } tests := []struct { - sm *raft - state StateType - term uint64 - raftLog *raftLog + sm *raft + state StateType + term uint64 + lastIndex uint64 }{ - {a, StateLeader, 1, wlog}, - {b, StateFollower, 1, wlog}, - {c, StateFollower, 1, newLog(NewMemoryStorage(), raftLogger)}, + {a, StateLeader, 1, 1}, + {b, StateFollower, 1, 1}, + {c, StateFollower, 1, 0}, } for i, tt := range tests { @@ -948,14 +939,8 @@ func TestDuelingPreCandidates(t *testing.T) { if g := tt.sm.Term; g != tt.term { t.Errorf("#%d: term = %d, want %d", i, g, tt.term) } - base := ltoa(tt.raftLog) - if sm, ok := nt.peers[1+uint64(i)].(*raft); ok { - l := ltoa(sm.raftLog) - if g := diffu(base, l); g != "" { - t.Errorf("#%d: diff:\n%s", i, g) - } - } else { - t.Logf("#%d: empty log", i) + if exp, act := tt.lastIndex, tt.sm.raftLog.lastIndex(); exp != act { + t.Errorf("#%d: last index is %d, exp %d", i, act, exp) } } } @@ -4664,6 +4649,8 @@ func votedWithConfig(configFunc func(*Config), vote, term uint64) *raft { } type network struct { + t *testing.T // optional + peers map[uint64]stateMachine storage map[uint64]*MemoryStorage dropm map[connem]float64 @@ -4747,6 +4734,9 @@ func (nw *network) send(msgs ...pb.Message) { for len(msgs) > 0 { m := msgs[0] p := nw.peers[m.To] + if nw.t != nil { + nw.t.Log(DescribeMessage(m, nil)) + } p.Step(m) msgs = append(msgs[1:], nw.filter(p.readMessages())...) } From ff837f3a0b47a7c13c881de563ed4425350c585e Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 12:22:37 +0200 Subject: [PATCH 22/36] TestProposal Don't check on `committed` but `lastIndex` instead. Signed-off-by: Tobias Grieger --- raft/diff_test.go | 2 +- raft/raft_test.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/raft/diff_test.go b/raft/diff_test.go index 3114a1f0017..6030527941f 100644 --- a/raft/diff_test.go +++ b/raft/diff_test.go @@ -55,7 +55,7 @@ func mustTemp(pre, body string) string { } func ltoa(l *raftLog) string { - s := fmt.Sprintf("committed: %d\n", l.committed) + s := fmt.Sprintf("lastIndex: %d\n", l.lastIndex()) s += fmt.Sprintf("applied: %d\n", l.applied) for i, e := range l.allEntries() { s += fmt.Sprintf("#%d: %+v\n", i, e) diff --git a/raft/raft_test.go b/raft/raft_test.go index 17070f8f1d1..c4d005d7c0e 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1077,6 +1077,7 @@ func TestProposal(t *testing.T) { // promote 1 to become leader send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) + r := tt.network.peers[1].(*raft) wantLog := newLog(NewMemoryStorage(), raftLogger) if tt.success { @@ -1084,8 +1085,8 @@ func TestProposal(t *testing.T) { storage: &MemoryStorage{ ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, }, - unstable: unstable{offset: 3}, - committed: 2} + unstable: unstable{offset: 3}, + } } base := ltoa(wantLog) for i, p := range tt.peers { @@ -1098,8 +1099,7 @@ func TestProposal(t *testing.T) { t.Logf("#%d: peer %d empty log", j, i) } } - sm := tt.network.peers[1].(*raft) - if g := sm.Term; g != 1 { + if g := r.Term; g != 1 { t.Errorf("#%d: term = %d, want %d", j, g, 1) } } From b462fd15c2c3a0b27ab28d268d66311b381f738b Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 12:25:52 +0200 Subject: [PATCH 23/36] TestMsgAppRespWaitReset Signed-off-by: Tobias Grieger --- raft/raft_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/raft/raft_test.go b/raft/raft_test.go index c4d005d7c0e..923f99084da 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1424,14 +1424,14 @@ func TestRaftFreesReadOnlyMem(t *testing.T) { // TestMsgAppRespWaitReset verifies the resume behavior of a leader // MsgAppResp. func TestMsgAppRespWaitReset(t *testing.T) { - sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2, 3))) + s := newTestMemoryStorage(withPeers(1, 2, 3)) + sm := newTestRaft(1, 5, 1, s) sm.becomeCandidate() sm.becomeLeader() - // The new leader has just emitted a new Term 4 entry; consume those messages - // from the outgoing queue. - sm.bcastAppend() - sm.readMessages() + // Run n1 which includes sending a message like the below + // one to n2, but also appending to its own log. + nextEnts(sm, s) // Node 2 acks the first entry, making it committed. sm.Step(pb.Message{ From 182e1a371d253c22360eb49497d94d959bb72a02 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 12:54:52 +0200 Subject: [PATCH 24/36] TestReadOnlyWithLearner Signed-off-by: Tobias Grieger --- raft/raft_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/raft/raft_test.go b/raft/raft_test.go index 923f99084da..3b99bf7ae3d 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -2247,7 +2247,8 @@ func TestReadOnlyOptionSafe(t *testing.T) { } func TestReadOnlyWithLearner(t *testing.T) { - a := newTestLearnerRaft(1, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2))) + s := newTestMemoryStorage(withPeers(1), withLearners(2)) + a := newTestLearnerRaft(1, 10, 1, s) b := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2))) nt := newNetwork(a, b) @@ -2277,6 +2278,7 @@ func TestReadOnlyWithLearner(t *testing.T) { for i, tt := range tests { for j := 0; j < tt.proposals; j++ { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + nextEnts(a, s) // append the entries on the leader } nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}}) From fbe4d400865472ad85d7e3e5a3a1297818ba0992 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 12:57:44 +0200 Subject: [PATCH 25/36] TestLeaderTransferIgnoreProposal Signed-off-by: Tobias Grieger --- raft/raft_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/raft/raft_test.go b/raft/raft_test.go index 3b99bf7ae3d..e45533d8d03 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -3655,13 +3655,17 @@ func TestLeaderTransferTimeout(t *testing.T) { } func TestLeaderTransferIgnoreProposal(t *testing.T) { - nt := newNetwork(nil, nil, nil) + s := newTestMemoryStorage(withPeers(1, 2, 3)) + r := newTestRaft(1, 10, 1, s) + nt := newNetwork(r, nil, nil) nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) nt.isolate(3) lead := nt.peers[1].(*raft) + nextEnts(r, s) // handle empty entry + // Transfer leadership to isolated node to let transfer pending, then send proposal. nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) if lead.leadTransferee != 3 { From 79bf3b0df4eb4d2545b3c566ec0c1f8ae47f2b76 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 13:04:27 +0200 Subject: [PATCH 26/36] TestRawNodeJointAutoLeave This now needs an additional Ready cycle to apply the previous conf change, so the finalizing conf change does too. Signed-off-by: Tobias Grieger --- raft/rawnode_test.go | 211 ++++++++++++++++++++++--------------------- 1 file changed, 107 insertions(+), 104 deletions(-) diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 413f232df4c..567227f6130 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -388,122 +388,125 @@ func TestRawNodeJointAutoLeave(t *testing.T) { } exp2Cs := pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}} - t.Run("", func(t *testing.T) { - s := newTestMemoryStorage(withPeers(1)) - rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s)) - if err != nil { - t.Fatal(err) - } + s := newTestMemoryStorage(withPeers(1)) + rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s)) + if err != nil { + t.Fatal(err) + } - rawNode.Campaign() - proposed := false - var ( - lastIndex uint64 - ccdata []byte - ) - // Propose the ConfChange, wait until it applies, save the resulting - // ConfState. - var cs *pb.ConfState - for cs == nil { - rd := rawNode.Ready() - s.Append(rd.Entries) - for _, ent := range rd.CommittedEntries { - var cc pb.ConfChangeI - if ent.Type == pb.EntryConfChangeV2 { - var ccc pb.ConfChangeV2 - if err = ccc.Unmarshal(ent.Data); err != nil { - t.Fatal(err) - } - cc = &ccc - } - if cc != nil { - // Force it step down. - rawNode.Step(pb.Message{Type: pb.MsgHeartbeatResp, From: 1, Term: rawNode.raft.Term + 1}) - cs = rawNode.ApplyConfChange(cc) - } - } - rawNode.Advance(rd) - // Once we are the leader, propose a command and a ConfChange. - if !proposed && rd.SoftState.Lead == rawNode.raft.id { - if err = rawNode.Propose([]byte("somedata")); err != nil { - t.Fatal(err) - } - ccdata, err = testCc.Marshal() - if err != nil { + rawNode.Campaign() + proposed := false + var ( + lastIndex uint64 + ccdata []byte + ) + // Propose the ConfChange, wait until it applies, save the resulting + // ConfState. + var cs *pb.ConfState + for cs == nil { + rd := rawNode.Ready() + s.Append(rd.Entries) + for _, ent := range rd.CommittedEntries { + var cc pb.ConfChangeI + if ent.Type == pb.EntryConfChangeV2 { + var ccc pb.ConfChangeV2 + if err = ccc.Unmarshal(ent.Data); err != nil { t.Fatal(err) } - rawNode.ProposeConfChange(testCc) - proposed = true + cc = &ccc + } + if cc != nil { + // Force it step down. + rawNode.Step(pb.Message{Type: pb.MsgHeartbeatResp, From: 1, Term: rawNode.raft.Term + 1}) + cs = rawNode.ApplyConfChange(cc) } } - - // Check that the last index is exactly the conf change we put in, - // down to the bits. Note that this comes from the Storage, which - // will not reflect any unstable entries that we'll only be presented - // with in the next Ready. - lastIndex, err = s.LastIndex() - if err != nil { - t.Fatal(err) + rawNode.Advance(rd) + // Once we are the leader, propose a command and a ConfChange. + if !proposed && rd.SoftState.Lead == rawNode.raft.id { + if err = rawNode.Propose([]byte("somedata")); err != nil { + t.Fatal(err) + } + ccdata, err = testCc.Marshal() + if err != nil { + t.Fatal(err) + } + rawNode.ProposeConfChange(testCc) + proposed = true } + } - entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit) - if err != nil { - t.Fatal(err) - } - if len(entries) != 2 { - t.Fatalf("len(entries) = %d, want %d", len(entries), 2) - } - if !bytes.Equal(entries[0].Data, []byte("somedata")) { - t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata")) - } - if entries[1].Type != pb.EntryConfChangeV2 { - t.Fatalf("type = %v, want %v", entries[1].Type, pb.EntryConfChangeV2) - } - if !bytes.Equal(entries[1].Data, ccdata) { - t.Errorf("data = %v, want %v", entries[1].Data, ccdata) - } + // Check that the last index is exactly the conf change we put in, + // down to the bits. Note that this comes from the Storage, which + // will not reflect any unstable entries that we'll only be presented + // with in the next Ready. + lastIndex, err = s.LastIndex() + if err != nil { + t.Fatal(err) + } - if !reflect.DeepEqual(&expCs, cs) { - t.Fatalf("exp:\n%+v\nact:\n%+v", expCs, cs) - } + entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit) + if err != nil { + t.Fatal(err) + } + if len(entries) != 2 { + t.Fatalf("len(entries) = %d, want %d", len(entries), 2) + } + if !bytes.Equal(entries[0].Data, []byte("somedata")) { + t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata")) + } + if entries[1].Type != pb.EntryConfChangeV2 { + t.Fatalf("type = %v, want %v", entries[1].Type, pb.EntryConfChangeV2) + } + if !bytes.Equal(entries[1].Data, ccdata) { + t.Errorf("data = %v, want %v", entries[1].Data, ccdata) + } - if rawNode.raft.pendingConfIndex != 0 { - t.Fatalf("pendingConfIndex: expected %d, got %d", 0, rawNode.raft.pendingConfIndex) - } + if !reflect.DeepEqual(&expCs, cs) { + t.Fatalf("exp:\n%+v\nact:\n%+v", expCs, cs) + } - // Move the RawNode along. It should not leave joint because it's follower. - rd := rawNode.readyWithoutAccept() - // Check that the right ConfChange comes out. - if len(rd.Entries) != 0 { - t.Fatalf("expected zero entry, got %+v", rd) - } + if rawNode.raft.pendingConfIndex != 0 { + t.Fatalf("pendingConfIndex: expected %d, got %d", 0, rawNode.raft.pendingConfIndex) + } - // Make it leader again. It should leave joint automatically after moving apply index. - rawNode.Campaign() - rd = rawNode.Ready() - s.Append(rd.Entries) - rawNode.Advance(rd) - rd = rawNode.Ready() - s.Append(rd.Entries) + // Move the RawNode along. It should not leave joint because it's follower. + rd := rawNode.readyWithoutAccept() + // Check that the right ConfChange comes out. + if len(rd.Entries) != 0 { + t.Fatalf("expected zero entry, got %+v", rd) + } - // Check that the right ConfChange comes out. - if len(rd.Entries) != 1 || rd.Entries[0].Type != pb.EntryConfChangeV2 { - t.Fatalf("expected exactly one more entry, got %+v", rd) - } - var cc pb.ConfChangeV2 - if err := cc.Unmarshal(rd.Entries[0].Data); err != nil { - t.Fatal(err) - } - if !reflect.DeepEqual(cc, pb.ConfChangeV2{Context: nil}) { - t.Fatalf("expected zero ConfChangeV2, got %+v", cc) - } - // Lie and pretend the ConfChange applied. It won't do so because now - // we require the joint quorum and we're only running one node. - cs = rawNode.ApplyConfChange(cc) - if exp := exp2Cs; !reflect.DeepEqual(&exp, cs) { - t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs) - } - }) + // Make it leader again. It should leave joint automatically after moving apply index. + rawNode.Campaign() + rd = rawNode.Ready() + t.Log(DescribeReady(rd, nil)) + s.Append(rd.Entries) + rawNode.Advance(rd) + rd = rawNode.Ready() + t.Log(DescribeReady(rd, nil)) + s.Append(rd.Entries) + rawNode.Advance(rd) + rd = rawNode.Ready() + t.Log(DescribeReady(rd, nil)) + s.Append(rd.Entries) + // Check that the right ConfChange comes out. + if len(rd.Entries) != 1 || rd.Entries[0].Type != pb.EntryConfChangeV2 { + t.Fatalf("expected exactly one more entry, got %+v", rd) + } + var cc pb.ConfChangeV2 + if err := cc.Unmarshal(rd.Entries[0].Data); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(cc, pb.ConfChangeV2{Context: nil}) { + t.Fatalf("expected zero ConfChangeV2, got %+v", cc) + } + // Lie and pretend the ConfChange applied. It won't do so because now + // we require the joint quorum and we're only running one node. + cs = rawNode.ApplyConfChange(cc) + if exp := exp2Cs; !reflect.DeepEqual(&exp, cs) { + t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs) + } } // TestRawNodeProposeAddDuplicateNode ensures that two proposes to add the same node should From 02efe5135d5ab8d2b991f0e984278e28e90b15a6 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 13:10:23 +0200 Subject: [PATCH 27/36] TestRawNodeStart Now also sees the extra Ready cycle. Signed-off-by: Tobias Grieger --- raft/rawnode_test.go | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 567227f6130..790365894af 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -659,18 +659,16 @@ func TestRawNodeReadIndex(t *testing.T) { // requires the application to bootstrap the state, i.e. it does not accept peers // and will not create faux configuration change entries. func TestRawNodeStart(t *testing.T) { + entries := []pb.Entry{ + {Term: 1, Index: 2, Data: nil}, // empty entry + {Term: 1, Index: 3, Data: []byte("foo")}, // empty entry + } want := Ready{ - SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, - HardState: pb.HardState{Term: 1, Commit: 3, Vote: 1}, - Entries: []pb.Entry{ - {Term: 1, Index: 2, Data: nil}, // empty entry - {Term: 1, Index: 3, Data: []byte("foo")}, // empty entry - }, - CommittedEntries: []pb.Entry{ - {Term: 1, Index: 2, Data: nil}, // empty entry - {Term: 1, Index: 3, Data: []byte("foo")}, // empty entry - }, - MustSync: true, + SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, + HardState: pb.HardState{Term: 1, Commit: 3, Vote: 1}, + Entries: nil, // emitted & checked in intermediate Ready cycle + CommittedEntries: entries, + MustSync: false, // since we're only applying, not appending } storage := NewMemoryStorage() @@ -750,9 +748,24 @@ func TestRawNodeStart(t *testing.T) { t.Fatal("expected a Ready") } rd := rawNode.Ready() + if !reflect.DeepEqual(entries, rd.Entries) { + t.Fatalf("expected to see entries\n%s, not\n%s", DescribeEntries(entries, nil), DescribeEntries(rd.Entries, nil)) + } storage.Append(rd.Entries) rawNode.Advance(rd) + if !rawNode.HasReady() { + t.Fatal("expected a Ready") + } + rd = rawNode.Ready() + if len(rd.Entries) != 0 { + t.Fatalf("unexpected entries: %s", DescribeEntries(rd.Entries, nil)) + } + if rd.MustSync { + t.Fatalf("should not need to sync") + } + rawNode.Advance(rd) + rd.SoftState, want.SoftState = nil, nil if !reflect.DeepEqual(rd, want) { From f7b0a6ad3366212ea51a9a5fefcd63e001bce1ff Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 13:17:26 +0200 Subject: [PATCH 28/36] TestRawNodeBoundedLogGrowthWithPartition Signed-off-by: Tobias Grieger --- raft/rawnode_test.go | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 790365894af..979b6fa8ce1 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -968,6 +968,7 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { data := []byte("testdata") testEntry := pb.Entry{Data: data} maxEntrySize := uint64(maxEntries * PayloadSize(testEntry)) + t.Log("maxEntrySize", maxEntrySize) s := newTestMemoryStorage(withPeers(1)) cfg := newTestConfig(1, 10, 1, s) @@ -976,20 +977,16 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { if err != nil { t.Fatal(err) } - rd := rawNode.Ready() - s.Append(rd.Entries) - rawNode.Advance(rd) - // Become the leader. + // Become the leader and apply empty entry. rawNode.Campaign() for { - rd = rawNode.Ready() + rd := rawNode.Ready() s.Append(rd.Entries) - if rd.SoftState.Lead == rawNode.raft.id { - rawNode.Advance(rd) + rawNode.Advance(rd) + if len(rd.CommittedEntries) > 0 { break } - rawNode.Advance(rd) } // Simulate a network partition while we make our proposals by never @@ -1011,12 +1008,25 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { // Recover from the partition. The uncommitted tail of the Raft log should // disappear as entries are committed. + rd := rawNode.Ready() + if len(rd.Entries) != maxEntries { + t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.Entries)) + } + s.Append(rd.Entries) + rawNode.Advance(rd) + + // Entries are appended, but not applied. + checkUncommitted(maxEntrySize) + rd = rawNode.Ready() + if len(rd.Entries) != 0 { + t.Fatalf("unexpected entries: %s", DescribeEntries(rd.Entries, nil)) + } if len(rd.CommittedEntries) != maxEntries { t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries)) } - s.Append(rd.Entries) rawNode.Advance(rd) + checkUncommitted(0) } From f7dcb9ec2acbc3d5cf914e8a46400b9ccdb2517b Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 13:48:52 +0200 Subject: [PATCH 29/36] TestInteraction Reviewed the diff in detail. The changes here were benign, just the extra raft cycle. Signed-off-by: Tobias Grieger --- raft/testdata/confchange_v1_add_single.txt | 5 ++++- raft/testdata/confchange_v2_add_double_auto.txt | 17 +++++++++++------ .../confchange_v2_add_double_implicit.txt | 5 ++++- raft/testdata/confchange_v2_add_single_auto.txt | 5 ++++- .../confchange_v2_add_single_explicit.txt | 5 ++++- raft/testdata/snapshot_succeed_via_app_resp.txt | 6 +++--- 6 files changed, 30 insertions(+), 13 deletions(-) diff --git a/raft/testdata/confchange_v1_add_single.txt b/raft/testdata/confchange_v1_add_single.txt index d9cc1a7b1c6..cd07af47944 100644 --- a/raft/testdata/confchange_v1_add_single.txt +++ b/raft/testdata/confchange_v1_add_single.txt @@ -35,10 +35,13 @@ stabilize > 1 handling Ready Ready MustSync=true: Lead:1 State:StateLeader - HardState Term:1 Vote:1 Commit:4 + HardState Term:1 Vote:1 Commit:2 Entries: 1/3 EntryNormal "" 1/4 EntryConfChange v2 +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:4 CommittedEntries: 1/3 EntryNormal "" 1/4 EntryConfChange v2 diff --git a/raft/testdata/confchange_v2_add_double_auto.txt b/raft/testdata/confchange_v2_add_double_auto.txt index 0a5e205bf0d..2419083f0e9 100644 --- a/raft/testdata/confchange_v2_add_double_auto.txt +++ b/raft/testdata/confchange_v2_add_double_auto.txt @@ -31,19 +31,24 @@ INFO 3 switched to configuration voters=() INFO 3 became follower at term 0 INFO newRaft 3 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0] -# n1 immediately gets to commit & apply the conf change using only itself. We see that -# it starts transitioning out of that joint configuration (though we will only see that -# proposal in the next ready handling loop, when it is emitted). We also see that this -# is using joint consensus, which it has to since we're carrying out two additions at -# once. +# Process n1 once, so that it can append the entry. process-ready 1 ---- Ready MustSync=true: Lead:1 State:StateLeader -HardState Term:1 Vote:1 Commit:4 +HardState Term:1 Vote:1 Commit:2 Entries: 1/3 EntryNormal "" 1/4 EntryConfChangeV2 v2 v3 + +# Now n1 applies the conf change. We see that it starts transitioning out of that joint +# configuration (though we will only see that proposal in the next ready handling +# loop, when it is emitted). We also see that this is using joint consensus, which +# it has to since we're carrying out two additions at once. +process-ready 1 +---- +Ready MustSync=false: +HardState Term:1 Vote:1 Commit:4 CommittedEntries: 1/3 EntryNormal "" 1/4 EntryConfChangeV2 v2 v3 diff --git a/raft/testdata/confchange_v2_add_double_implicit.txt b/raft/testdata/confchange_v2_add_double_implicit.txt index a93eb81cb52..45dfc5099b9 100644 --- a/raft/testdata/confchange_v2_add_double_implicit.txt +++ b/raft/testdata/confchange_v2_add_double_implicit.txt @@ -38,10 +38,13 @@ stabilize 1 2 > 1 handling Ready Ready MustSync=true: Lead:1 State:StateLeader - HardState Term:1 Vote:1 Commit:4 + HardState Term:1 Vote:1 Commit:2 Entries: 1/3 EntryNormal "" 1/4 EntryConfChangeV2 v2 +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:4 CommittedEntries: 1/3 EntryNormal "" 1/4 EntryConfChangeV2 v2 diff --git a/raft/testdata/confchange_v2_add_single_auto.txt b/raft/testdata/confchange_v2_add_single_auto.txt index 47c7f10b8e8..7ee3ab6c33c 100644 --- a/raft/testdata/confchange_v2_add_single_auto.txt +++ b/raft/testdata/confchange_v2_add_single_auto.txt @@ -36,10 +36,13 @@ stabilize > 1 handling Ready Ready MustSync=true: Lead:1 State:StateLeader - HardState Term:1 Vote:1 Commit:4 + HardState Term:1 Vote:1 Commit:2 Entries: 1/3 EntryNormal "" 1/4 EntryConfChangeV2 v2 +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:4 CommittedEntries: 1/3 EntryNormal "" 1/4 EntryConfChangeV2 v2 diff --git a/raft/testdata/confchange_v2_add_single_explicit.txt b/raft/testdata/confchange_v2_add_single_explicit.txt index dd4a4f65467..b4e6e3a83cd 100644 --- a/raft/testdata/confchange_v2_add_single_explicit.txt +++ b/raft/testdata/confchange_v2_add_single_explicit.txt @@ -36,10 +36,13 @@ stabilize 1 2 > 1 handling Ready Ready MustSync=true: Lead:1 State:StateLeader - HardState Term:1 Vote:1 Commit:4 + HardState Term:1 Vote:1 Commit:2 Entries: 1/3 EntryNormal "" 1/4 EntryConfChangeV2 v2 +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:4 CommittedEntries: 1/3 EntryNormal "" 1/4 EntryConfChangeV2 v2 diff --git a/raft/testdata/snapshot_succeed_via_app_resp.txt b/raft/testdata/snapshot_succeed_via_app_resp.txt index 96ded532cd7..dbbd5ce11d9 100644 --- a/raft/testdata/snapshot_succeed_via_app_resp.txt +++ b/raft/testdata/snapshot_succeed_via_app_resp.txt @@ -41,7 +41,7 @@ ok status 1 ---- -1: StateReplicate match=11 next=12 inactive +1: StateReplicate match=11 next=12 2: StateReplicate match=11 next=12 3: StateProbe match=0 next=11 paused inactive @@ -95,7 +95,7 @@ stabilize 1 status 1 ---- -1: StateReplicate match=11 next=12 inactive +1: StateReplicate match=11 next=12 2: StateReplicate match=11 next=12 3: StateSnapshot match=0 next=11 paused pendingSnap=11 @@ -132,7 +132,7 @@ stabilize 1 status 1 ---- -1: StateReplicate match=11 next=12 inactive +1: StateReplicate match=11 next=12 2: StateReplicate match=11 next=12 3: StateReplicate match=11 next=12 From f62b9d5e1961fd085ca04a28381204bc4eaa9cb3 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 6 Sep 2022 17:26:13 +0200 Subject: [PATCH 30/36] remove TestNodeReadIndex This is tested directly at the level of `RawNode` in `TestRawNodeReadIndex`. `*node` is a thin wrapper around `RawNode` so this is sufficient. The reason to remove the test is that it now incurs data races since it's not possible to adjust the `readStates` and `step` fields while the node is running, and there is no primitive to synchronize with its goroutine. This could all be fixed but it's not worth it. Signed-off-by: Tobias Grieger --- raft/node_test.go | 53 ----------------------------------------------- 1 file changed, 53 deletions(-) diff --git a/raft/node_test.go b/raft/node_test.go index 93747515478..ebb6d53f460 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -173,59 +173,6 @@ func TestNodePropose(t *testing.T) { } } -// TestNodeReadIndex ensures that node.ReadIndex sends the MsgReadIndex message to the underlying raft. -// It also ensures that ReadState can be read out through ready chan. -func TestNodeReadIndex(t *testing.T) { - var msgs []raftpb.Message - appendStep := func(r *raft, m raftpb.Message) error { - if m.Type == raftpb.MsgAppResp { - // See (*raft).advance. - return nil - } - msgs = append(msgs, m) - return nil - } - wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}} - - s := newTestMemoryStorage(withPeers(1)) - rn := newTestRawNode(1, 10, 1, s) - n := newNode(rn) - r := rn.raft - r.readStates = wrs - - go n.run() - n.Campaign(context.TODO()) - for { - rd := <-n.Ready() - if !reflect.DeepEqual(rd.ReadStates, wrs) { - t.Errorf("ReadStates = %v, want %v", rd.ReadStates, wrs) - } - - s.Append(rd.Entries) - - if rd.SoftState.Lead == r.id { - n.Advance() - break - } - n.Advance() - } - - r.step = appendStep - wrequestCtx := []byte("somedata2") - n.ReadIndex(context.TODO(), wrequestCtx) - n.Stop() - - if len(msgs) != 1 { - t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1) - } - if msgs[0].Type != raftpb.MsgReadIndex { - t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex) - } - if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) { - t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx) - } -} - // TestDisableProposalForwarding ensures that proposals are not forwarded to // the leader when DisableProposalForwarding is true. func TestDisableProposalForwarding(t *testing.T) { From 894e5cb685465407f30bde4c723152b4d53c4456 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 7 Sep 2022 08:59:36 +0200 Subject: [PATCH 31/36] move ctx param to the front to appease linter Signed-off-by: Tobias Grieger --- raft/node_test.go | 8 ++++---- raft/node_util_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/raft/node_test.go b/raft/node_test.go index ebb6d53f460..5cb729eb4fa 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -317,7 +317,7 @@ func TestNodeProposeConfig(t *testing.T) { func TestNodeProposeAddDuplicateNode(t *testing.T) { s := newTestMemoryStorage(withPeers(1)) cfg := newTestConfig(1, 10, 1, s) - ctx, cancel, n := newNodeTestHarness(t, context.Background(), cfg) + ctx, cancel, n := newNodeTestHarness(context.Background(), t, cfg) defer cancel() n.Campaign(ctx) allCommittedEntries := make([]raftpb.Entry, 0) @@ -590,7 +590,7 @@ func TestNodeStart(t *testing.T) { MaxInflightMsgs: 256, } n := StartNode(c, []Peer{{ID: 1}}) - ctx, cancel, n := newNodeTestHarness(t, context.Background(), c, Peer{ID: 1}) + ctx, cancel, n := newNodeTestHarness(context.Background(), t, c, Peer{ID: 1}) defer cancel() { @@ -740,7 +740,7 @@ func TestNodeAdvance(t *testing.T) { MaxSizePerMsg: noLimit, MaxInflightMsgs: 256, } - ctx, cancel, n := newNodeTestHarness(t, context.Background(), c) + ctx, cancel, n := newNodeTestHarness(context.Background(), t, c) defer cancel() n.Campaign(ctx) @@ -895,7 +895,7 @@ func TestCommitPagination(t *testing.T) { s := newTestMemoryStorage(withPeers(1)) cfg := newTestConfig(1, 10, 1, s) cfg.MaxCommittedSizePerReady = 2048 - ctx, cancel, n := newNodeTestHarness(t, context.Background(), cfg) + ctx, cancel, n := newNodeTestHarness(context.Background(), t, cfg) defer cancel() n.Campaign(ctx) diff --git a/raft/node_util_test.go b/raft/node_util_test.go index fb3473d128c..d344295ad72 100644 --- a/raft/node_util_test.go +++ b/raft/node_util_test.go @@ -78,7 +78,7 @@ func (l *nodeTestHarness) Panicf(format string, v ...interface{}) { panic(fmt.Sprintf(format, v...)) } -func newNodeTestHarness(t *testing.T, ctx context.Context, cfg *Config, peers ...Peer) (_ context.Context, cancel func(), _ *nodeTestHarness) { +func newNodeTestHarness(ctx context.Context, t *testing.T, cfg *Config, peers ...Peer) (_ context.Context, cancel func(), _ *nodeTestHarness) { // Wrap context in a 10s timeout to make tests more robust. Otherwise, // it's likely that deadlock will occur unless Node behaves exactly as // expected - when you expect a Ready and start waiting on the channel From 67c35228935a62dcf2763fc3bff98454c1264d1d Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 7 Sep 2022 10:16:43 +0200 Subject: [PATCH 32/36] raft: directly update leader in advance This makes the alternative option of implementing the leader's self-ack of entry append the default. Signed-off-by: Tobias Grieger --- raft/raft.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index cd4a604e91b..405bc4fe8a8 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -576,11 +576,11 @@ func (r *raft) advance(rd Ready) { // The leader needs to self-ack the entries just appended (since it doesn't // send an MsgApp to itself). This is roughly equivalent to: // - // r.prs.Progress[r.id].MaybeUpdate(e.Index) - // if r.maybeCommit() { - // r.bcastAppend() - // } - _ = r.Step(pb.Message{From: r.id, Type: pb.MsgAppResp, Index: e.Index}) + r.prs.Progress[r.id].MaybeUpdate(e.Index) + if r.maybeCommit() { + r.bcastAppend() + } + // _ = r.Step(pb.Message{From: r.id, Type: pb.MsgAppResp, Index: e.Index}) } // NB: it's important for performance that this call happens after // r.Step above on the leader. This is because r.Step can then use From 3c3e30a30e7636876dd12e8a079b205e4dcad3ba Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 7 Sep 2022 11:09:00 +0200 Subject: [PATCH 33/36] Revert "raft: directly update leader in advance" This reverts commit d73a986e4edb15ef9dbfc994f1cbf5e96694d877, which was added only for benchmarking purposes. Signed-off-by: Tobias Grieger --- raft/raft.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 405bc4fe8a8..cd4a604e91b 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -576,11 +576,11 @@ func (r *raft) advance(rd Ready) { // The leader needs to self-ack the entries just appended (since it doesn't // send an MsgApp to itself). This is roughly equivalent to: // - r.prs.Progress[r.id].MaybeUpdate(e.Index) - if r.maybeCommit() { - r.bcastAppend() - } - // _ = r.Step(pb.Message{From: r.id, Type: pb.MsgAppResp, Index: e.Index}) + // r.prs.Progress[r.id].MaybeUpdate(e.Index) + // if r.maybeCommit() { + // r.bcastAppend() + // } + _ = r.Step(pb.Message{From: r.id, Type: pb.MsgAppResp, Index: e.Index}) } // NB: it's important for performance that this call happens after // r.Step above on the leader. This is because r.Step can then use From 304e26003830c1257ee495d7da33decfa32dba86 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 7 Sep 2022 15:13:02 +0200 Subject: [PATCH 34/36] raft: benchmark results MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``` for sha in :/^Revert :/BenchmarkRawNode :/^raft:.directly; do git checkout raft-single-voter && git checkout $(git log -n 1 '--pretty=format:%H' $sha) && f=$(git log -1 --pretty=%s | sed -E 's/[^A-Za-z0-9]+/_/g').txt && go test -run - -count 10 -bench BenchmarkRawNode -benchmem -benchtime=100000x . > $f; done; git checkout raft-single-voter ``` The two possible solutions (directly updating progress and calling maybeCommit in `(*raft).advance` vs calling `r.Step`) are identical. In fact, we've gotten a tiny bit better with the `.Step` solution in terms of not calling `firstIndex` as much, in the common case of not being a single voter. ``` $ benchstat raft_directly_update_leader_in_advance.txt Revert_raft_directly_update_leader_in_advance_.txt name old time/op new time/op delta RawNode/single-voter-10 482ns ± 2% 742ns ± 1% +54.02% (p=0.000 n=9+9) RawNode/two-voters-10 1.29µs ± 1% 1.31µs ± 2% +1.70% (p=0.000 n=9+10) name old firstIndex/op new firstIndex/op delta RawNode/single-voter-10 4.00 ± 0% 5.00 ± 0% +25.00% (p=0.000 n=10+10) RawNode/two-voters-10 10.0 ± 0% 9.0 ± 0% -10.00% (p=0.000 n=10+10) name old lastIndex/op new lastIndex/op delta RawNode/single-voter-10 1.00 ± 0% 2.00 ± 0% +100.00% (p=0.000 n=10+10) RawNode/two-voters-10 2.00 ± 0% 2.00 ± 0% ~ (all equal) name old ready/op new ready/op delta RawNode/single-voter-10 1.00 ± 0% 2.00 ± 0% +100.00% (p=0.000 n=10+10) RawNode/two-voters-10 2.00 ± 0% 2.00 ± 0% ~ (all equal) name old term/op new term/op delta RawNode/single-voter-10 0.00 ± 0% 0.00 ± 0% ~ (all equal) RawNode/two-voters-10 1.00 ± 0% 1.00 ± 0% ~ (all equal) name old alloc/op new alloc/op delta RawNode/single-voter-10 372B ± 0% 388B ± 0% +4.30% (p=0.000 n=10+10) RawNode/two-voters-10 964B ± 0% 964B ± 0% ~ (all equal) name old allocs/op new allocs/op delta RawNode/single-voter-10 4.00 ± 0% 5.00 ± 0% +25.00% (p=0.000 n=10+10) RawNode/two-voters-10 7.00 ± 0% 7.00 ± 0% ~ (all equal) ``` We then compare the `.Step` solution against the previous "status quo" that prematurely emitted uncommitted entries for command application below. Importantly, we don't regress in the case of multiple peers. We actually gain slightly in terms of `lastIndex` calls, but run a bit more code; acceptable. In the single-voter case, since we now need two Ready handling cycles per op instead of one, we see additional calls to lastIndex and firstIndex as well as slightly increased allocations. These are expected and trade-offs we're willing to make to avoid correctness problems. Note that the benchmark intentionally forces full processing of each individual entries, so some of the new overhead would likely amortize on a singleton voter seeing high throughput as multiple proposals could share the Ready cycles. ``` $ benchstat raft_add_BenchmarkRawNode.txt Revert_raft_directly_update_leader_in_advance_.txt name old time/op new time/op delta RawNode/single-voter-10 482ns ± 2% 742ns ± 1% +54.02% (p=0.000 n=9+9) RawNode/two-voters-10 1.29µs ± 1% 1.31µs ± 2% +1.70% (p=0.000 n=9+10) name old firstIndex/op new firstIndex/op delta RawNode/single-voter-10 4.00 ± 0% 5.00 ± 0% +25.00% (p=0.000 n=10+10) RawNode/two-voters-10 10.0 ± 0% 9.0 ± 0% -10.00% (p=0.000 n=10+10) name old lastIndex/op new lastIndex/op delta RawNode/single-voter-10 1.00 ± 0% 2.00 ± 0% +100.00% (p=0.000 n=10+10) RawNode/two-voters-10 2.00 ± 0% 2.00 ± 0% ~ (all equal) name old ready/op new ready/op delta RawNode/single-voter-10 1.00 ± 0% 2.00 ± 0% +100.00% (p=0.000 n=10+10) RawNode/two-voters-10 2.00 ± 0% 2.00 ± 0% ~ (all equal) name old term/op new term/op delta RawNode/single-voter-10 0.00 ± 0% 0.00 ± 0% ~ (all equal) RawNode/two-voters-10 1.00 ± 0% 1.00 ± 0% ~ (all equal) name old alloc/op new alloc/op delta RawNode/single-voter-10 372B ± 0% 388B ± 0% +4.30% (p=0.000 n=10+10) RawNode/two-voters-10 964B ± 0% 964B ± 0% ~ (all equal) name old allocs/op new allocs/op delta RawNode/single-voter-10 4.00 ± 0% 5.00 ± 0% +25.00% (p=0.000 n=10+10) RawNode/two-voters-10 7.00 ± 0% 7.00 ± 0% ~ (all equal) ``` `tools/benchmark put`: ``` Summary[main]: | Summary[this PR]: Total: 284.4443 secs. | Total: 288.1100 secs. Slowest: 0.1626 secs. | Slowest: 0.1456 secs. Fastest: 0.0027 secs. | Fastest: 0.0018 secs. Average: 0.0284 secs. | Average: 0.0288 secs. Stddev: 0.0178 secs. | Stddev: 0.0182 secs. Requests/sec: 35.1563 | Requests/sec: 34.7090 [=0.98727681809x main] Response time histogram: | Response time histogram: 0.0027 [1] | | 0.0018 [1] | 0.0187 [137] | | 0.0162 [34] | 0.0347 [7895] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎ | 0.0305 [7938] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎ 0.0507 [86] | | 0.0449 [103] | 0.0667 [1328] |∎∎∎∎∎∎ | 0.0593 [1056] |∎∎∎∎∎ 0.0827 [480] |∎∎ | 0.0737 [420] |∎∎ 0.0987 [45] | | 0.0881 [370] |∎ 0.1147 [18] | | 0.1025 [48] | 0.1306 [7] | | 0.1168 [19] | 0.1466 [2] | | 0.1312 [6] | 0.1626 [1] | | 0.1456 [5] | Latency distribution: | Latency distribution: 10% in 0.0195 secs. | 10% in 0.0194 secs. 25% in 0.0198 secs. | 25% in 0.0198 secs. 50% in 0.0201 secs. | 50% in 0.0201 secs. 75% in 0.0210 secs. | 75% in 0.0214 secs. 90% in 0.0585 secs. | 90% in 0.0589 secs. 95% in 0.0727 secs. | 95% in 0.0731 secs. 99% in 0.0762 secs. | 99% in 0.0788 secs. 99.9% in 0.1244 secs. | 99.9% in 0.1240 secs. ``` Signed-off-by: Tobias Grieger From 9ad36eecabcdafd1da37d1d25472b2265d2cf1ae Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 19 Sep 2022 10:32:52 +0200 Subject: [PATCH 35/36] fixup! address comments Signed-off-by: Tobias Grieger --- raft/node_util_test.go | 3 ++- raft/raft.go | 2 +- raft/rawnode_test.go | 48 ++++++++++++++++++++++++++-------------- raft/tracker/progress.go | 3 +-- 4 files changed, 35 insertions(+), 21 deletions(-) diff --git a/raft/node_util_test.go b/raft/node_util_test.go index d344295ad72..5093cba6bf9 100644 --- a/raft/node_util_test.go +++ b/raft/node_util_test.go @@ -1,4 +1,4 @@ -// Copyright 2015 The etcd Authors +// Copyright 2022 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -105,5 +105,6 @@ func newNodeTestHarness(ctx context.Context, t *testing.T, cfg *Config, peers .. defer n.Stop() n.run() }() + t.Cleanup(n.Stop) return ctx, cancel, &nodeTestHarness{node: n, t: t} } diff --git a/raft/raft.go b/raft/raft.go index cd4a604e91b..258b5cd84eb 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -647,7 +647,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { return false } // use latest "last" index after truncate/append - li = r.raftLog.append(es...) + r.raftLog.append(es...) return true } diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 979b6fa8ce1..e209b3277b3 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -884,17 +884,17 @@ func TestRawNodeStatus(t *testing.T) { // TestNodeCommitPaginationAfterRestart. The anomaly here was even worse as the // Raft group would forget to apply entries: // -// - node learns that index 11 is committed -// - nextEnts returns index 1..10 in CommittedEntries (but index 10 already -// exceeds maxBytes), which isn't noticed internally by Raft -// - Commit index gets bumped to 10 -// - the node persists the HardState, but crashes before applying the entries -// - upon restart, the storage returns the same entries, but `slice` takes a -// different code path and removes the last entry. -// - Raft does not emit a HardState, but when the app calls Advance(), it bumps -// its internal applied index cursor to 10 (when it should be 9) -// - the next Ready asks the app to apply index 11 (omitting index 10), losing a -// write. +// - node learns that index 11 is committed +// - nextEnts returns index 1..10 in CommittedEntries (but index 10 already +// exceeds maxBytes), which isn't noticed internally by Raft +// - Commit index gets bumped to 10 +// - the node persists the HardState, but crashes before applying the entries +// - upon restart, the storage returns the same entries, but `slice` takes a +// different code path and removes the last entry. +// - Raft does not emit a HardState, but when the app calls Advance(), it bumps +// its internal applied index cursor to 10 (when it should be 9) +// - the next Ready asks the app to apply index 11 (omitting index 10), losing a +// write. func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { s := &ignoreSizeHintMemStorage{ MemoryStorage: newTestMemoryStorage(withPeers(1)), @@ -1133,12 +1133,26 @@ func TestRawNodeConsumeReady(t *testing.T) { } func BenchmarkRawNode(b *testing.B) { - b.Run("single-voter", func(b *testing.B) { - benchmarkRawNodeImpl(b, 1) - }) - b.Run("two-voters", func(b *testing.B) { - benchmarkRawNodeImpl(b, 1, 2) - }) + cases := []struct { + name string + peers []uint64 + }{ + { + name: "single-voter", + peers: []uint64{1}, + }, + { + name: "two-voters", + peers: []uint64{1, 2}, + }, + // You can easily add more cases here. + } + + for _, tc := range cases { + b.Run(tc.name, func(b *testing.B) { + benchmarkRawNodeImpl(b, tc.peers...) + }) + } } func benchmarkRawNodeImpl(b *testing.B, peers ...uint64) { diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index a36e5261ac7..e37e4b63ff7 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -52,8 +52,7 @@ type Progress struct { // RecentActive is true if the progress is recently active. Receiving any messages // from the corresponding follower indicates the progress is active. // RecentActive can be reset to false after an election timeout. - // - // TODO(tbg): the leader should always have this set to true. + // This is always true on the leader. RecentActive bool // ProbeSent is used while this follower is in StateProbe. When ProbeSent is From d56676c9b32107953a6c910e000c2a959732e064 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 19 Sep 2022 10:48:31 +0200 Subject: [PATCH 36/36] raft: benchmark results for ./benchmark put MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I ran this PR against its main merge-base twice (on my 2021 Mac M1 pro), and in both cases this PR was slightly faster, using the benchmark invocation from [^1]. 2819.6 vs 2808.4 2873.1 vs 2835 Full output below. ---- Script: ``` killall etcd rm -rf default.etcd scripts/build.sh nohup ./bin/etcd --quota-backend-bytes=4300000000 & sleep 10 f=bench-$(git log -1 --pretty=%s | sed -E 's/[^A-Za-z0-9]+/_/g').txt go run ./tools/benchmark txn-put --endpoints="http://127.0.0.1:2379" --clients=200 --conns=200 --key-space-size=4000000000 --key-size=128 --val-size=10240 --total=200000 --rate=40000 | tee "${f}" ``` PR: ``` Summary: Total: 70.9320 secs. Slowest: 0.3003 secs. Fastest: 0.0044 secs. Average: 0.0707 secs. Stddev: 0.0437 secs. Requests/sec: 2819.6030 (second run: 2873.0935) Response time histogram: 0.0044 [1] | 0.0340 [2877] | 0.0636 [119485] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎ 0.0932 [17436] |∎∎∎∎∎ 0.1228 [27364] |∎∎∎∎∎∎∎∎∎ 0.1524 [20349] |∎∎∎∎∎∎ 0.1820 [10214] |∎∎∎ 0.2116 [1248] | 0.2412 [564] | 0.2707 [318] | 0.3003 [144] | Latency distribution: 10% in 0.0368 secs. 25% in 0.0381 secs. 50% in 0.0416 secs. 75% in 0.0998 secs. 90% in 0.1375 secs. 95% in 0.1571 secs. 99% in 0.1850 secs. 99.9% in 0.2650 secs. ``` main: ``` Summary: Total: 71.2152 secs. Slowest: 0.6926 secs. Fastest: 0.0040 secs. Average: 0.0710 secs. Stddev: 0.0461 secs. Requests/sec: 2808.3903 (second run: 2834.98) Response time histogram: 0.0040 [1] | 0.0728 [125816] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎ 0.1417 [59127] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎ 0.2105 [13476] |∎∎∎∎ 0.2794 [1125] | 0.3483 [137] | 0.4171 [93] | 0.4860 [193] | 0.5549 [4] | 0.6237 [16] | 0.6926 [12] | Latency distribution: 10% in 0.0367 secs. 25% in 0.0379 secs. 50% in 0.0417 secs. 75% in 0.0993 secs. 90% in 0.1367 secs. 95% in 0.1567 secs. 99% in 0.1957 secs. 99.9% in 0.4361 secs. ``` [^1]: https://github.com/etcd-io/etcd/pull/14394#issuecomment-1229606410 Signed-off-by: Tobias Grieger