Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

optimize the lookup which peers are waiting for a given block #486

Merged
merged 4 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (

deciface "github.com/ipfs/go-bitswap/decision"
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
decision "github.com/ipfs/go-bitswap/internal/decision"
"github.com/ipfs/go-bitswap/internal/decision"
bsgetter "github.com/ipfs/go-bitswap/internal/getter"
bsmq "github.com/ipfs/go-bitswap/internal/messagequeue"
notifications "github.com/ipfs/go-bitswap/internal/notifications"
"github.com/ipfs/go-bitswap/internal/notifications"
bspm "github.com/ipfs/go-bitswap/internal/peermanager"
bspqm "github.com/ipfs/go-bitswap/internal/providerquerymanager"
bssession "github.com/ipfs/go-bitswap/internal/session"
Expand All @@ -27,14 +27,14 @@ import (
bsmsg "github.com/ipfs/go-bitswap/message"
bsnet "github.com/ipfs/go-bitswap/network"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
logging "github.com/ipfs/go-log"
metrics "github.com/ipfs/go-metrics-interface"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
procctx "github.com/jbenet/goprocess/context"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peer"
)

var log = logging.Logger("bitswap")
Expand Down Expand Up @@ -422,7 +422,7 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
bs.sm.ReceiveFrom(ctx, from, allKs, haves, dontHaves)

// Send wanted blocks to decision engine
bs.engine.ReceiveFrom(from, wanted, haves)
bs.engine.ReceiveFrom(from, wanted)

// Publish the block to any Bitswap clients that had requested blocks.
// (the sessions use this pubsub mechanism to inform clients of incoming
Expand Down
93 changes: 60 additions & 33 deletions internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (
pb "github.com/ipfs/go-bitswap/message/pb"
wl "github.com/ipfs/go-bitswap/wantlist"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-peertaskqueue"
"github.com/ipfs/go-peertaskqueue/peertask"
process "github.com/jbenet/goprocess"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peer"
)

// TODO consider taking responsibility for other types of requests. For
Expand Down Expand Up @@ -144,11 +144,14 @@ type Engine struct {

tagQueued, tagUseful string

lock sync.RWMutex // protects the fields immediatly below
lock sync.RWMutex // protects the fields immediately below

// ledgerMap lists block-related Ledgers by their Partner key.
ledgerMap map[peer.ID]*ledger

// peerLedger saves which peers are waiting for a Cid
peerLedger *peerLedger

// an external ledger dealing with peer scores
scoreLedger ScoreLedger

Expand Down Expand Up @@ -191,6 +194,7 @@ func newEngine(bs bstore.Blockstore, bstoreWorkerCount int, peerTagger PeerTagge
taskWorkerCount: taskWorkerCount,
sendDontHaves: true,
self: self,
peerLedger: newPeerLedger(),
}
e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
Expand Down Expand Up @@ -456,6 +460,15 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
return
}

e.lock.Lock()
for _, entry := range wants {
e.peerLedger.Wants(p, entry.Cid)
}
for _, entry := range cancels {
e.peerLedger.CancelWant(p, entry.Cid)
}
e.lock.Unlock()

// Get the ledger for the peer
l := e.findOrCreate(p)
l.lk.Lock()
Expand Down Expand Up @@ -563,7 +576,7 @@ func (e *Engine) splitWantsCancels(es []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Ent
// the blocks to them.
//
// This function also updates the receive side of the ledger.
func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block, haves []cid.Cid) {
func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block) {
if len(blks) == 0 {
return
}
Expand All @@ -588,40 +601,48 @@ func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block, haves []cid.Cid)
}

// Check each peer to see if it wants one of the blocks we received
work := false
var work bool
e.lock.RLock()
for _, b := range blks {
k := b.Cid()

for _, p := range e.peerLedger.Peers(k) {
ledger, ok := e.ledgerMap[p]
if !ok {
log.Errorw("failed to find peer in ledger", "peer", p)
e.peerLedger.CancelWant(p, k)
continue
}
ledger.lk.RLock()
entry, ok := ledger.WantListContains(k)
ledger.lk.RUnlock()
if !ok { // should never happen
log.Errorw("wantlist index doesn't match peer's wantlist", "peer", p)
e.peerLedger.CancelWant(p, k)
continue
}
work = true

for _, l := range e.ledgerMap {
l.lk.RLock()

for _, b := range blks {
k := b.Cid()

if entry, ok := l.WantListContains(k); ok {
work = true

blockSize := blockSizes[k]
isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

entrySize := blockSize
if !isWantBlock {
entrySize = bsmsg.BlockPresenceSize(k)
}
blockSize := blockSizes[k]
isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

e.peerRequestQueue.PushTasks(l.Partner, peertask.Task{
Topic: entry.Cid,
Priority: int(entry.Priority),
Work: entrySize,
Data: &taskData{
BlockSize: blockSize,
HaveBlock: true,
IsWantBlock: isWantBlock,
SendDontHave: false,
},
})
entrySize := blockSize
if !isWantBlock {
entrySize = bsmsg.BlockPresenceSize(k)
}

e.peerRequestQueue.PushTasks(p, peertask.Task{
Topic: entry.Cid,
Priority: int(entry.Priority),
Work: entrySize,
Data: &taskData{
BlockSize: blockSize,
HaveBlock: true,
IsWantBlock: isWantBlock,
SendDontHave: false,
},
})
}
l.lk.RUnlock()
}
e.lock.RUnlock()

Expand Down Expand Up @@ -677,6 +698,12 @@ func (e *Engine) PeerDisconnected(p peer.ID) {
e.lock.Lock()
defer e.lock.Unlock()

ledger, ok := e.ledgerMap[p]
if ok {
for _, entry := range ledger.Entries() {
e.peerLedger.CancelWant(p, entry.Cid)
}
}
delete(e.ledgerMap, p)

e.scoreLedger.PeerDisconnected(p)
Expand Down
7 changes: 3 additions & 4 deletions internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
pb "github.com/ipfs/go-bitswap/message/pb"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
Expand Down Expand Up @@ -124,7 +123,7 @@ func TestConsistentAccounting(t *testing.T) {

sender.Engine.MessageSent(receiver.Peer, m)
receiver.Engine.MessageReceived(ctx, sender.Peer, m)
receiver.Engine.ReceiveFrom(sender.Peer, m.Blocks(), nil)
receiver.Engine.ReceiveFrom(sender.Peer, m.Blocks())
}

// Ensure sender records the change
Expand Down Expand Up @@ -900,7 +899,7 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) {
if err := bs.PutMany([]blocks.Block{blks[0], blks[2]}); err != nil {
t.Fatal(err)
}
e.ReceiveFrom(otherPeer, []blocks.Block{blks[0], blks[2]}, []cid.Cid{})
e.ReceiveFrom(otherPeer, []blocks.Block{blks[0], blks[2]})
_, env = getNextEnvelope(e, next, 5*time.Millisecond)
if env == nil {
t.Fatal("expected envelope")
Expand Down Expand Up @@ -963,7 +962,7 @@ func TestSendDontHave(t *testing.T) {
if err := bs.PutMany(blks); err != nil {
t.Fatal(err)
}
e.ReceiveFrom(otherPeer, blks, []cid.Cid{})
e.ReceiveFrom(otherPeer, blks)

// Envelope should contain 2 HAVEs / 2 blocks
_, env = getNextEnvelope(e, next, 10*time.Millisecond)
Expand Down
8 changes: 6 additions & 2 deletions internal/decision/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
pb "github.com/ipfs/go-bitswap/message/pb"
wl "github.com/ipfs/go-bitswap/wantlist"

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
)

func newLedger(p peer.ID) *ledger {
Expand Down Expand Up @@ -40,3 +40,7 @@ func (l *ledger) CancelWant(k cid.Cid) bool {
func (l *ledger) WantListContains(k cid.Cid) (wl.Entry, bool) {
return l.wantList.Contains(k)
}

func (l *ledger) Entries() []wl.Entry {
return l.wantList.Entries()
}
46 changes: 46 additions & 0 deletions internal/decision/peer_ledger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package decision

import (
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
)

type peerLedger struct {
cids map[cid.Cid]map[peer.ID]struct{}
}

func newPeerLedger() *peerLedger {
return &peerLedger{cids: make(map[cid.Cid]map[peer.ID]struct{})}
}

func (l *peerLedger) Wants(p peer.ID, k cid.Cid) {
m, ok := l.cids[k]
if !ok {
m = make(map[peer.ID]struct{})
l.cids[k] = m
}
m[p] = struct{}{}
}

func (l *peerLedger) CancelWant(p peer.ID, k cid.Cid) {
m, ok := l.cids[k]
if !ok {
return
}
delete(m, p)
if len(m) == 0 {
delete(l.cids, k)
}
}

func (l *peerLedger) Peers(k cid.Cid) []peer.ID {
m, ok := l.cids[k]
if !ok {
return nil
}
peers := make([]peer.ID, 0, len(m))
for p := range m {
peers = append(peers, p)
}
return peers
}