Skip to content

Commit

Permalink
base delay on peer count
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Jan 30, 2025
1 parent bee11dc commit 8a27e39
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 111 deletions.
17 changes: 9 additions & 8 deletions bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"context"
"math"
"sync"
"sync/atomic"
"time"

"github.com/filecoin-project/go-clock"
"github.com/ipfs/boxo/bitswap/client/internal/messagequeue/ravg"
bswl "github.com/ipfs/boxo/bitswap/client/wantlist"
bsmsg "github.com/ipfs/boxo/bitswap/message"
pb "github.com/ipfs/boxo/bitswap/message/pb"
Expand Down Expand Up @@ -49,6 +49,8 @@ const (
sendTimeout = 30 * time.Second
)

var peerCount atomic.Int64

// MessageNetwork is any network that can connect peers and generate a message
// sender.
type MessageNetwork interface {
Expand Down Expand Up @@ -455,7 +457,8 @@ func (mq *MessageQueue) onShutdown() {
func (mq *MessageQueue) runQueue() {
const runRebroadcastsInterval = rebroadcastInterval / 2

avg := ravg.New[int](10)
peerCount.Add(1)
defer peerCount.Add(-1)

defer mq.onShutdown()

Expand Down Expand Up @@ -486,12 +489,10 @@ func (mq *MessageQueue) runQueue() {
mq.events <- messageQueued
}

avg.Put(mq.pendingWorkCount())
mean := avg.Mean()
delay := time.Duration(mean) * time.Millisecond / 8
delay = min(delay, maxSendMessageDelay)
delay = max(delay, minSendMessageDelay)
log.Errorw("Setting send delay", "delay", delay.String(), "avgCount", mean)
peers := peerCount.Load()
delay := time.Duration(peers) * time.Millisecond / 8
delay = max(minSendMessageDelay, min(maxSendMessageDelay, delay))
log.Errorw("Setting send delay", "delay", delay.String(), "peerCount", peers)

mq.sendMessage()
hasWorkChan = nil
Expand Down
12 changes: 12 additions & 0 deletions bitswap/client/internal/messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func TestSendingMessagesDeduped(t *testing.T) {
wantBlocks := random.Cids(10)

messageQueue.Startup()
defer messageQueue.Shutdown()
messageQueue.AddWants(wantBlocks, wantHaves)
messageQueue.AddWants(wantBlocks, wantHaves)
messages := collectMessages(ctx, t, messagesSent, collectTimeout)
Expand All @@ -233,6 +234,7 @@ func TestSendingMessagesPartialDupe(t *testing.T) {
wantBlocks := random.Cids(10)

messageQueue.Startup()
defer messageQueue.Shutdown()
messageQueue.AddWants(wantBlocks[:8], wantHaves[:8])
messageQueue.AddWants(wantBlocks[3:], wantHaves[3:])
messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout)
Expand All @@ -258,6 +260,7 @@ func TestSendingMessagesPriority(t *testing.T) {
wantBlocks := append(wantBlocks1, wantBlocks2...)

messageQueue.Startup()
defer messageQueue.Shutdown()
messageQueue.AddWants(wantBlocks1, wantHaves1)
messageQueue.AddWants(wantBlocks2, wantHaves2)
messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout)
Expand Down Expand Up @@ -323,6 +326,7 @@ func TestCancelOverridesPendingWants(t *testing.T) {
cancels := []cid.Cid{wantBlocks[0], wantHaves[0]}

messageQueue.Startup()
defer messageQueue.Shutdown()
messageQueue.AddWants(wantBlocks, wantHaves)
messageQueue.AddCancels(cancels)
messages := collectMessages(ctx, t, messagesSent, collectTimeout)
Expand Down Expand Up @@ -373,6 +377,7 @@ func TestWantOverridesPendingCancels(t *testing.T) {
wantHaves := cids[1:]

messageQueue.Startup()
defer messageQueue.Shutdown()

// Add 1 want-block and 2 want-haves
messageQueue.AddWants(wantBlocks, wantHaves)
Expand Down Expand Up @@ -422,6 +427,7 @@ func TestWantlistRebroadcast(t *testing.T) {

// Add some broadcast want-haves
messageQueue.Startup()
defer messageQueue.Shutdown()
messageQueue.AddBroadcastWantHaves(bcstwh)
clock.Add(maxSendMessageDelay)
expectEvent(t, events, messageQueued)
Expand Down Expand Up @@ -520,6 +526,7 @@ func TestSendingLargeMessages(t *testing.T) {
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMsgSize, sendErrorBackoff, maxValidLatency, dhtm, clock.New(), nil)

messageQueue.Startup()
defer messageQueue.Shutdown()
messageQueue.AddWants(wantBlocks, []cid.Cid{})
messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout)

Expand All @@ -544,6 +551,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) {

messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
messageQueue.Startup()
defer messageQueue.Shutdown()

// If the remote peer doesn't support HAVE / DONT_HAVE messages
// - want-blocks should be sent normally
Expand Down Expand Up @@ -599,6 +607,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {
dhtm := &fakeDontHaveTimeoutMgr{}
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm, clock.New(), nil)
messageQueue.Startup()
defer messageQueue.Shutdown()

wbs := random.Cids(10)
messageQueue.AddWants(wbs, nil)
Expand Down Expand Up @@ -632,6 +641,7 @@ func TestResponseReceived(t *testing.T) {
events := make(chan messageEvent)
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm, clock, events)
messageQueue.Startup()
defer messageQueue.Shutdown()

cids := random.Cids(10)

Expand Down Expand Up @@ -680,6 +690,7 @@ func TestResponseReceivedAppliesForFirstResponseOnly(t *testing.T) {
dhtm := &fakeDontHaveTimeoutMgr{}
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm, clock.New(), nil)
messageQueue.Startup()
defer messageQueue.Shutdown()

cids := random.Cids(2)

Expand Down Expand Up @@ -727,6 +738,7 @@ func TestResponseReceivedDiscardsOutliers(t *testing.T) {
events := make(chan messageEvent)
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValLatency, dhtm, clock, events)
messageQueue.Startup()
defer messageQueue.Shutdown()

cids := random.Cids(4)

Expand Down
56 changes: 0 additions & 56 deletions bitswap/client/internal/messagequeue/ravg/ravg.go

This file was deleted.

47 changes: 0 additions & 47 deletions bitswap/client/internal/messagequeue/ravg/ravg_test.go

This file was deleted.

0 comments on commit 8a27e39

Please sign in to comment.