From 8a27e3978309642ac977bb5e7841dff66abffc36 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 29 Jan 2025 22:04:31 -1000 Subject: [PATCH] base delay on peer count --- .../internal/messagequeue/messagequeue.go | 17 +++--- .../messagequeue/messagequeue_test.go | 12 ++++ .../client/internal/messagequeue/ravg/ravg.go | 56 ------------------- .../internal/messagequeue/ravg/ravg_test.go | 47 ---------------- 4 files changed, 21 insertions(+), 111 deletions(-) delete mode 100644 bitswap/client/internal/messagequeue/ravg/ravg.go delete mode 100644 bitswap/client/internal/messagequeue/ravg/ravg_test.go diff --git a/bitswap/client/internal/messagequeue/messagequeue.go b/bitswap/client/internal/messagequeue/messagequeue.go index 88a115e54..1bc76235d 100644 --- a/bitswap/client/internal/messagequeue/messagequeue.go +++ b/bitswap/client/internal/messagequeue/messagequeue.go @@ -4,10 +4,10 @@ import ( "context" "math" "sync" + "sync/atomic" "time" "github.com/filecoin-project/go-clock" - "github.com/ipfs/boxo/bitswap/client/internal/messagequeue/ravg" bswl "github.com/ipfs/boxo/bitswap/client/wantlist" bsmsg "github.com/ipfs/boxo/bitswap/message" pb "github.com/ipfs/boxo/bitswap/message/pb" @@ -49,6 +49,8 @@ const ( sendTimeout = 30 * time.Second ) +var peerCount atomic.Int64 + // MessageNetwork is any network that can connect peers and generate a message // sender. type MessageNetwork interface { @@ -455,7 +457,8 @@ func (mq *MessageQueue) onShutdown() { func (mq *MessageQueue) runQueue() { const runRebroadcastsInterval = rebroadcastInterval / 2 - avg := ravg.New[int](10) + peerCount.Add(1) + defer peerCount.Add(-1) defer mq.onShutdown() @@ -486,12 +489,10 @@ func (mq *MessageQueue) runQueue() { mq.events <- messageQueued } - avg.Put(mq.pendingWorkCount()) - mean := avg.Mean() - delay := time.Duration(mean) * time.Millisecond / 8 - delay = min(delay, maxSendMessageDelay) - delay = max(delay, minSendMessageDelay) - log.Errorw("Setting send delay", "delay", delay.String(), "avgCount", mean) + peers := peerCount.Load() + delay := time.Duration(peers) * time.Millisecond / 8 + delay = max(minSendMessageDelay, min(maxSendMessageDelay, delay)) + log.Errorw("Setting send delay", "delay", delay.String(), "peerCount", peers) mq.sendMessage() hasWorkChan = nil diff --git a/bitswap/client/internal/messagequeue/messagequeue_test.go b/bitswap/client/internal/messagequeue/messagequeue_test.go index 44c902212..5d4a2915f 100644 --- a/bitswap/client/internal/messagequeue/messagequeue_test.go +++ b/bitswap/client/internal/messagequeue/messagequeue_test.go @@ -212,6 +212,7 @@ func TestSendingMessagesDeduped(t *testing.T) { wantBlocks := random.Cids(10) messageQueue.Startup() + defer messageQueue.Shutdown() messageQueue.AddWants(wantBlocks, wantHaves) messageQueue.AddWants(wantBlocks, wantHaves) messages := collectMessages(ctx, t, messagesSent, collectTimeout) @@ -233,6 +234,7 @@ func TestSendingMessagesPartialDupe(t *testing.T) { wantBlocks := random.Cids(10) messageQueue.Startup() + defer messageQueue.Shutdown() messageQueue.AddWants(wantBlocks[:8], wantHaves[:8]) messageQueue.AddWants(wantBlocks[3:], wantHaves[3:]) messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout) @@ -258,6 +260,7 @@ func TestSendingMessagesPriority(t *testing.T) { wantBlocks := append(wantBlocks1, wantBlocks2...) messageQueue.Startup() + defer messageQueue.Shutdown() messageQueue.AddWants(wantBlocks1, wantHaves1) messageQueue.AddWants(wantBlocks2, wantHaves2) messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout) @@ -323,6 +326,7 @@ func TestCancelOverridesPendingWants(t *testing.T) { cancels := []cid.Cid{wantBlocks[0], wantHaves[0]} messageQueue.Startup() + defer messageQueue.Shutdown() messageQueue.AddWants(wantBlocks, wantHaves) messageQueue.AddCancels(cancels) messages := collectMessages(ctx, t, messagesSent, collectTimeout) @@ -373,6 +377,7 @@ func TestWantOverridesPendingCancels(t *testing.T) { wantHaves := cids[1:] messageQueue.Startup() + defer messageQueue.Shutdown() // Add 1 want-block and 2 want-haves messageQueue.AddWants(wantBlocks, wantHaves) @@ -422,6 +427,7 @@ func TestWantlistRebroadcast(t *testing.T) { // Add some broadcast want-haves messageQueue.Startup() + defer messageQueue.Shutdown() messageQueue.AddBroadcastWantHaves(bcstwh) clock.Add(maxSendMessageDelay) expectEvent(t, events, messageQueued) @@ -520,6 +526,7 @@ func TestSendingLargeMessages(t *testing.T) { messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMsgSize, sendErrorBackoff, maxValidLatency, dhtm, clock.New(), nil) messageQueue.Startup() + defer messageQueue.Shutdown() messageQueue.AddWants(wantBlocks, []cid.Cid{}) messages := collectMessages(ctx, t, messagesSent, 5*collectTimeout) @@ -544,6 +551,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) { messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) messageQueue.Startup() + defer messageQueue.Shutdown() // If the remote peer doesn't support HAVE / DONT_HAVE messages // - want-blocks should be sent normally @@ -599,6 +607,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) { dhtm := &fakeDontHaveTimeoutMgr{} messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm, clock.New(), nil) messageQueue.Startup() + defer messageQueue.Shutdown() wbs := random.Cids(10) messageQueue.AddWants(wbs, nil) @@ -632,6 +641,7 @@ func TestResponseReceived(t *testing.T) { events := make(chan messageEvent) messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm, clock, events) messageQueue.Startup() + defer messageQueue.Shutdown() cids := random.Cids(10) @@ -680,6 +690,7 @@ func TestResponseReceivedAppliesForFirstResponseOnly(t *testing.T) { dhtm := &fakeDontHaveTimeoutMgr{} messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm, clock.New(), nil) messageQueue.Startup() + defer messageQueue.Shutdown() cids := random.Cids(2) @@ -727,6 +738,7 @@ func TestResponseReceivedDiscardsOutliers(t *testing.T) { events := make(chan messageEvent) messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValLatency, dhtm, clock, events) messageQueue.Startup() + defer messageQueue.Shutdown() cids := random.Cids(4) diff --git a/bitswap/client/internal/messagequeue/ravg/ravg.go b/bitswap/client/internal/messagequeue/ravg/ravg.go deleted file mode 100644 index c1e4f26a5..000000000 --- a/bitswap/client/internal/messagequeue/ravg/ravg.go +++ /dev/null @@ -1,56 +0,0 @@ -package ravg - -import ( - "golang.org/x/exp/constraints" -) - -type Number interface { - constraints.Integer | constraints.Float -} - -type RAvg[T Number] struct { - samples []T - next int - full bool -} - -func New[T Number](size int) *RAvg[T] { - return &RAvg[T]{ - samples: make([]T, size), - } -} - -func (r *RAvg[T]) Len() int { - return len(r.samples) -} - -func (r *RAvg[T]) Put(sample T) { - r.samples[r.next] = sample - r.next++ - if r.next == len(r.samples) { - r.next = 0 - r.full = true - } -} - -func (r *RAvg[T]) Mean() T { - size, sum := r.mean() - return sum / T(size) -} - -func (r *RAvg[T]) FMean() float64 { - size, sum := r.mean() - return float64(sum) / float64(size) -} - -func (r *RAvg[T]) mean() (int, T) { - size := len(r.samples) - if !r.full { - size = r.next - } - var sum T - for i := 0; i < size; i++ { - sum += r.samples[i] - } - return size, sum -} diff --git a/bitswap/client/internal/messagequeue/ravg/ravg_test.go b/bitswap/client/internal/messagequeue/ravg/ravg_test.go deleted file mode 100644 index 17654a58f..000000000 --- a/bitswap/client/internal/messagequeue/ravg/ravg_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package ravg - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestRAvgInt(t *testing.T) { - avg := New[int](10) - require.Equal(t, 10, avg.Len()) - - avg.Put(1) - avg.Put(2) - avg.Put(3) - - require.Equal(t, 2, avg.Mean()) - - for i := 0; i < 10; i++ { - for j := 1; j <= avg.Len(); j++ { - avg.Put(j) - } - } - require.Equal(t, 5, avg.Mean()) - - require.Equal(t, 5.5, avg.FMean()) -} - -func TestRAvgFloat(t *testing.T) { - avg := New[float64](10) - require.Equal(t, 10, avg.Len()) - - avg.Put(1) - avg.Put(2) - avg.Put(3) - - require.Equal(t, 2.0, avg.Mean()) - - for i := 0; i < 10; i++ { - for j := 1; j <= avg.Len(); j++ { - avg.Put(float64(j)) - } - } - require.Equal(t, 5.5, avg.Mean()) - - require.Equal(t, 5.5, avg.FMean()) -}