diff --git a/peertaskqueue.go b/peertaskqueue.go index ccc801a..f935c54 100644 --- a/peertaskqueue.go +++ b/peertaskqueue.go @@ -23,13 +23,14 @@ type hookFunc func(p peer.ID, event peerTaskQueueEvent) // to execute the block with the highest priority, or otherwise the one added // first if priorities are equal. type PeerTaskQueue struct { - lock sync.Mutex - pQueue pq.PQ - peerTrackers map[peer.ID]*peertracker.PeerTracker - frozenPeers map[peer.ID]struct{} - hooks []hookFunc - ignoreFreezing bool - taskMerger peertracker.TaskMerger + lock sync.Mutex + pQueue pq.PQ + peerTrackers map[peer.ID]*peertracker.PeerTracker + frozenPeers map[peer.ID]struct{} + hooks []hookFunc + ignoreFreezing bool + taskMerger peertracker.TaskMerger + maxOutstandingWorkPerPeer int } // Option is a function that configures the peer task queue @@ -62,6 +63,16 @@ func TaskMerger(tmfp peertracker.TaskMerger) Option { } } +// MaxOutstandingWorkPerPeer is an option that specifies how many tasks a peer can have outstanding +// with the same Topic as an existing Topic. +func MaxOutstandingWorkPerPeer(count int) Option { + return func(ptq *PeerTaskQueue) Option { + previous := ptq.maxOutstandingWorkPerPeer + ptq.maxOutstandingWorkPerPeer = count + return MaxOutstandingWorkPerPeer(previous) + } +} + func removeHook(hook hookFunc) Option { return func(ptq *PeerTaskQueue) Option { for i, testHook := range ptq.hooks { @@ -139,7 +150,7 @@ func (ptq *PeerTaskQueue) PushTasks(to peer.ID, tasks ...peertask.Task) { peerTracker, ok := ptq.peerTrackers[to] if !ok { - peerTracker = peertracker.New(to, ptq.taskMerger) + peerTracker = peertracker.New(to, ptq.taskMerger, ptq.maxOutstandingWorkPerPeer) ptq.pQueue.Push(peerTracker) ptq.peerTrackers[to] = peerTracker ptq.callHooks(to, peerAdded) diff --git a/peertracker/peertracker.go b/peertracker/peertracker.go index 7d73e35..19f2406 100644 --- a/peertracker/peertracker.go +++ b/peertracker/peertracker.go @@ -45,6 +45,8 @@ type PeerTracker struct { activelk sync.Mutex activeWork int + maxActiveWorkPerPeer int + // for the PQ interface index int @@ -57,13 +59,14 @@ type PeerTracker struct { } // New creates a new PeerTracker -func New(target peer.ID, taskMerger TaskMerger) *PeerTracker { +func New(target peer.ID, taskMerger TaskMerger, maxActiveWorkPerPeer int) *PeerTracker { return &PeerTracker{ - target: target, - taskQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)), - pendingTasks: make(map[peertask.Topic]*peertask.QueueTask), - activeTasks: make(map[*peertask.Task]struct{}), - taskMerger: taskMerger, + target: target, + taskQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)), + pendingTasks: make(map[peertask.Topic]*peertask.QueueTask), + activeTasks: make(map[*peertask.Task]struct{}), + taskMerger: taskMerger, + maxActiveWorkPerPeer: maxActiveWorkPerPeer, } } @@ -172,6 +175,16 @@ func (p *PeerTracker) PopTasks(targetMinWork int) ([]*peertask.Task, int) { var out []*peertask.Task work := 0 for p.taskQueue.Len() > 0 && p.freezeVal == 0 && work < targetMinWork { + if p.maxActiveWorkPerPeer > 0 { + // Do not add work to a peer that is already maxed out + p.activelk.Lock() + activeWork := p.activeWork + p.activelk.Unlock() + if activeWork >= p.maxActiveWorkPerPeer { + break + } + } + // Pop the next task off the queue t := p.taskQueue.Pop().(*peertask.QueueTask) diff --git a/peertracker/peertracker_test.go b/peertracker/peertracker_test.go index 451022b..afabf85 100644 --- a/peertracker/peertracker_test.go +++ b/peertracker/peertracker_test.go @@ -7,9 +7,11 @@ import ( "github.com/ipfs/go-peertaskqueue/testutil" ) +const testMaxActiveWorkPerPeer = 100 + func TestEmpty(t *testing.T) { partner := testutil.GeneratePeers(1)[0] - tracker := New(partner, &DefaultTaskMerger{}) + tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer) tasks, _ := tracker.PopTasks(100) if len(tasks) != 0 { @@ -19,7 +21,7 @@ func TestEmpty(t *testing.T) { func TestPushPop(t *testing.T) { partner := testutil.GeneratePeers(1)[0] - tracker := New(partner, &DefaultTaskMerger{}) + tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer) tasks := []peertask.Task{ { @@ -40,7 +42,7 @@ func TestPushPop(t *testing.T) { func TestPopNegativeOrZeroSize(t *testing.T) { partner := testutil.GeneratePeers(1)[0] - tracker := New(partner, &DefaultTaskMerger{}) + tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer) tasks := []peertask.Task{ { @@ -62,7 +64,7 @@ func TestPopNegativeOrZeroSize(t *testing.T) { func TestPushPopSizeAndOrder(t *testing.T) { partner := testutil.GeneratePeers(1)[0] - tracker := New(partner, &DefaultTaskMerger{}) + tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer) tasks := []peertask.Task{ { @@ -116,7 +118,7 @@ func TestPushPopSizeAndOrder(t *testing.T) { func TestPopFirstItemAlways(t *testing.T) { partner := testutil.GeneratePeers(1)[0] - tracker := New(partner, &DefaultTaskMerger{}) + tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer) tasks := []peertask.Task{ { @@ -147,7 +149,7 @@ func TestPopFirstItemAlways(t *testing.T) { func TestPopItemsToCoverTargetWork(t *testing.T) { partner := testutil.GeneratePeers(1)[0] - tracker := New(partner, &DefaultTaskMerger{}) + tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer) tasks := []peertask.Task{ { @@ -183,7 +185,7 @@ func TestPopItemsToCoverTargetWork(t *testing.T) { func TestRemove(t *testing.T) { partner := testutil.GeneratePeers(1)[0] - tracker := New(partner, &DefaultTaskMerger{}) + tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer) tasks := []peertask.Task{ { @@ -215,7 +217,7 @@ func TestRemove(t *testing.T) { func TestRemoveMulti(t *testing.T) { partner := testutil.GeneratePeers(1)[0] - tracker := New(partner, &DefaultTaskMerger{}) + tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer) tasks := []peertask.Task{ { @@ -247,7 +249,7 @@ func TestRemoveMulti(t *testing.T) { func TestTaskDone(t *testing.T) { partner := testutil.GeneratePeers(1)[0] - tracker := New(partner, &DefaultTaskMerger{}) + tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer) tasks := []peertask.Task{ { @@ -299,7 +301,7 @@ func (*permissiveTaskMerger) Merge(task peertask.Task, existing *peertask.Task) func TestReplaceTaskPermissive(t *testing.T) { partner := testutil.GeneratePeers(1)[0] - tracker := New(partner, &permissiveTaskMerger{}) + tracker := New(partner, &permissiveTaskMerger{}, testMaxActiveWorkPerPeer) tasks := []peertask.Task{ { @@ -338,7 +340,7 @@ func TestReplaceTaskPermissive(t *testing.T) { func TestReplaceTaskSize(t *testing.T) { partner := testutil.GeneratePeers(1)[0] - tracker := New(partner, &permissiveTaskMerger{}) + tracker := New(partner, &permissiveTaskMerger{}, testMaxActiveWorkPerPeer) tasks := []peertask.Task{ { @@ -391,7 +393,7 @@ func TestReplaceTaskSize(t *testing.T) { func TestReplaceActiveTask(t *testing.T) { partner := testutil.GeneratePeers(1)[0] - tracker := New(partner, &permissiveTaskMerger{}) + tracker := New(partner, &permissiveTaskMerger{}, testMaxActiveWorkPerPeer) tasks := []peertask.Task{ { @@ -430,7 +432,7 @@ func TestReplaceActiveTask(t *testing.T) { func TestReplaceActiveTaskNonPermissive(t *testing.T) { partner := testutil.GeneratePeers(1)[0] - tracker := New(partner, &DefaultTaskMerger{}) + tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer) tasks := []peertask.Task{ { @@ -468,7 +470,7 @@ func TestReplaceActiveTaskNonPermissive(t *testing.T) { func TestReplaceTaskThatIsActiveAndPending(t *testing.T) { partner := testutil.GeneratePeers(1)[0] - tracker := New(partner, &permissiveTaskMerger{}) + tracker := New(partner, &permissiveTaskMerger{}, testMaxActiveWorkPerPeer) tasks := []peertask.Task{ { @@ -519,7 +521,7 @@ func TestReplaceTaskThatIsActiveAndPending(t *testing.T) { func TestRemoveActive(t *testing.T) { partner := testutil.GeneratePeers(1)[0] - tracker := New(partner, &permissiveTaskMerger{}) + tracker := New(partner, &permissiveTaskMerger{}, testMaxActiveWorkPerPeer) tasks := []peertask.Task{ {