Skip to content

Commit

Permalink
also fixup old naming
Browse files Browse the repository at this point in the history
  • Loading branch information
thesilentg committed Jul 24, 2024
1 parent 17434ea commit 9780c2e
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (s stateStorage[AC, OC, JC]) getStatusCounts() []StatusCount {
type Processor[AC any, OC any, JC any] struct {
appContext AC
serializer Serializer[OC, JC]
stateThing stateStorage[AC, OC, JC]
stateStorage stateStorage[AC, OC, JC]
statusListener StatusListener
returnChan chan Return[JC]
wg sync.WaitGroup
Expand All @@ -234,7 +234,7 @@ func NewProcessor[AC any, OC any, JC any](ac AC, states []State[AC, OC, JC], ser
statusListener: statusListener,
}

if err := p.stateThing.validate(); err != nil {
if err := p.stateStorage.validate(); err != nil {
return nil, err
}

Expand All @@ -257,18 +257,18 @@ func (p *Processor[AC, OC, JC]) init() {
func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error {
p.init()

if p.stateThing.allJobsAreTerminal(r) {
if p.stateStorage.allJobsAreTerminal(r) {
// Send one status update so that if there are listeners they can render the correct values
for _, job := range r.Jobs {
p.stateThing.completeJob(job)
p.stateStorage.completeJob(job)
}
p.statusListener.StatusUpdate(p.stateThing.getStatusCounts())
p.statusListener.StatusUpdate(p.stateStorage.getStatusCounts())
slog.Info("AllJobsTerminal")
return nil
}

// create the workers
for _, s := range p.stateThing.states {
for _, s := range p.stateStorage.states {
// Terminal states don't need to recieve jobs, they're just done
if s.Terminal {
continue
Expand All @@ -294,7 +294,7 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg

// Enqueue the jobs to start
for _, job := range r.Jobs {
p.stateThing.processJob(job)
p.stateStorage.processJob(job)
}

// Send the initial status update with the state of all the jobs
Expand All @@ -306,11 +306,11 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg
return
case completedJob := <-p.returnChan:
// If the prior state of the completed job was at capacity, we now have space for one more
p.stateThing.runNextWaitingJob(completedJob.PriorState)
p.stateStorage.runNextWaitingJob(completedJob.PriorState)

// Update the run with the new state
r.UpdateJob(completedJob.Job)
p.stateThing.processJob(completedJob.Job)
p.stateStorage.processJob(completedJob.Job)

// Start any of the new jobs that need kicking
for idx, kickRequest := range completedJob.KickRequests {
Expand All @@ -321,7 +321,7 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg
StateErrors: map[string][]string{},
}
r.UpdateJob(job)
p.stateThing.processJob(job)
p.stateStorage.processJob(job)
}

if err := p.serializer.Serialize(*r); err != nil {
Expand All @@ -330,21 +330,21 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg

p.updateStatus()

if p.stateThing.allJobsAreTerminal(r) && !p.stateThing.hasExecutingJobs() {
if p.stateStorage.allJobsAreTerminal(r) && !p.stateStorage.hasExecutingJobs() {
return
}
}
}
}

func (p *Processor[AC, OC, JC]) updateStatus() {
p.statusListener.StatusUpdate(p.stateThing.getStatusCounts())
p.statusListener.StatusUpdate(p.stateStorage.getStatusCounts())
}

func (p *Processor[AC, OC, JC]) shutdown() {
// close all of the channels
for _, state := range p.stateThing.states {
p.stateThing.closeJobChannelForState(state.TriggerState)
for _, state := range p.stateStorage.states {
p.stateStorage.closeJobChannelForState(state.TriggerState)
}
// close ourselves down
close(p.returnChan)
Expand Down Expand Up @@ -407,7 +407,7 @@ func (p *Processor[AC, OC, JC]) execFunc(ctx context.Context, state State[AC, OC
ac: p.appContext,
oc: overallContext,
state: state,
jobChan: p.stateThing.getJobChannelForState(state.TriggerState),
jobChan: p.stateStorage.getJobChannelForState(state.TriggerState),
returnChan: p.returnChan,
i: i,
wg: wg,
Expand Down

0 comments on commit 9780c2e

Please sign in to comment.