Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/reprov worker #787

Merged
merged 3 commits into from
Feb 20, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions blocks/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,9 @@ func (b *Block) Key() u.Key {
func (b *Block) String() string {
return fmt.Sprintf("[Block %s]", b.Key())
}

func (b *Block) Loggable() map[string]interface{} {
return map[string]interface{}{
"block": b.Key().String(),
}
}
108 changes: 17 additions & 91 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/inflect"
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"

blocks "github.com/jbenet/go-ipfs/blocks"
Expand Down Expand Up @@ -37,9 +36,13 @@ const (
maxProvidersPerRequest = 3
providerRequestTimeout = time.Second * 10
hasBlockTimeout = time.Second * 15
provideTimeout = time.Second * 15
sizeBatchRequestChan = 32
// kMaxPriority is the max priority as defined by the bitswap protocol
kMaxPriority = math.MaxInt32

hasBlockBufferSize = 256
provideWorkers = 4
)

var (
Expand Down Expand Up @@ -86,18 +89,12 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
wantlist: wantlist.NewThreadSafe(),
batchRequests: make(chan *blockRequest, sizeBatchRequestChan),
process: px,
newBlocks: make(chan *blocks.Block, hasBlockBufferSize),
}
network.SetDelegate(bs)
px.Go(func(px process.Process) {
bs.clientWorker(ctx)
})
px.Go(func(px process.Process) {
bs.taskWorker(ctx)
})
px.Go(func(px process.Process) {
bs.rebroadcastWorker(ctx)
})

// Start up bitswaps async worker routines
bs.startWorkers(px, ctx)
return bs
}

Expand Down Expand Up @@ -126,6 +123,8 @@ type bitswap struct {
wantlist *wantlist.ThreadSafe

process process.Process

newBlocks chan *blocks.Block
}

type blockRequest struct {
Expand Down Expand Up @@ -172,7 +171,6 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
case <-parent.Done():
return nil, parent.Err()
}

}

// GetBlocks returns a channel where the caller may receive blocks that
Expand Down Expand Up @@ -205,6 +203,7 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
log.Event(ctx, "hasBlock", blk)
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
Expand All @@ -215,7 +214,12 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
}
bs.wantlist.Remove(blk.Key())
bs.notifications.Publish(blk)
return bs.network.Provide(ctx, blk.Key())
select {
case bs.newBlocks <- blk:
case <-ctx.Done():
return ctx.Err()
}
return nil
}

func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
Expand Down Expand Up @@ -310,6 +314,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
log.Debug(err)
}
}

var keys []u.Key
for _, block := range incoming.Blocks() {
keys = append(keys, block.Key())
Expand Down Expand Up @@ -391,82 +396,3 @@ func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage)
func (bs *bitswap) Close() error {
return bs.process.Close()
}

func (bs *bitswap) taskWorker(ctx context.Context) {
defer log.Info("bitswap task worker shutting down...")
for {
select {
case <-ctx.Done():
return
case nextEnvelope := <-bs.engine.Outbox():
select {
case <-ctx.Done():
return
case envelope, ok := <-nextEnvelope:
if !ok {
continue
}
log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
bs.send(ctx, envelope.Peer, envelope.Message)
}
}
}
}

// TODO ensure only one active request per key
func (bs *bitswap) clientWorker(parent context.Context) {
defer log.Info("bitswap client worker shutting down...")

for {
select {
case req := <-bs.batchRequests:
keys := req.keys
if len(keys) == 0 {
log.Warning("Received batch request for zero blocks")
continue
}
for i, k := range keys {
bs.wantlist.Add(k, kMaxPriority-i)
}

bs.wantNewBlocks(req.ctx, keys)

// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
child, _ := context.WithTimeout(req.ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
err := bs.sendWantlistToPeers(req.ctx, providers)
if err != nil {
log.Debugf("error sending wantlist: %s", err)
}
case <-parent.Done():
return
}
}
}

func (bs *bitswap) rebroadcastWorker(parent context.Context) {
ctx, cancel := context.WithCancel(parent)
defer cancel()

broadcastSignal := time.After(rebroadcastDelay.Get())

for {
select {
case <-time.Tick(10 * time.Second):
n := bs.wantlist.Len()
if n > 0 {
log.Debug(n, inflect.FromNumber("keys", n), "in bitswap wantlist")
}
case <-broadcastSignal: // resend unfulfilled wantlist keys
entries := bs.wantlist.Entries()
if len(entries) > 0 {
bs.sendWantlistToProviders(ctx, entries)
}
broadcastSignal = time.After(rebroadcastDelay.Get())
case <-parent.Done():
return
}
}
}
24 changes: 24 additions & 0 deletions exchange/bitswap/notifications/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,30 @@ func TestSubscribeMany(t *testing.T) {
assertBlocksEqual(t, e2, r2)
}

// TestDuplicateSubscribe tests a scenario where a given block
// would be requested twice at the same time.
func TestDuplicateSubscribe(t *testing.T) {
e1 := blocks.NewBlock([]byte("1"))

n := New()
defer n.Shutdown()
ch1 := n.Subscribe(context.Background(), e1.Key())
ch2 := n.Subscribe(context.Background(), e1.Key())

n.Publish(e1)
r1, ok := <-ch1
if !ok {
t.Fatal("didn't receive first expected block")
}
assertBlocksEqual(t, e1, r1)

r2, ok := <-ch2
if !ok {
t.Fatal("didn't receive second expected block")
}
assertBlocksEqual(t, e1, r2)
}

func TestSubscribeIsANoopWhenCalledWithNoKeys(t *testing.T) {
n := New()
defer n.Shutdown()
Expand Down
133 changes: 133 additions & 0 deletions exchange/bitswap/workers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package bitswap

import (
"time"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/inflect"
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
)

func (bs *bitswap) startWorkers(px process.Process, ctx context.Context) {
// Start up a worker to handle block requests this node is making
px.Go(func(px process.Process) {
bs.clientWorker(ctx)
})

// Start up a worker to handle requests from other nodes for the data on this node
px.Go(func(px process.Process) {
bs.taskWorker(ctx)
})

// Start up a worker to manage periodically resending our wantlist out to peers
px.Go(func(px process.Process) {
bs.rebroadcastWorker(ctx)
})

// Spawn up multiple workers to handle incoming blocks
// consider increasing number if providing blocks bottlenecks
// file transfers
for i := 0; i < provideWorkers; i++ {
px.Go(func(px process.Process) {
bs.provideWorker(ctx)
})
}
}

func (bs *bitswap) taskWorker(ctx context.Context) {
defer log.Info("bitswap task worker shutting down...")
for {
select {
case nextEnvelope := <-bs.engine.Outbox():
select {
case envelope, ok := <-nextEnvelope:
if !ok {
continue
}
log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
bs.send(ctx, envelope.Peer, envelope.Message)
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}

func (bs *bitswap) provideWorker(ctx context.Context) {
for {
select {
case blk, ok := <-bs.newBlocks:
if !ok {
log.Debug("newBlocks channel closed")
return
}
ctx, _ := context.WithTimeout(ctx, provideTimeout)
err := bs.network.Provide(ctx, blk.Key())
if err != nil {
log.Error(err)
}
case <-ctx.Done():
return
}
}
}

// TODO ensure only one active request per key
func (bs *bitswap) clientWorker(parent context.Context) {
defer log.Info("bitswap client worker shutting down...")

for {
select {
case req := <-bs.batchRequests:
keys := req.keys
if len(keys) == 0 {
log.Warning("Received batch request for zero blocks")
continue
}
for i, k := range keys {
bs.wantlist.Add(k, kMaxPriority-i)
}

bs.wantNewBlocks(req.ctx, keys)

// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
child, _ := context.WithTimeout(req.ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
err := bs.sendWantlistToPeers(req.ctx, providers)
if err != nil {
log.Debugf("error sending wantlist: %s", err)
}
case <-parent.Done():
return
}
}
}

func (bs *bitswap) rebroadcastWorker(parent context.Context) {
ctx, cancel := context.WithCancel(parent)
defer cancel()

broadcastSignal := time.After(rebroadcastDelay.Get())

for {
select {
case <-time.Tick(10 * time.Second):
n := bs.wantlist.Len()
if n > 0 {
log.Debug(n, inflect.FromNumber("keys", n), "in bitswap wantlist")
}
case <-broadcastSignal: // resend unfulfilled wantlist keys
entries := bs.wantlist.Entries()
if len(entries) > 0 {
bs.sendWantlistToProviders(ctx, entries)
}
broadcastSignal = time.After(rebroadcastDelay.Get())
case <-parent.Done():
return
}
}
}