Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: refactor inflight #4931

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions raft/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,17 @@ type Progress struct {
RecentActive bool

// inflights is a sliding window for the inflight messages.
// Each inflight message might contain more than one entries
// and has a size limit defined in raft config as MaxSizePerMsg.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A size limit in number of bytes or number of entries? I'm again confused about the purpose of this whole thing. Are the number of bytes tracked anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry... So a message contains one or multiple raft entries. Each message has a max size.

Say leader sends 8 entries with 1MB per entries. For each message, the max size is 4MB. With flow control, two messages with entries [1,2,3,4] and [5,6,7,8] will be sent. We will have [4, 8] in inflights.

Once follower replies with index 8, then we clear 4 and 8 from inflights together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am considering to just make this a direct bytes based control. But it requires some changes from the upper level rafthttp, which now has a message based buffer (N number of messages).

// Thus inflight effectively limits both the number of inflight messages
// and the bandwidth each Progress can use.
// When inflights is full, no more message should be sent.
// When a leader sends out a message, the index of the last
// entry should be added to inflights. The index MUST be added
// into inflights in order.
// When a leader receives a reply, the previous inflights should
// be freed by calling inflights.freeTo.
// be freed by calling inflights.freeTo with the index of the last
// received entry.
ins *inflights
}

Expand Down Expand Up @@ -183,14 +188,13 @@ type inflights struct {
// number of inflights in the buffer
count int

// the size of the buffer
size int
// buffer contains the index of the last entry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to word

// buffer contains the last entry indexes of each message.

// inside one message.
buffer []uint64
}

func newInflights(size int) *inflights {
return &inflights{
size: size,
buffer: make([]uint64, size),
}
}
Expand All @@ -201,8 +205,9 @@ func (in *inflights) add(inflight uint64) {
panic("cannot add into a full inflights")
}
next := in.start + in.count
if next >= in.size {
next -= in.size
size := in.size()
if next >= size {
next -= size
}
in.buffer[next] = inflight
in.count++
Expand All @@ -220,10 +225,10 @@ func (in *inflights) freeTo(to uint64) {
if to < in.buffer[idx] { // found the first large inflight
break
}

size := in.size()
// increase index and maybe rotate
if idx++; idx >= in.size {
idx -= in.size
if idx++; idx >= size {
idx -= size
}
}
// free i inflights and set new start index
Expand All @@ -235,7 +240,11 @@ func (in *inflights) freeFirstOne() { in.freeTo(in.buffer[in.start]) }

// full returns true if the inflights is full.
func (in *inflights) full() bool {
return in.count == in.size
return in.count == len(in.buffer)
}

func (in *inflights) size() int {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well inline this.

return len(in.buffer)
}

// resets frees all inflights.
Expand Down
11 changes: 0 additions & 11 deletions raft/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
func TestInflightsAdd(t *testing.T) {
// no rotating case
in := &inflights{
size: 10,
buffer: make([]uint64, 10),
}

Expand All @@ -33,7 +32,6 @@ func TestInflightsAdd(t *testing.T) {
wantIn := &inflights{
start: 0,
count: 5,
size: 10,
// ↓------------
buffer: []uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0},
}
Expand All @@ -49,7 +47,6 @@ func TestInflightsAdd(t *testing.T) {
wantIn2 := &inflights{
start: 0,
count: 10,
size: 10,
// ↓---------------------------
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
}
Expand All @@ -61,7 +58,6 @@ func TestInflightsAdd(t *testing.T) {
// rotating case
in2 := &inflights{
start: 5,
size: 10,
buffer: make([]uint64, 10),
}

Expand All @@ -72,7 +68,6 @@ func TestInflightsAdd(t *testing.T) {
wantIn21 := &inflights{
start: 5,
count: 5,
size: 10,
// ↓------------
buffer: []uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4},
}
Expand All @@ -88,7 +83,6 @@ func TestInflightsAdd(t *testing.T) {
wantIn22 := &inflights{
start: 5,
count: 10,
size: 10,
// -------------- ↓------------
buffer: []uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4},
}
Expand All @@ -110,7 +104,6 @@ func TestInflightFreeTo(t *testing.T) {
wantIn := &inflights{
start: 5,
count: 5,
size: 10,
// ↓------------
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
}
Expand All @@ -124,7 +117,6 @@ func TestInflightFreeTo(t *testing.T) {
wantIn2 := &inflights{
start: 9,
count: 1,
size: 10,
// ↓
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
}
Expand All @@ -143,7 +135,6 @@ func TestInflightFreeTo(t *testing.T) {
wantIn3 := &inflights{
start: 3,
count: 2,
size: 10,
// ↓-----
buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9},
}
Expand All @@ -157,7 +148,6 @@ func TestInflightFreeTo(t *testing.T) {
wantIn4 := &inflights{
start: 5,
count: 0,
size: 10,
// ↓
buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9},
}
Expand All @@ -178,7 +168,6 @@ func TestInflightFreeFirstOne(t *testing.T) {
wantIn := &inflights{
start: 1,
count: 9,
size: 10,
// ↓------------------------
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
}
Expand Down