diff --git a/internal/examples/simplejob/main.go b/internal/examples/simplejob/main.go index c241383..626f7dd 100644 --- a/internal/examples/simplejob/main.go +++ b/internal/examples/simplejob/main.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "io" "log" "log/slog" "math/rand" @@ -79,8 +78,5 @@ func (f *fileListener) StatusUpdate(status []jorb.StatusCount) { encoder.SetIndent("", " ") _ = encoder.Encode(status) - file, _ := os.Create(f.fileName) - defer file.Close() - - _, _ = io.Copy(file, buf) + _ = os.WriteFile(f.fileName, buf.Bytes(), 0644) } diff --git a/processor.go b/processor.go index 9b8e51b..db1c3ce 100644 --- a/processor.go +++ b/processor.go @@ -55,41 +55,55 @@ type StatusCount struct { Terminal bool } -// Processor executes a job -type Processor[AC any, OC any, JC any] struct { - appContext AC - states []State[AC, OC, JC] - serializer Serializer[OC, JC] - statusListener StatusListener - sortedStateNames []string +type state struct { +} + +type stateThing[AC any, OC any, JC any] struct { + // This is fine for package-internal use cases to directly iterate over + states []State[AC, OC, JC] + + // These shouldn't be used outside stateThing's methods stateMap map[string]State[AC, OC, JC] stateStatusMap map[string]*StatusCount stateWaitingJobsMap map[string][]Job[JC] stateChan map[string]chan Job[JC] - returnChan chan Return[JC] - wg sync.WaitGroup + sortedStateNames []string } -// Return is a struct that contains a job and a list of kick requests -// that is used for returning job updates to the system -type Return[JC any] struct { - PriorState string - Job Job[JC] - KickRequests []KickRequest[JC] - Error error -} +func newStateThingFromStates[AC any, OC any, JC any](states []State[AC, OC, JC]) stateThing[AC, OC, JC] { + st := stateThing[AC, OC, JC]{ + states: states, + stateMap: map[string]State[AC, OC, JC]{}, + stateStatusMap: map[string]*StatusCount{}, + stateWaitingJobsMap: map[string][]Job[JC]{}, + stateChan: map[string]chan Job[JC]{}, + sortedStateNames: []string{}, + } -func NewProcessor[AC any, OC any, JC any](ac AC, states []State[AC, OC, JC], serializer Serializer[OC, JC], statusListener StatusListener) *Processor[AC, OC, JC] { - return &Processor[AC, OC, JC]{ - appContext: ac, - states: states, - serializer: serializer, - statusListener: statusListener, + for _, s := range states { + stateName := s.TriggerState + + st.sortedStateNames = append(st.sortedStateNames, stateName) + st.stateMap[stateName] = s + st.stateStatusMap[stateName] = &StatusCount{ + State: stateName, + Terminal: s.Terminal, + } + // This is by-design unbuffered + st.stateChan[stateName] = make(chan Job[JC]) } + + sort.Strings(st.sortedStateNames) + + return st } -func (p *Processor[AC, OC, JC]) validate(states []State[AC, OC, JC]) error { - for _, state := range states { +func (s stateThing[AC, OC, JC]) getJobChannelForState(stateName string) chan Job[JC] { + return s.stateChan[stateName] +} + +func (s stateThing[AC, OC, JC]) validate() error { + for _, state := range s.states { if state.Terminal { if state.Concurrency < 0 { return fmt.Errorf("terminal state %s has negative concurrency", state.TriggerState) @@ -107,6 +121,114 @@ func (p *Processor[AC, OC, JC]) validate(states []State[AC, OC, JC]) error { return nil } +func (s stateThing[AC, OC, JC]) runJob(job Job[JC]) { + s.stateStatusMap[job.State].Executing += 1 + s.stateChan[job.State] <- job +} + +func (s stateThing[AC, OC, JC]) queueJob(job Job[JC]) { + s.stateStatusMap[job.State].Waiting += 1 + s.stateWaitingJobsMap[job.State] = append(s.stateWaitingJobsMap[job.State], job) +} + +func (s stateThing[AC, OC, JC]) completeJob(job Job[JC]) { + s.stateStatusMap[job.State].Completed += 1 +} + +func (s stateThing[AC, OC, JC]) processJob(job Job[JC]) { + if s.isTerminal(job) { + s.completeJob(job) + } else { + if s.canRunJobForState(job.State) { + s.runJob(job) + } else { + s.queueJob(job) + } + } +} + +func (s stateThing[AC, OC, JC]) isTerminal(job Job[JC]) bool { + return s.stateMap[job.State].Terminal +} + +func (s stateThing[AC, OC, JC]) allJobsAreTerminal(r *Run[OC, JC]) bool { + for _, job := range r.Jobs { + if !s.isTerminal(job) { + return false + } + } + return true +} + +func (s stateThing[AC, OC, JC]) runNextWaitingJob(state string) { + // One less job is executing for the prior state + s.stateStatusMap[state].Executing -= 1 + + // There are no waiting jobs for the state, so we have nothing to queue + waitingJobCount := len(s.stateWaitingJobsMap[state]) + if waitingJobCount == 0 { + return + } + + job := s.stateWaitingJobsMap[state][waitingJobCount-1] + s.stateWaitingJobsMap[state] = s.stateWaitingJobsMap[state][0 : waitingJobCount-1] + s.stateStatusMap[job.State].Waiting -= 1 + + s.runJob(job) +} + +func (s stateThing[AC, OC, JC]) canRunJobForState(state string) bool { + return s.stateStatusMap[state].Executing < s.stateMap[state].Concurrency +} + +func (s stateThing[AC, OC, JC]) hasExecutingJobs() bool { + for _, value := range s.stateStatusMap { + if value.Executing > 0 { + return true + } + } + + return false +} + +func (s stateThing[AC, OC, JC]) getStatusCounts() []StatusCount { + ret := make([]StatusCount, 0) + for _, name := range s.sortedStateNames { + ret = append(ret, *s.stateStatusMap[name]) + } + return ret +} + +// Serializer is an interface that defines how to serialize and deserialize job contexts. + +// Processor executes a job +type Processor[AC any, OC any, JC any] struct { + appContext AC + serializer Serializer[OC, JC] + stateThing stateThing[AC, OC, JC] + statusListener StatusListener + returnChan chan Return[JC] + wg sync.WaitGroup +} + +// Return is a struct that contains a job and a list of kick requests +// that is used for returning job updates to the system +type Return[JC any] struct { + PriorState string + Job Job[JC] + KickRequests []KickRequest[JC] + Error error +} + +func NewProcessor[AC any, OC any, JC any](ac AC, states []State[AC, OC, JC], serializer Serializer[OC, JC], statusListener StatusListener) *Processor[AC, OC, JC] { + return &Processor[AC, OC, JC]{ + appContext: ac, + stateThing: newStateThingFromStates(states), + serializer: serializer, + statusListener: statusListener, + } +} + func (p *Processor[AC, OC, JC]) init() { if p.serializer == nil { p.serializer = &NilSerializer[OC, JC]{} @@ -114,27 +236,6 @@ func (p *Processor[AC, OC, JC]) init() { if p.statusListener == nil { p.statusListener = &NilStatusListener{} } - // Make a map of triggers to states so we can easily reference it - p.stateMap = map[string]State[AC, OC, JC]{} - p.stateStatusMap = map[string]*StatusCount{} - p.stateWaitingJobsMap = map[string][]Job[JC]{} - p.stateChan = map[string]chan Job[JC]{} - p.sortedStateNames = []string{} - - for _, s := range p.states { - stateName := s.TriggerState - - p.sortedStateNames = append(p.sortedStateNames, stateName) - p.stateMap[stateName] = s - p.stateStatusMap[stateName] = &StatusCount{ - State: stateName, - Terminal: s.Terminal, - } - // This is by-design unbuffered - p.stateChan[stateName] = make(chan Job[JC]) - } - - sort.Strings(p.sortedStateNames) // This is by-design unbuffered p.returnChan = make(chan Return[JC]) @@ -142,31 +243,25 @@ func (p *Processor[AC, OC, JC]) init() { // Exec this big work function, this does all the crunching func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error { - if err := p.validate(p.states); err != nil { + if err := p.stateThing.validate(); err != nil { return err } p.init() - if p.allJobsAreTerminal(r) { + if p.stateThing.allJobsAreTerminal(r) { slog.Info("AllJobsTerminal") return nil } // create the workers - for _, s := range p.states { + for _, s := range p.stateThing.states { // Terminal states don't need to recieve jobs, they're just done if s.Terminal { continue } - // Make workers for each, they just process and fire back to the central channel - for i := 0; i < s.Concurrency; i++ { - p.wg.Add(1) - pprof.Do(ctx, pprof.Labels("type", "worker", "state", s.TriggerState, "id", fmt.Sprintf("%d", i)), func(ctx context.Context) { - go p.execFunc(ctx, s, r.Overall, i, &p.wg)() - }) - } + p.execFunc(ctx, s, r.Overall, &p.wg) } pprof.Do(ctx, pprof.Labels("type", "main"), func(ctx context.Context) { @@ -178,58 +273,15 @@ func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error return nil } -func (p *Processor[AC, OC, JC]) runJob(job Job[JC]) { - p.stateStatusMap[job.State].Executing += 1 - p.stateChan[job.State] <- job -} - -func (p *Processor[AC, OC, JC]) queueJob(job Job[JC]) { - p.stateStatusMap[job.State].Waiting += 1 - p.stateWaitingJobsMap[job.State] = append(p.stateWaitingJobsMap[job.State], job) -} - -func (p *Processor[AC, OC, JC]) completeJob(job Job[JC]) { - p.stateStatusMap[job.State].Completed += 1 -} - -func (p *Processor[AC, OC, JC]) processJob(job Job[JC]) { - if p.isTerminal(job) { - p.completeJob(job) - } else { - if p.canRunJobForState(job.State) { - p.runJob(job) - } else { - p.queueJob(job) - } - } -} - -func (p *Processor[AC, OC, JC]) runNextWaitingJob(state string) { - // One less job is executing for the prior state - p.stateStatusMap[state].Executing -= 1 - - // There are no waiting jobs for the state, so we have nothing to queue - waitingJobCount := len(p.stateWaitingJobsMap[state]) - if waitingJobCount == 0 { - return - } - - job := p.stateWaitingJobsMap[state][waitingJobCount-1] - p.stateWaitingJobsMap[state] = p.stateWaitingJobsMap[state][0 : waitingJobCount-1] - p.stateStatusMap[job.State].Waiting -= 1 - - p.runJob(job) -} - func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg *sync.WaitGroup) { defer func() { p.shutdown() - wg.Add(-1) + wg.Done() }() // Enqueue the jobs to start for _, job := range r.Jobs { - p.processJob(job) + p.stateThing.processJob(job) } for { @@ -238,11 +290,11 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg return case completedJob := <-p.returnChan: // If the prior state of the completed job was at capacity, we now have space for one more - p.runNextWaitingJob(completedJob.PriorState) + p.stateThing.runNextWaitingJob(completedJob.PriorState) // Update the run with the new state r.UpdateJob(completedJob.Job) - p.processJob(completedJob.Job) + p.stateThing.processJob(completedJob.Job) // Start any of the new jobs that need kicking for idx, kickRequest := range completedJob.KickRequests { @@ -253,7 +305,7 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg StateErrors: map[string][]string{}, } r.UpdateJob(job) - p.processJob(job) + p.stateThing.processJob(job) } if err := p.serializer.Serialize(*r); err != nil { @@ -262,53 +314,21 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg p.updateStatus() - if p.allJobsAreTerminal(r) && !p.hasExecutingJobs() { + if p.stateThing.allJobsAreTerminal(r) && !p.stateThing.hasExecutingJobs() { return } } } } -func (p *Processor[AC, OC, JC]) canRunJobForState(state string) bool { - return p.stateStatusMap[state].Executing < p.stateMap[state].Concurrency -} - -func (p *Processor[AC, OC, JC]) hasExecutingJobs() bool { - for _, value := range p.stateStatusMap { - if value.Executing > 0 { - return true - } - } - - return false -} - func (p *Processor[AC, OC, JC]) updateStatus() { - ret := make([]StatusCount, 0) - for _, name := range p.sortedStateNames { - ret = append(ret, *p.stateStatusMap[name]) - } - - p.statusListener.StatusUpdate(ret) -} - -func (p *Processor[AC, OC, JC]) isTerminal(job Job[JC]) bool { - return p.stateMap[job.State].Terminal -} - -func (p *Processor[AC, OC, JC]) allJobsAreTerminal(r *Run[OC, JC]) bool { - for _, job := range r.Jobs { - if !p.isTerminal(job) { - return false - } - } - return true + p.statusListener.StatusUpdate(p.stateThing.getStatusCounts()) } func (p *Processor[AC, OC, JC]) shutdown() { // close all of the channels - for _, c := range p.stateChan { - close(c) + for _, state := range p.stateThing.states { + close(p.stateThing.getJobChannelForState(state.TriggerState)) } // close ourselves down close(p.returnChan) @@ -328,7 +348,7 @@ type StateExec[AC any, OC any, JC any] struct { func (s *StateExec[AC, OC, JC]) Run() { slog.Info("Starting worker", "worker", s.i, "state", s.state.TriggerState) defer func() { - s.wg.Add(-1) + s.wg.Done() slog.Info("Stopped worker", "worker", s.i, "state", s.state.TriggerState) }() @@ -362,16 +382,23 @@ func (s *StateExec[AC, OC, JC]) Run() { } } -func (p *Processor[AC, OC, JC]) execFunc(ctx context.Context, state State[AC, OC, JC], overallContext OC, i int, wg *sync.WaitGroup) func() { - e := &StateExec[AC, OC, JC]{ - ctx: ctx, - ac: p.appContext, - oc: overallContext, - state: state, - jobChan: p.stateChan[state.TriggerState], - returnChan: p.returnChan, - i: i, - wg: wg, +func (p *Processor[AC, OC, JC]) execFunc(ctx context.Context, state State[AC, OC, JC], overallContext OC, wg *sync.WaitGroup) { + // Make workers for each, they just process and fire back to the central channel + for i := 0; i < state.Concurrency; i++ { + p.wg.Add(1) + stateExec := StateExec[AC, OC, JC]{ + ctx: ctx, + ac: p.appContext, + oc: overallContext, + state: state, + jobChan: p.stateThing.getJobChannelForState(state.TriggerState), + returnChan: p.returnChan, + i: i, + wg: wg, + } + + pprof.Do(ctx, pprof.Labels("type", "worker", "state", state.TriggerState, "id", fmt.Sprintf("%d", i)), func(ctx context.Context) { + go stateExec.Run() + }) } - return e.Run }