Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move findproviders out of critical path #1290

Merged
merged 5 commits into from
May 31, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 46 additions & 26 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
notifications: notif,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network,
batchRequests: make(chan *blockRequest, sizeBatchRequestChan),
findKeys: make(chan *blockRequest, sizeBatchRequestChan),
process: px,
newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
provideKeys: make(chan u.Key),
Expand Down Expand Up @@ -115,10 +115,8 @@ type Bitswap struct {

notifications notifications.PubSub

// Requests for a set of related blocks
// the assumption is made that the same peer is likely to
// have more than a single block in the set
batchRequests chan *blockRequest
// send keys to a worker to find and connect to providers for them
findKeys chan *blockRequest

engine *decision.Engine

Expand Down Expand Up @@ -202,12 +200,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.
}
promise := bs.notifications.Subscribe(ctx, keys...)

bs.wm.WantBlocks(keys)

req := &blockRequest{
keys: keys,
ctx: ctx,
}
select {
case bs.batchRequests <- req:
case bs.findKeys <- req:
return promise, nil
case <-ctx.Done():
return nil, ctx.Err()
Expand Down Expand Up @@ -270,39 +270,59 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger

if len(incoming.Blocks()) == 0 {
iblocks := incoming.Blocks()

if len(iblocks) == 0 {
return
}

// quickly send out cancels, reduces chances of duplicate block receives
var keys []u.Key
for _, block := range incoming.Blocks() {
for _, block := range iblocks {
if _, found := bs.wm.wl.Contains(block.Key()); !found {
log.Notice("received un-asked-for block: %s", block)
continue
}
keys = append(keys, block.Key())
}
bs.wm.CancelWants(keys)

for _, block := range incoming.Blocks() {
bs.counterLk.Lock()
bs.blocksRecvd++
if has, err := bs.blockstore.Has(block.Key()); err == nil && has {
bs.dupBlocksRecvd++
}
brecvd := bs.blocksRecvd
bdup := bs.dupBlocksRecvd
bs.counterLk.Unlock()
log.Infof("got block %s from %s (%d,%d)", block, p, brecvd, bdup)

hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
if err := bs.HasBlock(hasBlockCtx, block); err != nil {
log.Warningf("ReceiveMessage HasBlock error: %s", err)
}
cancel()
wg := sync.WaitGroup{}
for _, block := range iblocks {
wg.Add(1)
go func(b *blocks.Block) {
defer wg.Done()
bs.counterLk.Lock()
bs.blocksRecvd++
has, err := bs.blockstore.Has(b.Key())
if err != nil {
bs.counterLk.Unlock()
log.Noticef("blockstore.Has error: %s", err)
return
}
if err == nil && has {
bs.dupBlocksRecvd++
}
brecvd := bs.blocksRecvd
bdup := bs.dupBlocksRecvd
bs.counterLk.Unlock()
if has {
return
}

log.Debugf("got block %s from %s (%d,%d)", b, p, brecvd, bdup)
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
if err := bs.HasBlock(hasBlockCtx, b); err != nil {
log.Warningf("ReceiveMessage HasBlock error: %s", err)
}
cancel()
}(block)
}
wg.Wait()
}

// Connected/Disconnected warns bitswap about peer connections
func (bs *Bitswap) PeerConnected(p peer.ID) {
// TODO: add to clientWorker??
bs.wm.Connected(p)
}

Expand All @@ -313,7 +333,7 @@ func (bs *Bitswap) PeerDisconnected(p peer.ID) {
}

func (bs *Bitswap) ReceiveError(err error) {
log.Debugf("Bitswap ReceiveError: %s", err)
log.Infof("Bitswap ReceiveError: %s", err)
// TODO log the network error
// TODO bubble the network error up to the parent context/error logger
}
Expand Down
4 changes: 2 additions & 2 deletions exchange/bitswap/wantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type WantManager struct {

// synchronized by Run loop, only touch inside there
peers map[peer.ID]*msgQueue
wl *wantlist.Wantlist
wl *wantlist.ThreadSafe

network bsnet.BitSwapNetwork
ctx context.Context
Expand All @@ -33,7 +33,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana
connect: make(chan peer.ID, 10),
disconnect: make(chan peer.ID, 10),
peers: make(map[peer.ID]*msgQueue),
wl: wantlist.New(),
wl: wantlist.NewThreadSafe(),
network: network,
ctx: ctx,
}
Expand Down
10 changes: 4 additions & 6 deletions exchange/bitswap/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func init() {
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
// Start up a worker to handle block requests this node is making
px.Go(func(px process.Process) {
bs.clientWorker(ctx)
bs.providerConnector(ctx)
})

// Start up workers to handle requests from other nodes for the data on this node
Expand Down Expand Up @@ -134,21 +134,19 @@ func (bs *Bitswap) provideCollector(ctx context.Context) {
}
}

// TODO ensure only one active request per key
func (bs *Bitswap) clientWorker(parent context.Context) {
// connects to providers for the given keys
func (bs *Bitswap) providerConnector(parent context.Context) {
defer log.Info("bitswap client worker shutting down...")

for {
select {
case req := <-bs.batchRequests:
case req := <-bs.findKeys:
keys := req.keys
if len(keys) == 0 {
log.Warning("Received batch request for zero blocks")
continue
}

bs.wm.WantBlocks(keys)

// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
Expand Down
4 changes: 2 additions & 2 deletions test/integration/bitswap_wo_routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestBitswapWithoutRouting(t *testing.T) {
}

log.Debugf("%d %s get block.", i, n.Identity)
b, err := n.Exchange.GetBlock(ctx, block0.Key())
b, err := n.Blocks.GetBlock(ctx, block0.Key())
if err != nil {
t.Error(err)
} else if !bytes.Equal(b.Data, block0.Data) {
Expand All @@ -92,7 +92,7 @@ func TestBitswapWithoutRouting(t *testing.T) {

// get it out.
for _, n := range nodes {
b, err := n.Exchange.GetBlock(ctx, block1.Key())
b, err := n.Blocks.GetBlock(ctx, block1.Key())
if err != nil {
t.Error(err)
} else if !bytes.Equal(b.Data, block1.Data) {
Expand Down