Skip to content

Commit

Permalink
Merge pull request #10 from ipfs/feat/track-ongoing-jobs
Browse files Browse the repository at this point in the history
Have a configurable maximum active work per peer
  • Loading branch information
petar authored Jul 22, 2021
2 parents c340d8f + 24f8245 commit db9b167
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 29 deletions.
27 changes: 19 additions & 8 deletions peertaskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ type hookFunc func(p peer.ID, event peerTaskQueueEvent)
// to execute the block with the highest priority, or otherwise the one added
// first if priorities are equal.
type PeerTaskQueue struct {
lock sync.Mutex
pQueue pq.PQ
peerTrackers map[peer.ID]*peertracker.PeerTracker
frozenPeers map[peer.ID]struct{}
hooks []hookFunc
ignoreFreezing bool
taskMerger peertracker.TaskMerger
lock sync.Mutex
pQueue pq.PQ
peerTrackers map[peer.ID]*peertracker.PeerTracker
frozenPeers map[peer.ID]struct{}
hooks []hookFunc
ignoreFreezing bool
taskMerger peertracker.TaskMerger
maxOutstandingWorkPerPeer int
}

// Option is a function that configures the peer task queue
Expand Down Expand Up @@ -62,6 +63,16 @@ func TaskMerger(tmfp peertracker.TaskMerger) Option {
}
}

// MaxOutstandingWorkPerPeer is an option that specifies how many tasks a peer can have outstanding
// with the same Topic as an existing Topic.
func MaxOutstandingWorkPerPeer(count int) Option {
return func(ptq *PeerTaskQueue) Option {
previous := ptq.maxOutstandingWorkPerPeer
ptq.maxOutstandingWorkPerPeer = count
return MaxOutstandingWorkPerPeer(previous)
}
}

func removeHook(hook hookFunc) Option {
return func(ptq *PeerTaskQueue) Option {
for i, testHook := range ptq.hooks {
Expand Down Expand Up @@ -139,7 +150,7 @@ func (ptq *PeerTaskQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {

peerTracker, ok := ptq.peerTrackers[to]
if !ok {
peerTracker = peertracker.New(to, ptq.taskMerger)
peerTracker = peertracker.New(to, ptq.taskMerger, ptq.maxOutstandingWorkPerPeer)
ptq.pQueue.Push(peerTracker)
ptq.peerTrackers[to] = peerTracker
ptq.callHooks(to, peerAdded)
Expand Down
25 changes: 19 additions & 6 deletions peertracker/peertracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type PeerTracker struct {
activelk sync.Mutex
activeWork int

maxActiveWorkPerPeer int

// for the PQ interface
index int

Expand All @@ -57,13 +59,14 @@ type PeerTracker struct {
}

// New creates a new PeerTracker
func New(target peer.ID, taskMerger TaskMerger) *PeerTracker {
func New(target peer.ID, taskMerger TaskMerger, maxActiveWorkPerPeer int) *PeerTracker {
return &PeerTracker{
target: target,
taskQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)),
pendingTasks: make(map[peertask.Topic]*peertask.QueueTask),
activeTasks: make(map[*peertask.Task]struct{}),
taskMerger: taskMerger,
target: target,
taskQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)),
pendingTasks: make(map[peertask.Topic]*peertask.QueueTask),
activeTasks: make(map[*peertask.Task]struct{}),
taskMerger: taskMerger,
maxActiveWorkPerPeer: maxActiveWorkPerPeer,
}
}

Expand Down Expand Up @@ -172,6 +175,16 @@ func (p *PeerTracker) PopTasks(targetMinWork int) ([]*peertask.Task, int) {
var out []*peertask.Task
work := 0
for p.taskQueue.Len() > 0 && p.freezeVal == 0 && work < targetMinWork {
if p.maxActiveWorkPerPeer > 0 {
// Do not add work to a peer that is already maxed out
p.activelk.Lock()
activeWork := p.activeWork
p.activelk.Unlock()
if activeWork >= p.maxActiveWorkPerPeer {
break
}
}

// Pop the next task off the queue
t := p.taskQueue.Pop().(*peertask.QueueTask)

Expand Down
32 changes: 17 additions & 15 deletions peertracker/peertracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"github.com/ipfs/go-peertaskqueue/testutil"
)

const testMaxActiveWorkPerPeer = 100

func TestEmpty(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks, _ := tracker.PopTasks(100)
if len(tasks) != 0 {
Expand All @@ -19,7 +21,7 @@ func TestEmpty(t *testing.T) {

func TestPushPop(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand All @@ -40,7 +42,7 @@ func TestPushPop(t *testing.T) {

func TestPopNegativeOrZeroSize(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand All @@ -62,7 +64,7 @@ func TestPopNegativeOrZeroSize(t *testing.T) {

func TestPushPopSizeAndOrder(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -116,7 +118,7 @@ func TestPushPopSizeAndOrder(t *testing.T) {

func TestPopFirstItemAlways(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -147,7 +149,7 @@ func TestPopFirstItemAlways(t *testing.T) {

func TestPopItemsToCoverTargetWork(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -183,7 +185,7 @@ func TestPopItemsToCoverTargetWork(t *testing.T) {

func TestRemove(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -215,7 +217,7 @@ func TestRemove(t *testing.T) {

func TestRemoveMulti(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -247,7 +249,7 @@ func TestRemoveMulti(t *testing.T) {

func TestTaskDone(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -299,7 +301,7 @@ func (*permissiveTaskMerger) Merge(task peertask.Task, existing *peertask.Task)

func TestReplaceTaskPermissive(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tracker := New(partner, &permissiveTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -338,7 +340,7 @@ func TestReplaceTaskPermissive(t *testing.T) {

func TestReplaceTaskSize(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tracker := New(partner, &permissiveTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -391,7 +393,7 @@ func TestReplaceTaskSize(t *testing.T) {

func TestReplaceActiveTask(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tracker := New(partner, &permissiveTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -430,7 +432,7 @@ func TestReplaceActiveTask(t *testing.T) {

func TestReplaceActiveTaskNonPermissive(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tracker := New(partner, &DefaultTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -468,7 +470,7 @@ func TestReplaceActiveTaskNonPermissive(t *testing.T) {

func TestReplaceTaskThatIsActiveAndPending(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tracker := New(partner, &permissiveTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down Expand Up @@ -519,7 +521,7 @@ func TestReplaceTaskThatIsActiveAndPending(t *testing.T) {

func TestRemoveActive(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &permissiveTaskMerger{})
tracker := New(partner, &permissiveTaskMerger{}, testMaxActiveWorkPerPeer)

tasks := []peertask.Task{
{
Expand Down

0 comments on commit db9b167

Please sign in to comment.