Skip to content

Commit

Permalink
raft: add MaxInflightBytes to Config
Browse files Browse the repository at this point in the history
This commit introduces the max inflight bytes setting at the Config level, and
tests that raft flow control honours it.

Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Oct 27, 2022
1 parent 86aec34 commit 53e5398
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 33 deletions.
7 changes: 5 additions & 2 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ type Config struct {
// overflowing that sending buffer. TODO (xiangli): feedback to application to
// limit the proposal rate?
MaxInflightMsgs int
// MaxInflightBytes limits the number of in-flight bytes in append messages.
// Complements MaxInflightMsgs. Ignored if zero.
MaxInflightBytes uint64

// CheckQuorum specifies if the leader should check quorum activity. Leader
// steps down when quorum is not active for an electionTimeout.
Expand Down Expand Up @@ -332,7 +335,7 @@ func newRaft(c *Config) *raft {
raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg,
maxUncommittedSize: c.MaxUncommittedEntriesSize,
prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, 0), // TODO: set maxBytes
prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes),
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
Expand Down Expand Up @@ -482,7 +485,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
case tracker.StateReplicate:
last := m.Entries[n-1].Index
pr.OptimisticUpdate(last)
pr.Inflights.Add(last, 0) // TODO: set bytes to sum(m.Entries[].Size())
pr.Inflights.Add(last, payloadsSize(m.Entries))
case tracker.StateProbe:
pr.ProbeSent = true
default:
Expand Down
68 changes: 37 additions & 31 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func TestProgressFlowControl(t *testing.T) {
cfg := newTestConfig(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
cfg.MaxInflightMsgs = 3
cfg.MaxSizePerMsg = 2048
cfg.MaxInflightBytes = 9000 // A little over MaxInflightMsgs * MaxSizePerMsg.
r := newRaft(cfg)
r.becomeCandidate()
r.becomeLeader()
Expand All @@ -136,7 +137,12 @@ func TestProgressFlowControl(t *testing.T) {
// While node 2 is in probe state, propose a bunch of entries.
r.prs.Progress[2].BecomeProbe()
blob := []byte(strings.Repeat("a", 1000))
for i := 0; i < 10; i++ {
large := []byte(strings.Repeat("b", 5000))
for i := 0; i < 22; i++ {
blob := blob
if i >= 10 && i < 16 { // Temporarily send large messages.
blob = large
}
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}})
}

Expand All @@ -154,40 +160,40 @@ func TestProgressFlowControl(t *testing.T) {
t.Fatalf("unexpected entry sizes: %v", ms[0].Entries)
}

// When this append is acked, we change to replicate state and can
// send multiple messages at once.
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[0].Entries[1].Index})
ms = r.readMessages()
if len(ms) != 3 {
t.Fatalf("expected 3 messages, got %d", len(ms))
}
for i, m := range ms {
if m.Type != pb.MsgApp {
t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
ackAndVerify := func(index uint64, expEntries ...int) uint64 {
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: index})
ms := r.readMessages()
if got, want := len(ms), len(expEntries); got != want {
t.Fatalf("expected %d messages, got %d", want, got)
}
if len(m.Entries) != 2 {
t.Errorf("%d: expected 2 entries, got %d", i, len(m.Entries))
for i, m := range ms {
if got, want := m.Type, pb.MsgApp; got != want {
t.Errorf("%d: expected MsgApp, got %s", i, got)
}
if got, want := len(m.Entries), expEntries[i]; got != want {
t.Errorf("%d: expected %d entries, got %d", i, want, got)
}
}
}

// Ack all three of those messages together and get the last two
// messages (containing three entries).
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[2].Entries[1].Index})
ms = r.readMessages()
if len(ms) != 2 {
t.Fatalf("expected 2 messages, got %d", len(ms))
}
for i, m := range ms {
if m.Type != pb.MsgApp {
t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
last := ms[len(ms)-1].Entries
if len(last) == 0 {
return index
}
return last[len(last)-1].Index
}
if len(ms[0].Entries) != 2 {
t.Errorf("%d: expected 2 entries, got %d", 0, len(ms[0].Entries))
}
if len(ms[1].Entries) != 1 {
t.Errorf("%d: expected 1 entry, got %d", 1, len(ms[1].Entries))
}

// When this append is acked, we change to replicate state and can
// send multiple messages at once.
index := ackAndVerify(ms[0].Entries[1].Index, 2, 2, 2)
// Ack all three of those messages together and get another 3 messages. The
// third message contains a single large entry, in contrast to 2 before.
index = ackAndVerify(index, 2, 1, 1)
// All subsequent messages contain one large entry, and we cap at 2 messages
// because it overflows MaxInflightBytes.
index = ackAndVerify(index, 1, 1)
index = ackAndVerify(index, 1, 1)
// Start getting small messages again.
index = ackAndVerify(index, 1, 2, 2)
ackAndVerify(index, 2)
}

func TestUncommittedEntryLimit(t *testing.T) {
Expand Down

0 comments on commit 53e5398

Please sign in to comment.