Skip to content

Commit

Permalink
Merge pull request ipfs/go-bitswap#116 from ipfs/bugs/queue-memory-leak
Browse files Browse the repository at this point in the history
fix(decision): cleanup request queues

This commit was moved from ipfs/go-bitswap@61f1223
  • Loading branch information
Stebalien authored May 3, 2019
2 parents 52ee7ff + c501d01 commit 7d693dc
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 4 deletions.
22 changes: 18 additions & 4 deletions bitswap/decision/peer_request_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (tl *prq) Push(to peer.ID, entries ...wantlist.Entry) {
defer tl.lock.Unlock()
partner, ok := tl.partners[to]
if !ok {
partner = newActivePartner()
partner = newActivePartner(to)
tl.pQueue.Push(partner)
tl.partners[to] = partner
}
Expand Down Expand Up @@ -136,7 +136,13 @@ func (tl *prq) Pop() *peerRequestTask {
break // and return |out|
}

tl.pQueue.Push(partner)
if partner.IsIdle() {
target := partner.target
delete(tl.partners, target)
delete(tl.frozen, target)
} else {
tl.pQueue.Push(partner)
}
return out
}

Expand Down Expand Up @@ -252,7 +258,7 @@ func wrapCmp(f func(a, b *peerRequestTask) bool) func(a, b pq.Elem) bool {
}

type activePartner struct {

target peer.ID
// Active is the number of blocks this peer is currently being sent
// active must be locked around as it will be updated externally
activelk sync.Mutex
Expand All @@ -274,8 +280,9 @@ type activePartner struct {
taskQueue pq.PQ
}

func newActivePartner() *activePartner {
func newActivePartner(target peer.ID) *activePartner {
return &activePartner{
target: target,
taskQueue: pq.New(wrapCmp(V1)),
activeBlocks: cid.NewSet(),
}
Expand Down Expand Up @@ -323,6 +330,7 @@ func (p *activePartner) StartTask(k cid.Cid) {
// TaskDone signals that a task was completed for this partner.
func (p *activePartner) TaskDone(k cid.Cid) {
p.activelk.Lock()

p.activeBlocks.Remove(k)
p.active--
if p.active < 0 {
Expand All @@ -331,6 +339,12 @@ func (p *activePartner) TaskDone(k cid.Cid) {
p.activelk.Unlock()
}

func (p *activePartner) IsIdle() bool {
p.activelk.Lock()
defer p.activelk.Unlock()
return p.requests == 0 && p.active == 0
}

// Index implements pq.Elem.
func (p *activePartner) Index() int {
return p.index
Expand Down
32 changes: 32 additions & 0 deletions bitswap/decision/peer_request_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,35 @@ func TestPeerRepeats(t *testing.T) {
}
}
}

func TestCleaningUpQueues(t *testing.T) {
partner := testutil.RandPeerIDFatal(t)
var entries []wantlist.Entry
for i := 0; i < 5; i++ {
entries = append(entries, wantlist.Entry{Cid: cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))})
}

prq := newPRQ()

// push a block, pop a block, complete everything, should be removed
prq.Push(partner, entries...)
task := prq.Pop()
task.Done(task.Entries)
task = prq.Pop()

if task != nil || len(prq.partners) > 0 || prq.pQueue.Len() > 0 {
t.Fatal("Partner should have been removed because it's idle")
}

// push a block, remove each of its entries, should be removed
prq.Push(partner, entries...)
for _, entry := range entries {
prq.Remove(entry.Cid, partner)
}
task = prq.Pop()

if task != nil || len(prq.partners) > 0 || prq.pQueue.Len() > 0 {
t.Fatal("Partner should have been removed because it's idle")
}

}

0 comments on commit 7d693dc

Please sign in to comment.