From ac45ed058d5fc515ef53cf3803f1506df31b27db Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 27 Nov 2018 20:27:02 -0800 Subject: [PATCH] refactor(messagequeue): cleanup and comment --- messagequeue/messagequeue.go | 198 ++++++++++++++++++++--------------- 1 file changed, 116 insertions(+), 82 deletions(-) diff --git a/messagequeue/messagequeue.go b/messagequeue/messagequeue.go index d8421a15..bed0cd55 100644 --- a/messagequeue/messagequeue.go +++ b/messagequeue/messagequeue.go @@ -14,11 +14,14 @@ import ( var log = logging.Logger("bitswap") +// MessageNetwork is any network that can connect peers and generate a message +// sender type MessageNetwork interface { ConnectTo(context.Context, peer.ID) error NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) } +// MessageQueue implements queuee of want messages to send to peers type MessageQueue struct { p peer.ID @@ -35,6 +38,7 @@ type MessageQueue struct { done chan struct{} } +// New creats a new MessageQueues func New(p peer.ID, network MessageNetwork) *MessageQueue { return &MessageQueue{ done: make(chan struct{}), @@ -46,52 +50,31 @@ func New(p peer.ID, network MessageNetwork) *MessageQueue { } } +// RefIncrement increments the refcount for a message queue func (mq *MessageQueue) RefIncrement() { mq.refcnt++ } +// RefDecrement decrements the refcount for a message queue and returns true +// if the refcount is now 0 func (mq *MessageQueue) RefDecrement() bool { mq.refcnt-- return mq.refcnt > 0 } +// AddMessage adds new entries to an outgoing message for a given session func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) { - var work bool - mq.outlk.Lock() - defer func() { - mq.outlk.Unlock() - if !work { - return - } - select { - case mq.work <- struct{}{}: - default: - } - }() - - // if we have no message held allocate a new one - if mq.out == nil { - mq.out = bsmsg.New(false) + if !mq.addEntries(entries, ses) { + return } - - // TODO: add a msg.Combine(...) method - // otherwise, combine the one we are holding with the - // one passed in - for _, e := range entries { - if e.Cancel { - if mq.wl.Remove(e.Cid, ses) { - work = true - mq.out.Cancel(e.Cid) - } - } else { - if mq.wl.Add(e.Cid, e.Priority, ses) { - work = true - mq.out.AddEntry(e.Cid, e.Priority) - } - } + select { + case mq.work <- struct{}{}: + default: } } +// Startup starts the processing of messages, and creates an initial message +// based on the given initial wantlist func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.Entry) { // new peer, we will want to give them our full wantlist @@ -110,6 +93,7 @@ func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist. } +// Shutdown stops the processing of messages for a message queue func (mq *MessageQueue) Shutdown() { close(mq.done) } @@ -133,84 +117,134 @@ func (mq *MessageQueue) runQueue(ctx context.Context) { } } -func (mq *MessageQueue) doWork(ctx context.Context) { - // grab outgoing message +func (mq *MessageQueue) addEntries(entries []*bsmsg.Entry, ses uint64) bool { + var work bool mq.outlk.Lock() - wlm := mq.out + defer mq.outlk.Unlock() + // if we have no message held allocate a new one + if mq.out == nil { + mq.out = bsmsg.New(false) + } + + // TODO: add a msg.Combine(...) method + // otherwise, combine the one we are holding with the + // one passed in + for _, e := range entries { + if e.Cancel { + if mq.wl.Remove(e.Cid, ses) { + work = true + mq.out.Cancel(e.Cid) + } + } else { + if mq.wl.Add(e.Cid, e.Priority, ses) { + work = true + mq.out.AddEntry(e.Cid, e.Priority) + } + } + } + + return work +} + +func (mq *MessageQueue) doWork(ctx context.Context) { + + wlm := mq.extractOutgoingMessage() if wlm == nil || wlm.Empty() { - mq.outlk.Unlock() return } - mq.out = nil - mq.outlk.Unlock() // NB: only open a stream if we actually have data to send - if mq.sender == nil { - err := mq.openSender(ctx) - if err != nil { - log.Infof("cant open message sender to peer %s: %s", mq.p, err) - // TODO: cant connect, what now? - return - } + err := mq.initializeSender(ctx) + if err != nil { + log.Infof("cant open message sender to peer %s: %s", mq.p, err) + // TODO: cant connect, what now? + return } // send wantlist updates for { // try to send this message until we fail. - err := mq.sender.SendMsg(ctx, wlm) - if err == nil { + if mq.attemptSendAndRecovery(ctx, wlm) { return } + } +} - log.Infof("bitswap send error: %s", err) - mq.sender.Reset() - mq.sender = nil +func (mq *MessageQueue) initializeSender(ctx context.Context) error { + if mq.sender != nil { + return nil + } + nsender, err := openSender(ctx, mq.network, mq.p) + if err != nil { + return err + } + mq.sender = nsender + return nil +} - select { - case <-mq.done: - return - case <-ctx.Done(): - return - case <-time.After(time.Millisecond * 100): - // wait 100ms in case disconnect notifications are still propogating - log.Warning("SendMsg errored but neither 'done' nor context.Done() were set") - } +func (mq *MessageQueue) attemptSendAndRecovery(ctx context.Context, wlm bsmsg.BitSwapMessage) bool { + err := mq.sender.SendMsg(ctx, wlm) + if err == nil { + return true + } - err = mq.openSender(ctx) - if err != nil { - log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err) - // TODO(why): what do we do now? - // I think the *right* answer is to probably put the message we're - // trying to send back, and then return to waiting for new work or - // a disconnect. - return - } + log.Infof("bitswap send error: %s", err) + mq.sender.Reset() + mq.sender = nil + + select { + case <-mq.done: + return true + case <-ctx.Done(): + return true + case <-time.After(time.Millisecond * 100): + // wait 100ms in case disconnect notifications are still propogating + log.Warning("SendMsg errored but neither 'done' nor context.Done() were set") + } - // TODO: Is this the same instance for the remote peer? - // If its not, we should resend our entire wantlist to them - /* - if mq.sender.InstanceID() != mq.lastSeenInstanceID { - wlm = mq.getFullWantlistMessage() - } - */ + err = mq.initializeSender(ctx) + if err != nil { + log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err) + // TODO(why): what do we do now? + // I think the *right* answer is to probably put the message we're + // trying to send back, and then return to waiting for new work or + // a disconnect. + return true } + + // TODO: Is this the same instance for the remote peer? + // If its not, we should resend our entire wantlist to them + /* + if mq.sender.InstanceID() != mq.lastSeenInstanceID { + wlm = mq.getFullWantlistMessage() + } + */ + return false +} + +func (mq *MessageQueue) extractOutgoingMessage() bsmsg.BitSwapMessage { + // grab outgoing message + mq.outlk.Lock() + wlm := mq.out + mq.out = nil + mq.outlk.Unlock() + return wlm } -func (mq *MessageQueue) openSender(ctx context.Context) error { +func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) { // allow ten minutes for connections this includes looking them up in the // dht dialing them, and handshaking conctx, cancel := context.WithTimeout(ctx, time.Minute*10) defer cancel() - err := mq.network.ConnectTo(conctx, mq.p) + err := network.ConnectTo(conctx, p) if err != nil { - return err + return nil, err } - nsender, err := mq.network.NewMessageSender(ctx, mq.p) + nsender, err := network.NewMessageSender(ctx, p) if err != nil { - return err + return nil, err } - mq.sender = nsender - return nil + return nsender, nil }