From 9780c2e81bacb27408f00ef0cbdd06d998a50bf4 Mon Sep 17 00:00:00 2001 From: Avery Gnolek Date: Tue, 23 Jul 2024 23:32:57 -0700 Subject: [PATCH] also fixup old naming --- processor.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/processor.go b/processor.go index 086c8e5..f49bb8d 100644 --- a/processor.go +++ b/processor.go @@ -211,7 +211,7 @@ func (s stateStorage[AC, OC, JC]) getStatusCounts() []StatusCount { type Processor[AC any, OC any, JC any] struct { appContext AC serializer Serializer[OC, JC] - stateThing stateStorage[AC, OC, JC] + stateStorage stateStorage[AC, OC, JC] statusListener StatusListener returnChan chan Return[JC] wg sync.WaitGroup @@ -234,7 +234,7 @@ func NewProcessor[AC any, OC any, JC any](ac AC, states []State[AC, OC, JC], ser statusListener: statusListener, } - if err := p.stateThing.validate(); err != nil { + if err := p.stateStorage.validate(); err != nil { return nil, err } @@ -257,18 +257,18 @@ func (p *Processor[AC, OC, JC]) init() { func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error { p.init() - if p.stateThing.allJobsAreTerminal(r) { + if p.stateStorage.allJobsAreTerminal(r) { // Send one status update so that if there are listeners they can render the correct values for _, job := range r.Jobs { - p.stateThing.completeJob(job) + p.stateStorage.completeJob(job) } - p.statusListener.StatusUpdate(p.stateThing.getStatusCounts()) + p.statusListener.StatusUpdate(p.stateStorage.getStatusCounts()) slog.Info("AllJobsTerminal") return nil } // create the workers - for _, s := range p.stateThing.states { + for _, s := range p.stateStorage.states { // Terminal states don't need to recieve jobs, they're just done if s.Terminal { continue @@ -294,7 +294,7 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg // Enqueue the jobs to start for _, job := range r.Jobs { - p.stateThing.processJob(job) + p.stateStorage.processJob(job) } // Send the initial status update with the state of all the jobs @@ -306,11 +306,11 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg return case completedJob := <-p.returnChan: // If the prior state of the completed job was at capacity, we now have space for one more - p.stateThing.runNextWaitingJob(completedJob.PriorState) + p.stateStorage.runNextWaitingJob(completedJob.PriorState) // Update the run with the new state r.UpdateJob(completedJob.Job) - p.stateThing.processJob(completedJob.Job) + p.stateStorage.processJob(completedJob.Job) // Start any of the new jobs that need kicking for idx, kickRequest := range completedJob.KickRequests { @@ -321,7 +321,7 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg StateErrors: map[string][]string{}, } r.UpdateJob(job) - p.stateThing.processJob(job) + p.stateStorage.processJob(job) } if err := p.serializer.Serialize(*r); err != nil { @@ -330,7 +330,7 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg p.updateStatus() - if p.stateThing.allJobsAreTerminal(r) && !p.stateThing.hasExecutingJobs() { + if p.stateStorage.allJobsAreTerminal(r) && !p.stateStorage.hasExecutingJobs() { return } } @@ -338,13 +338,13 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg } func (p *Processor[AC, OC, JC]) updateStatus() { - p.statusListener.StatusUpdate(p.stateThing.getStatusCounts()) + p.statusListener.StatusUpdate(p.stateStorage.getStatusCounts()) } func (p *Processor[AC, OC, JC]) shutdown() { // close all of the channels - for _, state := range p.stateThing.states { - p.stateThing.closeJobChannelForState(state.TriggerState) + for _, state := range p.stateStorage.states { + p.stateStorage.closeJobChannelForState(state.TriggerState) } // close ourselves down close(p.returnChan) @@ -407,7 +407,7 @@ func (p *Processor[AC, OC, JC]) execFunc(ctx context.Context, state State[AC, OC ac: p.appContext, oc: overallContext, state: state, - jobChan: p.stateThing.getJobChannelForState(state.TriggerState), + jobChan: p.stateStorage.getJobChannelForState(state.TriggerState), returnChan: p.returnChan, i: i, wg: wg,