Skip to content

Commit

Permalink
raft: Add a test for MaxSizePerMsg feature
Browse files Browse the repository at this point in the history
Ensure that this limit is respected when generating MsgApp messages.
  • Loading branch information
bdarnell committed Aug 6, 2018
1 parent c5bef4f commit bc14dee
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 0 deletions.
44 changes: 44 additions & 0 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,3 +831,47 @@ func TestNodeProposeAddLearnerNode(t *testing.T) {
close(stop)
<-done
}

func TestAppendPagination(t *testing.T) {
const maxSizePerMsg = 2048
n := newNetworkWithConfig(func(c *Config) {
c.MaxSizePerMsg = maxSizePerMsg
}, nil, nil, nil)

seenFullMessage := false
// Inspect all messages to see that we never exceed the limit, but
// we do see messages of larger than half the limit.
n.msgHook = func(m raftpb.Message) bool {
if m.Type == raftpb.MsgApp {
size := 0
for _, e := range m.Entries {
size += len(e.Data)
}
if size > maxSizePerMsg {
t.Errorf("sent MsgApp that is too large: %d bytes", size)
}
if size > maxSizePerMsg/2 {
seenFullMessage = true
}
}
return true
}

n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})

// Partition the network while we make our proposals. This forces
// the entries to be batched into larger messages.
n.isolate(1)
blob := []byte(strings.Repeat("a", 1000))
for i := 0; i < 5; i++ {
n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgProp, Entries: []raftpb.Entry{{Data: blob}}})
}
n.recover()

// After the partition recovers, tick the clock to wake everything
// back up and send the messages.
n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat})
if !seenFullMessage {
t.Error("didn't see any messages more than half the max size; something is wrong with this test")
}
}
9 changes: 9 additions & 0 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3986,6 +3986,10 @@ type network struct {
storage map[uint64]*MemoryStorage
dropm map[connem]float64
ignorem map[pb.MessageType]bool

// msgHook is called for each message sent. It may inspect the
// message and return true to send it or false to drop it.
msgHook func(pb.Message) bool
}

// newNetwork initializes a network from peers.
Expand Down Expand Up @@ -4104,6 +4108,11 @@ func (nw *network) filter(msgs []pb.Message) []pb.Message {
continue
}
}
if nw.msgHook != nil {
if !nw.msgHook(m) {
continue
}
}
mm = append(mm, m)
}
return mm
Expand Down

0 comments on commit bc14dee

Please sign in to comment.