From f574cd4b80e13870f6f31ce8535e568f67847f16 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 24 May 2015 23:10:04 -0700 Subject: [PATCH 1/5] Move findproviders out of main block request path This PR moves the addition of new blocks to our wantlist (and their subsequent broadcast to the network) outside of the clientWorker loop. This allows blocks to more quickly propogate to peers we are already connected to, where before we had to wait for the previous findProviders call in clientworker to complete before we could notify our partners of the next blocks that we want. I then changed the naming of the clientWorker and related variables to be a bit more appropriate to the model. Although the clientWorker (now named providerConnector) feels a bit awkward and should probably be changed. fix test assumption --- exchange/bitswap/bitswap.go | 2 ++ exchange/bitswap/workers.go | 4 +--- test/integration/bitswap_wo_routing_test.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 27be53967dc..f849c1ed90d 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -202,6 +202,8 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks. } promise := bs.notifications.Subscribe(ctx, keys...) + bs.wm.WantBlocks(keys) + req := &blockRequest{ keys: keys, ctx: ctx, diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 1083566a1fb..b41f0dd3016 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -134,7 +134,7 @@ func (bs *Bitswap) provideCollector(ctx context.Context) { } } -// TODO ensure only one active request per key +// TODO: figure out clientWorkers purpose in life func (bs *Bitswap) clientWorker(parent context.Context) { defer log.Info("bitswap client worker shutting down...") @@ -147,8 +147,6 @@ func (bs *Bitswap) clientWorker(parent context.Context) { continue } - bs.wm.WantBlocks(keys) - // NB: Optimization. Assumes that providers of key[0] are likely to // be able to provide for all keys. This currently holds true in most // every situation. Later, this assumption may not hold as true. diff --git a/test/integration/bitswap_wo_routing_test.go b/test/integration/bitswap_wo_routing_test.go index f0f5d5d31e6..560e20ec326 100644 --- a/test/integration/bitswap_wo_routing_test.go +++ b/test/integration/bitswap_wo_routing_test.go @@ -75,7 +75,7 @@ func TestBitswapWithoutRouting(t *testing.T) { } log.Debugf("%d %s get block.", i, n.Identity) - b, err := n.Exchange.GetBlock(ctx, block0.Key()) + b, err := n.Blocks.GetBlock(ctx, block0.Key()) if err != nil { t.Error(err) } else if !bytes.Equal(b.Data, block0.Data) { @@ -92,7 +92,7 @@ func TestBitswapWithoutRouting(t *testing.T) { // get it out. for _, n := range nodes { - b, err := n.Exchange.GetBlock(ctx, block1.Key()) + b, err := n.Blocks.GetBlock(ctx, block1.Key()) if err != nil { t.Error(err) } else if !bytes.Equal(b.Data, block1.Data) { From efa442ada2b0a709cf98f29d0882fdf9539a2c80 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 25 May 2015 18:00:34 -0700 Subject: [PATCH 2/5] adjust naming --- exchange/bitswap/bitswap.go | 10 ++++------ exchange/bitswap/workers.go | 8 ++++---- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index f849c1ed90d..58243e88899 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -82,7 +82,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, notifications: notif, engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method network: network, - batchRequests: make(chan *blockRequest, sizeBatchRequestChan), + findKeys: make(chan *blockRequest, sizeBatchRequestChan), process: px, newBlocks: make(chan *blocks.Block, HasBlockBufferSize), provideKeys: make(chan u.Key), @@ -115,10 +115,8 @@ type Bitswap struct { notifications notifications.PubSub - // Requests for a set of related blocks - // the assumption is made that the same peer is likely to - // have more than a single block in the set - batchRequests chan *blockRequest + // send keys to a worker to find and connect to providers for them + findKeys chan *blockRequest engine *decision.Engine @@ -209,7 +207,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks. ctx: ctx, } select { - case bs.batchRequests <- req: + case bs.findKeys <- req: return promise, nil case <-ctx.Done(): return nil, ctx.Err() diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index b41f0dd3016..7852cf93ec0 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -31,7 +31,7 @@ func init() { func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { // Start up a worker to handle block requests this node is making px.Go(func(px process.Process) { - bs.clientWorker(ctx) + bs.providerConnector(ctx) }) // Start up workers to handle requests from other nodes for the data on this node @@ -134,13 +134,13 @@ func (bs *Bitswap) provideCollector(ctx context.Context) { } } -// TODO: figure out clientWorkers purpose in life -func (bs *Bitswap) clientWorker(parent context.Context) { +// connects to providers for the given keys +func (bs *Bitswap) providerConnector(parent context.Context) { defer log.Info("bitswap client worker shutting down...") for { select { - case req := <-bs.batchRequests: + case req := <-bs.findKeys: keys := req.keys if len(keys) == 0 { log.Warning("Received batch request for zero blocks") From ab161cf6b4f2cee1c3b63c943b640b120f9fca93 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 26 May 2015 11:14:44 -0700 Subject: [PATCH 3/5] clean up organization of receivemessage and fix race --- exchange/bitswap/bitswap.go | 25 +++++++++++++++++++------ exchange/bitswap/wantmanager.go | 4 ++-- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 58243e88899..d103687d27e 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -270,26 +270,40 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg // TODO: this is bad, and could be easily abused. // Should only track *useful* messages in ledger - if len(incoming.Blocks()) == 0 { + iblocks := incoming.Blocks() + + if len(iblocks) == 0 { return } // quickly send out cancels, reduces chances of duplicate block receives var keys []u.Key - for _, block := range incoming.Blocks() { + for _, block := range iblocks { keys = append(keys, block.Key()) } bs.wm.CancelWants(keys) - for _, block := range incoming.Blocks() { + for _, block := range iblocks { bs.counterLk.Lock() bs.blocksRecvd++ - if has, err := bs.blockstore.Has(block.Key()); err == nil && has { + has, err := bs.blockstore.Has(block.Key()) + if err == nil && has { bs.dupBlocksRecvd++ } brecvd := bs.blocksRecvd bdup := bs.dupBlocksRecvd bs.counterLk.Unlock() + if has { + continue + } + + // put this after the duplicate check as a block not on our wantlist may + // have already been received. + if _, found := bs.wm.wl.Contains(block.Key()); !found { + log.Notice("received un-asked-for block: %s", block) + continue + } + log.Infof("got block %s from %s (%d,%d)", block, p, brecvd, bdup) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) @@ -302,7 +316,6 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerConnected(p peer.ID) { - // TODO: add to clientWorker?? bs.wm.Connected(p) } @@ -313,7 +326,7 @@ func (bs *Bitswap) PeerDisconnected(p peer.ID) { } func (bs *Bitswap) ReceiveError(err error) { - log.Debugf("Bitswap ReceiveError: %s", err) + log.Infof("Bitswap ReceiveError: %s", err) // TODO log the network error // TODO bubble the network error up to the parent context/error logger } diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 5405f5074fe..e8745392055 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -21,7 +21,7 @@ type WantManager struct { // synchronized by Run loop, only touch inside there peers map[peer.ID]*msgQueue - wl *wantlist.Wantlist + wl *wantlist.ThreadSafe network bsnet.BitSwapNetwork ctx context.Context @@ -33,7 +33,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana connect: make(chan peer.ID, 10), disconnect: make(chan peer.ID, 10), peers: make(map[peer.ID]*msgQueue), - wl: wantlist.New(), + wl: wantlist.NewThreadSafe(), network: network, ctx: ctx, } From 11de3643840752708d2473400954fa224150ea35 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 27 May 2015 19:03:39 -0700 Subject: [PATCH 4/5] parallelize block processing --- exchange/bitswap/bitswap.go | 54 +++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index d103687d27e..7e8a0f7af88 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -279,39 +279,41 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg // quickly send out cancels, reduces chances of duplicate block receives var keys []u.Key for _, block := range iblocks { - keys = append(keys, block.Key()) - } - bs.wm.CancelWants(keys) - - for _, block := range iblocks { - bs.counterLk.Lock() - bs.blocksRecvd++ - has, err := bs.blockstore.Has(block.Key()) - if err == nil && has { - bs.dupBlocksRecvd++ - } - brecvd := bs.blocksRecvd - bdup := bs.dupBlocksRecvd - bs.counterLk.Unlock() - if has { - continue - } - - // put this after the duplicate check as a block not on our wantlist may - // have already been received. if _, found := bs.wm.wl.Contains(block.Key()); !found { log.Notice("received un-asked-for block: %s", block) continue } + keys = append(keys, block.Key()) + } + bs.wm.CancelWants(keys) - log.Infof("got block %s from %s (%d,%d)", block, p, brecvd, bdup) + wg := sync.WaitGroup{} + for _, block := range iblocks { + wg.Add(1) + go func(b *blocks.Block) { + defer wg.Done() + bs.counterLk.Lock() + bs.blocksRecvd++ + has, err := bs.blockstore.Has(b.Key()) + if err == nil && has { + bs.dupBlocksRecvd++ + } + brecvd := bs.blocksRecvd + bdup := bs.dupBlocksRecvd + bs.counterLk.Unlock() + if has { + return + } - hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) - if err := bs.HasBlock(hasBlockCtx, block); err != nil { - log.Warningf("ReceiveMessage HasBlock error: %s", err) - } - cancel() + log.Debugf("got block %s from %s (%d,%d)", b, p, brecvd, bdup) + hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) + if err := bs.HasBlock(hasBlockCtx, b); err != nil { + log.Warningf("ReceiveMessage HasBlock error: %s", err) + } + cancel() + }(block) } + wg.Wait() } // Connected/Disconnected warns bitswap about peer connections From 2ec4c9ac455dc4781e937d463b97aa6162d26c5c Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 27 May 2015 21:19:07 -0700 Subject: [PATCH 5/5] handle error --- exchange/bitswap/bitswap.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 7e8a0f7af88..020c8d16a5f 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -295,6 +295,11 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg bs.counterLk.Lock() bs.blocksRecvd++ has, err := bs.blockstore.Has(b.Key()) + if err != nil { + bs.counterLk.Unlock() + log.Noticef("blockstore.Has error: %s", err) + return + } if err == nil && has { bs.dupBlocksRecvd++ }