Skip to content

Commit

Permalink
fix(bitswap): filter interests from received messages (#822)
Browse files Browse the repository at this point in the history
Fixresource leak by adding a CID to BlockResponseManager only if there a session interested in it.

---------

Co-authored-by: Andrew Gillis <[email protected]>
  • Loading branch information
guillaumemichel and gammazero authored Jan 31, 2025
1 parent b15619e commit 1ca1b50
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 6 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ The following emojis are used to highlight certain changes:

### Fixed

`bitswap/client`: Fix runaway goroutine creation under high load. Under high load conditions, goroutines are created faster than they can complete and the more goroutines creates the slower them complete. This creates a positive feedback cycle that ends in OOM. The fix dynamically adjusts message send scheduling to avoid the runaway condition. [#817](https://github.com/ipfs/boxo/pull/817)

- `bitswap/client`: Fix runaway goroutine creation under high load. Under high load conditions, goroutines are created faster than they can complete and the more goroutines creates the slower them complete. This creates a positive feedback cycle that ends in OOM. The fix dynamically adjusts message send scheduling to avoid the runaway condition. [#817](https://github.com/ipfs/boxo/pull/817)
- `bitswap/cllient`: Fix resource leak caused by recording the presence of blocks that no session cares about. [#822](https://github.com/ipfs/boxo/pull/822)

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,11 @@ func (sim *SessionInterestManager) FilterSessionInterested(ses uint64, ksets ...
// When bitswap receives blocks it calls SplitWantedUnwanted() to discard
// unwanted blocks
func (sim *SessionInterestManager) SplitWantedUnwanted(blks []blocks.Block) ([]blocks.Block, []blocks.Block) {
sim.lk.RLock()

// Get the wanted block keys as a set
wantedKs := cid.NewSet()

sim.lk.RLock()

for _, b := range blks {
c := b.Cid()
// For each session that wants the key.
Expand All @@ -139,7 +140,7 @@ func (sim *SessionInterestManager) SplitWantedUnwanted(blks []blocks.Block) ([]b

// Separate the blocks into wanted and unwanted
wantedBlks := make([]blocks.Block, 0, len(blks))
notWantedBlks := make([]blocks.Block, 0)
var notWantedBlks []blocks.Block
for _, b := range blks {
if wantedKs.Has(b.Cid()) {
wantedBlks = append(wantedBlks, b)
Expand Down Expand Up @@ -175,3 +176,31 @@ func (sim *SessionInterestManager) InterestedSessions(keySets ...[]cid.Cid) []ui
}
return ses
}

// Filters only the keys that are wanted by at least one session.
//
// IMPORTANT: FilterInterests filters the given Cid slices in place, modifying
// their contents. If the caller needs to preserve a copy of the lists it
// should make a copy before calling FilterInterests.
func (sim *SessionInterestManager) FilterInterests(keySets ...[]cid.Cid) [][]cid.Cid {
result := keySets[:0]

sim.lk.RLock()
defer sim.lk.RUnlock()

// For each set of keys
for _, ks := range keySets {
// The set of keys wanted by at least one session
wanted := ks[:0]

// For each key in the set
for _, c := range ks {
// If any session wants the key
if _, ok := sim.wants[c]; ok {
wanted = append(wanted, c)
}
}
result = append(result, wanted)
}
return result
}
11 changes: 10 additions & 1 deletion bitswap/client/internal/sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,17 @@ func (sm *SessionManager) GetNextSessionID() uint64 {
return sm.sessID
}

// ReceiveFrom is called when a new message is received
// ReceiveFrom is called when a new message is received.
//
// IMPORTANT: ReceiveFrom filters the given Cid slices in place, modifying
// their contents. If the caller needs to preserve a copy of the lists it
// should make a copy before calling ReceiveFrom.
func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
// Keep only the keys that at least one session wants
keys := sm.sessionInterestManager.FilterInterests(blks, haves, dontHaves)
blks = keys[0]
haves = keys[1]
dontHaves = keys[2]
// Record block presence for HAVE / DONT_HAVE
sm.blockPresenceManager.ReceiveFrom(p, haves, dontHaves)

Expand Down

0 comments on commit 1ca1b50

Please sign in to comment.