Skip to content

Commit

Permalink
Merge pull request ipfs/go-bitswap#111 from ipfs/feat/improve-connmgr
Browse files Browse the repository at this point in the history
connmgr: give peers more weight when actively participating in a session

This commit was moved from ipfs/go-bitswap@07ec9e8
  • Loading branch information
hannahhoward authored Apr 30, 2019
2 parents 1d30b5b + 9477f82 commit 165d5f8
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 46 deletions.
15 changes: 1 addition & 14 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,8 @@ var log = logging.Logger("bitswap")
var _ exchange.SessionExchange = (*Bitswap)(nil)

const (
// maxProvidersPerRequest specifies the maximum number of providers desired
// from the network. This value is specified because the network streams
// results.
// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
maxProvidersPerRequest = 3
findProviderDelay = 1 * time.Second
providerRequestTimeout = time.Second * 10
// these requests take at _least_ two minutes at the moment.
provideTimeout = time.Minute * 3
sizeBatchRequestChan = 32
provideTimeout = time.Minute * 3
)

var (
Expand Down Expand Up @@ -190,11 +182,6 @@ type counters struct {
messagesRecvd uint64
}

type blockRequest struct {
Cid cid.Cid
Ctx context.Context
}

// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
Expand Down
6 changes: 2 additions & 4 deletions bitswap/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (e *Engine) Peers() []peer.ID {

// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) {
if m.Empty() {
log.Debugf("received empty message from %s", p)
}
Expand Down Expand Up @@ -276,7 +276,6 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
log.Debugf("got block %s %d bytes", block, len(block.RawData()))
l.ReceivedBytes(len(block.RawData()))
}
return nil
}

func (e *Engine) addBlock(block blocks.Block) {
Expand Down Expand Up @@ -309,7 +308,7 @@ func (e *Engine) AddBlock(block blocks.Block) {
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
l := e.findOrCreate(p)
l.lk.Lock()
defer l.lk.Unlock()
Expand All @@ -320,7 +319,6 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
e.peerRequestQueue.Remove(block.Cid(), p)
}

return nil
}

func (e *Engine) PeerConnected(p peer.ID) {
Expand Down
11 changes: 0 additions & 11 deletions bitswap/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,10 @@ import (

bsmsg "github.com/ipfs/go-bitswap/message"
wantlist "github.com/ipfs/go-bitswap/wantlist"
logging "github.com/ipfs/go-log"

peer "github.com/libp2p/go-libp2p-peer"
)

var log = logging.Logger("bitswap")

var (
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)

// PeerQueue provides a queer of messages to be sent for a single peer.
type PeerQueue interface {
AddMessage(entries []bsmsg.Entry, ses uint64)
Expand All @@ -27,10 +20,6 @@ type PeerQueue interface {
// PeerQueueFactory provides a function that will create a PeerQueue.
type PeerQueueFactory func(ctx context.Context, p peer.ID) PeerQueue

type peerMessage interface {
handle(pm *PeerManager)
}

type peerQueueInstance struct {
refcnt int
pq PeerQueue
Expand Down
31 changes: 14 additions & 17 deletions bitswap/sessionpeermanager/sessionpeermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@ import (
"fmt"
"math/rand"

logging "github.com/ipfs/go-log"

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
)

var log = logging.Logger("bitswap")

const (
maxOptimizedPeers = 32
reservePeers = 2
maxOptimizedPeers = 32
reservePeers = 2
unoptimizedTagValue = 5 // tag value for "unoptimized" session peers.
optimizedTagValue = 10 // tag value for "optimized" session peers.
)

// PeerTagger is an interface for tagging peers with metadata
Expand Down Expand Up @@ -131,8 +129,8 @@ func (spm *SessionPeerManager) run(ctx context.Context) {
}
}

func (spm *SessionPeerManager) tagPeer(p peer.ID) {
spm.tagger.TagPeer(p, spm.tag, 10)
func (spm *SessionPeerManager) tagPeer(p peer.ID, value int) {
spm.tagger.TagPeer(p, spm.tag, value)
}

func (spm *SessionPeerManager) insertOptimizedPeer(p peer.ID) {
Expand Down Expand Up @@ -173,7 +171,7 @@ func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
if _, ok := spm.activePeers[p]; !ok {
spm.activePeers[p] = false
spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
spm.tagPeer(p)
spm.tagPeer(p, unoptimizedTagValue)
}
}

Expand All @@ -182,17 +180,16 @@ type peerResponseMessage struct {
}

func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {

p := prm.p
isOptimized, ok := spm.activePeers[p]
if !ok {
spm.activePeers[p] = true
spm.tagPeer(p)
if isOptimized {
spm.removeOptimizedPeer(p)
} else {
if isOptimized {
spm.removeOptimizedPeer(p)
} else {
spm.activePeers[p] = true
spm.activePeers[p] = true
spm.tagPeer(p, optimizedTagValue)

// transition from unoptimized.
if ok {
spm.removeUnoptimizedPeer(p)
}
}
Expand Down

0 comments on commit 165d5f8

Please sign in to comment.