Skip to content

Commit

Permalink
Merge pull request #8 from thesilentg/main
Browse files Browse the repository at this point in the history
Ensure scheduling fairness
  • Loading branch information
gaffo authored Aug 14, 2024
2 parents ec5cc41 + 9c5d451 commit 82cc1ce
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
7 changes: 6 additions & 1 deletion processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,12 @@ func (s stateStorage[AC, OC, JC]) runJob(job Job[JC]) {

func (s stateStorage[AC, OC, JC]) queueJob(job Job[JC]) {
s.stateStatusMap[job.State].Waiting += 1
s.stateWaitingJobsMap[job.State] = append(s.stateWaitingJobsMap[job.State], job)
// Since we pull queued jobs from the end of the slice, we should put new jobs at the front
// to ensure fairness (jobs that come later only get processed after already waiting jobs)
// If this was in the hot loop (happening thousands of times per second), the memory re-alloc here wouldn't be great
// However, typically work involved in state transitions is 4+ orders of magnitude lower than the actual work
// being done, so the simplicity is preferred compared to some sort of more elegant resizing ring buffer
s.stateWaitingJobsMap[job.State] = append([]Job[JC]{job}, s.stateWaitingJobsMap[job.State]...)
}

func (s stateStorage[AC, OC, JC]) completeJob(job Job[JC]) {
Expand Down
42 changes: 41 additions & 1 deletion processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,47 @@ func TestProcessor_StateCallback(t *testing.T) {
}
}

func TestNoStatusCounts(t *testing.T) {
func TestFairness(t *testing.T) {
oc := MyOverallContext{}
ac := MyAppContext{}
r := NewRun[MyOverallContext, MyJobContext]("job", oc)
for i := 0; i < 5; i++ {
r.AddJob(MyJobContext{
Count: 0,
})
}
totalCount := 0
states := []State[MyAppContext, MyOverallContext, MyJobContext]{
{
TriggerState: TRIGGER_STATE_NEW,
Exec: func(ctx context.Context, ac MyAppContext, oc MyOverallContext, jc MyJobContext) (MyJobContext, string, []KickRequest[MyJobContext], error) {
totalCount++
if totalCount > 10 {
return jc, STATE_DONE, nil, nil
}

jc.Count++

return jc, TRIGGER_STATE_NEW, nil, nil
},
Concurrency: 1,
},
{
TriggerState: STATE_DONE,
Terminal: true,
},
}

p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil)
assert.NoError(t, err)

err = p.Exec(context.Background(), r)
for _, job := range r.Jobs {
assert.Equal(t, 2, job.C.Count)
}
}

func TestStatusCountDedup(t *testing.T) {
oc := MyOverallContext{}
ac := MyAppContext{}
r := NewRun[MyOverallContext, MyJobContext]("job", oc)
Expand Down

0 comments on commit 82cc1ce

Please sign in to comment.