diff --git a/raft/diff_test.go b/raft/diff_test.go index 3114a1f0017..6030527941f 100644 --- a/raft/diff_test.go +++ b/raft/diff_test.go @@ -55,7 +55,7 @@ func mustTemp(pre, body string) string { } func ltoa(l *raftLog) string { - s := fmt.Sprintf("committed: %d\n", l.committed) + s := fmt.Sprintf("lastIndex: %d\n", l.lastIndex()) s += fmt.Sprintf("applied: %d\n", l.applied) for i, e := range l.allEntries() { s += fmt.Sprintf("#%d: %+v\n", i, e) diff --git a/raft/node.go b/raft/node.go index d374b6c0c21..381000621ac 100644 --- a/raft/node.go +++ b/raft/node.go @@ -211,11 +211,7 @@ type Peer struct { Context []byte } -// StartNode returns a new Node given configuration and a list of raft peers. -// It appends a ConfChangeAddNode entry for each given peer to the initial log. -// -// Peers must not be zero length; call RestartNode in that case. -func StartNode(c *Config, peers []Peer) Node { +func setupNode(c *Config, peers []Peer) *node { if len(peers) == 0 { panic("no peers given; use RestartNode instead") } @@ -229,9 +225,17 @@ func StartNode(c *Config, peers []Peer) Node { } n := newNode(rn) + return &n +} +// StartNode returns a new Node given configuration and a list of raft peers. +// It appends a ConfChangeAddNode entry for each given peer to the initial log. +// +// Peers must not be zero length; call RestartNode in that case. +func StartNode(c *Config, peers []Peer) Node { + n := setupNode(c, peers) go n.run() - return &n + return n } // RestartNode is similar to StartNode but does not take a list of peers. diff --git a/raft/node_test.go b/raft/node_test.go index 2c2ff801809..5cb729eb4fa 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -35,6 +35,12 @@ import ( func readyWithTimeout(n Node) Ready { select { case rd := <-n.Ready(): + if nn, ok := n.(*nodeTestHarness); ok { + n = nn.node + } + if nn, ok := n.(*node); ok { + nn.rn.raft.logger.Infof("emitted ready: %s", DescribeReady(rd, nil)) + } return rd case <-time.After(time.Second): panic("timed out waiting for ready") @@ -126,6 +132,10 @@ func TestNodeStepUnblock(t *testing.T) { func TestNodePropose(t *testing.T) { var msgs []raftpb.Message appendStep := func(r *raft, m raftpb.Message) error { + t.Log(DescribeMessage(m, nil)) + if m.Type == raftpb.MsgAppResp { + return nil // injected by (*raft).advance + } msgs = append(msgs, m) return nil } @@ -163,55 +173,6 @@ func TestNodePropose(t *testing.T) { } } -// TestNodeReadIndex ensures that node.ReadIndex sends the MsgReadIndex message to the underlying raft. -// It also ensures that ReadState can be read out through ready chan. -func TestNodeReadIndex(t *testing.T) { - var msgs []raftpb.Message - appendStep := func(r *raft, m raftpb.Message) error { - msgs = append(msgs, m) - return nil - } - wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}} - - s := newTestMemoryStorage(withPeers(1)) - rn := newTestRawNode(1, 10, 1, s) - n := newNode(rn) - r := rn.raft - r.readStates = wrs - - go n.run() - n.Campaign(context.TODO()) - for { - rd := <-n.Ready() - if !reflect.DeepEqual(rd.ReadStates, wrs) { - t.Errorf("ReadStates = %v, want %v", rd.ReadStates, wrs) - } - - s.Append(rd.Entries) - - if rd.SoftState.Lead == r.id { - n.Advance() - break - } - n.Advance() - } - - r.step = appendStep - wrequestCtx := []byte("somedata2") - n.ReadIndex(context.TODO(), wrequestCtx) - n.Stop() - - if len(msgs) != 1 { - t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1) - } - if msgs[0].Type != raftpb.MsgReadIndex { - t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex) - } - if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) { - t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx) - } -} - // TestDisableProposalForwarding ensures that proposals are not forwarded to // the leader when DisableProposalForwarding is true. func TestDisableProposalForwarding(t *testing.T) { @@ -308,6 +269,9 @@ func TestNodeReadIndexToOldLeader(t *testing.T) { func TestNodeProposeConfig(t *testing.T) { var msgs []raftpb.Message appendStep := func(r *raft, m raftpb.Message) error { + if m.Type == raftpb.MsgAppResp { + return nil // injected by (*raft).advance + } msgs = append(msgs, m) return nil } @@ -352,30 +316,34 @@ func TestNodeProposeConfig(t *testing.T) { // not affect the later propose to add new node. func TestNodeProposeAddDuplicateNode(t *testing.T) { s := newTestMemoryStorage(withPeers(1)) - rn := newTestRawNode(1, 10, 1, s) - n := newNode(rn) - go n.run() - n.Campaign(context.TODO()) - rdyEntries := make([]raftpb.Entry, 0) + cfg := newTestConfig(1, 10, 1, s) + ctx, cancel, n := newNodeTestHarness(context.Background(), t, cfg) + defer cancel() + n.Campaign(ctx) + allCommittedEntries := make([]raftpb.Entry, 0) ticker := time.NewTicker(time.Millisecond * 100) defer ticker.Stop() - done := make(chan struct{}) - stop := make(chan struct{}) + goroutineStopped := make(chan struct{}) applyConfChan := make(chan struct{}) + rd := readyWithTimeout(n) + s.Append(rd.Entries) + n.Advance() + go func() { - defer close(done) + defer close(goroutineStopped) for { select { - case <-stop: + case <-ctx.Done(): return case <-ticker.C: n.Tick() case rd := <-n.Ready(): + t.Log(DescribeReady(rd, nil)) s.Append(rd.Entries) applied := false - for _, e := range rd.Entries { - rdyEntries = append(rdyEntries, e) + for _, e := range rd.CommittedEntries { + allCommittedEntries = append(allCommittedEntries, e) switch e.Type { case raftpb.EntryNormal: case raftpb.EntryConfChange: @@ -395,32 +363,31 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) { cc1 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1} ccdata1, _ := cc1.Marshal() - n.ProposeConfChange(context.TODO(), cc1) + n.ProposeConfChange(ctx, cc1) <-applyConfChan // try add the same node again - n.ProposeConfChange(context.TODO(), cc1) + n.ProposeConfChange(ctx, cc1) <-applyConfChan // the new node join should be ok cc2 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2} ccdata2, _ := cc2.Marshal() - n.ProposeConfChange(context.TODO(), cc2) + n.ProposeConfChange(ctx, cc2) <-applyConfChan - close(stop) - <-done + cancel() + <-goroutineStopped - if len(rdyEntries) != 4 { - t.Errorf("len(entry) = %d, want %d, %v\n", len(rdyEntries), 4, rdyEntries) + if len(allCommittedEntries) != 4 { + t.Errorf("len(entry) = %d, want %d, %v\n", len(allCommittedEntries), 4, allCommittedEntries) } - if !bytes.Equal(rdyEntries[1].Data, ccdata1) { - t.Errorf("data = %v, want %v", rdyEntries[1].Data, ccdata1) + if !bytes.Equal(allCommittedEntries[1].Data, ccdata1) { + t.Errorf("data = %v, want %v", allCommittedEntries[1].Data, ccdata1) } - if !bytes.Equal(rdyEntries[3].Data, ccdata2) { - t.Errorf("data = %v, want %v", rdyEntries[3].Data, ccdata2) + if !bytes.Equal(allCommittedEntries[3].Data, ccdata2) { + t.Errorf("data = %v, want %v", allCommittedEntries[3].Data, ccdata2) } - n.Stop() } // TestBlockProposal ensures that node will block proposal when it does not @@ -463,6 +430,10 @@ func TestNodeProposeWaitDropped(t *testing.T) { t.Logf("dropping message: %v", m.String()) return ErrProposalDropped } + if m.Type == raftpb.MsgAppResp { + // This is produced by raft internally, see (*raft).advance. + return nil + } msgs = append(msgs, m) return nil } @@ -495,7 +466,7 @@ func TestNodeProposeWaitDropped(t *testing.T) { n.Stop() if len(msgs) != 0 { - t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1) + t.Fatalf("len(msgs) = %d, want %d", len(msgs), 0) } } @@ -580,9 +551,6 @@ func TestReadyContainUpdates(t *testing.T) { // start with correct configuration change entries, and can accept and commit // proposals. func TestNodeStart(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1} ccdata, err := cc.Marshal() if err != nil { @@ -600,11 +568,17 @@ func TestNodeStart(t *testing.T) { MustSync: true, }, { - HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1}, + HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1}, Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, - CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, + CommittedEntries: []raftpb.Entry{{Term: 2, Index: 2, Data: nil}}, MustSync: true, }, + { + HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1}, + Entries: nil, + CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, + MustSync: false, + }, } storage := NewMemoryStorage() c := &Config{ @@ -616,27 +590,44 @@ func TestNodeStart(t *testing.T) { MaxInflightMsgs: 256, } n := StartNode(c, []Peer{{ID: 1}}) - defer n.Stop() - g := <-n.Ready() - if !reflect.DeepEqual(g, wants[0]) { - t.Fatalf("#%d: g = %+v,\n w %+v", 1, g, wants[0]) - } else { - storage.Append(g.Entries) + ctx, cancel, n := newNodeTestHarness(context.Background(), t, c, Peer{ID: 1}) + defer cancel() + + { + rd := <-n.Ready() + if !reflect.DeepEqual(rd, wants[0]) { + t.Fatalf("#1: rd = %+v,\n w %+v", rd, wants[0]) + } + storage.Append(rd.Entries) n.Advance() } if err := n.Campaign(ctx); err != nil { t.Fatal(err) } - rd := <-n.Ready() - storage.Append(rd.Entries) - n.Advance() + + { + rd := <-n.Ready() + storage.Append(rd.Entries) + n.Advance() + } n.Propose(ctx, []byte("foo")) - if g2 := <-n.Ready(); !reflect.DeepEqual(g2, wants[1]) { - t.Errorf("#%d: g = %+v,\n w %+v", 2, g2, wants[1]) - } else { - storage.Append(g2.Entries) + { + rd := <-n.Ready() + if !reflect.DeepEqual(rd, wants[1]) { + t.Errorf("#2: rd = %+v,\n w %+v", rd, wants[1]) + } + storage.Append(rd.Entries) + n.Advance() + } + + { + rd := <-n.Ready() + if !reflect.DeepEqual(rd, wants[2]) { + t.Errorf("#3: rd = %+v,\n w %+v", rd, wants[2]) + } + storage.Append(rd.Entries) n.Advance() } @@ -740,10 +731,7 @@ func TestNodeRestartFromSnapshot(t *testing.T) { } func TestNodeAdvance(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - storage := NewMemoryStorage() + storage := newTestMemoryStorage(withPeers(1)) c := &Config{ ID: 1, ElectionTick: 10, @@ -752,21 +740,17 @@ func TestNodeAdvance(t *testing.T) { MaxSizePerMsg: noLimit, MaxInflightMsgs: 256, } - n := StartNode(c, []Peer{{ID: 1}}) - defer n.Stop() - rd := <-n.Ready() - storage.Append(rd.Entries) - n.Advance() + ctx, cancel, n := newNodeTestHarness(context.Background(), t, c) + defer cancel() n.Campaign(ctx) - <-n.Ready() + rd := readyWithTimeout(n) + // Commit empty entry. + storage.Append(rd.Entries) + n.Advance() n.Propose(ctx, []byte("foo")) - select { - case rd = <-n.Ready(): - t.Fatalf("unexpected Ready before Advance: %+v", rd) - case <-time.After(time.Millisecond): - } + rd = readyWithTimeout(n) storage.Append(rd.Entries) n.Advance() select { @@ -911,15 +895,14 @@ func TestCommitPagination(t *testing.T) { s := newTestMemoryStorage(withPeers(1)) cfg := newTestConfig(1, 10, 1, s) cfg.MaxCommittedSizePerReady = 2048 - rn, err := NewRawNode(cfg) - if err != nil { - t.Fatal(err) - } - n := newNode(rn) - go n.run() - n.Campaign(context.TODO()) + ctx, cancel, n := newNodeTestHarness(context.Background(), t, cfg) + defer cancel() + n.Campaign(ctx) - rd := readyWithTimeout(&n) + rd := readyWithTimeout(n) + s.Append(rd.Entries) + n.Advance() + rd = readyWithTimeout(n) if len(rd.CommittedEntries) != 1 { t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries)) } @@ -928,25 +911,32 @@ func TestCommitPagination(t *testing.T) { blob := []byte(strings.Repeat("a", 1000)) for i := 0; i < 3; i++ { - if err := n.Propose(context.TODO(), blob); err != nil { + if err := n.Propose(ctx, blob); err != nil { t.Fatal(err) } } + // First the three proposals have to be appended. + rd = readyWithTimeout(n) + if len(rd.Entries) != 3 { + t.Fatal("expected to see three entries") + } + s.Append(rd.Entries) + n.Advance() + // The 3 proposals will commit in two batches. - rd = readyWithTimeout(&n) + rd = readyWithTimeout(n) if len(rd.CommittedEntries) != 2 { t.Fatalf("expected 2 entries in first batch, got %d", len(rd.CommittedEntries)) } s.Append(rd.Entries) n.Advance() - rd = readyWithTimeout(&n) + rd = readyWithTimeout(n) if len(rd.CommittedEntries) != 1 { t.Fatalf("expected 1 entry in second batch, got %d", len(rd.CommittedEntries)) } s.Append(rd.Entries) n.Advance() - n.Stop() } type ignoreSizeHintMemStorage struct { diff --git a/raft/node_util_test.go b/raft/node_util_test.go new file mode 100644 index 00000000000..5093cba6bf9 --- /dev/null +++ b/raft/node_util_test.go @@ -0,0 +1,110 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raft + +import ( + "context" + "fmt" + "testing" + "time" +) + +type nodeTestHarness struct { + *node + t *testing.T +} + +func (l *nodeTestHarness) Debug(v ...interface{}) { + l.t.Log(v...) +} + +func (l *nodeTestHarness) Debugf(format string, v ...interface{}) { + l.t.Logf(format, v...) +} + +func (l *nodeTestHarness) Error(v ...interface{}) { + l.t.Error(v...) +} + +func (l *nodeTestHarness) Errorf(format string, v ...interface{}) { + l.t.Errorf(format, v...) +} + +func (l *nodeTestHarness) Info(v ...interface{}) { + l.t.Log(v...) +} + +func (l *nodeTestHarness) Infof(format string, v ...interface{}) { + l.t.Logf(format, v...) +} + +func (l *nodeTestHarness) Warning(v ...interface{}) { + l.t.Log(v...) +} + +func (l *nodeTestHarness) Warningf(format string, v ...interface{}) { + l.t.Logf(format, v...) +} + +func (l *nodeTestHarness) Fatal(v ...interface{}) { + l.t.Error(v...) + panic(v) +} + +func (l *nodeTestHarness) Fatalf(format string, v ...interface{}) { + l.t.Errorf(format, v...) + panic(fmt.Sprintf(format, v...)) +} + +func (l *nodeTestHarness) Panic(v ...interface{}) { + l.t.Log(v...) + panic(v) +} + +func (l *nodeTestHarness) Panicf(format string, v ...interface{}) { + l.t.Errorf(format, v...) + panic(fmt.Sprintf(format, v...)) +} + +func newNodeTestHarness(ctx context.Context, t *testing.T, cfg *Config, peers ...Peer) (_ context.Context, cancel func(), _ *nodeTestHarness) { + // Wrap context in a 10s timeout to make tests more robust. Otherwise, + // it's likely that deadlock will occur unless Node behaves exactly as + // expected - when you expect a Ready and start waiting on the channel + // but no Ready ever shows up, for example. + ctx, cancel = context.WithTimeout(ctx, 10*time.Second) + var n *node + if len(peers) > 0 { + n = setupNode(cfg, peers) + } else { + rn, err := NewRawNode(cfg) + if err != nil { + t.Fatal(err) + } + nn := newNode(rn) + n = &nn + } + go func() { + defer func() { + if r := recover(); r != nil { + t.Error(r) + } + }() + defer cancel() + defer n.Stop() + n.run() + }() + t.Cleanup(n.Stop) + return ctx, cancel, &nodeTestHarness{node: n, t: t} +} diff --git a/raft/raft.go b/raft/raft.go index 8ffa6792863..52b4192c8c5 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -572,6 +572,19 @@ func (r *raft) advance(rd Ready) { if len(rd.Entries) > 0 { e := rd.Entries[len(rd.Entries)-1] + if r.id == r.lead { + // The leader needs to self-ack the entries just appended (since it doesn't + // send an MsgApp to itself). This is roughly equivalent to: + // + // r.prs.Progress[r.id].MaybeUpdate(e.Index) + // if r.maybeCommit() { + // r.bcastAppend() + // } + _ = r.Step(pb.Message{From: r.id, Type: pb.MsgAppResp, Index: e.Index}) + } + // NB: it's important for performance that this call happens after + // r.Step above on the leader. This is because r.Step can then use + // a fast-path for `r.raftLog.term()`. r.raftLog.stableTo(e.Index, e.Term) } if !IsEmptySnap(rd.Snapshot) { @@ -634,10 +647,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) { return false } // use latest "last" index after truncate/append - li = r.raftLog.append(es...) - r.prs.Progress[r.id].MaybeUpdate(li) - // Regardless of maybeCommit's return, our caller will call bcastAppend. - r.maybeCommit() + r.raftLog.append(es...) return true } @@ -735,7 +745,11 @@ func (r *raft) becomeLeader() { // (perhaps after having received a snapshot as a result). The leader is // trivially in this state. Note that r.reset() has initialized this // progress with the last index already. - r.prs.Progress[r.id].BecomeReplicate() + pr := r.prs.Progress[r.id] + pr.BecomeReplicate() + // The leader always has RecentActive == true; MsgCheckQuorum makes sure to + // preserve this. + pr.RecentActive = true // Conservatively set the pendingConfIndex to the last index in the // log. There may or may not be a pending config change, but it's @@ -995,15 +1009,6 @@ func stepLeader(r *raft, m pb.Message) error { r.bcastHeartbeat() return nil case pb.MsgCheckQuorum: - // The leader should always see itself as active. As a precaution, handle - // the case in which the leader isn't in the configuration any more (for - // example if it just removed itself). - // - // TODO(tbg): I added a TODO in removeNode, it doesn't seem that the - // leader steps down when removing itself. I might be missing something. - if pr := r.prs.Progress[r.id]; pr != nil { - pr.RecentActive = true - } if !r.prs.QuorumActive() { r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id) r.becomeFollower(r.Term, None) @@ -1104,6 +1109,9 @@ func stepLeader(r *raft, m pb.Message) error { } switch m.Type { case pb.MsgAppResp: + // NB: this code path is also hit from (*raft).advance, where the leader steps + // an MsgAppResp to acknowledge the appended entries in the last Ready. + pr.RecentActive = true if m.Reject { @@ -1272,7 +1280,9 @@ func stepLeader(r *raft, m pb.Message) error { // replicate, or when freeTo() covers multiple messages). If // we have more entries to send, send as many messages as we // can (without sending empty messages for the commit index) - for r.maybeSendAppend(m.From, false) { + if r.id != m.From { + for r.maybeSendAppend(m.From, false) { + } } // Transfer leadership is in progress. if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() { @@ -1811,6 +1821,11 @@ func numOfPendingConf(ents []pb.Entry) int { } func releasePendingReadIndexMessages(r *raft) { + if len(r.pendingReadIndexMessages) == 0 { + // Fast path for the common case to avoid a call to storage.LastIndex() + // via committedEntryInCurrentTerm. + return + } if !r.committedEntryInCurrentTerm() { r.logger.Error("pending MsgReadIndex should be released only after first commit in current term") return diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 9577823d426..84c451cdf2d 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -473,9 +473,9 @@ func TestLeaderCommitEntry(t *testing.T) { // Reference: section 5.3 func TestLeaderAcknowledgeCommit(t *testing.T) { tests := []struct { - size int - acceptors map[uint64]bool - wack bool + size int + nonLeaderAcceptors map[uint64]bool + wack bool }{ {1, nil, true}, {3, nil, false}, @@ -496,8 +496,11 @@ func TestLeaderAcknowledgeCommit(t *testing.T) { li := r.raftLog.lastIndex() r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) - for _, m := range r.readMessages() { - if tt.acceptors[m.To] { + rd := newReady(r, &SoftState{}, pb.HardState{}) + s.Append(rd.Entries) + r.advance(rd) // simulate having appended entry on leader + for _, m := range rd.Messages { + if tt.nonLeaderAcceptors[m.To] { r.Step(acceptAndReply(m)) } } @@ -891,6 +894,9 @@ func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Term: r.Term, Index: tt.index}) + rd := newReady(r, &SoftState{}, pb.HardState{}) + storage.Append(rd.Entries) + r.advance(rd) if r.raftLog.committed != tt.wcommit { t.Errorf("#%d: commit = %d, want %d", i, r.raftLog.committed, tt.wcommit) } diff --git a/raft/raft_test.go b/raft/raft_test.go index 945611a50e5..e45533d8d03 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -29,13 +29,15 @@ import ( // nextEnts returns the appliable entries and updates the applied index func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) { - // Transfer all unstable entries to "stable" storage. - s.Append(r.raftLog.unstableEntries()) - r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm()) - - ents = r.raftLog.nextEnts() - r.raftLog.appliedTo(r.raftLog.committed) - return ents + for { + rd := newReady(r, &SoftState{}, pb.HardState{}) + s.Append(rd.Entries) + r.advance(rd) + if len(rd.Entries)+len(rd.CommittedEntries) == 0 { + return ents + } + ents = append(ents, rd.CommittedEntries...) + } } func mustAppendEntry(r *raft, ents ...pb.Entry) { @@ -57,21 +59,33 @@ func (r *raft) readMessages() []pb.Message { } func TestProgressLeader(t *testing.T) { - r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2))) + s := newTestMemoryStorage(withPeers(1, 2)) + r := newTestRaft(1, 5, 1, s) r.becomeCandidate() r.becomeLeader() r.prs.Progress[2].BecomeReplicate() - // Send proposals to r1. The first 5 entries should be appended to the log. + // Send proposals to r1. The first 5 entries should be queued in the unstable log. propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}} for i := 0; i < 5; i++ { - if pr := r.prs.Progress[r.id]; pr.State != tracker.StateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 { - t.Errorf("unexpected progress %v", pr) - } if err := r.Step(propMsg); err != nil { t.Fatalf("proposal resulted in error: %v", err) } } + if m := r.prs.Progress[1].Match; m != 0 { + t.Fatalf("expected zero match, got %d", m) + } + rd := newReady(r, &SoftState{}, pb.HardState{}) + if len(rd.Entries) != 6 || len(rd.Entries[0].Data) > 0 || string(rd.Entries[5].Data) != "foo" { + t.Fatalf("unexpected Entries: %s", DescribeReady(rd, nil)) + } + r.advance(rd) + if m := r.prs.Progress[1].Match; m != 6 { + t.Fatalf("unexpected Match %d", m) + } + if m := r.prs.Progress[1].Next; m != 7 { + t.Fatalf("unexpected Next %d", m) + } } // TestProgressResumeByHeartbeatResp ensures raft.heartbeat reset progress.paused by heartbeat response. @@ -663,10 +677,12 @@ func TestLogReplication(t *testing.T) { // TestLearnerLogReplication tests that a learner can receive entries from the leader. func TestLearnerLogReplication(t *testing.T) { - n1 := newTestLearnerRaft(1, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2))) + s1 := newTestMemoryStorage(withPeers(1), withLearners(2)) + n1 := newTestLearnerRaft(1, 10, 1, s1) n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2))) nt := newNetwork(n1, n2) + nt.t = t n1.becomeFollower(1, None) n2.becomeFollower(1, None) @@ -686,12 +702,23 @@ func TestLearnerLogReplication(t *testing.T) { t.Error("peer 2 state: not learner, want yes") } - nextCommitted := n1.raftLog.committed + 1 - nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) + nextCommitted := uint64(2) + { + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) + rd := newReady(n1, &SoftState{}, pb.HardState{}) + nt.send(rd.Messages...) + s1.Append(rd.Entries) + n1.advance(rd) + } if n1.raftLog.committed != nextCommitted { t.Errorf("peer 1 wants committed to %d, but still %d", nextCommitted, n1.raftLog.committed) } + { + rd := newReady(n1, &SoftState{}, pb.HardState{}) + nt.send(rd.Messages...) + } + if n1.raftLog.committed != n2.raftLog.committed { t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed) } @@ -703,11 +730,18 @@ func TestLearnerLogReplication(t *testing.T) { } func TestSingleNodeCommit(t *testing.T) { - tt := newNetwork(nil) + s := newTestMemoryStorage(withPeers(1)) + cfg := newTestConfig(1, 10, 1, s) + r := newRaft(cfg) + tt := newNetwork(r) tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}}) + rd := newReady(r, &SoftState{}, pb.HardState{}) + s.Append(rd.Entries) + r.advance(rd) + sm := tt.peers[1].(*raft) if sm.raftLog.committed != 3 { t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3) @@ -792,9 +826,12 @@ func TestCommitWithoutNewTermEntry(t *testing.T) { } func TestDuelingCandidates(t *testing.T) { - a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) + s1 := newTestMemoryStorage(withPeers(1, 2, 3)) + s2 := newTestMemoryStorage(withPeers(1, 2, 3)) + s3 := newTestMemoryStorage(withPeers(1, 2, 3)) + a := newTestRaft(1, 10, 1, s1) + b := newTestRaft(2, 10, 1, s2) + c := newTestRaft(3, 10, 1, s3) nt := newNetwork(a, b, c) nt.cut(1, 3) @@ -820,21 +857,19 @@ func TestDuelingCandidates(t *testing.T) { // we expect it to disrupt the leader 1 since it has a higher term // 3 will be follower again since both 1 and 2 rejects its vote request since 3 does not have a long enough log nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) - - wlog := &raftLog{ - storage: &MemoryStorage{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}}}, - committed: 1, - unstable: unstable{offset: 2}, + if sm.state != StateFollower { + t.Errorf("state = %s, want %s", sm.state, StateFollower) } + tests := []struct { - sm *raft - state StateType - term uint64 - raftLog *raftLog + sm *raft + state StateType + term uint64 + lastIndex uint64 }{ - {a, StateFollower, 2, wlog}, - {b, StateFollower, 2, wlog}, - {c, StateFollower, 2, newLog(NewMemoryStorage(), raftLogger)}, + {a, StateFollower, 2, 1}, + {b, StateFollower, 2, 1}, + {c, StateFollower, 2, 0}, } for i, tt := range tests { @@ -844,14 +879,8 @@ func TestDuelingCandidates(t *testing.T) { if g := tt.sm.Term; g != tt.term { t.Errorf("#%d: term = %d, want %d", i, g, tt.term) } - base := ltoa(tt.raftLog) - if sm, ok := nt.peers[1+uint64(i)].(*raft); ok { - l := ltoa(sm.raftLog) - if g := diffu(base, l); g != "" { - t.Errorf("#%d: diff:\n%s", i, g) - } - } else { - t.Logf("#%d: empty log", i) + if exp, act := tt.lastIndex, tt.sm.raftLog.lastIndex(); exp != act { + t.Errorf("#%d: last index exp = %d, act = %d", i, exp, act) } } } @@ -868,6 +897,7 @@ func TestDuelingPreCandidates(t *testing.T) { c := newRaft(cfgC) nt := newNetwork(a, b, c) + nt.t = t nt.cut(1, 3) nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) @@ -891,20 +921,15 @@ func TestDuelingPreCandidates(t *testing.T) { // With PreVote, it does not disrupt the leader. nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup}) - wlog := &raftLog{ - storage: &MemoryStorage{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}}}, - committed: 1, - unstable: unstable{offset: 2}, - } tests := []struct { - sm *raft - state StateType - term uint64 - raftLog *raftLog + sm *raft + state StateType + term uint64 + lastIndex uint64 }{ - {a, StateLeader, 1, wlog}, - {b, StateFollower, 1, wlog}, - {c, StateFollower, 1, newLog(NewMemoryStorage(), raftLogger)}, + {a, StateLeader, 1, 1}, + {b, StateFollower, 1, 1}, + {c, StateFollower, 1, 0}, } for i, tt := range tests { @@ -914,14 +939,8 @@ func TestDuelingPreCandidates(t *testing.T) { if g := tt.sm.Term; g != tt.term { t.Errorf("#%d: term = %d, want %d", i, g, tt.term) } - base := ltoa(tt.raftLog) - if sm, ok := nt.peers[1+uint64(i)].(*raft); ok { - l := ltoa(sm.raftLog) - if g := diffu(base, l); g != "" { - t.Errorf("#%d: diff:\n%s", i, g) - } - } else { - t.Logf("#%d: empty log", i) + if exp, act := tt.lastIndex, tt.sm.raftLog.lastIndex(); exp != act { + t.Errorf("#%d: last index is %d, exp %d", i, act, exp) } } } @@ -1058,6 +1077,7 @@ func TestProposal(t *testing.T) { // promote 1 to become leader send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) + r := tt.network.peers[1].(*raft) wantLog := newLog(NewMemoryStorage(), raftLogger) if tt.success { @@ -1065,8 +1085,8 @@ func TestProposal(t *testing.T) { storage: &MemoryStorage{ ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, }, - unstable: unstable{offset: 3}, - committed: 2} + unstable: unstable{offset: 3}, + } } base := ltoa(wantLog) for i, p := range tt.peers { @@ -1079,8 +1099,7 @@ func TestProposal(t *testing.T) { t.Logf("#%d: peer %d empty log", j, i) } } - sm := tt.network.peers[1].(*raft) - if g := sm.Term; g != 1 { + if g := r.Term; g != 1 { t.Errorf("#%d: term = %d, want %d", j, g, 1) } } @@ -1405,14 +1424,14 @@ func TestRaftFreesReadOnlyMem(t *testing.T) { // TestMsgAppRespWaitReset verifies the resume behavior of a leader // MsgAppResp. func TestMsgAppRespWaitReset(t *testing.T) { - sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2, 3))) + s := newTestMemoryStorage(withPeers(1, 2, 3)) + sm := newTestRaft(1, 5, 1, s) sm.becomeCandidate() sm.becomeLeader() - // The new leader has just emitted a new Term 4 entry; consume those messages - // from the outgoing queue. - sm.bcastAppend() - sm.readMessages() + // Run n1 which includes sending a message like the below + // one to n2, but also appending to its own log. + nextEnts(sm, s) // Node 2 acks the first entry, making it committed. sm.Step(pb.Message{ @@ -2228,7 +2247,8 @@ func TestReadOnlyOptionSafe(t *testing.T) { } func TestReadOnlyWithLearner(t *testing.T) { - a := newTestLearnerRaft(1, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2))) + s := newTestMemoryStorage(withPeers(1), withLearners(2)) + a := newTestLearnerRaft(1, 10, 1, s) b := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2))) nt := newNetwork(a, b) @@ -2258,6 +2278,7 @@ func TestReadOnlyWithLearner(t *testing.T) { for i, tt := range tests { for j := 0; j < tt.proposals; j++ { nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + nextEnts(a, s) // append the entries on the leader } nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}}) @@ -3634,13 +3655,17 @@ func TestLeaderTransferTimeout(t *testing.T) { } func TestLeaderTransferIgnoreProposal(t *testing.T) { - nt := newNetwork(nil, nil, nil) + s := newTestMemoryStorage(withPeers(1, 2, 3)) + r := newTestRaft(1, 10, 1, s) + nt := newNetwork(r, nil, nil) nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) nt.isolate(3) lead := nt.peers[1].(*raft) + nextEnts(r, s) // handle empty entry + // Transfer leadership to isolated node to let transfer pending, then send proposal. nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader}) if lead.leadTransferee != 3 { @@ -4630,6 +4655,8 @@ func votedWithConfig(configFunc func(*Config), vote, term uint64) *raft { } type network struct { + t *testing.T // optional + peers map[uint64]stateMachine storage map[uint64]*MemoryStorage dropm map[connem]float64 @@ -4713,6 +4740,9 @@ func (nw *network) send(msgs ...pb.Message) { for len(msgs) > 0 { m := msgs[0] p := nw.peers[m.To] + if nw.t != nil { + nw.t.Log(DescribeMessage(m, nil)) + } p.Step(m) msgs = append(msgs[1:], nw.filter(p.readMessages())...) } diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 535152e14c2..e209b3277b3 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -388,122 +388,125 @@ func TestRawNodeJointAutoLeave(t *testing.T) { } exp2Cs := pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}} - t.Run("", func(t *testing.T) { - s := newTestMemoryStorage(withPeers(1)) - rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s)) - if err != nil { - t.Fatal(err) - } + s := newTestMemoryStorage(withPeers(1)) + rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s)) + if err != nil { + t.Fatal(err) + } - rawNode.Campaign() - proposed := false - var ( - lastIndex uint64 - ccdata []byte - ) - // Propose the ConfChange, wait until it applies, save the resulting - // ConfState. - var cs *pb.ConfState - for cs == nil { - rd := rawNode.Ready() - s.Append(rd.Entries) - for _, ent := range rd.CommittedEntries { - var cc pb.ConfChangeI - if ent.Type == pb.EntryConfChangeV2 { - var ccc pb.ConfChangeV2 - if err = ccc.Unmarshal(ent.Data); err != nil { - t.Fatal(err) - } - cc = &ccc - } - if cc != nil { - // Force it step down. - rawNode.Step(pb.Message{Type: pb.MsgHeartbeatResp, From: 1, Term: rawNode.raft.Term + 1}) - cs = rawNode.ApplyConfChange(cc) - } - } - rawNode.Advance(rd) - // Once we are the leader, propose a command and a ConfChange. - if !proposed && rd.SoftState.Lead == rawNode.raft.id { - if err = rawNode.Propose([]byte("somedata")); err != nil { - t.Fatal(err) - } - ccdata, err = testCc.Marshal() - if err != nil { + rawNode.Campaign() + proposed := false + var ( + lastIndex uint64 + ccdata []byte + ) + // Propose the ConfChange, wait until it applies, save the resulting + // ConfState. + var cs *pb.ConfState + for cs == nil { + rd := rawNode.Ready() + s.Append(rd.Entries) + for _, ent := range rd.CommittedEntries { + var cc pb.ConfChangeI + if ent.Type == pb.EntryConfChangeV2 { + var ccc pb.ConfChangeV2 + if err = ccc.Unmarshal(ent.Data); err != nil { t.Fatal(err) } - rawNode.ProposeConfChange(testCc) - proposed = true + cc = &ccc + } + if cc != nil { + // Force it step down. + rawNode.Step(pb.Message{Type: pb.MsgHeartbeatResp, From: 1, Term: rawNode.raft.Term + 1}) + cs = rawNode.ApplyConfChange(cc) } } - - // Check that the last index is exactly the conf change we put in, - // down to the bits. Note that this comes from the Storage, which - // will not reflect any unstable entries that we'll only be presented - // with in the next Ready. - lastIndex, err = s.LastIndex() - if err != nil { - t.Fatal(err) + rawNode.Advance(rd) + // Once we are the leader, propose a command and a ConfChange. + if !proposed && rd.SoftState.Lead == rawNode.raft.id { + if err = rawNode.Propose([]byte("somedata")); err != nil { + t.Fatal(err) + } + ccdata, err = testCc.Marshal() + if err != nil { + t.Fatal(err) + } + rawNode.ProposeConfChange(testCc) + proposed = true } + } - entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit) - if err != nil { - t.Fatal(err) - } - if len(entries) != 2 { - t.Fatalf("len(entries) = %d, want %d", len(entries), 2) - } - if !bytes.Equal(entries[0].Data, []byte("somedata")) { - t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata")) - } - if entries[1].Type != pb.EntryConfChangeV2 { - t.Fatalf("type = %v, want %v", entries[1].Type, pb.EntryConfChangeV2) - } - if !bytes.Equal(entries[1].Data, ccdata) { - t.Errorf("data = %v, want %v", entries[1].Data, ccdata) - } + // Check that the last index is exactly the conf change we put in, + // down to the bits. Note that this comes from the Storage, which + // will not reflect any unstable entries that we'll only be presented + // with in the next Ready. + lastIndex, err = s.LastIndex() + if err != nil { + t.Fatal(err) + } - if !reflect.DeepEqual(&expCs, cs) { - t.Fatalf("exp:\n%+v\nact:\n%+v", expCs, cs) - } + entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit) + if err != nil { + t.Fatal(err) + } + if len(entries) != 2 { + t.Fatalf("len(entries) = %d, want %d", len(entries), 2) + } + if !bytes.Equal(entries[0].Data, []byte("somedata")) { + t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata")) + } + if entries[1].Type != pb.EntryConfChangeV2 { + t.Fatalf("type = %v, want %v", entries[1].Type, pb.EntryConfChangeV2) + } + if !bytes.Equal(entries[1].Data, ccdata) { + t.Errorf("data = %v, want %v", entries[1].Data, ccdata) + } - if rawNode.raft.pendingConfIndex != 0 { - t.Fatalf("pendingConfIndex: expected %d, got %d", 0, rawNode.raft.pendingConfIndex) - } + if !reflect.DeepEqual(&expCs, cs) { + t.Fatalf("exp:\n%+v\nact:\n%+v", expCs, cs) + } - // Move the RawNode along. It should not leave joint because it's follower. - rd := rawNode.readyWithoutAccept() - // Check that the right ConfChange comes out. - if len(rd.Entries) != 0 { - t.Fatalf("expected zero entry, got %+v", rd) - } + if rawNode.raft.pendingConfIndex != 0 { + t.Fatalf("pendingConfIndex: expected %d, got %d", 0, rawNode.raft.pendingConfIndex) + } - // Make it leader again. It should leave joint automatically after moving apply index. - rawNode.Campaign() - rd = rawNode.Ready() - s.Append(rd.Entries) - rawNode.Advance(rd) - rd = rawNode.Ready() - s.Append(rd.Entries) + // Move the RawNode along. It should not leave joint because it's follower. + rd := rawNode.readyWithoutAccept() + // Check that the right ConfChange comes out. + if len(rd.Entries) != 0 { + t.Fatalf("expected zero entry, got %+v", rd) + } - // Check that the right ConfChange comes out. - if len(rd.Entries) != 1 || rd.Entries[0].Type != pb.EntryConfChangeV2 { - t.Fatalf("expected exactly one more entry, got %+v", rd) - } - var cc pb.ConfChangeV2 - if err := cc.Unmarshal(rd.Entries[0].Data); err != nil { - t.Fatal(err) - } - if !reflect.DeepEqual(cc, pb.ConfChangeV2{Context: nil}) { - t.Fatalf("expected zero ConfChangeV2, got %+v", cc) - } - // Lie and pretend the ConfChange applied. It won't do so because now - // we require the joint quorum and we're only running one node. - cs = rawNode.ApplyConfChange(cc) - if exp := exp2Cs; !reflect.DeepEqual(&exp, cs) { - t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs) - } - }) + // Make it leader again. It should leave joint automatically after moving apply index. + rawNode.Campaign() + rd = rawNode.Ready() + t.Log(DescribeReady(rd, nil)) + s.Append(rd.Entries) + rawNode.Advance(rd) + rd = rawNode.Ready() + t.Log(DescribeReady(rd, nil)) + s.Append(rd.Entries) + rawNode.Advance(rd) + rd = rawNode.Ready() + t.Log(DescribeReady(rd, nil)) + s.Append(rd.Entries) + // Check that the right ConfChange comes out. + if len(rd.Entries) != 1 || rd.Entries[0].Type != pb.EntryConfChangeV2 { + t.Fatalf("expected exactly one more entry, got %+v", rd) + } + var cc pb.ConfChangeV2 + if err := cc.Unmarshal(rd.Entries[0].Data); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(cc, pb.ConfChangeV2{Context: nil}) { + t.Fatalf("expected zero ConfChangeV2, got %+v", cc) + } + // Lie and pretend the ConfChange applied. It won't do so because now + // we require the joint quorum and we're only running one node. + cs = rawNode.ApplyConfChange(cc) + if exp := exp2Cs; !reflect.DeepEqual(&exp, cs) { + t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs) + } } // TestRawNodeProposeAddDuplicateNode ensures that two proposes to add the same node should @@ -656,18 +659,16 @@ func TestRawNodeReadIndex(t *testing.T) { // requires the application to bootstrap the state, i.e. it does not accept peers // and will not create faux configuration change entries. func TestRawNodeStart(t *testing.T) { + entries := []pb.Entry{ + {Term: 1, Index: 2, Data: nil}, // empty entry + {Term: 1, Index: 3, Data: []byte("foo")}, // empty entry + } want := Ready{ - SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, - HardState: pb.HardState{Term: 1, Commit: 3, Vote: 1}, - Entries: []pb.Entry{ - {Term: 1, Index: 2, Data: nil}, // empty entry - {Term: 1, Index: 3, Data: []byte("foo")}, // empty entry - }, - CommittedEntries: []pb.Entry{ - {Term: 1, Index: 2, Data: nil}, // empty entry - {Term: 1, Index: 3, Data: []byte("foo")}, // empty entry - }, - MustSync: true, + SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, + HardState: pb.HardState{Term: 1, Commit: 3, Vote: 1}, + Entries: nil, // emitted & checked in intermediate Ready cycle + CommittedEntries: entries, + MustSync: false, // since we're only applying, not appending } storage := NewMemoryStorage() @@ -747,9 +748,24 @@ func TestRawNodeStart(t *testing.T) { t.Fatal("expected a Ready") } rd := rawNode.Ready() + if !reflect.DeepEqual(entries, rd.Entries) { + t.Fatalf("expected to see entries\n%s, not\n%s", DescribeEntries(entries, nil), DescribeEntries(rd.Entries, nil)) + } storage.Append(rd.Entries) rawNode.Advance(rd) + if !rawNode.HasReady() { + t.Fatal("expected a Ready") + } + rd = rawNode.Ready() + if len(rd.Entries) != 0 { + t.Fatalf("unexpected entries: %s", DescribeEntries(rd.Entries, nil)) + } + if rd.MustSync { + t.Fatalf("should not need to sync") + } + rawNode.Advance(rd) + rd.SoftState, want.SoftState = nil, nil if !reflect.DeepEqual(rd, want) { @@ -868,17 +884,17 @@ func TestRawNodeStatus(t *testing.T) { // TestNodeCommitPaginationAfterRestart. The anomaly here was even worse as the // Raft group would forget to apply entries: // -// - node learns that index 11 is committed -// - nextEnts returns index 1..10 in CommittedEntries (but index 10 already -// exceeds maxBytes), which isn't noticed internally by Raft -// - Commit index gets bumped to 10 -// - the node persists the HardState, but crashes before applying the entries -// - upon restart, the storage returns the same entries, but `slice` takes a -// different code path and removes the last entry. -// - Raft does not emit a HardState, but when the app calls Advance(), it bumps -// its internal applied index cursor to 10 (when it should be 9) -// - the next Ready asks the app to apply index 11 (omitting index 10), losing a -// write. +// - node learns that index 11 is committed +// - nextEnts returns index 1..10 in CommittedEntries (but index 10 already +// exceeds maxBytes), which isn't noticed internally by Raft +// - Commit index gets bumped to 10 +// - the node persists the HardState, but crashes before applying the entries +// - upon restart, the storage returns the same entries, but `slice` takes a +// different code path and removes the last entry. +// - Raft does not emit a HardState, but when the app calls Advance(), it bumps +// its internal applied index cursor to 10 (when it should be 9) +// - the next Ready asks the app to apply index 11 (omitting index 10), losing a +// write. func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { s := &ignoreSizeHintMemStorage{ MemoryStorage: newTestMemoryStorage(withPeers(1)), @@ -952,6 +968,7 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { data := []byte("testdata") testEntry := pb.Entry{Data: data} maxEntrySize := uint64(maxEntries * PayloadSize(testEntry)) + t.Log("maxEntrySize", maxEntrySize) s := newTestMemoryStorage(withPeers(1)) cfg := newTestConfig(1, 10, 1, s) @@ -960,20 +977,16 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { if err != nil { t.Fatal(err) } - rd := rawNode.Ready() - s.Append(rd.Entries) - rawNode.Advance(rd) - // Become the leader. + // Become the leader and apply empty entry. rawNode.Campaign() for { - rd = rawNode.Ready() + rd := rawNode.Ready() s.Append(rd.Entries) - if rd.SoftState.Lead == rawNode.raft.id { - rawNode.Advance(rd) + rawNode.Advance(rd) + if len(rd.CommittedEntries) > 0 { break } - rawNode.Advance(rd) } // Simulate a network partition while we make our proposals by never @@ -995,12 +1008,25 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { // Recover from the partition. The uncommitted tail of the Raft log should // disappear as entries are committed. + rd := rawNode.Ready() + if len(rd.Entries) != maxEntries { + t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.Entries)) + } + s.Append(rd.Entries) + rawNode.Advance(rd) + + // Entries are appended, but not applied. + checkUncommitted(maxEntrySize) + rd = rawNode.Ready() + if len(rd.Entries) != 0 { + t.Fatalf("unexpected entries: %s", DescribeEntries(rd.Entries, nil)) + } if len(rd.CommittedEntries) != maxEntries { t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries)) } - s.Append(rd.Entries) rawNode.Advance(rd) + checkUncommitted(0) } @@ -1105,3 +1131,104 @@ func TestRawNodeConsumeReady(t *testing.T) { t.Fatalf("expected only m2 in raft.msgs, got %+v", rn.raft.msgs) } } + +func BenchmarkRawNode(b *testing.B) { + cases := []struct { + name string + peers []uint64 + }{ + { + name: "single-voter", + peers: []uint64{1}, + }, + { + name: "two-voters", + peers: []uint64{1, 2}, + }, + // You can easily add more cases here. + } + + for _, tc := range cases { + b.Run(tc.name, func(b *testing.B) { + benchmarkRawNodeImpl(b, tc.peers...) + }) + } +} + +func benchmarkRawNodeImpl(b *testing.B, peers ...uint64) { + + const debug = false + + s := newTestMemoryStorage(withPeers(peers...)) + cfg := newTestConfig(1, 10, 1, s) + if !debug { + cfg.Logger = discardLogger // avoid distorting benchmark output + } + rn, err := NewRawNode(cfg) + if err != nil { + b.Fatal(err) + } + + run := make(chan struct{}, 1) + defer close(run) + + var numReady uint64 + stabilize := func() (applied uint64) { + for rn.HasReady() { + numReady++ + rd := rn.Ready() + if debug { + b.Log(DescribeReady(rd, nil)) + } + if n := len(rd.CommittedEntries); n > 0 { + applied = rd.CommittedEntries[n-1].Index + } + s.Append(rd.Entries) + for _, m := range rd.Messages { + if m.Type == pb.MsgVote { + resp := pb.Message{To: m.From, From: m.To, Term: m.Term, Type: pb.MsgVoteResp} + if debug { + b.Log(DescribeMessage(resp, nil)) + } + rn.Step(resp) + } + if m.Type == pb.MsgApp { + idx := m.Index + if n := len(m.Entries); n > 0 { + idx = m.Entries[n-1].Index + } + resp := pb.Message{To: m.From, From: m.To, Type: pb.MsgAppResp, Term: m.Term, Index: idx} + if debug { + b.Log(DescribeMessage(resp, nil)) + } + rn.Step(resp) + } + } + rn.Advance(rd) + } + return applied + } + + rn.Campaign() + stabilize() + + if debug { + b.N = 1 + } + + var applied uint64 + for i := 0; i < b.N; i++ { + if err := rn.Propose([]byte("foo")); err != nil { + b.Fatal(err) + } + applied = stabilize() + } + if applied < uint64(b.N) { + b.Fatalf("did not apply everything: %d < %d", applied, b.N) + } + b.ReportMetric(float64(s.callStats.firstIndex)/float64(b.N), "firstIndex/op") + b.ReportMetric(float64(s.callStats.lastIndex)/float64(b.N), "lastIndex/op") + b.ReportMetric(float64(s.callStats.term)/float64(b.N), "term/op") + b.ReportMetric(float64(numReady)/float64(b.N), "ready/op") + b.Logf("storage access stats: %+v", s.callStats) +} diff --git a/raft/storage.go b/raft/storage.go index 8b16d4fa24f..67ec16b13aa 100644 --- a/raft/storage.go +++ b/raft/storage.go @@ -71,6 +71,10 @@ type Storage interface { Snapshot() (pb.Snapshot, error) } +type inMemStorageCallStats struct { + initialState, firstIndex, lastIndex, entries, term, snapshot int +} + // MemoryStorage implements the Storage interface backed by an // in-memory array. type MemoryStorage struct { @@ -83,6 +87,8 @@ type MemoryStorage struct { snapshot pb.Snapshot // ents[i] has raft log position i+snapshot.Metadata.Index ents []pb.Entry + + callStats inMemStorageCallStats } // NewMemoryStorage creates an empty MemoryStorage. @@ -95,6 +101,7 @@ func NewMemoryStorage() *MemoryStorage { // InitialState implements the Storage interface. func (ms *MemoryStorage) InitialState() (pb.HardState, pb.ConfState, error) { + ms.callStats.initialState++ return ms.hardState, ms.snapshot.Metadata.ConfState, nil } @@ -110,6 +117,7 @@ func (ms *MemoryStorage) SetHardState(st pb.HardState) error { func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) { ms.Lock() defer ms.Unlock() + ms.callStats.entries++ offset := ms.ents[0].Index if lo <= offset { return nil, ErrCompacted @@ -130,6 +138,7 @@ func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) { func (ms *MemoryStorage) Term(i uint64) (uint64, error) { ms.Lock() defer ms.Unlock() + ms.callStats.term++ offset := ms.ents[0].Index if i < offset { return 0, ErrCompacted @@ -144,6 +153,7 @@ func (ms *MemoryStorage) Term(i uint64) (uint64, error) { func (ms *MemoryStorage) LastIndex() (uint64, error) { ms.Lock() defer ms.Unlock() + ms.callStats.lastIndex++ return ms.lastIndex(), nil } @@ -155,6 +165,7 @@ func (ms *MemoryStorage) lastIndex() uint64 { func (ms *MemoryStorage) FirstIndex() (uint64, error) { ms.Lock() defer ms.Unlock() + ms.callStats.firstIndex++ return ms.firstIndex(), nil } @@ -166,6 +177,7 @@ func (ms *MemoryStorage) firstIndex() uint64 { func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error) { ms.Lock() defer ms.Unlock() + ms.callStats.snapshot++ return ms.snapshot, nil } diff --git a/raft/testdata/confchange_v1_add_single.txt b/raft/testdata/confchange_v1_add_single.txt index d9cc1a7b1c6..cd07af47944 100644 --- a/raft/testdata/confchange_v1_add_single.txt +++ b/raft/testdata/confchange_v1_add_single.txt @@ -35,10 +35,13 @@ stabilize > 1 handling Ready Ready MustSync=true: Lead:1 State:StateLeader - HardState Term:1 Vote:1 Commit:4 + HardState Term:1 Vote:1 Commit:2 Entries: 1/3 EntryNormal "" 1/4 EntryConfChange v2 +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:4 CommittedEntries: 1/3 EntryNormal "" 1/4 EntryConfChange v2 diff --git a/raft/testdata/confchange_v2_add_double_auto.txt b/raft/testdata/confchange_v2_add_double_auto.txt index 0a5e205bf0d..2419083f0e9 100644 --- a/raft/testdata/confchange_v2_add_double_auto.txt +++ b/raft/testdata/confchange_v2_add_double_auto.txt @@ -31,19 +31,24 @@ INFO 3 switched to configuration voters=() INFO 3 became follower at term 0 INFO newRaft 3 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0] -# n1 immediately gets to commit & apply the conf change using only itself. We see that -# it starts transitioning out of that joint configuration (though we will only see that -# proposal in the next ready handling loop, when it is emitted). We also see that this -# is using joint consensus, which it has to since we're carrying out two additions at -# once. +# Process n1 once, so that it can append the entry. process-ready 1 ---- Ready MustSync=true: Lead:1 State:StateLeader -HardState Term:1 Vote:1 Commit:4 +HardState Term:1 Vote:1 Commit:2 Entries: 1/3 EntryNormal "" 1/4 EntryConfChangeV2 v2 v3 + +# Now n1 applies the conf change. We see that it starts transitioning out of that joint +# configuration (though we will only see that proposal in the next ready handling +# loop, when it is emitted). We also see that this is using joint consensus, which +# it has to since we're carrying out two additions at once. +process-ready 1 +---- +Ready MustSync=false: +HardState Term:1 Vote:1 Commit:4 CommittedEntries: 1/3 EntryNormal "" 1/4 EntryConfChangeV2 v2 v3 diff --git a/raft/testdata/confchange_v2_add_double_implicit.txt b/raft/testdata/confchange_v2_add_double_implicit.txt index a93eb81cb52..45dfc5099b9 100644 --- a/raft/testdata/confchange_v2_add_double_implicit.txt +++ b/raft/testdata/confchange_v2_add_double_implicit.txt @@ -38,10 +38,13 @@ stabilize 1 2 > 1 handling Ready Ready MustSync=true: Lead:1 State:StateLeader - HardState Term:1 Vote:1 Commit:4 + HardState Term:1 Vote:1 Commit:2 Entries: 1/3 EntryNormal "" 1/4 EntryConfChangeV2 v2 +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:4 CommittedEntries: 1/3 EntryNormal "" 1/4 EntryConfChangeV2 v2 diff --git a/raft/testdata/confchange_v2_add_single_auto.txt b/raft/testdata/confchange_v2_add_single_auto.txt index 47c7f10b8e8..7ee3ab6c33c 100644 --- a/raft/testdata/confchange_v2_add_single_auto.txt +++ b/raft/testdata/confchange_v2_add_single_auto.txt @@ -36,10 +36,13 @@ stabilize > 1 handling Ready Ready MustSync=true: Lead:1 State:StateLeader - HardState Term:1 Vote:1 Commit:4 + HardState Term:1 Vote:1 Commit:2 Entries: 1/3 EntryNormal "" 1/4 EntryConfChangeV2 v2 +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:4 CommittedEntries: 1/3 EntryNormal "" 1/4 EntryConfChangeV2 v2 diff --git a/raft/testdata/confchange_v2_add_single_explicit.txt b/raft/testdata/confchange_v2_add_single_explicit.txt index dd4a4f65467..b4e6e3a83cd 100644 --- a/raft/testdata/confchange_v2_add_single_explicit.txt +++ b/raft/testdata/confchange_v2_add_single_explicit.txt @@ -36,10 +36,13 @@ stabilize 1 2 > 1 handling Ready Ready MustSync=true: Lead:1 State:StateLeader - HardState Term:1 Vote:1 Commit:4 + HardState Term:1 Vote:1 Commit:2 Entries: 1/3 EntryNormal "" 1/4 EntryConfChangeV2 v2 +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:4 CommittedEntries: 1/3 EntryNormal "" 1/4 EntryConfChangeV2 v2 diff --git a/raft/testdata/single_node.txt b/raft/testdata/single_node.txt new file mode 100644 index 00000000000..3b6e4f4c1dd --- /dev/null +++ b/raft/testdata/single_node.txt @@ -0,0 +1,30 @@ +log-level info +---- +ok + +add-nodes 1 voters=(1) index=3 +---- +INFO 1 switched to configuration voters=(1) +INFO 1 became follower at term 0 +INFO newRaft 1 [peers: [1], term: 0, commit: 3, applied: 3, lastindex: 3, lastterm: 1] + +campaign 1 +---- +INFO 1 is starting a new election at term 0 +INFO 1 became candidate at term 1 +INFO 1 received MsgVoteResp from 1 at term 1 +INFO 1 became leader at term 1 + +stabilize +---- +> 1 handling Ready + Ready MustSync=true: + Lead:1 State:StateLeader + HardState Term:1 Vote:1 Commit:3 + Entries: + 1/4 EntryNormal "" +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:4 + CommittedEntries: + 1/4 EntryNormal "" diff --git a/raft/testdata/snapshot_succeed_via_app_resp.txt b/raft/testdata/snapshot_succeed_via_app_resp.txt index 96ded532cd7..dbbd5ce11d9 100644 --- a/raft/testdata/snapshot_succeed_via_app_resp.txt +++ b/raft/testdata/snapshot_succeed_via_app_resp.txt @@ -41,7 +41,7 @@ ok status 1 ---- -1: StateReplicate match=11 next=12 inactive +1: StateReplicate match=11 next=12 2: StateReplicate match=11 next=12 3: StateProbe match=0 next=11 paused inactive @@ -95,7 +95,7 @@ stabilize 1 status 1 ---- -1: StateReplicate match=11 next=12 inactive +1: StateReplicate match=11 next=12 2: StateReplicate match=11 next=12 3: StateSnapshot match=0 next=11 paused pendingSnap=11 @@ -132,7 +132,7 @@ stabilize 1 status 1 ---- -1: StateReplicate match=11 next=12 inactive +1: StateReplicate match=11 next=12 2: StateReplicate match=11 next=12 3: StateReplicate match=11 next=12 diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index a36e5261ac7..e37e4b63ff7 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -52,8 +52,7 @@ type Progress struct { // RecentActive is true if the progress is recently active. Receiving any messages // from the corresponding follower indicates the progress is active. // RecentActive can be reset to false after an election timeout. - // - // TODO(tbg): the leader should always have this set to true. + // This is always true on the leader. RecentActive bool // ProbeSent is used while this follower is in StateProbe. When ProbeSent is