Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
Merge pull request #119 from ipfs/feat/use-peer-task-queue-package
Browse files Browse the repository at this point in the history
Use shared peer task queue with Graphsync
  • Loading branch information
Stebalien authored May 16, 2019
2 parents 5ec87a8 + d2a4d88 commit a32fa8a
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 565 deletions.
30 changes: 0 additions & 30 deletions decision/bench_test.go

This file was deleted.

38 changes: 22 additions & 16 deletions decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (

bsmsg "github.com/ipfs/go-bitswap/message"
wl "github.com/ipfs/go-bitswap/wantlist"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-peertaskqueue"
"github.com/ipfs/go-peertaskqueue/peertask"

blocks "github.com/ipfs/go-block-format"
bstore "github.com/ipfs/go-ipfs-blockstore"
Expand Down Expand Up @@ -73,7 +76,7 @@ type Engine struct {
// peerRequestQueue is a priority queue of requests received from peers.
// Requests are popped from the queue, packaged up, and placed in the
// outbox.
peerRequestQueue *prq
peerRequestQueue *peertaskqueue.PeerTaskQueue

// FIXME it's a bit odd for the client and the worker to both share memory
// (both modify the peerRequestQueue) and also to communicate over the
Expand All @@ -100,7 +103,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
bs: bs,
peerRequestQueue: newPRQ(),
peerRequestQueue: peertaskqueue.New(),
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
Expand Down Expand Up @@ -159,23 +162,23 @@ func (e *Engine) taskWorker(ctx context.Context) {
// context is cancelled before the next Envelope can be created.
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
for {
nextTask := e.peerRequestQueue.Pop()
nextTask := e.peerRequestQueue.PopBlock()
for nextTask == nil {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-e.workSignal:
nextTask = e.peerRequestQueue.Pop()
nextTask = e.peerRequestQueue.PopBlock()
case <-e.ticker.C:
e.peerRequestQueue.thawRound()
nextTask = e.peerRequestQueue.Pop()
e.peerRequestQueue.ThawRound()
nextTask = e.peerRequestQueue.PopBlock()
}
}

// with a task in hand, we're ready to prepare the envelope...
msg := bsmsg.New(true)
for _, entry := range nextTask.Entries {
block, err := e.bs.Get(entry.Cid)
for _, entry := range nextTask.Tasks {
block, err := e.bs.Get(entry.Identifier.(cid.Cid))
if err != nil {
log.Errorf("tried to execute a task and errored fetching block: %s", err)
continue
Expand All @@ -186,15 +189,15 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
if msg.Empty() {
// If we don't have the block, don't hold that against the peer
// make sure to update that the task has been 'completed'
nextTask.Done(nextTask.Entries)
nextTask.Done(nextTask.Tasks)
continue
}

return &Envelope{
Peer: nextTask.Target,
Message: msg,
Sent: func() {
nextTask.Done(nextTask.Entries)
nextTask.Done(nextTask.Tasks)
select {
case e.workSignal <- struct{}{}:
// work completing may mean that our queue will provide new
Expand Down Expand Up @@ -246,7 +249,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) {
}

var msgSize int
var activeEntries []wl.Entry
var activeEntries []peertask.Task
for _, entry := range m.Wantlist() {
if entry.Cancel {
log.Debugf("%s cancel %s", p, entry.Cid)
Expand All @@ -265,17 +268,17 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) {
// we have the block
newWorkExists = true
if msgSize+blockSize > maxMessageSize {
e.peerRequestQueue.Push(p, activeEntries...)
activeEntries = []wl.Entry{}
e.peerRequestQueue.PushBlock(p, activeEntries...)
activeEntries = []peertask.Task{}
msgSize = 0
}
activeEntries = append(activeEntries, entry.Entry)
activeEntries = append(activeEntries, peertask.Task{Identifier: entry.Cid, Priority: entry.Priority})
msgSize += blockSize
}
}
}
if len(activeEntries) > 0 {
e.peerRequestQueue.Push(p, activeEntries...)
e.peerRequestQueue.PushBlock(p, activeEntries...)
}
for _, block := range m.Blocks() {
log.Debugf("got block %s %d bytes", block, len(block.RawData()))
Expand All @@ -289,7 +292,10 @@ func (e *Engine) addBlock(block blocks.Block) {
for _, l := range e.ledgerMap {
l.lk.Lock()
if entry, ok := l.WantListContains(block.Cid()); ok {
e.peerRequestQueue.Push(l.Partner, entry)
e.peerRequestQueue.PushBlock(l.Partner, peertask.Task{
Identifier: entry.Cid,
Priority: entry.Priority,
})
work = true
}
l.lk.Unlock()
Expand Down
Loading

0 comments on commit a32fa8a

Please sign in to comment.