Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
refactor(messagequeue): cleanup and comment
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Dec 4, 2018
1 parent ea5d2c3 commit 03f4615
Showing 1 changed file with 116 additions and 82 deletions.
198 changes: 116 additions & 82 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ import (

var log = logging.Logger("bitswap")

// MessageNetwork is any network that can connect peers and generate a message
// sender
type MessageNetwork interface {
ConnectTo(context.Context, peer.ID) error
NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error)
}

// MessageQueue implements queuee of want messages to send to peers
type MessageQueue struct {
p peer.ID

Expand All @@ -35,6 +38,7 @@ type MessageQueue struct {
done chan struct{}
}

// New creats a new MessageQueues
func New(p peer.ID, network MessageNetwork) *MessageQueue {
return &MessageQueue{
done: make(chan struct{}),
Expand All @@ -46,52 +50,31 @@ func New(p peer.ID, network MessageNetwork) *MessageQueue {
}
}

// RefIncrement increments the refcount for a message queue
func (mq *MessageQueue) RefIncrement() {
mq.refcnt++
}

// RefDecrement decrements the refcount for a message queue and returns true
// if the refcount is now 0
func (mq *MessageQueue) RefDecrement() bool {
mq.refcnt--
return mq.refcnt > 0
}

// AddMessage adds new entries to an outgoing message for a given session
func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
var work bool
mq.outlk.Lock()
defer func() {
mq.outlk.Unlock()
if !work {
return
}
select {
case mq.work <- struct{}{}:
default:
}
}()

// if we have no message held allocate a new one
if mq.out == nil {
mq.out = bsmsg.New(false)
if !mq.addEntries(entries, ses) {
return
}

// TODO: add a msg.Combine(...) method
// otherwise, combine the one we are holding with the
// one passed in
for _, e := range entries {
if e.Cancel {
if mq.wl.Remove(e.Cid, ses) {
work = true
mq.out.Cancel(e.Cid)
}
} else {
if mq.wl.Add(e.Cid, e.Priority, ses) {
work = true
mq.out.AddEntry(e.Cid, e.Priority)
}
}
select {
case mq.work <- struct{}{}:
default:
}
}

// Startup starts the processing of messages, and creates an initial message
// based on the given initial wantlist
func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.Entry) {

// new peer, we will want to give them our full wantlist
Expand All @@ -110,6 +93,7 @@ func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.

}

// Shutdown stops the processing of messages for a message queue
func (mq *MessageQueue) Shutdown() {
close(mq.done)
}
Expand All @@ -133,84 +117,134 @@ func (mq *MessageQueue) runQueue(ctx context.Context) {
}
}

func (mq *MessageQueue) doWork(ctx context.Context) {
// grab outgoing message
func (mq *MessageQueue) addEntries(entries []*bsmsg.Entry, ses uint64) bool {
var work bool
mq.outlk.Lock()
wlm := mq.out
defer mq.outlk.Unlock()
// if we have no message held allocate a new one
if mq.out == nil {
mq.out = bsmsg.New(false)
}

// TODO: add a msg.Combine(...) method
// otherwise, combine the one we are holding with the
// one passed in
for _, e := range entries {
if e.Cancel {
if mq.wl.Remove(e.Cid, ses) {
work = true
mq.out.Cancel(e.Cid)
}
} else {
if mq.wl.Add(e.Cid, e.Priority, ses) {
work = true
mq.out.AddEntry(e.Cid, e.Priority)
}
}
}

return work
}

func (mq *MessageQueue) doWork(ctx context.Context) {

wlm := mq.extractOutgoingMessage()
if wlm == nil || wlm.Empty() {
mq.outlk.Unlock()
return
}
mq.out = nil
mq.outlk.Unlock()

// NB: only open a stream if we actually have data to send
if mq.sender == nil {
err := mq.openSender(ctx)
if err != nil {
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
return
}
err := mq.initializeSender(ctx)
if err != nil {
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
return
}

// send wantlist updates
for { // try to send this message until we fail.
err := mq.sender.SendMsg(ctx, wlm)
if err == nil {
if mq.attemptSendAndRecovery(ctx, wlm) {
return
}
}
}

log.Infof("bitswap send error: %s", err)
mq.sender.Reset()
mq.sender = nil
func (mq *MessageQueue) initializeSender(ctx context.Context) error {
if mq.sender != nil {
return nil
}
nsender, err := openSender(ctx, mq.network, mq.p)
if err != nil {
return err
}
mq.sender = nsender
return nil
}

select {
case <-mq.done:
return
case <-ctx.Done():
return
case <-time.After(time.Millisecond * 100):
// wait 100ms in case disconnect notifications are still propogating
log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
}
func (mq *MessageQueue) attemptSendAndRecovery(ctx context.Context, wlm bsmsg.BitSwapMessage) bool {
err := mq.sender.SendMsg(ctx, wlm)
if err == nil {
return true
}

err = mq.openSender(ctx)
if err != nil {
log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
// TODO(why): what do we do now?
// I think the *right* answer is to probably put the message we're
// trying to send back, and then return to waiting for new work or
// a disconnect.
return
}
log.Infof("bitswap send error: %s", err)
mq.sender.Reset()
mq.sender = nil

select {
case <-mq.done:
return true
case <-ctx.Done():
return true
case <-time.After(time.Millisecond * 100):
// wait 100ms in case disconnect notifications are still propogating
log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
}

// TODO: Is this the same instance for the remote peer?
// If its not, we should resend our entire wantlist to them
/*
if mq.sender.InstanceID() != mq.lastSeenInstanceID {
wlm = mq.getFullWantlistMessage()
}
*/
err = mq.initializeSender(ctx)
if err != nil {
log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
// TODO(why): what do we do now?
// I think the *right* answer is to probably put the message we're
// trying to send back, and then return to waiting for new work or
// a disconnect.
return true
}

// TODO: Is this the same instance for the remote peer?
// If its not, we should resend our entire wantlist to them
/*
if mq.sender.InstanceID() != mq.lastSeenInstanceID {
wlm = mq.getFullWantlistMessage()
}
*/
return false
}

func (mq *MessageQueue) extractOutgoingMessage() bsmsg.BitSwapMessage {
// grab outgoing message
mq.outlk.Lock()
wlm := mq.out
mq.out = nil
mq.outlk.Unlock()
return wlm
}

func (mq *MessageQueue) openSender(ctx context.Context) error {
func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()

err := mq.network.ConnectTo(conctx, mq.p)
err := network.ConnectTo(conctx, p)
if err != nil {
return err
return nil, err
}

nsender, err := mq.network.NewMessageSender(ctx, mq.p)
nsender, err := network.NewMessageSender(ctx, p)
if err != nil {
return err
return nil, err
}

mq.sender = nsender
return nil
return nsender, nil
}

0 comments on commit 03f4615

Please sign in to comment.