Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
refactor(general): extract components to packages
Browse files Browse the repository at this point in the history
Extract session manager from bitswap
Extract session manager & want manager to package
Move want manager message queue to seperate file
Move Message Queue to subpackage
Respond to PR Comments
  • Loading branch information
hannahhoward committed Nov 20, 2018
1 parent c5b071d commit 4d5134c
Show file tree
Hide file tree
Showing 7 changed files with 535 additions and 446 deletions.
39 changes: 13 additions & 26 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package bitswap
import (
"context"
"errors"
"math"
"sync"
"sync/atomic"
"time"
Expand All @@ -14,6 +13,8 @@ import (
bsmsg "github.com/ipfs/go-bitswap/message"
bsnet "github.com/ipfs/go-bitswap/network"
notifications "github.com/ipfs/go-bitswap/notifications"
bssm "github.com/ipfs/go-bitswap/sessionmanager"
bswm "github.com/ipfs/go-bitswap/wantmanager"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -42,8 +43,6 @@ const (
providerRequestTimeout = time.Second * 10
provideTimeout = time.Second * 15
sizeBatchRequestChan = 32
// kMaxPriority is the max priority as defined by the bitswap protocol
kMaxPriority = math.MaxInt32
)

var (
Expand Down Expand Up @@ -101,7 +100,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: NewWantManager(ctx, network),
wm: bswm.New(ctx, network),
sm: bssm.New(),
counters: new(counters),

dupMetric: dupHist,
Expand All @@ -128,7 +128,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
type Bitswap struct {
// the peermanager manages sending messages to peers in a way that
// wont block bitswap operation
wm *WantManager
wm *bswm.WantManager

// the engine is the bit of logic that decides who to send which blocks to
engine *decision.Engine
Expand Down Expand Up @@ -163,12 +163,8 @@ type Bitswap struct {
dupMetric metrics.Histogram
allMetric metrics.Histogram

// Sessions
sessions []*Session
sessLk sync.Mutex

sessID uint64
sessIDLk sync.Mutex
// the sessionmanager manages tracking sessions
sm *bssm.SessionManager
}

type counters struct {
Expand Down Expand Up @@ -229,7 +225,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
}

mses := bs.getNextSessionID()
mses := bs.sm.GetNextSessionID()

bs.wm.WantBlocks(ctx, keys, nil, mses)

Expand Down Expand Up @@ -294,13 +290,6 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
return out, nil
}

func (bs *Bitswap) getNextSessionID() uint64 {
bs.sessIDLk.Lock()
defer bs.sessIDLk.Unlock()
bs.sessID++
return bs.sessID
}

// CancelWant removes a given key from the wantlist
func (bs *Bitswap) CancelWants(cids []cid.Cid, ses uint64) {
if len(cids) == 0 {
Expand Down Expand Up @@ -359,15 +348,13 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {

// SessionsForBlock returns a slice of all sessions that may be interested in the given cid
func (bs *Bitswap) SessionsForBlock(c cid.Cid) []*Session {
bs.sessLk.Lock()
defer bs.sessLk.Unlock()

var out []*Session
for _, s := range bs.sessions {
bs.sm.IterateSessions(func(session exchange.Fetcher) {
s := session.(*Session)
if s.interestedIn(c) {
out = append(out, s)
}
}
})
return out
}

Expand Down Expand Up @@ -398,7 +385,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
log.Debugf("got block %s from %s", b, p)

// skip received blocks that are not in the wantlist
if _, contains := bs.wm.wl.Contains(b.Cid()); !contains {
if !bs.wm.IsWanted(b.Cid()) {
return
}

Expand Down Expand Up @@ -461,7 +448,7 @@ func (bs *Bitswap) Close() error {
}

func (bs *Bitswap) GetWantlist() []cid.Cid {
entries := bs.wm.wl.Entries()
entries := bs.wm.CurrentWants()
out := make([]cid.Cid, 0, len(entries))
for _, e := range entries {
out = append(out, e.Cid)
Expand Down
208 changes: 208 additions & 0 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package messagequeue

import (
"context"
"sync"
"time"

bsmsg "github.com/ipfs/go-bitswap/message"
bsnet "github.com/ipfs/go-bitswap/network"
wantlist "github.com/ipfs/go-bitswap/wantlist"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
)

var log = logging.Logger("bitswap")

type MessageQueue struct {
p peer.ID

outlk sync.Mutex
out bsmsg.BitSwapMessage
network bsnet.BitSwapNetwork
wl *wantlist.ThreadSafe

sender bsnet.MessageSender

refcnt int

work chan struct{}
done chan struct{}
}

func New(p peer.ID, network bsnet.BitSwapNetwork) *MessageQueue {
return &MessageQueue{
done: make(chan struct{}),
work: make(chan struct{}, 1),
wl: wantlist.NewThreadSafe(),
network: network,
p: p,
refcnt: 1,
}
}

func (mq *MessageQueue) RefIncrement() {
mq.refcnt++
}

func (mq *MessageQueue) RefDecrement() bool {
mq.refcnt--
return mq.refcnt > 0
}

func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
var work bool
mq.outlk.Lock()
defer func() {
mq.outlk.Unlock()
if !work {
return
}
select {
case mq.work <- struct{}{}:
default:
}
}()

// if we have no message held allocate a new one
if mq.out == nil {
mq.out = bsmsg.New(false)
}

// TODO: add a msg.Combine(...) method
// otherwise, combine the one we are holding with the
// one passed in
for _, e := range entries {
if e.Cancel {
if mq.wl.Remove(e.Cid, ses) {
work = true
mq.out.Cancel(e.Cid)
}
} else {
if mq.wl.Add(e.Cid, e.Priority, ses) {
work = true
mq.out.AddEntry(e.Cid, e.Priority)
}
}
}
}

func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.Entry) {

// new peer, we will want to give them our full wantlist
fullwantlist := bsmsg.New(true)
for _, e := range initialEntries {
for k := range e.SesTrk {
mq.wl.AddEntry(e, k)
}
fullwantlist.AddEntry(e.Cid, e.Priority)
}
mq.out = fullwantlist
mq.work <- struct{}{}

go mq.runQueue(ctx)
}

func (mq *MessageQueue) Shutdown() {
close(mq.done)
}
func (mq *MessageQueue) runQueue(ctx context.Context) {
for {
select {
case <-mq.work: // there is work to be done
mq.doWork(ctx)
case <-mq.done:
if mq.sender != nil {
mq.sender.Close()
}
return
case <-ctx.Done():
if mq.sender != nil {
mq.sender.Reset()
}
return
}
}
}

func (mq *MessageQueue) doWork(ctx context.Context) {
// grab outgoing message
mq.outlk.Lock()
wlm := mq.out
if wlm == nil || wlm.Empty() {
mq.outlk.Unlock()
return
}
mq.out = nil
mq.outlk.Unlock()

// NB: only open a stream if we actually have data to send
if mq.sender == nil {
err := mq.openSender(ctx)
if err != nil {
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
return
}
}

// send wantlist updates
for { // try to send this message until we fail.
err := mq.sender.SendMsg(ctx, wlm)
if err == nil {
return
}

log.Infof("bitswap send error: %s", err)
mq.sender.Reset()
mq.sender = nil

select {
case <-mq.done:
return
case <-ctx.Done():
return
case <-time.After(time.Millisecond * 100):
// wait 100ms in case disconnect notifications are still propogating
log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
}

err = mq.openSender(ctx)
if err != nil {
log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
// TODO(why): what do we do now?
// I think the *right* answer is to probably put the message we're
// trying to send back, and then return to waiting for new work or
// a disconnect.
return
}

// TODO: Is this the same instance for the remote peer?
// If its not, we should resend our entire wantlist to them
/*
if mq.sender.InstanceID() != mq.lastSeenInstanceID {
wlm = mq.getFullWantlistMessage()
}
*/
}
}

func (mq *MessageQueue) openSender(ctx context.Context) error {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()

err := mq.network.ConnectTo(conctx, mq.p)
if err != nil {
return err
}

nsender, err := mq.network.NewMessageSender(ctx, mq.p)
if err != nil {
return err
}

mq.sender = nsender
return nil
}
17 changes: 3 additions & 14 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,15 @@ func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500,
id: bs.getNextSessionID(),
id: bs.sm.GetNextSessionID(),
}

s.tag = fmt.Sprint("bs-ses-", s.id)

cache, _ := lru.New(2048)
s.interest = cache

bs.sessLk.Lock()
bs.sessions = append(bs.sessions, s)
bs.sessLk.Unlock()

bs.sm.AddSession(s)
go s.run(ctx)

return s
Expand All @@ -92,15 +89,7 @@ func (bs *Bitswap) removeSession(s *Session) {
}
bs.CancelWants(live, s.id)

bs.sessLk.Lock()
defer bs.sessLk.Unlock()
for i := 0; i < len(bs.sessions); i++ {
if bs.sessions[i] == s {
bs.sessions[i] = bs.sessions[len(bs.sessions)-1]
bs.sessions = bs.sessions[:len(bs.sessions)-1]
return
}
}
bs.sm.RemoveSession(s)
}

type blkRecv struct {
Expand Down
Loading

0 comments on commit 4d5134c

Please sign in to comment.