diff --git a/internal/examples/simplejob/main.go b/internal/examples/simplejob/main.go new file mode 100644 index 0000000..40a02d0 --- /dev/null +++ b/internal/examples/simplejob/main.go @@ -0,0 +1,86 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "log" + "log/slog" + "math/rand" + "os" + "time" + + "github.com/gaffo/jorb" +) + +type oc struct{} +type ac struct{} +type jc struct{} + +func main() { + o := oc{} + a := ac{} + r := jorb.NewRun[oc, jc]("example", o) + + slog.SetLogLoggerLevel(slog.LevelWarn) + for i := 0; i < 100; i++ { + r.AddJobWithState(jc{}, "A") + } + + states := []jorb.State[ac, oc, jc]{ + { + TriggerState: "A", + Exec: func(ctx context.Context, ac ac, oc oc, jc jc) (jc, string, []jorb.KickRequest[jc], error) { + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + return jc, "B", nil, nil + }, + Concurrency: 5, + }, + { + TriggerState: "B", + Exec: func(ctx context.Context, ac ac, oc oc, jc jc) (jc, string, []jorb.KickRequest[jc], error) { + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + return jc, "C", nil, nil + }, + Concurrency: 4, + }, + { + TriggerState: "C", + Exec: func(ctx context.Context, ac ac, oc oc, jc jc) (jc, string, []jorb.KickRequest[jc], error) { + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + return jc, "D", nil, nil + }, + Concurrency: 3, + }, + { + TriggerState: "D", + Terminal: true, + }, + } + + serial := jorb.NewJsonSerializer[oc, jc]("example.state") + listener := &fileListener{fileName: "example.status"} + p, err := jorb.NewProcessor[ac, oc, jc](a, states, serial, listener) + if err != nil { + log.Fatal(err) + } + + if err := p.Exec(context.Background(), r); err != nil { + log.Fatal(err) + } +} + +// Serializes the status updates to a file +type fileListener struct { + fileName string +} + +func (f *fileListener) StatusUpdate(status []jorb.StatusCount) { + buf := &bytes.Buffer{} + + encoder := json.NewEncoder(buf) + encoder.SetIndent("", " ") + _ = encoder.Encode(status) + + _ = os.WriteFile(f.fileName, buf.Bytes(), 0644) +} diff --git a/job.go b/job.go index f991333..204d2df 100644 --- a/job.go +++ b/job.go @@ -13,7 +13,11 @@ type Job[JC any] struct { // UpdateLastEvent updates the LastUpdate field of the Job struct to the current time. func (j Job[JC]) UpdateLastEvent() Job[JC] { - t := time.Now() + // Removes the monotonic clock portion of the timestamp which is only useful for measuring time + // https://pkg.go.dev/time#hdr-Monotonic_Clocks + // The monotonic clock information will not be marshalled, and thus cause tests which Marshal / Unmarshal job state + // and expect the results to be the same to fail. + t := time.Now().Truncate(time.Millisecond) // Set the LastUpdate field to the current time j.LastUpdate = &t return j diff --git a/processor.go b/processor.go index 2be68fd..7773ac0 100644 --- a/processor.go +++ b/processor.go @@ -6,7 +6,7 @@ import ( "log" "log/slog" "runtime/pprof" - "strings" + "sort" "sync" "golang.org/x/time/rate" @@ -49,335 +49,367 @@ type KickRequest[JC any] struct { type StatusCount struct { State string - Count int + Completed int Executing int + Waiting int Terminal bool } +type state struct { +} + +type stateStorage[AC any, OC any, JC any] struct { + // This is fine for package-internal use cases to directly iterate over + states []State[AC, OC, JC] + + // These shouldn't be used outside stateStorage's methods + stateMap map[string]State[AC, OC, JC] + stateStatusMap map[string]*StatusCount + stateWaitingJobsMap map[string][]Job[JC] + stateChan map[string]chan Job[JC] + sortedStateNames []string +} + +func newStateStorageFromStates[AC any, OC any, JC any](states []State[AC, OC, JC]) stateStorage[AC, OC, JC] { + st := stateStorage[AC, OC, JC]{ + states: states, + stateMap: map[string]State[AC, OC, JC]{}, + stateStatusMap: map[string]*StatusCount{}, + stateWaitingJobsMap: map[string][]Job[JC]{}, + stateChan: map[string]chan Job[JC]{}, + sortedStateNames: []string{}, + } + + for _, s := range states { + stateName := s.TriggerState + + st.sortedStateNames = append(st.sortedStateNames, stateName) + st.stateMap[stateName] = s + st.stateStatusMap[stateName] = &StatusCount{ + State: stateName, + Terminal: s.Terminal, + } + // This is by-design unbuffered + st.stateChan[stateName] = make(chan Job[JC]) + } + + sort.Strings(st.sortedStateNames) + + return st +} + +func (s stateStorage[AC, OC, JC]) getJobChannelForState(stateName string) chan Job[JC] { + return s.stateChan[stateName] +} + +func (s stateStorage[AC, OC, JC]) closeJobChannelForState(stateName string) { + close(s.stateChan[stateName]) +} + +func (s stateStorage[AC, OC, JC]) validate() error { + for _, state := range s.states { + if state.Terminal { + if state.Concurrency < 0 { + return fmt.Errorf("terminal state %s has negative concurrency", state.TriggerState) + } + } else { + if state.Concurrency < 1 { + return fmt.Errorf("non-terminal state %s has non-positive concurrency", state.TriggerState) + } + if state.Exec == nil { + return fmt.Errorf("non-terminal state %s but has no Exec function", state.TriggerState) + } + } + } + + return nil +} + +func (s stateStorage[AC, OC, JC]) runJob(job Job[JC]) { + s.stateStatusMap[job.State].Executing += 1 + s.stateChan[job.State] <- job +} + +func (s stateStorage[AC, OC, JC]) queueJob(job Job[JC]) { + s.stateStatusMap[job.State].Waiting += 1 + s.stateWaitingJobsMap[job.State] = append(s.stateWaitingJobsMap[job.State], job) +} + +func (s stateStorage[AC, OC, JC]) completeJob(job Job[JC]) { + s.stateStatusMap[job.State].Completed += 1 +} + +func (s stateStorage[AC, OC, JC]) processJob(job Job[JC]) { + if s.isTerminal(job) { + s.completeJob(job) + return + } + + if s.canRunJobForState(job.State) { + s.runJob(job) + return + } + + s.queueJob(job) +} + +func (s stateStorage[AC, OC, JC]) isTerminal(job Job[JC]) bool { + return s.stateMap[job.State].Terminal +} + +func (s stateStorage[AC, OC, JC]) allJobsAreTerminal(r *Run[OC, JC]) bool { + for _, job := range r.Jobs { + if !s.isTerminal(job) { + return false + } + } + return true +} + +func (s stateStorage[AC, OC, JC]) runNextWaitingJob(state string) { + // One less job is executing for the prior state + s.stateStatusMap[state].Executing -= 1 + + // There are no waiting jobs for the state, so we have nothing to queue + waitingJobCount := len(s.stateWaitingJobsMap[state]) + if waitingJobCount == 0 { + return + } + + job := s.stateWaitingJobsMap[state][waitingJobCount-1] + s.stateWaitingJobsMap[state] = s.stateWaitingJobsMap[state][0 : waitingJobCount-1] + s.stateStatusMap[job.State].Waiting -= 1 + + s.runJob(job) +} + +func (s stateStorage[AC, OC, JC]) canRunJobForState(state string) bool { + return s.stateStatusMap[state].Executing < s.stateMap[state].Concurrency +} + +func (s stateStorage[AC, OC, JC]) hasExecutingJobs() bool { + for _, value := range s.stateStatusMap { + if value.Executing > 0 { + return true + } + } + + return false +} + +func (s stateStorage[AC, OC, JC]) getStatusCounts() []StatusCount { + ret := make([]StatusCount, 0) + for _, name := range s.sortedStateNames { + ret = append(ret, *s.stateStatusMap[name]) + } + return ret +} + +// Serializer is an interface that defines how to serialize and deserialize job contexts. + // Processor executes a job type Processor[AC any, OC any, JC any] struct { appContext AC - states []State[AC, OC, JC] serializer Serializer[OC, JC] + stateThing stateStorage[AC, OC, JC] statusListener StatusListener - initted bool - stateMap map[string]State[AC, OC, JC] - stateNames []string - stateChan map[string]chan Job[JC] returnChan chan Return[JC] -} - -func NewProcessor[AC any, OC any, JC any](ac AC, states []State[AC, OC, JC], serializer Serializer[OC, JC], statusListener StatusListener) *Processor[AC, OC, JC] { - return &Processor[AC, OC, JC]{ - appContext: ac, - states: states, - serializer: serializer, - statusListener: statusListener, - } + wg sync.WaitGroup } // Return is a struct that contains a job and a list of kick requests // that is used for returning job updates to the system type Return[JC any] struct { + PriorState string Job Job[JC] KickRequests []KickRequest[JC] Error error } -func (p *Processor[AC, OC, JC]) init() { - if p.initted { - return +func NewProcessor[AC any, OC any, JC any](ac AC, states []State[AC, OC, JC], serializer Serializer[OC, JC], statusListener StatusListener) (*Processor[AC, OC, JC], error) { + p := &Processor[AC, OC, JC]{ + appContext: ac, + stateThing: newStateStorageFromStates(states), + serializer: serializer, + statusListener: statusListener, } + + if err := p.stateThing.validate(); err != nil { + return nil, err + } + + return p, nil +} + +func (p *Processor[AC, OC, JC]) init() { if p.serializer == nil { p.serializer = &NilSerializer[OC, JC]{} } if p.statusListener == nil { p.statusListener = &NilStatusListener{} } - // Make a map of triggers to states so we can easily reference it - p.stateMap = map[string]State[AC, OC, JC]{} - for _, s := range p.states { - p.stateMap[s.TriggerState] = s - } - // get a list of return state names for use - p.stateNames = make([]string, 0, len(p.stateMap)) - for k := range p.stateMap { - p.stateNames = append(p.stateNames, k) - } - - // For each state, we need a channel of jobs - p.stateChan = map[string]chan Job[JC]{} - - // Create the state chans - totalConcurrency := 0 - for _, s := range p.states { - if s.Terminal { - continue - } - p.stateChan[s.TriggerState] = make(chan Job[JC], s.Concurrency) // make a chan - totalConcurrency += s.Concurrency - } - // When a job changes state, we send it to this channel to centrally manage and re-queue - p.returnChan = make(chan Return[JC], totalConcurrency*2) // make it the size of the total amount of in flight jobs we could have so that each worker can return a task + // This is by-design unbuffered + p.returnChan = make(chan Return[JC]) } // Exec this big work function, this does all the crunching func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error { p.init() - if p.allJobsAreTerminal(r) { + if p.stateThing.allJobsAreTerminal(r) { slog.Info("AllJobsTerminal") return nil } - wg := sync.WaitGroup{} - // create the workers - for _, s := range p.states { + for _, s := range p.stateThing.states { // Terminal states don't need to recieve jobs, they're just done if s.Terminal { continue } - if s.Exec == nil { - return p.invalidStateError(s.TriggerState) - } - concurrency := s.Concurrency - - // Make workers for each, they just process and fire back to the central channel - for i := 0; i < concurrency; i++ { // add a waiter for every go processor, do it before forking - labels := pprof.Labels("type", "jorbWorker", "state", s.TriggerState, "id", fmt.Sprintf("%d", i)) - pprof.Do(ctx, labels, func(_ context.Context) { - go p.execFunc(ctx, r, s, i, wg)() - }) - } + p.execFunc(ctx, s, r.Overall, &p.wg) } - go func() { p.enqueueAllJobs(r) }() // fill all the outbound queues once in a seperate goroutine to prime the pump faster - - // Make a central processor and start it - wg.Add(1) - pprof.Do(ctx, pprof.Labels("type", "ReturnChanWorker"), func(_ context.Context) { - go func() { - p.returnQueue(r, &wg) - }() + pprof.Do(ctx, pprof.Labels("type", "main"), func(ctx context.Context) { + p.wg.Add(1) + go p.process(ctx, r, &p.wg) }) - // Wait for all of the processors to quit - wg.Wait() - + p.wg.Wait() return nil } -func (p *Processor[AC, OC, JC]) returnQueue(r *Run[OC, JC], wg *sync.WaitGroup) { - for { - batch := []Return[JC]{} - READ_BATCH: - for { - select { - case rtn := <-p.returnChan: - batch = append(batch, rtn) - default: - break READ_BATCH - } - } - // Dispense with the jobs - for _, rtn := range batch { - slog.Info("ReturnChan GotJobBack", "jobId", rtn.Job.Id, "state", rtn.Job.State, "kickRequests", len(rtn.KickRequests), "error", rtn.Error) - j := rtn.Job - - // Send the new kicks if any - p.kickJobs(rtn, j, r) - - // Append the error if needed - if rtn.Error != nil { - j.StateErrors[j.State] = append(j.StateErrors[j.State], rtn.Error.Error()) - } - - // return the job - r.Return(j) - } - // Now do end of batch work - - // Flush the state - err := p.serializer.Serialize(*r) - if err != nil { - log.Fatalf("Error serializing, aborting now to not lose work: %v", err) - } - // update the status counts - p.updateStatusCounts(r) - - // flush out any new jobs we can - p.enqueueAllJobs(r) - - // If the state was terminal, we should see if all of the states are terminated, if so shut down - if !p.allJobsAreTerminal(r) { - continue - } - // if there are any jobs in flight in the run, keep going - if r.JobsInFlight() { - continue - } - +func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg *sync.WaitGroup) { + defer func() { p.shutdown() + wg.Done() + }() - break + // Enqueue the jobs to start + for _, job := range r.Jobs { + p.stateThing.processJob(job) } - slog.Info("ReturnChanWorker Quit") - wg.Done() -} -func (p *Processor[AC, OC, JC]) updateStatusCounts(r *Run[OC, JC]) { - counts := r.StatusCounts() + // Send the initial status update with the state of all the jobs + p.updateStatus() - ret := []StatusCount{} + for { + select { + case <-ctx.Done(): + return + case completedJob := <-p.returnChan: + // If the prior state of the completed job was at capacity, we now have space for one more + p.stateThing.runNextWaitingJob(completedJob.PriorState) + + // Update the run with the new state + r.UpdateJob(completedJob.Job) + p.stateThing.processJob(completedJob.Job) + + // Start any of the new jobs that need kicking + for idx, kickRequest := range completedJob.KickRequests { + job := Job[JC]{ + Id: fmt.Sprintf("%s->%d", completedJob.Job.Id, idx), + C: kickRequest.C, + State: kickRequest.State, + StateErrors: map[string][]string{}, + } + r.UpdateJob(job) + p.stateThing.processJob(job) + } - for _, state := range p.states { - if _, ok := counts[state.TriggerState]; !ok { - ret = append(ret, StatusCount{ - State: state.TriggerState, - Count: 0, - Executing: 0, - Terminal: state.Terminal, - }) - continue - } - c := counts[state.TriggerState] - c.Terminal = state.Terminal - ret = append(ret, c) - } - p.statusListener.StatusUpdate(ret) -} + if err := p.serializer.Serialize(*r); err != nil { + log.Fatalf("Error serializing, aborting now to not lose work: %v", err) + } -func (p *Processor[AC, OC, JC]) allJobsAreTerminal(r *Run[OC, JC]) bool { - c := r.StatusCounts() - for _, k := range p.states { - if k.Terminal { - continue - } - if c[k.TriggerState].Count > 0 { - return false - } - } - return true -} + p.updateStatus() -func (p *Processor[AC, OC, JC]) enqueueAllJobs(r *Run[OC, JC]) { - slog.Info("Enqueing Jobs", "jobCount", len(r.Jobs)) - enqueued := 0 - for _, state := range p.states { - enqueued += p.enqueueJobsForState(r, state) // mutates r.Jobs + if p.stateThing.allJobsAreTerminal(r) && !p.stateThing.hasExecutingJobs() { + return + } + } } - slog.Info("All Queues Primed", "jobCount", len(r.Jobs), "enqueuedCount", enqueued) } -func (p *Processor[AC, OC, JC]) enqueueJobsForState(r *Run[OC, JC], state State[AC, OC, JC]) int { - slog.Info("Enqueueing jobs for state", "state", state.TriggerState) - enqueued := 0 - for { - j, ok := r.NextJobForState(state.TriggerState) - if !ok { - slog.Info("No more jobs for state", "state", state.TriggerState) - return enqueued - } - c := p.stateChan[state.TriggerState] - select { - case c <- j: - enqueued++ - slog.Info("Enqueing Job", "state", j.State, "job", j.Id) - continue - default: - r.Return(j) - return enqueued - } - } - slog.Info("Enqueued jobs for state", "state", state.TriggerState, "enqueuedCount", enqueued) - return enqueued +func (p *Processor[AC, OC, JC]) updateStatus() { + p.statusListener.StatusUpdate(p.stateThing.getStatusCounts()) } func (p *Processor[AC, OC, JC]) shutdown() { // close all of the channels - for _, c := range p.stateChan { - close(c) + for _, state := range p.stateThing.states { + p.stateThing.closeJobChannelForState(state.TriggerState) } // close ourselves down close(p.returnChan) } -func (p *Processor[AC, OC, JC]) kickJobs(rtn Return[JC], j Job[JC], r *Run[OC, JC]) { - if rtn.KickRequests == nil { - return - } - for _, k := range rtn.KickRequests { - // create a new job with the right state - newJob := Job[JC]{ - Id: fmt.Sprintf("%s->%d", j.Id, len(r.Jobs)), - C: k.C, - State: k.State, - StateErrors: map[string][]string{}, - } - - // validate it - _, ok := p.stateMap[newJob.State] - if !ok { - log.Fatal(p.invalidStateError(newJob.State)) - } - - // return it to the run, it'll get re-enqueued by the main return loop - r.Return(newJob) - } -} - type StateExec[AC any, OC any, JC any] struct { - State State[AC, OC, JC] - i int - wg sync.WaitGroup - c chan Job[JC] - Overall OC ctx context.Context - returnChan chan Return[JC] ac AC + oc OC + state State[AC, OC, JC] + jobChan <-chan Job[JC] + returnChan chan<- Return[JC] + i int + wg *sync.WaitGroup } func (s *StateExec[AC, OC, JC]) Run() { - slog.Info("Starting worker", "worker", s.i, "state", s.State.TriggerState) - for j := range s.c { - if s.State.RateLimit != nil { - s.State.RateLimit.Wait(s.ctx) - slog.Info("LimiterAllowed", "worker", s.i, "state", s.State.TriggerState, "job", j.Id) + slog.Info("Starting worker", "worker", s.i, "state", s.state.TriggerState) + defer func() { + s.wg.Done() + slog.Info("Stopped worker", "worker", s.i, "state", s.state.TriggerState) + }() + + for { + select { + case <-s.ctx.Done(): + return + case j, more := <-s.jobChan: + // The channel was closed + if !more { + return + } + + if s.state.RateLimit != nil { + s.state.RateLimit.Wait(s.ctx) + slog.Info("LimiterAllowed", "worker", s.i, "state", s.state.TriggerState, "job", j.Id) + } + // Execute the job + rtn := Return[JC]{ + PriorState: j.State, + } + slog.Info("Executing job", "job", j.Id, "state", s.state.TriggerState) + j.C, j.State, rtn.KickRequests, rtn.Error = s.state.Exec(s.ctx, s.ac, s.oc, j.C) + slog.Info("Execution complete", "job", j.Id, "state", s.state.TriggerState, "newState", j.State, "error", rtn.Error, "kickRequests", len(rtn.KickRequests)) + + rtn.Job = j + slog.Info("Returning job", "job", j.Id, "newState", j.State) + s.returnChan <- rtn + slog.Info("Returned job", "job", j.Id, "newState", j.State) } - // Execute the job - rtn := Return[JC]{} - slog.Info("Executing job", "job", j.Id, "state", s.State.TriggerState) - j.C, j.State, rtn.KickRequests, rtn.Error = s.State.Exec(s.ctx, s.ac, s.Overall, j.C) - slog.Info("Execution complete", "job", j.Id, "state", s.State.TriggerState, "newState", j.State, "error", rtn.Error, "kickRequests", len(rtn.KickRequests)) - - rtn.Job = j - //go func() { - slog.Info("Returning job", "job", j.Id, "newState", j.State) - s.returnChan <- rtn - slog.Info("Returned job", "job", j.Id, "newState", j.State) - //}() } - s.wg.Done() - slog.Info("Stopped worker", "worker", s.i, "state", s.State.TriggerState) } -func (p *Processor[AC, OC, JC]) execFunc(ctx context.Context, r *Run[OC, JC], s State[AC, OC, JC], i int, wg sync.WaitGroup) func() { - wg.Add(1) - e := &StateExec[AC, OC, JC]{ - State: s, - i: i, - wg: wg, - c: p.stateChan[s.TriggerState], - ctx: ctx, - returnChan: p.returnChan, - ac: p.appContext, - } - return e.Run -} +func (p *Processor[AC, OC, JC]) execFunc(ctx context.Context, state State[AC, OC, JC], overallContext OC, wg *sync.WaitGroup) { + // Make workers for each, they just process and fire back to the central channel + for i := 0; i < state.Concurrency; i++ { + p.wg.Add(1) + stateExec := StateExec[AC, OC, JC]{ + ctx: ctx, + ac: p.appContext, + oc: overallContext, + state: state, + jobChan: p.stateThing.getJobChannelForState(state.TriggerState), + returnChan: p.returnChan, + i: i, + wg: wg, + } -func (p *Processor[AC, OC, JC]) invalidStateError(s string) error { - return fmt.Errorf("State [%s] has no executor, valid state names: %s", s, strings.Join(p.stateNames, ", ")) + pprof.Do(ctx, pprof.Labels("type", "worker", "state", state.TriggerState, "id", fmt.Sprintf("%d", i)), func(ctx context.Context) { + go stateExec.Run() + }) + } } diff --git a/processor_test.go b/processor_test.go index 2ebcb11..6fb0ea2 100644 --- a/processor_test.go +++ b/processor_test.go @@ -41,6 +41,111 @@ const ( STATE_DONE_TWO = "done_two" ) +func createJob(state string) Job[MyJobContext] { + return Job[MyJobContext]{ + Id: "", + C: MyJobContext{}, + State: state, + } +} + +func TestStateStorage(t *testing.T) { + concurrency := 5 + stateS := newStateStorageFromStates([]State[MyAppContext, MyOverallContext, MyJobContext]{ + { + TriggerState: TRIGGER_STATE_NEW, + Exec: func(ctx context.Context, ac MyAppContext, oc MyOverallContext, jc MyJobContext) (MyJobContext, string, []KickRequest[MyJobContext], error) { + return jc, STATE_DONE, nil, nil + }, + Terminal: false, + Concurrency: concurrency, + }, + { + TriggerState: STATE_DONE, + Terminal: true, + }, + }) + + // Fake processor that just takes jobs and throws them away, as the StateStorage doesn't actually care about + // Any of the actual processing + go func() { + for true { + select { + case <-stateS.stateChan[TRIGGER_STATE_NEW]: + continue + } + } + }() + + for i := 0; i < concurrency*2; i++ { + stateS.processJob(createJob(TRIGGER_STATE_NEW)) + } + assert.Equal(t, []StatusCount{ + { + State: STATE_DONE, + Terminal: true, + }, + { + State: TRIGGER_STATE_NEW, + Executing: concurrency, + Waiting: concurrency, + }, + }, stateS.getStatusCounts()) + for i := 0; i < 2; i++ { + stateS.runNextWaitingJob(TRIGGER_STATE_NEW) + stateS.processJob(createJob(STATE_DONE)) + } + + assert.Equal(t, []StatusCount{ + { + State: STATE_DONE, + Terminal: true, + Completed: 2, + }, + { + State: TRIGGER_STATE_NEW, + Executing: concurrency, + Waiting: concurrency - 2, + }, + }, stateS.getStatusCounts()) + + for i := 0; i < concurrency-2; i++ { + stateS.runNextWaitingJob(TRIGGER_STATE_NEW) + stateS.processJob(createJob(STATE_DONE)) + } + + assert.Equal(t, []StatusCount{ + { + State: STATE_DONE, + Terminal: true, + Completed: concurrency, + }, + { + State: TRIGGER_STATE_NEW, + Executing: concurrency, + Waiting: 0, + }, + }, stateS.getStatusCounts()) + + for i := 0; i < concurrency; i++ { + stateS.runNextWaitingJob(TRIGGER_STATE_NEW) + stateS.processJob(createJob(STATE_DONE)) + } + + assert.Equal(t, []StatusCount{ + { + State: STATE_DONE, + Terminal: true, + Completed: concurrency * 2, + }, + { + State: TRIGGER_STATE_NEW, + Executing: 0, + Waiting: 0, + }, + }, stateS.getStatusCounts()) +} + func TestProcessorOneJob(t *testing.T) { t.Parallel() oc := MyOverallContext{} @@ -69,10 +174,11 @@ func TestProcessorOneJob(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) assert.Less(t, delta, time.Second*2, "Should take less than 2 seconds when run in parallel") @@ -99,10 +205,11 @@ func TestProcessorAllTerminal(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) assert.Less(t, delta, time.Second*2, "Should take less than 2 seconds when run in parallel") @@ -144,10 +251,11 @@ func TestProcessorTwoSequentialJobs(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) assert.Less(t, delta, time.Second*2, "Should take less than 2 seconds when run in parallel") @@ -184,7 +292,7 @@ func TestProcessor_TwoTerminal(t *testing.T) { oc := MyOverallContext{} ac := MyAppContext{} r := NewRun[MyOverallContext, MyJobContext]("job", oc) - for i := 0; i < 30_000; i++ { + for i := 0; i < 40; i++ { r.AddJob(MyJobContext{ Count: 0, }) @@ -202,7 +310,7 @@ func TestProcessor_TwoTerminal(t *testing.T) { return jc, STATE_DONE_TWO, nil, nil }, Terminal: false, - Concurrency: 1000, + Concurrency: 10, }, State[MyAppContext, MyOverallContext, MyJobContext]{ TriggerState: STATE_DONE_TWO, @@ -216,13 +324,14 @@ func TestProcessor_TwoTerminal(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) - assert.Less(t, delta, time.Second*16, "Should take less than 9 seconds when run in parallel") + assert.Less(t, delta, time.Second*10, "Should take less than 10 seconds when run in parallel") stateCount := map[string]int{} for _, j := range r.Jobs { @@ -268,7 +377,7 @@ func TestProcessor_StateCallback(t *testing.T) { oc := MyOverallContext{} ac := MyAppContext{} r := NewRun[MyOverallContext, MyJobContext]("job", oc) - for i := 0; i < 1; i++ { + for i := 0; i < 11; i++ { r.AddJob(MyJobContext{ Count: 0, }) @@ -277,52 +386,38 @@ func TestProcessor_StateCallback(t *testing.T) { tl := &testStatusListener{ t: t, } - tl.ExpectStatus([]StatusCount{ - { - State: TRIGGER_STATE_NEW, - Count: 1, - }, - { - State: STATE_DONE, - Count: 0, - Terminal: true, - }, - }) tl.ExpectStatus([]StatusCount{ { State: TRIGGER_STATE_NEW, - Count: 1, - Executing: 1, + Waiting: 1, + Executing: 10, + Completed: 0, }, { - State: STATE_DONE, - Count: 0, - Terminal: true, - }, - }) - tl.ExpectStatus([]StatusCount{ - { - State: TRIGGER_STATE_NEW, - Count: 1, - Executing: 1, - }, - { - State: STATE_DONE, - Count: 0, - Terminal: true, - }, - }) - tl.ExpectStatus([]StatusCount{ - { - State: TRIGGER_STATE_NEW, - Count: 0, - }, - { - State: STATE_DONE, - Count: 1, - Terminal: true, + State: STATE_DONE, + Waiting: 0, + Executing: 0, + Completed: 0, + Terminal: true, }, }) + for i := 0; i <= 10; i++ { + tl.ExpectStatus([]StatusCount{ + { + State: TRIGGER_STATE_NEW, + Waiting: 0, + Executing: 10 - i, + Completed: 0, + }, + { + State: STATE_DONE, + Waiting: 0, + Executing: 0, + Completed: 1 + i, + Terminal: true, + }, + }) + } states := []State[MyAppContext, MyOverallContext, MyJobContext]{ State[MyAppContext, MyOverallContext, MyJobContext]{ @@ -343,10 +438,11 @@ func TestProcessor_StateCallback(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, tl) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, tl) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) assert.Less(t, delta, time.Second*2, "Should take less than 2 seconds when run in parallel") @@ -386,10 +482,11 @@ func TestProcessor_Retries(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) assert.Less(t, delta, time.Second*2, "Should take less than 2 seconds when run in parallel") @@ -451,10 +548,11 @@ func TestProcessor_RateLimiter(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) assert.Less(t, delta, time.Second*4) @@ -494,10 +592,11 @@ func TestProcessor_RateLimiterSlows(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) jobCount := len(r.Jobs) @@ -548,10 +647,11 @@ func TestProcessor_LoopWithExit(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) assert.Less(t, delta, time.Second*2, "Should take less than 2 seconds when run in parallel") @@ -602,7 +702,8 @@ func TestProcessor_Serialization(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, serialzer, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, serialzer, nil) + assert.NoError(t, err) start := time.Now() err = p.Exec(context.Background(), r) @@ -687,10 +788,11 @@ func TestProcessor_FirstStepExpands(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) assert.Less(t, delta, time.Second*2, "Should take less than 2 seconds when run in parallel") diff --git a/run.go b/run.go index 2af13e8..4227e35 100644 --- a/run.go +++ b/run.go @@ -3,21 +3,19 @@ package jorb import ( "fmt" "log/slog" - "sort" + "reflect" "sync" + "time" ) // Run is basically the overall state of a given run (batch) in the processing framework // it's meant to be re-entrant, eg if you kill the processor and you have a serializaer, you can // restart using it at any time type Run[OC any, JC any] struct { - Name string // Name of the run - Jobs map[string]Job[JC] // Map of jobs, where keys are job ids and values are Job states - Overall OC // Overall overall state that is usful to all jobs, basically context for the overall batch - m sync.Mutex // Mutex used for indexing operations - lockedJobsById map[string]string // If a job is locked and what state it was locked into - lockedJobsStateCount map[string]int // How many jobs are locked for each state (for status) - stateCount map[string][]Job[JC] // Job Queues for Speed + Name string // Name of the run + Jobs map[string]Job[JC] // Map of jobs, where keys are job ids and values are Job states + Overall OC // Overall overall state that is usful to all jobs, basically context for the overall batch + m sync.Mutex // Mutex used for indexing operations } // NewRun creates a new Run instance with the given name and overall context @@ -38,37 +36,22 @@ func NewRun[OC any, JC any](name string, oc OC) *Run[OC, JC] { func (r *Run[OC, JC]) Init() { r.m.Lock() defer r.m.Unlock() - r.lockedJobsById = map[string]string{} - r.lockedJobsStateCount = map[string]int{} - r.stateCount = map[string][]Job[JC]{} - // initialize the state counts for _, j := range r.Jobs { - s := j.State - if _, ok := r.stateCount[s]; !ok { - r.stateCount[s] = []Job[JC]{} - } // if it doesn't have a last event, give it one if j.LastUpdate == nil { j.UpdateLastEvent() } - r.stateCount[s] = append(r.stateCount[s], j) - } - // now that they're all added we need to sort them - for state, list := range r.stateCount { - sort.Slice(list, func(i, j int) bool { - if list[i].LastUpdate == nil { - return false - } - if list[j].LastUpdate == nil { - return true - } - return list[i].LastUpdate.Before(*list[j].LastUpdate) - }) - r.stateCount[state] = list } } +func (r *Run[OC, JC]) UpdateJob(j Job[JC]) { + r.m.Lock() + defer r.m.Unlock() + + r.Jobs[j.Id] = j.UpdateLastEvent() +} + func (r *Run[OC, JC]) AddJobWithState(jc JC, state string) { r.m.Lock() defer r.m.Unlock() @@ -81,12 +64,9 @@ func (r *Run[OC, JC]) AddJobWithState(jc JC, state string) { State: state, StateErrors: map[string][]string{}, } - j.UpdateLastEvent() - // Pop it onto the end of the appropriate queue - r.stateCount[j.State] = append(r.stateCount[j.State], j) slog.Info("AddJob", "run", r.Name, "job", j, "totalJobs", len(r.Jobs)) - r.Jobs[id] = j + r.Jobs[id] = j.UpdateLastEvent() } // Add a job to the pool, this shouldn't be called once it's running @@ -94,67 +74,56 @@ func (r *Run[OC, JC]) AddJob(jc JC) { r.AddJobWithState(jc, TRIGGER_STATE_NEW) } -func (r *Run[OC, JC]) NextJobForState(state string) (Job[JC], bool) { - r.m.Lock() - defer r.m.Unlock() - - if len(r.stateCount[state]) == 0 { - return Job[JC]{}, false +func (r *Run[OC, JC]) Equal(r2 *Run[OC, JC]) bool { + if r.Name != r2.Name { + return false } - // pop the item off the front of the queue - minJob := r.stateCount[state][0] - r.stateCount[state] = r.stateCount[state][1:] - - // lock it - r.lockedJobsById[minJob.Id] = minJob.State - r.lockedJobsStateCount[minJob.State]++ - - // update it's last event - r.Jobs[minJob.Id] = minJob.UpdateLastEvent() - - // return it - return r.Jobs[minJob.Id], true -} - -func (r *Run[OC, JC]) Return(j Job[JC]) { - r.m.Lock() - defer r.m.Unlock() + if len(r.Jobs) != len(r2.Jobs) { + return false + } - r.Jobs[j.Id] = j.UpdateLastEvent() + for rKey, rValue := range r.Jobs { + r2Value, ok := r2.Jobs[rKey] + if !ok { + return false + } - // decremnt the previous state - prevState := r.lockedJobsById[j.Id] - r.lockedJobsStateCount[prevState]-- + if rValue.Id != r2Value.Id { + return false + } - // unlock it - delete(r.lockedJobsById, j.Id) + if !reflect.DeepEqual(rValue.C, r2Value.C) { + return false + } - // push it to the back of the new state - r.stateCount[j.State] = append(r.stateCount[j.State], j) -} + if rValue.State != r2Value.State { + return false + } -func (r *Run[OC, JC]) JobsInFlight() bool { - r.m.Lock() - defer r.m.Unlock() - // if any of the jobs are in flight, return true + if !reflect.DeepEqual(rValue.StateErrors, r2Value.StateErrors) { + return false + } - if len(r.lockedJobsById) > 0 { - return true - } - return false -} + if rValue.LastUpdate == nil && r2Value.LastUpdate == nil { + continue + } -func (r *Run[OC, JC]) StatusCounts() map[string]StatusCount { - ret := map[string]StatusCount{} + if (rValue.LastUpdate == nil && r2Value.LastUpdate != nil) || (rValue.LastUpdate != nil && r2Value.LastUpdate == nil) { + return false + } - for k, v := range r.stateCount { - ret[k] = StatusCount{ - State: k, - Count: len(v) + r.lockedJobsStateCount[k], - Executing: r.lockedJobsStateCount[k], + timeDiff := rValue.LastUpdate.Sub(*r2Value.LastUpdate) + if timeDiff > 0 { + if timeDiff > time.Millisecond { + return false + } + } else { + if timeDiff < -time.Millisecond { + return false + } } } - return ret + return true } diff --git a/run_test.go b/run_test.go index f1df6b7..3e3364d 100644 --- a/run_test.go +++ b/run_test.go @@ -2,168 +2,33 @@ package jorb import ( "testing" + "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) -func TestStatusCounts(t *testing.T) { - t.Parallel() - r := NewRun[MyOverallContext, MyJobContext]("job", MyOverallContext{}) - r.AddJob(MyJobContext{Count: 0}) - r.AddJob(MyJobContext{Count: 0}) - r.AddJob(MyJobContext{Count: 0}) - - assert.Equal(t, map[string]StatusCount{ - TRIGGER_STATE_NEW: { - State: TRIGGER_STATE_NEW, - Count: 3, - Executing: 0, - }, - }, r.StatusCounts()) - - j, ok := r.NextJobForState(TRIGGER_STATE_NEW) - require.True(t, ok) - - assert.Equal(t, map[string]StatusCount{ - TRIGGER_STATE_NEW: { - State: TRIGGER_STATE_NEW, - Count: 3, - Executing: 1, - }, - }, r.StatusCounts()) - - j.State = STATE_MIDDLE - r.Return(j) - - assert.Equal(t, map[string]StatusCount{ - TRIGGER_STATE_NEW: { - State: TRIGGER_STATE_NEW, - Count: 2, - Executing: 0, - }, - STATE_MIDDLE: { - State: STATE_MIDDLE, - Count: 1, - Executing: 0, - Terminal: false, - }, - }, r.StatusCounts()) -} - -func TestRun_NextForStatus_NoJobs(t *testing.T) { - t.Parallel() - r := NewRun[MyOverallContext, MyJobContext]("job", MyOverallContext{}) - - _, ok := r.NextJobForState(TRIGGER_STATE_NEW) - assert.False(t, ok) -} - -func TestRun_GetsNextRunOnSecondCall(t *testing.T) { - t.Parallel() - r := NewRun[MyOverallContext, MyJobContext]("job", MyOverallContext{}) - r.AddJob(MyJobContext{Count: 0}) - r.AddJob(MyJobContext{Count: 0}) - - // Given they're all even, I expect one job to come out - j, ok := r.NextJobForState(TRIGGER_STATE_NEW) - assert.True(t, ok) - assert.NotEmpty(t, j.Id) - - // Now I should get the same job back on the second call - j2, ok := r.NextJobForState(TRIGGER_STATE_NEW) - assert.True(t, ok) - assert.NotEqual(t, j.Id, j2.Id) -} - -func TestRun_GetNextRun_Returning(t *testing.T) { - t.Parallel() - r := NewRun[MyOverallContext, MyJobContext]("job", MyOverallContext{}) - r.AddJob(MyJobContext{Count: 0}) - - // Given they're all even, I expect one job to come out - j1, ok := r.NextJobForState(TRIGGER_STATE_NEW) - assert.True(t, ok) - - // Now I should get the same job back on the second call - _, ok = r.NextJobForState(TRIGGER_STATE_NEW) - assert.False(t, ok) - - // If I return the first job I should then get it again, but the last event should be updated - r.Return(j1) - returnDate := r.Jobs[j1.Id].LastUpdate - assert.NotEqual(t, returnDate, j1.LastUpdate) - - // Verify that we get the same id back but the returnDate has been updated again - j2, ok := r.NextJobForState(TRIGGER_STATE_NEW) - assert.True(t, ok) - assert.Equal(t, j1.Id, j2.Id) - - // The date should be updated on the next fetch as well - assert.NotEqual(t, j1.LastUpdate, j2.LastUpdate) - assert.NotEqual(t, returnDate, j2.LastUpdate) -} - -func TestRun_GetNextRun_GetsInOrder(t *testing.T) { - t.Parallel() - r := NewRun[MyOverallContext, MyJobContext]("job", MyOverallContext{}) - // Add 2 jobs - r.AddJob(MyJobContext{Count: 0}) - r.AddJob(MyJobContext{Count: 0}) - - // Given they're all even, I expect one job to come out - j1, ok := r.NextJobForState(TRIGGER_STATE_NEW) - require.True(t, ok) - j2, ok := r.NextJobForState(TRIGGER_STATE_NEW) - require.True(t, ok) - - // Now I return them in reverse order - r.Return(j2) - r.Return(j1) - - // When I get them again, I should get them in order j2, then j1 because I should be getting the one that has - // be updated the earliest - - // If I return the first job I should then get it again, but the last event should be updated - j3, ok := r.NextJobForState(TRIGGER_STATE_NEW) - require.True(t, ok) - j4, ok := r.NextJobForState(TRIGGER_STATE_NEW) - require.True(t, ok) - - // Now validate the ids - // The date should be updated on the next fetch as well - assert.Equal(t, j2.Id, j3.Id) - assert.Equal(t, j1.Id, j4.Id) -} - -func TestRun_JobsInFlight2(t *testing.T) { - t.Parallel() - r := NewRun[MyOverallContext, MyJobContext]("job", MyOverallContext{}) - r.AddJob(MyJobContext{Count: 0}) - - // no jobs checked out, there should be no jobs in flight - require.False(t, r.JobsInFlight()) - - // check out a job - j, ok := r.NextJobForState(TRIGGER_STATE_NEW) - require.True(t, ok) - require.True(t, r.JobsInFlight()) - - r.Return(j) - require.False(t, r.JobsInFlight()) -} - func Test_AddJobWithState(t *testing.T) { t.Parallel() r := NewRun[MyOverallContext, MyJobContext]("job", MyOverallContext{}) r.AddJobWithState(MyJobContext{Count: 0}, "other_state") - assert.Equal(t, map[string]StatusCount{ - "other_state": { - State: "other_state", - Count: 1, - Executing: 0, - }, - }, r.StatusCounts()) assert.Equal(t, 1, len(r.Jobs)) assert.Equal(t, "other_state", r.Jobs["0"].State) + originalTime := r.Jobs["0"].LastUpdate + time.Sleep(1 * time.Second) + + r.UpdateJob(Job[MyJobContext]{ + Id: "0", + C: MyJobContext{ + Count: 1, + }, + State: "other_state_2", + }) + + time.Sleep(1 * time.Second) + // Number of jobs has not changed + assert.Equal(t, 1, len(r.Jobs)) + // Job's state has been updated + assert.Equal(t, "other_state_2", r.Jobs["0"].State) + // Job's time has been updated + assert.NotEqual(t, originalTime, r.Jobs["0"].LastUpdate) } diff --git a/serializer_test.go b/serializer_test.go index 595f7c9..51f834e 100644 --- a/serializer_test.go +++ b/serializer_test.go @@ -12,12 +12,6 @@ import ( func TestJsonSerializer_SaveLoad(t *testing.T) { t.Parallel() - // Create a temporary directory for testing - tempDir, err := os.MkdirTemp("", "test") - if err != nil { - t.Fatalf("Failed to create temporary directory: %v", err) - } - defer os.RemoveAll(tempDir) // Create a test run run := NewRun[MyOverallContext, MyJobContext]("test", MyOverallContext{Name: "overall"}) @@ -28,11 +22,11 @@ func TestJsonSerializer_SaveLoad(t *testing.T) { } // Create a JsonSerializer with a temporary file - tempFile := filepath.Join(tempDir, "test.json") + tempFile := filepath.Join(t.TempDir(), "test.json") serializer := &JsonSerializer[MyOverallContext, MyJobContext]{File: tempFile} // Serialize the run - err = serializer.Serialize(*run) + err := serializer.Serialize(*run) require.NoError(t, err) require.FileExists(t, tempFile) @@ -41,9 +35,7 @@ func TestJsonSerializer_SaveLoad(t *testing.T) { require.NoError(t, err) // Check that the run is the same - assert.Equal(t, run.Overall, actualRun.Overall) - assert.Equal(t, run.Name, actualRun.Name) - assert.Equal(t, run.Jobs, actualRun.Jobs) + assert.True(t, run.Equal(actualRun)) } func Test_SerializeWithError(t *testing.T) { @@ -56,7 +48,7 @@ func Test_SerializeWithError(t *testing.T) { defer os.RemoveAll(tempDir) r := NewRun[MyOverallContext, MyJobContext]("test", MyOverallContext{Name: "overall"}) - r.Return(Job[MyJobContext]{ + r.UpdateJob(Job[MyJobContext]{ C: MyJobContext{Count: 0, Name: "job-0"}, StateErrors: map[string][]string{ "key": []string{ @@ -84,7 +76,5 @@ func Test_SerializeWithError(t *testing.T) { j.LastUpdate = nil actualRun.Jobs[k] = j } - assert.Equal(t, r.Overall, actualRun.Overall) - assert.Equal(t, r.Name, actualRun.Name) - assert.Equal(t, r.Jobs, actualRun.Jobs) + assert.True(t, r.Equal(actualRun)) }