From 27ea5d36c46107510962d54468b539b37057b416 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 1 Apr 2016 14:48:07 -0700 Subject: [PATCH] raft: refactor inflight --- raft/progress.go | 29 +++++++++++++++++++---------- raft/progress_test.go | 11 ----------- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/raft/progress.go b/raft/progress.go index 11f53409d4c..c5dbb655649 100644 --- a/raft/progress.go +++ b/raft/progress.go @@ -64,12 +64,17 @@ type Progress struct { RecentActive bool // inflights is a sliding window for the inflight messages. + // Each inflight message might contain more than one entries + // and has a size limit defined in raft config as MaxSizePerMsg. + // Thus inflight effectively limits both the number of inflight messages + // and the bandwidth each Progress can use. // When inflights is full, no more message should be sent. // When a leader sends out a message, the index of the last // entry should be added to inflights. The index MUST be added // into inflights in order. // When a leader receives a reply, the previous inflights should - // be freed by calling inflights.freeTo. + // be freed by calling inflights.freeTo with the index of the last + // received entry. ins *inflights } @@ -183,14 +188,13 @@ type inflights struct { // number of inflights in the buffer count int - // the size of the buffer - size int + // buffer contains the index of the last entry + // inside one message. buffer []uint64 } func newInflights(size int) *inflights { return &inflights{ - size: size, buffer: make([]uint64, size), } } @@ -201,8 +205,9 @@ func (in *inflights) add(inflight uint64) { panic("cannot add into a full inflights") } next := in.start + in.count - if next >= in.size { - next -= in.size + size := in.size() + if next >= size { + next -= size } in.buffer[next] = inflight in.count++ @@ -220,10 +225,10 @@ func (in *inflights) freeTo(to uint64) { if to < in.buffer[idx] { // found the first large inflight break } - + size := in.size() // increase index and maybe rotate - if idx++; idx >= in.size { - idx -= in.size + if idx++; idx >= size { + idx -= size } } // free i inflights and set new start index @@ -235,7 +240,11 @@ func (in *inflights) freeFirstOne() { in.freeTo(in.buffer[in.start]) } // full returns true if the inflights is full. func (in *inflights) full() bool { - return in.count == in.size + return in.count == len(in.buffer) +} + +func (in *inflights) size() int { + return len(in.buffer) } // resets frees all inflights. diff --git a/raft/progress_test.go b/raft/progress_test.go index 4cea14e875a..3d80d534995 100644 --- a/raft/progress_test.go +++ b/raft/progress_test.go @@ -22,7 +22,6 @@ import ( func TestInflightsAdd(t *testing.T) { // no rotating case in := &inflights{ - size: 10, buffer: make([]uint64, 10), } @@ -33,7 +32,6 @@ func TestInflightsAdd(t *testing.T) { wantIn := &inflights{ start: 0, count: 5, - size: 10, // ↓------------ buffer: []uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0}, } @@ -49,7 +47,6 @@ func TestInflightsAdd(t *testing.T) { wantIn2 := &inflights{ start: 0, count: 10, - size: 10, // ↓--------------------------- buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, } @@ -61,7 +58,6 @@ func TestInflightsAdd(t *testing.T) { // rotating case in2 := &inflights{ start: 5, - size: 10, buffer: make([]uint64, 10), } @@ -72,7 +68,6 @@ func TestInflightsAdd(t *testing.T) { wantIn21 := &inflights{ start: 5, count: 5, - size: 10, // ↓------------ buffer: []uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4}, } @@ -88,7 +83,6 @@ func TestInflightsAdd(t *testing.T) { wantIn22 := &inflights{ start: 5, count: 10, - size: 10, // -------------- ↓------------ buffer: []uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4}, } @@ -110,7 +104,6 @@ func TestInflightFreeTo(t *testing.T) { wantIn := &inflights{ start: 5, count: 5, - size: 10, // ↓------------ buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, } @@ -124,7 +117,6 @@ func TestInflightFreeTo(t *testing.T) { wantIn2 := &inflights{ start: 9, count: 1, - size: 10, // ↓ buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, } @@ -143,7 +135,6 @@ func TestInflightFreeTo(t *testing.T) { wantIn3 := &inflights{ start: 3, count: 2, - size: 10, // ↓----- buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, } @@ -157,7 +148,6 @@ func TestInflightFreeTo(t *testing.T) { wantIn4 := &inflights{ start: 5, count: 0, - size: 10, // ↓ buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, } @@ -178,7 +168,6 @@ func TestInflightFreeFirstOne(t *testing.T) { wantIn := &inflights{ start: 1, count: 9, - size: 10, // ↓------------------------ buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, }