Skip to content

Commit

Permalink
fix: bitswap lock contention under high load (#817)
Browse files Browse the repository at this point in the history
* Use dynamically adjustable message send scheduling to avoid the runaway condition. Send delay is based on peer count.
* Send large message chunks immediately until size no longer largen than cutoff.
* Do not check pending count when it is already known that work is ready
* Make per peer send delay configurable.
* Limit max delay to 1 second
  • Loading branch information
gammazero authored Jan 31, 2025
1 parent 62512fb commit 0a0b298
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 87 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ The following emojis are used to highlight certain changes:

### Fixed

`bitswap/client`: Fix runaway goroutine creation under high load. Under high load conditions, goroutines are created faster than they can complete and the more goroutines creates the slower them complete. This creates a positive feedback cycle that ends in OOM. The fix dynamically adjusts message send scheduling to avoid the runaway condition. [#817](https://github.com/ipfs/boxo/pull/817)


### Security


Expand Down
15 changes: 14 additions & 1 deletion bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ func WithDontHaveTimeoutConfig(cfg *bsmq.DontHaveTimeoutConfig) Option {
}
}

// WithPerPeerSendDelay determines how long to wait, based on the number of
// peers, for wants to accumulate before sending a bitswap message to peers. A
// value of 0 uses bitswap messagequeue default.
func WithPerPeerSendDelay(delay time.Duration) Option {
return func(bs *Client) {
bs.perPeerSendDelay = delay
}
}

// Configures the Client to use given tracer.
// This provides methods to access all messages sent and received by the Client.
// This interface can be used to implement various statistics (this is original intent).
Expand Down Expand Up @@ -178,7 +187,9 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ro
}
}
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network, onDontHaveTimeout, bsmq.WithDontHaveTimeoutConfig(bs.dontHaveTimeoutConfig))
return bsmq.New(ctx, p, network, onDontHaveTimeout,
bsmq.WithDontHaveTimeoutConfig(bs.dontHaveTimeoutConfig),
bsmq.WithPerPeerSendDelay(bs.perPeerSendDelay))
}
bs.dontHaveTimeoutConfig = nil

Expand Down Expand Up @@ -292,6 +303,8 @@ type Client struct {

// dupMetric will stay at 0
skipDuplicatedBlocksStats bool

perPeerSendDelay time.Duration
}

type counters struct {
Expand Down
160 changes: 84 additions & 76 deletions bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"math"
"sync"
"sync/atomic"
"time"

"github.com/filecoin-project/go-clock"
Expand Down Expand Up @@ -42,14 +43,16 @@ const (
sendErrorBackoff = 100 * time.Millisecond
// when we reach sendMessageCutoff wants/cancels, we'll send the message immediately.
sendMessageCutoff = 256
// sendMessageDebounce is the debounce duration when calling sendMessage()
sendMessageDebounce = time.Millisecond
// when we debounce for more than sendMessageMaxDelay, we'll send the
// message immediately.
sendMessageMaxDelay = 20 * time.Millisecond
sendTimeout = 30 * time.Second
// wait this long before sending next message
sendTimeout = 30 * time.Second

defaultPerPeerDelay = time.Millisecond / 8
maxSendMessageDelay = time.Second
minSendMessageDelay = 20 * time.Millisecond
)

var peerCount atomic.Int64

// MessageNetwork is any network that can connect peers and generate a message
// sender.
type MessageNetwork interface {
Expand Down Expand Up @@ -81,7 +84,7 @@ type MessageQueue struct {
maxValidLatency time.Duration

// Signals that there are outgoing wants / cancels ready to be processed
outgoingWork chan time.Time
outgoingWork chan struct{}

// Channel of CIDs of blocks / HAVEs / DONT_HAVEs received from the peer
responses chan []cid.Cid
Expand All @@ -105,6 +108,8 @@ type MessageQueue struct {

// Used to track things that happen asynchronously -- used only in test
events chan<- messageEvent

perPeerDelay time.Duration
}

// recallWantlist keeps a list of pending wants and a list of sent wants
Expand Down Expand Up @@ -232,7 +237,8 @@ type DontHaveTimeoutManager interface {
}

type optsConfig struct {
dhtConfig *DontHaveTimeoutConfig
dhtConfig *DontHaveTimeoutConfig
perPeerDelay time.Duration
}

type option func(*optsConfig)
Expand All @@ -243,11 +249,22 @@ func WithDontHaveTimeoutConfig(dhtConfig *DontHaveTimeoutConfig) option {
}
}

func WithPerPeerSendDelay(perPeerDelay time.Duration) option {
return func(cfg *optsConfig) {
if perPeerDelay == 0 {
perPeerDelay = defaultPerPeerDelay
}
cfg.perPeerDelay = perPeerDelay
}
}

// New creates a new MessageQueue.
//
// If onDontHaveTimeout is nil, then the dontHaveTimeoutMrg is disabled.
func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout, options ...option) *MessageQueue {
var opts optsConfig
opts := optsConfig{
perPeerDelay: defaultPerPeerDelay,
}
for _, o := range options {
o(&opts)
}
Expand All @@ -261,7 +278,9 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeo
}
dhTimeoutMgr = newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout, opts.dhtConfig)
}
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr, nil, nil)
mq := newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr, nil, nil)
mq.perPeerDelay = opts.perPeerDelay
return mq
}

type messageEvent int
Expand Down Expand Up @@ -298,7 +317,7 @@ func newMessageQueue(
bcstWants: newRecallWantList(),
peerWants: newRecallWantList(),
cancels: cid.NewSet(),
outgoingWork: make(chan time.Time, 1),
outgoingWork: make(chan struct{}, 1),
responses: make(chan []cid.Cid, 8),
rebroadcastNow: make(chan struct{}),
sendErrorBackoff: sendErrorBackoff,
Expand Down Expand Up @@ -377,9 +396,9 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) {
mq.dhTimeoutMgr.CancelPending(cancelKs)
}

mq.wllock.Lock()
var workReady bool

workReady := false
mq.wllock.Lock()

// Remove keys from broadcast and peer wants, and add to cancels
for _, c := range cancelKs {
Expand Down Expand Up @@ -456,6 +475,9 @@ func (mq *MessageQueue) onShutdown() {
func (mq *MessageQueue) runQueue() {
const runRebroadcastsInterval = rebroadcastInterval / 2

peerCount.Add(1)
defer peerCount.Add(-1)

defer mq.onShutdown()

// Create a timer for debouncing scheduled work.
Expand All @@ -466,10 +488,12 @@ func (mq *MessageQueue) runQueue() {
<-scheduleWork.C
}

perPeerDelay := mq.perPeerDelay
hasWorkChan := mq.outgoingWork

rebroadcastTimer := mq.clock.Timer(runRebroadcastsInterval)
defer rebroadcastTimer.Stop()

var workScheduled time.Time
for {
select {
case now := <-rebroadcastTimer.C:
Expand All @@ -479,37 +503,20 @@ func (mq *MessageQueue) runQueue() {
case <-mq.rebroadcastNow:
mq.rebroadcastWantlist(mq.clock.Now(), 0)

case when := <-mq.outgoingWork:
// If we have work scheduled, cancel the timer. If we
// don't, record when the work was scheduled.
// We send the time on the channel so we accurately
// track delay.
if workScheduled.IsZero() {
workScheduled = when
} else if !scheduleWork.Stop() {
// Need to drain the timer if Stop() returns false
<-scheduleWork.C
case <-hasWorkChan:
if mq.events != nil {
mq.events <- messageQueued
}

// If we have too many updates and/or we've waited too
// long, send immediately.
if mq.pendingWorkCount() > sendMessageCutoff ||
mq.clock.Since(workScheduled) >= sendMessageMaxDelay {
mq.sendIfReady()
workScheduled = time.Time{}
} else {
// Otherwise, extend the timer.
scheduleWork.Reset(sendMessageDebounce)
if mq.events != nil {
mq.events <- messageQueued
}
}
mq.sendMessage()
hasWorkChan = nil

delay := time.Duration(peerCount.Load()) * perPeerDelay
delay = max(minSendMessageDelay, min(maxSendMessageDelay, delay))
scheduleWork.Reset(delay)

case <-scheduleWork.C:
// We have work scheduled and haven't seen any updates
// in sendMessageDebounce. Send immediately.
workScheduled = time.Time{}
mq.sendIfReady()
hasWorkChan = mq.outgoingWork

case res := <-mq.responses:
// We received a response from the peer, calculate latency
Expand Down Expand Up @@ -538,17 +545,11 @@ func (mq *MessageQueue) rebroadcastWantlist(now time.Time, interval time.Duratio

func (mq *MessageQueue) signalWorkReady() {
select {
case mq.outgoingWork <- mq.clock.Now():
case mq.outgoingWork <- struct{}{}:
default:
}
}

func (mq *MessageQueue) sendIfReady() {
if mq.hasPendingWork() {
mq.sendMessage()
}
}

func (mq *MessageQueue) sendMessage() {
sender, err := mq.initializeSender()
if err != nil {
Expand All @@ -565,38 +566,50 @@ func (mq *MessageQueue) sendMessage() {
mq.dhTimeoutMgr.Start()
}

// Convert want lists to a Bitswap Message
message, onSent := mq.extractOutgoingMessage(mq.sender.SupportsHave())
supportsHave := mq.sender.SupportsHave()

// After processing the message, clear out its fields to save memory
defer mq.msg.Reset(false)

if message.Empty() {
return
}
var wantlist []bsmsg.Entry

wantlist := message.Wantlist()
mq.logOutgoingMessage(wantlist)
for {
// Convert want lists to a Bitswap Message
message, onSent := mq.extractOutgoingMessage(supportsHave)
if message.Empty() {
return
}

if err := sender.SendMsg(mq.ctx, message); err != nil {
// If the message couldn't be sent, the networking layer will
// emit a Disconnect event and the MessageQueue will get cleaned up
log.Infof("Could not send message to peer %s: %s", mq.p, err)
mq.Shutdown()
return
}
wantlist = message.FillWantlist(wantlist)
mq.logOutgoingMessage(wantlist)

// Record sent time so as to calculate message latency
onSent()
if err = sender.SendMsg(mq.ctx, message); err != nil {
// If the message couldn't be sent, the networking layer will
// emit a Disconnect event and the MessageQueue will get cleaned up
log.Infof("Could not send message to peer %s: %s", mq.p, err)
mq.Shutdown()
return
}

// Set a timer to wait for responses
mq.simulateDontHaveWithTimeout(wantlist)
// Record sent time so as to calculate message latency
onSent()

// If the message was too big and only a subset of wants could be
// sent, schedule sending the rest of the wants in the next
// iteration of the event loop.
if mq.hasPendingWork() {
mq.signalWorkReady()
// Set a timer to wait for responses
mq.simulateDontHaveWithTimeout(wantlist)

// If the message was too big and only a subset of wants could be sent,
// send more if the the workcount is above the cutoff. Otherwise,
// schedule sending the rest of the wants in the next iteration of the
// event loop.
pendingWork := mq.pendingWorkCount()
if pendingWork < sendMessageCutoff {
if pendingWork > 0 {
mq.signalWorkReady()
}
return
}

mq.msg.Reset(false)
}
}

Expand Down Expand Up @@ -721,11 +734,6 @@ func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) {
}
}

// Whether there is work to be processed
func (mq *MessageQueue) hasPendingWork() bool {
return mq.pendingWorkCount() > 0
}

// The amount of work that is waiting to be processed
func (mq *MessageQueue) pendingWorkCount() int {
mq.wllock.Lock()
Expand Down
Loading

0 comments on commit 0a0b298

Please sign in to comment.