Skip to content

Commit

Permalink
raft: refactor inflight
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 committed Apr 1, 2016
1 parent 5d431b4 commit 27ea5d3
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 21 deletions.
29 changes: 19 additions & 10 deletions raft/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,17 @@ type Progress struct {
RecentActive bool

// inflights is a sliding window for the inflight messages.
// Each inflight message might contain more than one entries
// and has a size limit defined in raft config as MaxSizePerMsg.
// Thus inflight effectively limits both the number of inflight messages
// and the bandwidth each Progress can use.
// When inflights is full, no more message should be sent.
// When a leader sends out a message, the index of the last
// entry should be added to inflights. The index MUST be added
// into inflights in order.
// When a leader receives a reply, the previous inflights should
// be freed by calling inflights.freeTo.
// be freed by calling inflights.freeTo with the index of the last
// received entry.
ins *inflights
}

Expand Down Expand Up @@ -183,14 +188,13 @@ type inflights struct {
// number of inflights in the buffer
count int

// the size of the buffer
size int
// buffer contains the index of the last entry
// inside one message.
buffer []uint64
}

func newInflights(size int) *inflights {
return &inflights{
size: size,
buffer: make([]uint64, size),
}
}
Expand All @@ -201,8 +205,9 @@ func (in *inflights) add(inflight uint64) {
panic("cannot add into a full inflights")
}
next := in.start + in.count
if next >= in.size {
next -= in.size
size := in.size()
if next >= size {
next -= size
}
in.buffer[next] = inflight
in.count++
Expand All @@ -220,10 +225,10 @@ func (in *inflights) freeTo(to uint64) {
if to < in.buffer[idx] { // found the first large inflight
break
}

size := in.size()
// increase index and maybe rotate
if idx++; idx >= in.size {
idx -= in.size
if idx++; idx >= size {
idx -= size
}
}
// free i inflights and set new start index
Expand All @@ -235,7 +240,11 @@ func (in *inflights) freeFirstOne() { in.freeTo(in.buffer[in.start]) }

// full returns true if the inflights is full.
func (in *inflights) full() bool {
return in.count == in.size
return in.count == len(in.buffer)
}

func (in *inflights) size() int {
return len(in.buffer)
}

// resets frees all inflights.
Expand Down
11 changes: 0 additions & 11 deletions raft/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
func TestInflightsAdd(t *testing.T) {
// no rotating case
in := &inflights{
size: 10,
buffer: make([]uint64, 10),
}

Expand All @@ -33,7 +32,6 @@ func TestInflightsAdd(t *testing.T) {
wantIn := &inflights{
start: 0,
count: 5,
size: 10,
// ↓------------
buffer: []uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0},
}
Expand All @@ -49,7 +47,6 @@ func TestInflightsAdd(t *testing.T) {
wantIn2 := &inflights{
start: 0,
count: 10,
size: 10,
// ↓---------------------------
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
}
Expand All @@ -61,7 +58,6 @@ func TestInflightsAdd(t *testing.T) {
// rotating case
in2 := &inflights{
start: 5,
size: 10,
buffer: make([]uint64, 10),
}

Expand All @@ -72,7 +68,6 @@ func TestInflightsAdd(t *testing.T) {
wantIn21 := &inflights{
start: 5,
count: 5,
size: 10,
// ↓------------
buffer: []uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4},
}
Expand All @@ -88,7 +83,6 @@ func TestInflightsAdd(t *testing.T) {
wantIn22 := &inflights{
start: 5,
count: 10,
size: 10,
// -------------- ↓------------
buffer: []uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4},
}
Expand All @@ -110,7 +104,6 @@ func TestInflightFreeTo(t *testing.T) {
wantIn := &inflights{
start: 5,
count: 5,
size: 10,
// ↓------------
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
}
Expand All @@ -124,7 +117,6 @@ func TestInflightFreeTo(t *testing.T) {
wantIn2 := &inflights{
start: 9,
count: 1,
size: 10,
// ↓
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
}
Expand All @@ -143,7 +135,6 @@ func TestInflightFreeTo(t *testing.T) {
wantIn3 := &inflights{
start: 3,
count: 2,
size: 10,
// ↓-----
buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9},
}
Expand All @@ -157,7 +148,6 @@ func TestInflightFreeTo(t *testing.T) {
wantIn4 := &inflights{
start: 5,
count: 0,
size: 10,
// ↓
buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9},
}
Expand All @@ -178,7 +168,6 @@ func TestInflightFreeFirstOne(t *testing.T) {
wantIn := &inflights{
start: 1,
count: 9,
size: 10,
// ↓------------------------
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
}
Expand Down

0 comments on commit 27ea5d3

Please sign in to comment.