Skip to content

Commit

Permalink
auto-adjust experiment
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Jan 30, 2025
1 parent 80a149b commit bee11dc
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 11 deletions.
18 changes: 15 additions & 3 deletions bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/filecoin-project/go-clock"
"github.com/ipfs/boxo/bitswap/client/internal/messagequeue/ravg"
bswl "github.com/ipfs/boxo/bitswap/client/wantlist"
bsmsg "github.com/ipfs/boxo/bitswap/message"
pb "github.com/ipfs/boxo/bitswap/message/pb"
Expand Down Expand Up @@ -43,8 +44,9 @@ const (
// when we reach sendMessageCutoff wants/cancels, we'll send the message immediately.
sendMessageCutoff = 256
// wait this long before sending next message
sendMessageDelay = 20 * time.Millisecond
sendTimeout = 30 * time.Second
minSendMessageDelay = 20 * time.Millisecond
maxSendMessageDelay = 200 * time.Millisecond
sendTimeout = 30 * time.Second
)

// MessageNetwork is any network that can connect peers and generate a message
Expand Down Expand Up @@ -453,6 +455,8 @@ func (mq *MessageQueue) onShutdown() {
func (mq *MessageQueue) runQueue() {
const runRebroadcastsInterval = rebroadcastInterval / 2

avg := ravg.New[int](10)

defer mq.onShutdown()

// Create a timer for debouncing scheduled work.
Expand Down Expand Up @@ -481,9 +485,17 @@ func (mq *MessageQueue) runQueue() {
if mq.events != nil {
mq.events <- messageQueued
}

avg.Put(mq.pendingWorkCount())
mean := avg.Mean()
delay := time.Duration(mean) * time.Millisecond / 8
delay = min(delay, maxSendMessageDelay)
delay = max(delay, minSendMessageDelay)
log.Errorw("Setting send delay", "delay", delay.String(), "avgCount", mean)

mq.sendMessage()
hasWorkChan = nil
scheduleWork.Reset(sendMessageDelay)
scheduleWork.Reset(delay)

case <-scheduleWork.C:
hasWorkChan = mq.outgoingWork
Expand Down
16 changes: 8 additions & 8 deletions bitswap/client/internal/messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func TestWantlistRebroadcast(t *testing.T) {
// Add some broadcast want-haves
messageQueue.Startup()
messageQueue.AddBroadcastWantHaves(bcstwh)
clock.Add(sendMessageDelay)
clock.Add(maxSendMessageDelay)
expectEvent(t, events, messageQueued)
message := <-messagesSent
expectEvent(t, events, messageFinishedSending)
Expand All @@ -444,7 +444,7 @@ func TestWantlistRebroadcast(t *testing.T) {

// Send out some regular wants and collect them
messageQueue.AddWants(wantBlocks, wantHaves)
clock.Add(sendMessageDelay)
clock.Add(maxSendMessageDelay)
expectEvent(t, events, messageQueued)
clock.Add(10 * time.Millisecond)
message = <-messagesSent
Expand Down Expand Up @@ -474,7 +474,7 @@ func TestWantlistRebroadcast(t *testing.T) {
// Cancel some of the wants
cancels := append([]cid.Cid{bcstwh[0]}, wantHaves[0], wantBlocks[0])
messageQueue.AddCancels(cancels)
clock.Add(sendMessageDelay)
clock.Add(maxSendMessageDelay)
expectEvent(t, events, messageQueued)
clock.Add(10 * time.Millisecond)
message = <-messagesSent
Expand Down Expand Up @@ -637,7 +637,7 @@ func TestResponseReceived(t *testing.T) {

// Add some wants
messageQueue.AddWants(cids[:5], nil)
clock.Add(sendMessageDelay)
clock.Add(maxSendMessageDelay)
expectEvent(t, events, messageQueued)
<-messagesSent
expectEvent(t, events, messageFinishedSending)
Expand All @@ -647,7 +647,7 @@ func TestResponseReceived(t *testing.T) {

// Add some wants and wait another 10ms
messageQueue.AddWants(cids[5:8], nil)
clock.Add(sendMessageDelay)
clock.Add(maxSendMessageDelay)
expectEvent(t, events, messageQueued)
clock.Add(10 * time.Millisecond)
<-messagesSent
Expand All @@ -664,7 +664,7 @@ func TestResponseReceived(t *testing.T) {
}
// Elapsed time should be between when the first want was sent and the
// response received (about 20ms)
if upds[0] != sendMessageDelay+20*time.Millisecond {
if upds[0] != maxSendMessageDelay+20*time.Millisecond {
t.Fatalf("expected latency to be time since oldest message sent, was %s", upds[0].String())
}
}
Expand Down Expand Up @@ -732,7 +732,7 @@ func TestResponseReceivedDiscardsOutliers(t *testing.T) {

// Add some wants and wait 20ms
messageQueue.AddWants(cids[:2], nil)
clock.Add(sendMessageDelay)
clock.Add(maxSendMessageDelay)
expectEvent(t, events, messageQueued)
<-messagesSent
expectEvent(t, events, messageFinishedSending)
Expand All @@ -742,7 +742,7 @@ func TestResponseReceivedDiscardsOutliers(t *testing.T) {
// Add some more wants and wait long enough that the first wants will be
// outside the maximum valid latency, but the second wants will be inside
messageQueue.AddWants(cids[2:], nil)
clock.Add(sendMessageDelay)
clock.Add(maxSendMessageDelay)
expectEvent(t, events, messageQueued)
<-messagesSent
expectEvent(t, events, messageFinishedSending)
Expand Down
56 changes: 56 additions & 0 deletions bitswap/client/internal/messagequeue/ravg/ravg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package ravg

import (
"golang.org/x/exp/constraints"
)

type Number interface {
constraints.Integer | constraints.Float
}

type RAvg[T Number] struct {
samples []T
next int
full bool
}

func New[T Number](size int) *RAvg[T] {
return &RAvg[T]{
samples: make([]T, size),
}
}

func (r *RAvg[T]) Len() int {
return len(r.samples)
}

func (r *RAvg[T]) Put(sample T) {
r.samples[r.next] = sample
r.next++
if r.next == len(r.samples) {
r.next = 0
r.full = true
}
}

func (r *RAvg[T]) Mean() T {
size, sum := r.mean()
return sum / T(size)
}

func (r *RAvg[T]) FMean() float64 {
size, sum := r.mean()
return float64(sum) / float64(size)
}

func (r *RAvg[T]) mean() (int, T) {
size := len(r.samples)
if !r.full {
size = r.next
}
var sum T
for i := 0; i < size; i++ {
sum += r.samples[i]
}
return size, sum
}
47 changes: 47 additions & 0 deletions bitswap/client/internal/messagequeue/ravg/ravg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package ravg

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestRAvgInt(t *testing.T) {
avg := New[int](10)
require.Equal(t, 10, avg.Len())

avg.Put(1)
avg.Put(2)
avg.Put(3)

require.Equal(t, 2, avg.Mean())

for i := 0; i < 10; i++ {
for j := 1; j <= avg.Len(); j++ {
avg.Put(j)
}
}
require.Equal(t, 5, avg.Mean())

require.Equal(t, 5.5, avg.FMean())
}

func TestRAvgFloat(t *testing.T) {
avg := New[float64](10)
require.Equal(t, 10, avg.Len())

avg.Put(1)
avg.Put(2)
avg.Put(3)

require.Equal(t, 2.0, avg.Mean())

for i := 0; i < 10; i++ {
for j := 1; j <= avg.Len(); j++ {
avg.Put(float64(j))
}
}
require.Equal(t, 5.5, avg.Mean())

require.Equal(t, 5.5, avg.FMean())
}

0 comments on commit bee11dc

Please sign in to comment.