From 9c5d4513ee2f6404d80d1f057fb15cdbe21f15dd Mon Sep 17 00:00:00 2001 From: Avery Gnolek Date: Wed, 14 Aug 2024 16:04:14 -0400 Subject: [PATCH] ensure fairness --- processor.go | 7 ++++++- processor_test.go | 42 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/processor.go b/processor.go index 4de9177..98be813 100644 --- a/processor.go +++ b/processor.go @@ -132,7 +132,12 @@ func (s stateStorage[AC, OC, JC]) runJob(job Job[JC]) { func (s stateStorage[AC, OC, JC]) queueJob(job Job[JC]) { s.stateStatusMap[job.State].Waiting += 1 - s.stateWaitingJobsMap[job.State] = append(s.stateWaitingJobsMap[job.State], job) + // Since we pull queued jobs from the end of the slice, we should put new jobs at the front + // to ensure fairness (jobs that come later only get processed after already waiting jobs) + // If this was in the hot loop (happening thousands of times per second), the memory re-alloc here wouldn't be great + // However, typically work involved in state transitions is 4+ orders of magnitude lower than the actual work + // being done, so the simplicity is preferred compared to some sort of more elegant resizing ring buffer + s.stateWaitingJobsMap[job.State] = append([]Job[JC]{job}, s.stateWaitingJobsMap[job.State]...) } func (s stateStorage[AC, OC, JC]) completeJob(job Job[JC]) { diff --git a/processor_test.go b/processor_test.go index 2092d10..2941dcb 100644 --- a/processor_test.go +++ b/processor_test.go @@ -482,7 +482,47 @@ func TestProcessor_StateCallback(t *testing.T) { } } -func TestNoStatusCounts(t *testing.T) { +func TestFairness(t *testing.T) { + oc := MyOverallContext{} + ac := MyAppContext{} + r := NewRun[MyOverallContext, MyJobContext]("job", oc) + for i := 0; i < 5; i++ { + r.AddJob(MyJobContext{ + Count: 0, + }) + } + totalCount := 0 + states := []State[MyAppContext, MyOverallContext, MyJobContext]{ + { + TriggerState: TRIGGER_STATE_NEW, + Exec: func(ctx context.Context, ac MyAppContext, oc MyOverallContext, jc MyJobContext) (MyJobContext, string, []KickRequest[MyJobContext], error) { + totalCount++ + if totalCount > 10 { + return jc, STATE_DONE, nil, nil + } + + jc.Count++ + + return jc, TRIGGER_STATE_NEW, nil, nil + }, + Concurrency: 1, + }, + { + TriggerState: STATE_DONE, + Terminal: true, + }, + } + + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) + + err = p.Exec(context.Background(), r) + for _, job := range r.Jobs { + assert.Equal(t, 2, job.C.Count) + } +} + +func TestStatusCountDedup(t *testing.T) { oc := MyOverallContext{} ac := MyAppContext{} r := NewRun[MyOverallContext, MyJobContext]("job", oc)