Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Use shared peer task queue with Graphsync #119

Merged
merged 2 commits into from
May 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions decision/bench_test.go

This file was deleted.

38 changes: 22 additions & 16 deletions decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (

bsmsg "github.com/ipfs/go-bitswap/message"
wl "github.com/ipfs/go-bitswap/wantlist"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-peertaskqueue"
"github.com/ipfs/go-peertaskqueue/peertask"

blocks "github.com/ipfs/go-block-format"
bstore "github.com/ipfs/go-ipfs-blockstore"
Expand Down Expand Up @@ -73,7 +76,7 @@ type Engine struct {
// peerRequestQueue is a priority queue of requests received from peers.
// Requests are popped from the queue, packaged up, and placed in the
// outbox.
peerRequestQueue *prq
peerRequestQueue *peertaskqueue.PeerTaskQueue

// FIXME it's a bit odd for the client and the worker to both share memory
// (both modify the peerRequestQueue) and also to communicate over the
Expand All @@ -100,7 +103,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
bs: bs,
peerRequestQueue: newPRQ(),
peerRequestQueue: peertaskqueue.New(),
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
Expand Down Expand Up @@ -159,23 +162,23 @@ func (e *Engine) taskWorker(ctx context.Context) {
// context is cancelled before the next Envelope can be created.
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
for {
nextTask := e.peerRequestQueue.Pop()
nextTask := e.peerRequestQueue.PopBlock()
for nextTask == nil {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-e.workSignal:
nextTask = e.peerRequestQueue.Pop()
nextTask = e.peerRequestQueue.PopBlock()
case <-e.ticker.C:
e.peerRequestQueue.thawRound()
nextTask = e.peerRequestQueue.Pop()
e.peerRequestQueue.ThawRound()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does graphsync use freeze/thaw?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so graphsync does support cancellation, and long term it makes sense that if a request gets cancelled, we probably out to wait and see if that peer is going to send other cancel requests... especially if we move to a context where requests are being broadcast.

however, I think it makes sense to have an option to turn freezing off since at least in graphsync's current implementation it does not make a lot of sense. I think I will probably go ahead and implement that.

nextTask = e.peerRequestQueue.PopBlock()
}
}

// with a task in hand, we're ready to prepare the envelope...
msg := bsmsg.New(true)
for _, entry := range nextTask.Entries {
block, err := e.bs.Get(entry.Cid)
for _, entry := range nextTask.Tasks {
block, err := e.bs.Get(entry.Identifier.(cid.Cid))
if err != nil {
log.Errorf("tried to execute a task and errored fetching block: %s", err)
continue
Expand All @@ -186,15 +189,15 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
if msg.Empty() {
// If we don't have the block, don't hold that against the peer
// make sure to update that the task has been 'completed'
nextTask.Done(nextTask.Entries)
nextTask.Done(nextTask.Tasks)
continue
}

return &Envelope{
Peer: nextTask.Target,
Message: msg,
Sent: func() {
nextTask.Done(nextTask.Entries)
nextTask.Done(nextTask.Tasks)
select {
case e.workSignal <- struct{}{}:
// work completing may mean that our queue will provide new
Expand Down Expand Up @@ -246,7 +249,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) {
}

var msgSize int
var activeEntries []wl.Entry
var activeEntries []peertask.Task
for _, entry := range m.Wantlist() {
if entry.Cancel {
log.Debugf("%s cancel %s", p, entry.Cid)
Expand All @@ -265,17 +268,17 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) {
// we have the block
newWorkExists = true
if msgSize+blockSize > maxMessageSize {
e.peerRequestQueue.Push(p, activeEntries...)
activeEntries = []wl.Entry{}
e.peerRequestQueue.PushBlock(p, activeEntries...)
activeEntries = []peertask.Task{}
msgSize = 0
}
activeEntries = append(activeEntries, entry.Entry)
activeEntries = append(activeEntries, peertask.Task{Identifier: entry.Cid, Priority: entry.Priority})
msgSize += blockSize
}
}
}
if len(activeEntries) > 0 {
e.peerRequestQueue.Push(p, activeEntries...)
e.peerRequestQueue.PushBlock(p, activeEntries...)
}
for _, block := range m.Blocks() {
log.Debugf("got block %s %d bytes", block, len(block.RawData()))
Expand All @@ -289,7 +292,10 @@ func (e *Engine) addBlock(block blocks.Block) {
for _, l := range e.ledgerMap {
l.lk.Lock()
if entry, ok := l.WantListContains(block.Cid()); ok {
e.peerRequestQueue.Push(l.Partner, entry)
e.peerRequestQueue.PushBlock(l.Partner, peertask.Task{
Identifier: entry.Cid,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried about allocations here. This used to be a bit of a hot-spot until we made entries and CIDs by-value.

This is a great abstraction so I'm not going to block on that but we just need to keep it in mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I mean, it's still technically by-value -- just a new value that makes a new allocation :) I will keep an eye on it as this goes into production.

Priority: entry.Priority,
})
work = true
}
l.lk.Unlock()
Expand Down
Loading