From e9b462f3b58adf0cbd1865f13ac96d6921044ec5 Mon Sep 17 00:00:00 2001 From: Avery Gnolek Date: Fri, 19 Jul 2024 13:12:40 -0700 Subject: [PATCH 1/6] first pass --- internal/examples/simplejob/main.go | 86 ++++++ processor.go | 428 ++++++++++++++-------------- run.go | 111 +------- 3 files changed, 309 insertions(+), 316 deletions(-) create mode 100644 internal/examples/simplejob/main.go diff --git a/internal/examples/simplejob/main.go b/internal/examples/simplejob/main.go new file mode 100644 index 0000000..c241383 --- /dev/null +++ b/internal/examples/simplejob/main.go @@ -0,0 +1,86 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "io" + "log" + "log/slog" + "math/rand" + "os" + "time" + + "github.com/gaffo/jorb" +) + +type oc struct{} +type ac struct{} +type jc struct{} + +func main() { + o := oc{} + a := ac{} + r := jorb.NewRun[oc, jc]("example", o) + + slog.SetLogLoggerLevel(slog.LevelWarn) + for i := 0; i < 100; i++ { + r.AddJobWithState(jc{}, "A") + } + + states := []jorb.State[ac, oc, jc]{ + { + TriggerState: "A", + Exec: func(ctx context.Context, ac ac, oc oc, jc jc) (jc, string, []jorb.KickRequest[jc], error) { + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + return jc, "B", nil, nil + }, + Concurrency: 5, + }, + { + TriggerState: "B", + Exec: func(ctx context.Context, ac ac, oc oc, jc jc) (jc, string, []jorb.KickRequest[jc], error) { + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + return jc, "C", nil, nil + }, + Concurrency: 4, + }, + { + TriggerState: "C", + Exec: func(ctx context.Context, ac ac, oc oc, jc jc) (jc, string, []jorb.KickRequest[jc], error) { + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + return jc, "D", nil, nil + }, + Concurrency: 3, + }, + { + TriggerState: "D", + Terminal: true, + }, + } + + serial := jorb.NewJsonSerializer[oc, jc]("example.state") + listener := &fileListener{fileName: "example.status"} + p := jorb.NewProcessor[ac, oc, jc](a, states, serial, listener) + if err := p.Exec(context.Background(), r); err != nil { + log.Fatal(err) + } +} + +// Serializes the status updates to a file +type fileListener struct { + fileName string +} + +func (f *fileListener) StatusUpdate(status []jorb.StatusCount) { + buf := &bytes.Buffer{} + + encoder := json.NewEncoder(buf) + encoder.SetIndent("", " ") + _ = encoder.Encode(status) + + file, _ := os.Create(f.fileName) + defer file.Close() + + _, _ = io.Copy(file, buf) +} diff --git a/processor.go b/processor.go index 2be68fd..9b8e51b 100644 --- a/processor.go +++ b/processor.go @@ -6,7 +6,7 @@ import ( "log" "log/slog" "runtime/pprof" - "strings" + "sort" "sync" "golang.org/x/time/rate" @@ -49,22 +49,34 @@ type KickRequest[JC any] struct { type StatusCount struct { State string - Count int + Completed int Executing int + Waiting int Terminal bool } // Processor executes a job type Processor[AC any, OC any, JC any] struct { - appContext AC - states []State[AC, OC, JC] - serializer Serializer[OC, JC] - statusListener StatusListener - initted bool - stateMap map[string]State[AC, OC, JC] - stateNames []string - stateChan map[string]chan Job[JC] - returnChan chan Return[JC] + appContext AC + states []State[AC, OC, JC] + serializer Serializer[OC, JC] + statusListener StatusListener + sortedStateNames []string + stateMap map[string]State[AC, OC, JC] + stateStatusMap map[string]*StatusCount + stateWaitingJobsMap map[string][]Job[JC] + stateChan map[string]chan Job[JC] + returnChan chan Return[JC] + wg sync.WaitGroup +} + +// Return is a struct that contains a job and a list of kick requests +// that is used for returning job updates to the system +type Return[JC any] struct { + PriorState string + Job Job[JC] + KickRequests []KickRequest[JC] + Error error } func NewProcessor[AC any, OC any, JC any](ac AC, states []State[AC, OC, JC], serializer Serializer[OC, JC], statusListener StatusListener) *Processor[AC, OC, JC] { @@ -76,18 +88,26 @@ func NewProcessor[AC any, OC any, JC any](ac AC, states []State[AC, OC, JC], ser } } -// Return is a struct that contains a job and a list of kick requests -// that is used for returning job updates to the system -type Return[JC any] struct { - Job Job[JC] - KickRequests []KickRequest[JC] - Error error +func (p *Processor[AC, OC, JC]) validate(states []State[AC, OC, JC]) error { + for _, state := range states { + if state.Terminal { + if state.Concurrency < 0 { + return fmt.Errorf("terminal state %s has negative concurrency", state.TriggerState) + } + } else { + if state.Concurrency < 1 { + return fmt.Errorf("non-terminal state %s has non-positive concurrency", state.TriggerState) + } + if state.Exec == nil { + return fmt.Errorf("non-terminal state %s but has no Exec function", state.TriggerState) + } + } + } + + return nil } func (p *Processor[AC, OC, JC]) init() { - if p.initted { - return - } if p.serializer == nil { p.serializer = &NilSerializer[OC, JC]{} } @@ -96,34 +116,36 @@ func (p *Processor[AC, OC, JC]) init() { } // Make a map of triggers to states so we can easily reference it p.stateMap = map[string]State[AC, OC, JC]{} - for _, s := range p.states { - p.stateMap[s.TriggerState] = s - } - // get a list of return state names for use - p.stateNames = make([]string, 0, len(p.stateMap)) - for k := range p.stateMap { - p.stateNames = append(p.stateNames, k) - } - - // For each state, we need a channel of jobs + p.stateStatusMap = map[string]*StatusCount{} + p.stateWaitingJobsMap = map[string][]Job[JC]{} p.stateChan = map[string]chan Job[JC]{} + p.sortedStateNames = []string{} - // Create the state chans - totalConcurrency := 0 for _, s := range p.states { - if s.Terminal { - continue + stateName := s.TriggerState + + p.sortedStateNames = append(p.sortedStateNames, stateName) + p.stateMap[stateName] = s + p.stateStatusMap[stateName] = &StatusCount{ + State: stateName, + Terminal: s.Terminal, } - p.stateChan[s.TriggerState] = make(chan Job[JC], s.Concurrency) // make a chan - totalConcurrency += s.Concurrency + // This is by-design unbuffered + p.stateChan[stateName] = make(chan Job[JC]) } - // When a job changes state, we send it to this channel to centrally manage and re-queue - p.returnChan = make(chan Return[JC], totalConcurrency*2) // make it the size of the total amount of in flight jobs we could have so that each worker can return a task + sort.Strings(p.sortedStateNames) + + // This is by-design unbuffered + p.returnChan = make(chan Return[JC]) } // Exec this big work function, this does all the crunching func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error { + if err := p.validate(p.states); err != nil { + return err + } + p.init() if p.allJobsAreTerminal(r) { @@ -131,169 +153,156 @@ func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error return nil } - wg := sync.WaitGroup{} - // create the workers for _, s := range p.states { // Terminal states don't need to recieve jobs, they're just done if s.Terminal { continue } - if s.Exec == nil { - return p.invalidStateError(s.TriggerState) - } - - concurrency := s.Concurrency // Make workers for each, they just process and fire back to the central channel - for i := 0; i < concurrency; i++ { // add a waiter for every go processor, do it before forking - labels := pprof.Labels("type", "jorbWorker", "state", s.TriggerState, "id", fmt.Sprintf("%d", i)) - pprof.Do(ctx, labels, func(_ context.Context) { - go p.execFunc(ctx, r, s, i, wg)() + for i := 0; i < s.Concurrency; i++ { + p.wg.Add(1) + pprof.Do(ctx, pprof.Labels("type", "worker", "state", s.TriggerState, "id", fmt.Sprintf("%d", i)), func(ctx context.Context) { + go p.execFunc(ctx, s, r.Overall, i, &p.wg)() }) } } - go func() { p.enqueueAllJobs(r) }() // fill all the outbound queues once in a seperate goroutine to prime the pump faster - - // Make a central processor and start it - wg.Add(1) - pprof.Do(ctx, pprof.Labels("type", "ReturnChanWorker"), func(_ context.Context) { - go func() { - p.returnQueue(r, &wg) - }() + pprof.Do(ctx, pprof.Labels("type", "main"), func(ctx context.Context) { + p.wg.Add(1) + go p.process(ctx, r, &p.wg) }) - // Wait for all of the processors to quit - wg.Wait() - + p.wg.Wait() return nil } -func (p *Processor[AC, OC, JC]) returnQueue(r *Run[OC, JC], wg *sync.WaitGroup) { - for { - batch := []Return[JC]{} - READ_BATCH: - for { - select { - case rtn := <-p.returnChan: - batch = append(batch, rtn) - default: - break READ_BATCH - } - } - // Dispense with the jobs - for _, rtn := range batch { - slog.Info("ReturnChan GotJobBack", "jobId", rtn.Job.Id, "state", rtn.Job.State, "kickRequests", len(rtn.KickRequests), "error", rtn.Error) - j := rtn.Job +func (p *Processor[AC, OC, JC]) runJob(job Job[JC]) { + p.stateStatusMap[job.State].Executing += 1 + p.stateChan[job.State] <- job +} - // Send the new kicks if any - p.kickJobs(rtn, j, r) +func (p *Processor[AC, OC, JC]) queueJob(job Job[JC]) { + p.stateStatusMap[job.State].Waiting += 1 + p.stateWaitingJobsMap[job.State] = append(p.stateWaitingJobsMap[job.State], job) +} - // Append the error if needed - if rtn.Error != nil { - j.StateErrors[j.State] = append(j.StateErrors[j.State], rtn.Error.Error()) - } +func (p *Processor[AC, OC, JC]) completeJob(job Job[JC]) { + p.stateStatusMap[job.State].Completed += 1 +} - // return the job - r.Return(j) +func (p *Processor[AC, OC, JC]) processJob(job Job[JC]) { + if p.isTerminal(job) { + p.completeJob(job) + } else { + if p.canRunJobForState(job.State) { + p.runJob(job) + } else { + p.queueJob(job) } - // Now do end of batch work + } +} - // Flush the state - err := p.serializer.Serialize(*r) - if err != nil { - log.Fatalf("Error serializing, aborting now to not lose work: %v", err) - } - // update the status counts - p.updateStatusCounts(r) +func (p *Processor[AC, OC, JC]) runNextWaitingJob(state string) { + // One less job is executing for the prior state + p.stateStatusMap[state].Executing -= 1 + + // There are no waiting jobs for the state, so we have nothing to queue + waitingJobCount := len(p.stateWaitingJobsMap[state]) + if waitingJobCount == 0 { + return + } - // flush out any new jobs we can - p.enqueueAllJobs(r) + job := p.stateWaitingJobsMap[state][waitingJobCount-1] + p.stateWaitingJobsMap[state] = p.stateWaitingJobsMap[state][0 : waitingJobCount-1] + p.stateStatusMap[job.State].Waiting -= 1 - // If the state was terminal, we should see if all of the states are terminated, if so shut down - if !p.allJobsAreTerminal(r) { - continue - } - // if there are any jobs in flight in the run, keep going - if r.JobsInFlight() { - continue - } + p.runJob(job) +} +func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg *sync.WaitGroup) { + defer func() { p.shutdown() + wg.Add(-1) + }() - break + // Enqueue the jobs to start + for _, job := range r.Jobs { + p.processJob(job) } - slog.Info("ReturnChanWorker Quit") - wg.Done() -} -func (p *Processor[AC, OC, JC]) updateStatusCounts(r *Run[OC, JC]) { - counts := r.StatusCounts() + for { + select { + case <-ctx.Done(): + return + case completedJob := <-p.returnChan: + // If the prior state of the completed job was at capacity, we now have space for one more + p.runNextWaitingJob(completedJob.PriorState) + + // Update the run with the new state + r.UpdateJob(completedJob.Job) + p.processJob(completedJob.Job) + + // Start any of the new jobs that need kicking + for idx, kickRequest := range completedJob.KickRequests { + job := Job[JC]{ + Id: fmt.Sprintf("%s->%d", completedJob.Job.Id, idx), + C: kickRequest.C, + State: kickRequest.State, + StateErrors: map[string][]string{}, + } + r.UpdateJob(job) + p.processJob(job) + } - ret := []StatusCount{} + if err := p.serializer.Serialize(*r); err != nil { + log.Fatalf("Error serializing, aborting now to not lose work: %v", err) + } - for _, state := range p.states { - if _, ok := counts[state.TriggerState]; !ok { - ret = append(ret, StatusCount{ - State: state.TriggerState, - Count: 0, - Executing: 0, - Terminal: state.Terminal, - }) - continue + p.updateStatus() + + if p.allJobsAreTerminal(r) && !p.hasExecutingJobs() { + return + } } - c := counts[state.TriggerState] - c.Terminal = state.Terminal - ret = append(ret, c) } - p.statusListener.StatusUpdate(ret) } -func (p *Processor[AC, OC, JC]) allJobsAreTerminal(r *Run[OC, JC]) bool { - c := r.StatusCounts() - for _, k := range p.states { - if k.Terminal { - continue - } - if c[k.TriggerState].Count > 0 { - return false +func (p *Processor[AC, OC, JC]) canRunJobForState(state string) bool { + return p.stateStatusMap[state].Executing < p.stateMap[state].Concurrency +} + +func (p *Processor[AC, OC, JC]) hasExecutingJobs() bool { + for _, value := range p.stateStatusMap { + if value.Executing > 0 { + return true } } - return true + + return false } -func (p *Processor[AC, OC, JC]) enqueueAllJobs(r *Run[OC, JC]) { - slog.Info("Enqueing Jobs", "jobCount", len(r.Jobs)) - enqueued := 0 - for _, state := range p.states { - enqueued += p.enqueueJobsForState(r, state) // mutates r.Jobs +func (p *Processor[AC, OC, JC]) updateStatus() { + ret := make([]StatusCount, 0) + for _, name := range p.sortedStateNames { + ret = append(ret, *p.stateStatusMap[name]) } - slog.Info("All Queues Primed", "jobCount", len(r.Jobs), "enqueuedCount", enqueued) + + p.statusListener.StatusUpdate(ret) } -func (p *Processor[AC, OC, JC]) enqueueJobsForState(r *Run[OC, JC], state State[AC, OC, JC]) int { - slog.Info("Enqueueing jobs for state", "state", state.TriggerState) - enqueued := 0 - for { - j, ok := r.NextJobForState(state.TriggerState) - if !ok { - slog.Info("No more jobs for state", "state", state.TriggerState) - return enqueued - } - c := p.stateChan[state.TriggerState] - select { - case c <- j: - enqueued++ - slog.Info("Enqueing Job", "state", j.State, "job", j.Id) - continue - default: - r.Return(j) - return enqueued +func (p *Processor[AC, OC, JC]) isTerminal(job Job[JC]) bool { + return p.stateMap[job.State].Terminal +} + +func (p *Processor[AC, OC, JC]) allJobsAreTerminal(r *Run[OC, JC]) bool { + for _, job := range r.Jobs { + if !p.isTerminal(job) { + return false } } - slog.Info("Enqueued jobs for state", "state", state.TriggerState, "enqueuedCount", enqueued) - return enqueued + return true } func (p *Processor[AC, OC, JC]) shutdown() { @@ -305,79 +314,64 @@ func (p *Processor[AC, OC, JC]) shutdown() { close(p.returnChan) } -func (p *Processor[AC, OC, JC]) kickJobs(rtn Return[JC], j Job[JC], r *Run[OC, JC]) { - if rtn.KickRequests == nil { - return - } - for _, k := range rtn.KickRequests { - // create a new job with the right state - newJob := Job[JC]{ - Id: fmt.Sprintf("%s->%d", j.Id, len(r.Jobs)), - C: k.C, - State: k.State, - StateErrors: map[string][]string{}, - } - - // validate it - _, ok := p.stateMap[newJob.State] - if !ok { - log.Fatal(p.invalidStateError(newJob.State)) - } - - // return it to the run, it'll get re-enqueued by the main return loop - r.Return(newJob) - } -} - type StateExec[AC any, OC any, JC any] struct { - State State[AC, OC, JC] - i int - wg sync.WaitGroup - c chan Job[JC] - Overall OC ctx context.Context - returnChan chan Return[JC] ac AC + oc OC + state State[AC, OC, JC] + jobChan <-chan Job[JC] + returnChan chan<- Return[JC] + i int + wg *sync.WaitGroup } func (s *StateExec[AC, OC, JC]) Run() { - slog.Info("Starting worker", "worker", s.i, "state", s.State.TriggerState) - for j := range s.c { - if s.State.RateLimit != nil { - s.State.RateLimit.Wait(s.ctx) - slog.Info("LimiterAllowed", "worker", s.i, "state", s.State.TriggerState, "job", j.Id) + slog.Info("Starting worker", "worker", s.i, "state", s.state.TriggerState) + defer func() { + s.wg.Add(-1) + slog.Info("Stopped worker", "worker", s.i, "state", s.state.TriggerState) + }() + + for { + select { + case <-s.ctx.Done(): + return + case j, more := <-s.jobChan: + // The channel was closed + if !more { + return + } + + if s.state.RateLimit != nil { + s.state.RateLimit.Wait(s.ctx) + slog.Info("LimiterAllowed", "worker", s.i, "state", s.state.TriggerState, "job", j.Id) + } + // Execute the job + rtn := Return[JC]{ + PriorState: j.State, + } + slog.Info("Executing job", "job", j.Id, "state", s.state.TriggerState) + j.C, j.State, rtn.KickRequests, rtn.Error = s.state.Exec(s.ctx, s.ac, s.oc, j.C) + slog.Info("Execution complete", "job", j.Id, "state", s.state.TriggerState, "newState", j.State, "error", rtn.Error, "kickRequests", len(rtn.KickRequests)) + + rtn.Job = j + slog.Info("Returning job", "job", j.Id, "newState", j.State) + s.returnChan <- rtn + slog.Info("Returned job", "job", j.Id, "newState", j.State) } - // Execute the job - rtn := Return[JC]{} - slog.Info("Executing job", "job", j.Id, "state", s.State.TriggerState) - j.C, j.State, rtn.KickRequests, rtn.Error = s.State.Exec(s.ctx, s.ac, s.Overall, j.C) - slog.Info("Execution complete", "job", j.Id, "state", s.State.TriggerState, "newState", j.State, "error", rtn.Error, "kickRequests", len(rtn.KickRequests)) - - rtn.Job = j - //go func() { - slog.Info("Returning job", "job", j.Id, "newState", j.State) - s.returnChan <- rtn - slog.Info("Returned job", "job", j.Id, "newState", j.State) - //}() } - s.wg.Done() - slog.Info("Stopped worker", "worker", s.i, "state", s.State.TriggerState) } -func (p *Processor[AC, OC, JC]) execFunc(ctx context.Context, r *Run[OC, JC], s State[AC, OC, JC], i int, wg sync.WaitGroup) func() { - wg.Add(1) +func (p *Processor[AC, OC, JC]) execFunc(ctx context.Context, state State[AC, OC, JC], overallContext OC, i int, wg *sync.WaitGroup) func() { e := &StateExec[AC, OC, JC]{ - State: s, - i: i, - wg: wg, - c: p.stateChan[s.TriggerState], ctx: ctx, - returnChan: p.returnChan, ac: p.appContext, + oc: overallContext, + state: state, + jobChan: p.stateChan[state.TriggerState], + returnChan: p.returnChan, + i: i, + wg: wg, } return e.Run } - -func (p *Processor[AC, OC, JC]) invalidStateError(s string) error { - return fmt.Errorf("State [%s] has no executor, valid state names: %s", s, strings.Join(p.stateNames, ", ")) -} diff --git a/run.go b/run.go index 2af13e8..aeccec2 100644 --- a/run.go +++ b/run.go @@ -3,7 +3,6 @@ package jorb import ( "fmt" "log/slog" - "sort" "sync" ) @@ -11,13 +10,10 @@ import ( // it's meant to be re-entrant, eg if you kill the processor and you have a serializaer, you can // restart using it at any time type Run[OC any, JC any] struct { - Name string // Name of the run - Jobs map[string]Job[JC] // Map of jobs, where keys are job ids and values are Job states - Overall OC // Overall overall state that is usful to all jobs, basically context for the overall batch - m sync.Mutex // Mutex used for indexing operations - lockedJobsById map[string]string // If a job is locked and what state it was locked into - lockedJobsStateCount map[string]int // How many jobs are locked for each state (for status) - stateCount map[string][]Job[JC] // Job Queues for Speed + Name string // Name of the run + Jobs map[string]Job[JC] // Map of jobs, where keys are job ids and values are Job states + Overall OC // Overall overall state that is usful to all jobs, basically context for the overall batch + m sync.Mutex // Mutex used for indexing operations } // NewRun creates a new Run instance with the given name and overall context @@ -38,37 +34,22 @@ func NewRun[OC any, JC any](name string, oc OC) *Run[OC, JC] { func (r *Run[OC, JC]) Init() { r.m.Lock() defer r.m.Unlock() - r.lockedJobsById = map[string]string{} - r.lockedJobsStateCount = map[string]int{} - r.stateCount = map[string][]Job[JC]{} - // initialize the state counts for _, j := range r.Jobs { - s := j.State - if _, ok := r.stateCount[s]; !ok { - r.stateCount[s] = []Job[JC]{} - } // if it doesn't have a last event, give it one if j.LastUpdate == nil { j.UpdateLastEvent() } - r.stateCount[s] = append(r.stateCount[s], j) - } - // now that they're all added we need to sort them - for state, list := range r.stateCount { - sort.Slice(list, func(i, j int) bool { - if list[i].LastUpdate == nil { - return false - } - if list[j].LastUpdate == nil { - return true - } - return list[i].LastUpdate.Before(*list[j].LastUpdate) - }) - r.stateCount[state] = list } } +func (r *Run[OC, JC]) UpdateJob(j Job[JC]) { + r.m.Lock() + defer r.m.Unlock() + + r.Jobs[j.Id] = j.UpdateLastEvent() +} + func (r *Run[OC, JC]) AddJobWithState(jc JC, state string) { r.m.Lock() defer r.m.Unlock() @@ -81,80 +62,12 @@ func (r *Run[OC, JC]) AddJobWithState(jc JC, state string) { State: state, StateErrors: map[string][]string{}, } - j.UpdateLastEvent() - // Pop it onto the end of the appropriate queue - r.stateCount[j.State] = append(r.stateCount[j.State], j) slog.Info("AddJob", "run", r.Name, "job", j, "totalJobs", len(r.Jobs)) - r.Jobs[id] = j + r.Jobs[id] = j.UpdateLastEvent() } // Add a job to the pool, this shouldn't be called once it's running func (r *Run[OC, JC]) AddJob(jc JC) { r.AddJobWithState(jc, TRIGGER_STATE_NEW) } - -func (r *Run[OC, JC]) NextJobForState(state string) (Job[JC], bool) { - r.m.Lock() - defer r.m.Unlock() - - if len(r.stateCount[state]) == 0 { - return Job[JC]{}, false - } - - // pop the item off the front of the queue - minJob := r.stateCount[state][0] - r.stateCount[state] = r.stateCount[state][1:] - - // lock it - r.lockedJobsById[minJob.Id] = minJob.State - r.lockedJobsStateCount[minJob.State]++ - - // update it's last event - r.Jobs[minJob.Id] = minJob.UpdateLastEvent() - - // return it - return r.Jobs[minJob.Id], true -} - -func (r *Run[OC, JC]) Return(j Job[JC]) { - r.m.Lock() - defer r.m.Unlock() - - r.Jobs[j.Id] = j.UpdateLastEvent() - - // decremnt the previous state - prevState := r.lockedJobsById[j.Id] - r.lockedJobsStateCount[prevState]-- - - // unlock it - delete(r.lockedJobsById, j.Id) - - // push it to the back of the new state - r.stateCount[j.State] = append(r.stateCount[j.State], j) -} - -func (r *Run[OC, JC]) JobsInFlight() bool { - r.m.Lock() - defer r.m.Unlock() - // if any of the jobs are in flight, return true - - if len(r.lockedJobsById) > 0 { - return true - } - return false -} - -func (r *Run[OC, JC]) StatusCounts() map[string]StatusCount { - ret := map[string]StatusCount{} - - for k, v := range r.stateCount { - ret[k] = StatusCount{ - State: k, - Count: len(v) + r.lockedJobsStateCount[k], - Executing: r.lockedJobsStateCount[k], - } - } - - return ret -} From 6627f9f35f58d61e368541d64d299e8d5cad2e1c Mon Sep 17 00:00:00 2001 From: Avery Gnolek Date: Sat, 20 Jul 2024 20:25:37 -0700 Subject: [PATCH 2/6] v2 pass on concurrency fix --- internal/examples/simplejob/main.go | 6 +- processor.go | 331 +++++++++++++++------------- 2 files changed, 180 insertions(+), 157 deletions(-) diff --git a/internal/examples/simplejob/main.go b/internal/examples/simplejob/main.go index c241383..626f7dd 100644 --- a/internal/examples/simplejob/main.go +++ b/internal/examples/simplejob/main.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "io" "log" "log/slog" "math/rand" @@ -79,8 +78,5 @@ func (f *fileListener) StatusUpdate(status []jorb.StatusCount) { encoder.SetIndent("", " ") _ = encoder.Encode(status) - file, _ := os.Create(f.fileName) - defer file.Close() - - _, _ = io.Copy(file, buf) + _ = os.WriteFile(f.fileName, buf.Bytes(), 0644) } diff --git a/processor.go b/processor.go index 9b8e51b..db1c3ce 100644 --- a/processor.go +++ b/processor.go @@ -55,41 +55,55 @@ type StatusCount struct { Terminal bool } -// Processor executes a job -type Processor[AC any, OC any, JC any] struct { - appContext AC - states []State[AC, OC, JC] - serializer Serializer[OC, JC] - statusListener StatusListener - sortedStateNames []string +type state struct { +} + +type stateThing[AC any, OC any, JC any] struct { + // This is fine for package-internal use cases to directly iterate over + states []State[AC, OC, JC] + + // These shouldn't be used outside stateThing's methods stateMap map[string]State[AC, OC, JC] stateStatusMap map[string]*StatusCount stateWaitingJobsMap map[string][]Job[JC] stateChan map[string]chan Job[JC] - returnChan chan Return[JC] - wg sync.WaitGroup + sortedStateNames []string } -// Return is a struct that contains a job and a list of kick requests -// that is used for returning job updates to the system -type Return[JC any] struct { - PriorState string - Job Job[JC] - KickRequests []KickRequest[JC] - Error error -} +func newStateThingFromStates[AC any, OC any, JC any](states []State[AC, OC, JC]) stateThing[AC, OC, JC] { + st := stateThing[AC, OC, JC]{ + states: states, + stateMap: map[string]State[AC, OC, JC]{}, + stateStatusMap: map[string]*StatusCount{}, + stateWaitingJobsMap: map[string][]Job[JC]{}, + stateChan: map[string]chan Job[JC]{}, + sortedStateNames: []string{}, + } -func NewProcessor[AC any, OC any, JC any](ac AC, states []State[AC, OC, JC], serializer Serializer[OC, JC], statusListener StatusListener) *Processor[AC, OC, JC] { - return &Processor[AC, OC, JC]{ - appContext: ac, - states: states, - serializer: serializer, - statusListener: statusListener, + for _, s := range states { + stateName := s.TriggerState + + st.sortedStateNames = append(st.sortedStateNames, stateName) + st.stateMap[stateName] = s + st.stateStatusMap[stateName] = &StatusCount{ + State: stateName, + Terminal: s.Terminal, + } + // This is by-design unbuffered + st.stateChan[stateName] = make(chan Job[JC]) } + + sort.Strings(st.sortedStateNames) + + return st } -func (p *Processor[AC, OC, JC]) validate(states []State[AC, OC, JC]) error { - for _, state := range states { +func (s stateThing[AC, OC, JC]) getJobChannelForState(stateName string) chan Job[JC] { + return s.stateChan[stateName] +} + +func (s stateThing[AC, OC, JC]) validate() error { + for _, state := range s.states { if state.Terminal { if state.Concurrency < 0 { return fmt.Errorf("terminal state %s has negative concurrency", state.TriggerState) @@ -107,6 +121,114 @@ func (p *Processor[AC, OC, JC]) validate(states []State[AC, OC, JC]) error { return nil } +func (s stateThing[AC, OC, JC]) runJob(job Job[JC]) { + s.stateStatusMap[job.State].Executing += 1 + s.stateChan[job.State] <- job +} + +func (s stateThing[AC, OC, JC]) queueJob(job Job[JC]) { + s.stateStatusMap[job.State].Waiting += 1 + s.stateWaitingJobsMap[job.State] = append(s.stateWaitingJobsMap[job.State], job) +} + +func (s stateThing[AC, OC, JC]) completeJob(job Job[JC]) { + s.stateStatusMap[job.State].Completed += 1 +} + +func (s stateThing[AC, OC, JC]) processJob(job Job[JC]) { + if s.isTerminal(job) { + s.completeJob(job) + } else { + if s.canRunJobForState(job.State) { + s.runJob(job) + } else { + s.queueJob(job) + } + } +} + +func (s stateThing[AC, OC, JC]) isTerminal(job Job[JC]) bool { + return s.stateMap[job.State].Terminal +} + +func (s stateThing[AC, OC, JC]) allJobsAreTerminal(r *Run[OC, JC]) bool { + for _, job := range r.Jobs { + if !s.isTerminal(job) { + return false + } + } + return true +} + +func (s stateThing[AC, OC, JC]) runNextWaitingJob(state string) { + // One less job is executing for the prior state + s.stateStatusMap[state].Executing -= 1 + + // There are no waiting jobs for the state, so we have nothing to queue + waitingJobCount := len(s.stateWaitingJobsMap[state]) + if waitingJobCount == 0 { + return + } + + job := s.stateWaitingJobsMap[state][waitingJobCount-1] + s.stateWaitingJobsMap[state] = s.stateWaitingJobsMap[state][0 : waitingJobCount-1] + s.stateStatusMap[job.State].Waiting -= 1 + + s.runJob(job) +} + +func (s stateThing[AC, OC, JC]) canRunJobForState(state string) bool { + return s.stateStatusMap[state].Executing < s.stateMap[state].Concurrency +} + +func (s stateThing[AC, OC, JC]) hasExecutingJobs() bool { + for _, value := range s.stateStatusMap { + if value.Executing > 0 { + return true + } + } + + return false +} + +func (s stateThing[AC, OC, JC]) getStatusCounts() []StatusCount { + ret := make([]StatusCount, 0) + for _, name := range s.sortedStateNames { + ret = append(ret, *s.stateStatusMap[name]) + } + return ret +} + +// Serializer is an interface that defines how to serialize and deserialize job contexts. + +// Processor executes a job +type Processor[AC any, OC any, JC any] struct { + appContext AC + serializer Serializer[OC, JC] + stateThing stateThing[AC, OC, JC] + statusListener StatusListener + returnChan chan Return[JC] + wg sync.WaitGroup +} + +// Return is a struct that contains a job and a list of kick requests +// that is used for returning job updates to the system +type Return[JC any] struct { + PriorState string + Job Job[JC] + KickRequests []KickRequest[JC] + Error error +} + +func NewProcessor[AC any, OC any, JC any](ac AC, states []State[AC, OC, JC], serializer Serializer[OC, JC], statusListener StatusListener) *Processor[AC, OC, JC] { + return &Processor[AC, OC, JC]{ + appContext: ac, + stateThing: newStateThingFromStates(states), + serializer: serializer, + statusListener: statusListener, + } +} + func (p *Processor[AC, OC, JC]) init() { if p.serializer == nil { p.serializer = &NilSerializer[OC, JC]{} @@ -114,27 +236,6 @@ func (p *Processor[AC, OC, JC]) init() { if p.statusListener == nil { p.statusListener = &NilStatusListener{} } - // Make a map of triggers to states so we can easily reference it - p.stateMap = map[string]State[AC, OC, JC]{} - p.stateStatusMap = map[string]*StatusCount{} - p.stateWaitingJobsMap = map[string][]Job[JC]{} - p.stateChan = map[string]chan Job[JC]{} - p.sortedStateNames = []string{} - - for _, s := range p.states { - stateName := s.TriggerState - - p.sortedStateNames = append(p.sortedStateNames, stateName) - p.stateMap[stateName] = s - p.stateStatusMap[stateName] = &StatusCount{ - State: stateName, - Terminal: s.Terminal, - } - // This is by-design unbuffered - p.stateChan[stateName] = make(chan Job[JC]) - } - - sort.Strings(p.sortedStateNames) // This is by-design unbuffered p.returnChan = make(chan Return[JC]) @@ -142,31 +243,25 @@ func (p *Processor[AC, OC, JC]) init() { // Exec this big work function, this does all the crunching func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error { - if err := p.validate(p.states); err != nil { + if err := p.stateThing.validate(); err != nil { return err } p.init() - if p.allJobsAreTerminal(r) { + if p.stateThing.allJobsAreTerminal(r) { slog.Info("AllJobsTerminal") return nil } // create the workers - for _, s := range p.states { + for _, s := range p.stateThing.states { // Terminal states don't need to recieve jobs, they're just done if s.Terminal { continue } - // Make workers for each, they just process and fire back to the central channel - for i := 0; i < s.Concurrency; i++ { - p.wg.Add(1) - pprof.Do(ctx, pprof.Labels("type", "worker", "state", s.TriggerState, "id", fmt.Sprintf("%d", i)), func(ctx context.Context) { - go p.execFunc(ctx, s, r.Overall, i, &p.wg)() - }) - } + p.execFunc(ctx, s, r.Overall, &p.wg) } pprof.Do(ctx, pprof.Labels("type", "main"), func(ctx context.Context) { @@ -178,58 +273,15 @@ func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error return nil } -func (p *Processor[AC, OC, JC]) runJob(job Job[JC]) { - p.stateStatusMap[job.State].Executing += 1 - p.stateChan[job.State] <- job -} - -func (p *Processor[AC, OC, JC]) queueJob(job Job[JC]) { - p.stateStatusMap[job.State].Waiting += 1 - p.stateWaitingJobsMap[job.State] = append(p.stateWaitingJobsMap[job.State], job) -} - -func (p *Processor[AC, OC, JC]) completeJob(job Job[JC]) { - p.stateStatusMap[job.State].Completed += 1 -} - -func (p *Processor[AC, OC, JC]) processJob(job Job[JC]) { - if p.isTerminal(job) { - p.completeJob(job) - } else { - if p.canRunJobForState(job.State) { - p.runJob(job) - } else { - p.queueJob(job) - } - } -} - -func (p *Processor[AC, OC, JC]) runNextWaitingJob(state string) { - // One less job is executing for the prior state - p.stateStatusMap[state].Executing -= 1 - - // There are no waiting jobs for the state, so we have nothing to queue - waitingJobCount := len(p.stateWaitingJobsMap[state]) - if waitingJobCount == 0 { - return - } - - job := p.stateWaitingJobsMap[state][waitingJobCount-1] - p.stateWaitingJobsMap[state] = p.stateWaitingJobsMap[state][0 : waitingJobCount-1] - p.stateStatusMap[job.State].Waiting -= 1 - - p.runJob(job) -} - func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg *sync.WaitGroup) { defer func() { p.shutdown() - wg.Add(-1) + wg.Done() }() // Enqueue the jobs to start for _, job := range r.Jobs { - p.processJob(job) + p.stateThing.processJob(job) } for { @@ -238,11 +290,11 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg return case completedJob := <-p.returnChan: // If the prior state of the completed job was at capacity, we now have space for one more - p.runNextWaitingJob(completedJob.PriorState) + p.stateThing.runNextWaitingJob(completedJob.PriorState) // Update the run with the new state r.UpdateJob(completedJob.Job) - p.processJob(completedJob.Job) + p.stateThing.processJob(completedJob.Job) // Start any of the new jobs that need kicking for idx, kickRequest := range completedJob.KickRequests { @@ -253,7 +305,7 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg StateErrors: map[string][]string{}, } r.UpdateJob(job) - p.processJob(job) + p.stateThing.processJob(job) } if err := p.serializer.Serialize(*r); err != nil { @@ -262,53 +314,21 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg p.updateStatus() - if p.allJobsAreTerminal(r) && !p.hasExecutingJobs() { + if p.stateThing.allJobsAreTerminal(r) && !p.stateThing.hasExecutingJobs() { return } } } } -func (p *Processor[AC, OC, JC]) canRunJobForState(state string) bool { - return p.stateStatusMap[state].Executing < p.stateMap[state].Concurrency -} - -func (p *Processor[AC, OC, JC]) hasExecutingJobs() bool { - for _, value := range p.stateStatusMap { - if value.Executing > 0 { - return true - } - } - - return false -} - func (p *Processor[AC, OC, JC]) updateStatus() { - ret := make([]StatusCount, 0) - for _, name := range p.sortedStateNames { - ret = append(ret, *p.stateStatusMap[name]) - } - - p.statusListener.StatusUpdate(ret) -} - -func (p *Processor[AC, OC, JC]) isTerminal(job Job[JC]) bool { - return p.stateMap[job.State].Terminal -} - -func (p *Processor[AC, OC, JC]) allJobsAreTerminal(r *Run[OC, JC]) bool { - for _, job := range r.Jobs { - if !p.isTerminal(job) { - return false - } - } - return true + p.statusListener.StatusUpdate(p.stateThing.getStatusCounts()) } func (p *Processor[AC, OC, JC]) shutdown() { // close all of the channels - for _, c := range p.stateChan { - close(c) + for _, state := range p.stateThing.states { + close(p.stateThing.getJobChannelForState(state.TriggerState)) } // close ourselves down close(p.returnChan) @@ -328,7 +348,7 @@ type StateExec[AC any, OC any, JC any] struct { func (s *StateExec[AC, OC, JC]) Run() { slog.Info("Starting worker", "worker", s.i, "state", s.state.TriggerState) defer func() { - s.wg.Add(-1) + s.wg.Done() slog.Info("Stopped worker", "worker", s.i, "state", s.state.TriggerState) }() @@ -362,16 +382,23 @@ func (s *StateExec[AC, OC, JC]) Run() { } } -func (p *Processor[AC, OC, JC]) execFunc(ctx context.Context, state State[AC, OC, JC], overallContext OC, i int, wg *sync.WaitGroup) func() { - e := &StateExec[AC, OC, JC]{ - ctx: ctx, - ac: p.appContext, - oc: overallContext, - state: state, - jobChan: p.stateChan[state.TriggerState], - returnChan: p.returnChan, - i: i, - wg: wg, +func (p *Processor[AC, OC, JC]) execFunc(ctx context.Context, state State[AC, OC, JC], overallContext OC, wg *sync.WaitGroup) { + // Make workers for each, they just process and fire back to the central channel + for i := 0; i < state.Concurrency; i++ { + p.wg.Add(1) + stateExec := StateExec[AC, OC, JC]{ + ctx: ctx, + ac: p.appContext, + oc: overallContext, + state: state, + jobChan: p.stateThing.getJobChannelForState(state.TriggerState), + returnChan: p.returnChan, + i: i, + wg: wg, + } + + pprof.Do(ctx, pprof.Labels("type", "worker", "state", state.TriggerState, "id", fmt.Sprintf("%d", i)), func(ctx context.Context) { + go stateExec.Run() + }) } - return e.Run } From d3d0b8c65d8ccd743d2574ae3c57a9f818f803bf Mon Sep 17 00:00:00 2001 From: Avery Gnolek Date: Mon, 22 Jul 2024 22:39:09 -0700 Subject: [PATCH 3/6] add testing --- internal/examples/simplejob/main.go | 6 +- job.go | 6 +- processor.go | 73 +++++---- processor_test.go | 222 ++++++++++++++++++++-------- run_test.go | 171 +++------------------ serializer_test.go | 12 +- 6 files changed, 234 insertions(+), 256 deletions(-) diff --git a/internal/examples/simplejob/main.go b/internal/examples/simplejob/main.go index 626f7dd..40a02d0 100644 --- a/internal/examples/simplejob/main.go +++ b/internal/examples/simplejob/main.go @@ -60,7 +60,11 @@ func main() { serial := jorb.NewJsonSerializer[oc, jc]("example.state") listener := &fileListener{fileName: "example.status"} - p := jorb.NewProcessor[ac, oc, jc](a, states, serial, listener) + p, err := jorb.NewProcessor[ac, oc, jc](a, states, serial, listener) + if err != nil { + log.Fatal(err) + } + if err := p.Exec(context.Background(), r); err != nil { log.Fatal(err) } diff --git a/job.go b/job.go index f991333..bce59a7 100644 --- a/job.go +++ b/job.go @@ -13,7 +13,11 @@ type Job[JC any] struct { // UpdateLastEvent updates the LastUpdate field of the Job struct to the current time. func (j Job[JC]) UpdateLastEvent() Job[JC] { - t := time.Now() + // Removes the monotonic clock portion of the timestamp which is only useful for measuring time + // https://pkg.go.dev/time#hdr-Monotonic_Clocks + // The monotonic clock information will not be marshalled, and thus cause tests which Marshal / Unmarshal job state + // and expect the results to be the same to fail. + t := time.Now().Truncate(0) // Set the LastUpdate field to the current time j.LastUpdate = &t return j diff --git a/processor.go b/processor.go index db1c3ce..7773ac0 100644 --- a/processor.go +++ b/processor.go @@ -58,11 +58,11 @@ type StatusCount struct { type state struct { } -type stateThing[AC any, OC any, JC any] struct { +type stateStorage[AC any, OC any, JC any] struct { // This is fine for package-internal use cases to directly iterate over states []State[AC, OC, JC] - // These shouldn't be used outside stateThing's methods + // These shouldn't be used outside stateStorage's methods stateMap map[string]State[AC, OC, JC] stateStatusMap map[string]*StatusCount stateWaitingJobsMap map[string][]Job[JC] @@ -70,8 +70,8 @@ type stateThing[AC any, OC any, JC any] struct { sortedStateNames []string } -func newStateThingFromStates[AC any, OC any, JC any](states []State[AC, OC, JC]) stateThing[AC, OC, JC] { - st := stateThing[AC, OC, JC]{ +func newStateStorageFromStates[AC any, OC any, JC any](states []State[AC, OC, JC]) stateStorage[AC, OC, JC] { + st := stateStorage[AC, OC, JC]{ states: states, stateMap: map[string]State[AC, OC, JC]{}, stateStatusMap: map[string]*StatusCount{}, @@ -98,11 +98,15 @@ func newStateThingFromStates[AC any, OC any, JC any](states []State[AC, OC, JC]) return st } -func (s stateThing[AC, OC, JC]) getJobChannelForState(stateName string) chan Job[JC] { +func (s stateStorage[AC, OC, JC]) getJobChannelForState(stateName string) chan Job[JC] { return s.stateChan[stateName] } -func (s stateThing[AC, OC, JC]) validate() error { +func (s stateStorage[AC, OC, JC]) closeJobChannelForState(stateName string) { + close(s.stateChan[stateName]) +} + +func (s stateStorage[AC, OC, JC]) validate() error { for _, state := range s.states { if state.Terminal { if state.Concurrency < 0 { @@ -121,37 +125,39 @@ func (s stateThing[AC, OC, JC]) validate() error { return nil } -func (s stateThing[AC, OC, JC]) runJob(job Job[JC]) { +func (s stateStorage[AC, OC, JC]) runJob(job Job[JC]) { s.stateStatusMap[job.State].Executing += 1 s.stateChan[job.State] <- job } -func (s stateThing[AC, OC, JC]) queueJob(job Job[JC]) { +func (s stateStorage[AC, OC, JC]) queueJob(job Job[JC]) { s.stateStatusMap[job.State].Waiting += 1 s.stateWaitingJobsMap[job.State] = append(s.stateWaitingJobsMap[job.State], job) } -func (s stateThing[AC, OC, JC]) completeJob(job Job[JC]) { +func (s stateStorage[AC, OC, JC]) completeJob(job Job[JC]) { s.stateStatusMap[job.State].Completed += 1 } -func (s stateThing[AC, OC, JC]) processJob(job Job[JC]) { +func (s stateStorage[AC, OC, JC]) processJob(job Job[JC]) { if s.isTerminal(job) { s.completeJob(job) - } else { - if s.canRunJobForState(job.State) { - s.runJob(job) - } else { - s.queueJob(job) - } + return } + + if s.canRunJobForState(job.State) { + s.runJob(job) + return + } + + s.queueJob(job) } -func (s stateThing[AC, OC, JC]) isTerminal(job Job[JC]) bool { +func (s stateStorage[AC, OC, JC]) isTerminal(job Job[JC]) bool { return s.stateMap[job.State].Terminal } -func (s stateThing[AC, OC, JC]) allJobsAreTerminal(r *Run[OC, JC]) bool { +func (s stateStorage[AC, OC, JC]) allJobsAreTerminal(r *Run[OC, JC]) bool { for _, job := range r.Jobs { if !s.isTerminal(job) { return false @@ -160,7 +166,7 @@ func (s stateThing[AC, OC, JC]) allJobsAreTerminal(r *Run[OC, JC]) bool { return true } -func (s stateThing[AC, OC, JC]) runNextWaitingJob(state string) { +func (s stateStorage[AC, OC, JC]) runNextWaitingJob(state string) { // One less job is executing for the prior state s.stateStatusMap[state].Executing -= 1 @@ -177,11 +183,11 @@ func (s stateThing[AC, OC, JC]) runNextWaitingJob(state string) { s.runJob(job) } -func (s stateThing[AC, OC, JC]) canRunJobForState(state string) bool { +func (s stateStorage[AC, OC, JC]) canRunJobForState(state string) bool { return s.stateStatusMap[state].Executing < s.stateMap[state].Concurrency } -func (s stateThing[AC, OC, JC]) hasExecutingJobs() bool { +func (s stateStorage[AC, OC, JC]) hasExecutingJobs() bool { for _, value := range s.stateStatusMap { if value.Executing > 0 { return true @@ -191,7 +197,7 @@ func (s stateThing[AC, OC, JC]) hasExecutingJobs() bool { return false } -func (s stateThing[AC, OC, JC]) getStatusCounts() []StatusCount { +func (s stateStorage[AC, OC, JC]) getStatusCounts() []StatusCount { ret := make([]StatusCount, 0) for _, name := range s.sortedStateNames { ret = append(ret, *s.stateStatusMap[name]) @@ -205,7 +211,7 @@ func (s stateThing[AC, OC, JC]) getStatusCounts() []StatusCount { type Processor[AC any, OC any, JC any] struct { appContext AC serializer Serializer[OC, JC] - stateThing stateThing[AC, OC, JC] + stateThing stateStorage[AC, OC, JC] statusListener StatusListener returnChan chan Return[JC] wg sync.WaitGroup @@ -220,13 +226,19 @@ type Return[JC any] struct { Error error } -func NewProcessor[AC any, OC any, JC any](ac AC, states []State[AC, OC, JC], serializer Serializer[OC, JC], statusListener StatusListener) *Processor[AC, OC, JC] { - return &Processor[AC, OC, JC]{ +func NewProcessor[AC any, OC any, JC any](ac AC, states []State[AC, OC, JC], serializer Serializer[OC, JC], statusListener StatusListener) (*Processor[AC, OC, JC], error) { + p := &Processor[AC, OC, JC]{ appContext: ac, - stateThing: newStateThingFromStates(states), + stateThing: newStateStorageFromStates(states), serializer: serializer, statusListener: statusListener, } + + if err := p.stateThing.validate(); err != nil { + return nil, err + } + + return p, nil } func (p *Processor[AC, OC, JC]) init() { @@ -243,10 +255,6 @@ func (p *Processor[AC, OC, JC]) init() { // Exec this big work function, this does all the crunching func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error { - if err := p.stateThing.validate(); err != nil { - return err - } - p.init() if p.stateThing.allJobsAreTerminal(r) { @@ -284,6 +292,9 @@ func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg p.stateThing.processJob(job) } + // Send the initial status update with the state of all the jobs + p.updateStatus() + for { select { case <-ctx.Done(): @@ -328,7 +339,7 @@ func (p *Processor[AC, OC, JC]) updateStatus() { func (p *Processor[AC, OC, JC]) shutdown() { // close all of the channels for _, state := range p.stateThing.states { - close(p.stateThing.getJobChannelForState(state.TriggerState)) + p.stateThing.closeJobChannelForState(state.TriggerState) } // close ourselves down close(p.returnChan) diff --git a/processor_test.go b/processor_test.go index 2ebcb11..334ac6a 100644 --- a/processor_test.go +++ b/processor_test.go @@ -41,6 +41,111 @@ const ( STATE_DONE_TWO = "done_two" ) +func createJob(state string) Job[MyJobContext] { + return Job[MyJobContext]{ + Id: "", + C: MyJobContext{}, + State: state, + } +} + +func TestStateStorage(t *testing.T) { + concurrency := 5 + stateS := newStateStorageFromStates([]State[MyAppContext, MyOverallContext, MyJobContext]{ + { + TriggerState: TRIGGER_STATE_NEW, + Exec: func(ctx context.Context, ac MyAppContext, oc MyOverallContext, jc MyJobContext) (MyJobContext, string, []KickRequest[MyJobContext], error) { + return jc, STATE_DONE, nil, nil + }, + Terminal: false, + Concurrency: concurrency, + }, + { + TriggerState: STATE_DONE, + Terminal: true, + }, + }) + + // Fake processor that just takes jobs and throws them away, as the StateStorage doesn't actually care about + // Any of the actual processing + go func() { + for true { + select { + case <-stateS.stateChan[TRIGGER_STATE_NEW]: + continue + } + } + }() + + for i := 0; i < concurrency*2; i++ { + stateS.processJob(createJob(TRIGGER_STATE_NEW)) + } + assert.Equal(t, []StatusCount{ + { + State: STATE_DONE, + Terminal: true, + }, + { + State: TRIGGER_STATE_NEW, + Executing: concurrency, + Waiting: concurrency, + }, + }, stateS.getStatusCounts()) + for i := 0; i < 2; i++ { + stateS.runNextWaitingJob(TRIGGER_STATE_NEW) + stateS.processJob(createJob(STATE_DONE)) + } + + assert.Equal(t, []StatusCount{ + { + State: STATE_DONE, + Terminal: true, + Completed: 2, + }, + { + State: TRIGGER_STATE_NEW, + Executing: concurrency, + Waiting: concurrency - 2, + }, + }, stateS.getStatusCounts()) + + for i := 0; i < concurrency-2; i++ { + stateS.runNextWaitingJob(TRIGGER_STATE_NEW) + stateS.processJob(createJob(STATE_DONE)) + } + + assert.Equal(t, []StatusCount{ + { + State: STATE_DONE, + Terminal: true, + Completed: concurrency, + }, + { + State: TRIGGER_STATE_NEW, + Executing: concurrency, + Waiting: 0, + }, + }, stateS.getStatusCounts()) + + for i := 0; i < concurrency; i++ { + stateS.runNextWaitingJob(TRIGGER_STATE_NEW) + stateS.processJob(createJob(STATE_DONE)) + } + + assert.Equal(t, []StatusCount{ + { + State: STATE_DONE, + Terminal: true, + Completed: concurrency * 2, + }, + { + State: TRIGGER_STATE_NEW, + Executing: 0, + Waiting: 0, + }, + }, stateS.getStatusCounts()) +} + func TestProcessorOneJob(t *testing.T) { t.Parallel() oc := MyOverallContext{} @@ -69,10 +174,11 @@ func TestProcessorOneJob(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) assert.Less(t, delta, time.Second*2, "Should take less than 2 seconds when run in parallel") @@ -99,10 +205,11 @@ func TestProcessorAllTerminal(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) assert.Less(t, delta, time.Second*2, "Should take less than 2 seconds when run in parallel") @@ -144,10 +251,11 @@ func TestProcessorTwoSequentialJobs(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) assert.Less(t, delta, time.Second*2, "Should take less than 2 seconds when run in parallel") @@ -216,7 +324,8 @@ func TestProcessor_TwoTerminal(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() err = p.Exec(context.Background(), r) @@ -268,7 +377,7 @@ func TestProcessor_StateCallback(t *testing.T) { oc := MyOverallContext{} ac := MyAppContext{} r := NewRun[MyOverallContext, MyJobContext]("job", oc) - for i := 0; i < 1; i++ { + for i := 0; i < 11; i++ { r.AddJob(MyJobContext{ Count: 0, }) @@ -277,52 +386,38 @@ func TestProcessor_StateCallback(t *testing.T) { tl := &testStatusListener{ t: t, } - tl.ExpectStatus([]StatusCount{ - { - State: TRIGGER_STATE_NEW, - Count: 1, - }, - { - State: STATE_DONE, - Count: 0, - Terminal: true, - }, - }) - tl.ExpectStatus([]StatusCount{ - { - State: TRIGGER_STATE_NEW, - Count: 1, - Executing: 1, - }, - { - State: STATE_DONE, - Count: 0, - Terminal: true, - }, - }) tl.ExpectStatus([]StatusCount{ { State: TRIGGER_STATE_NEW, - Count: 1, - Executing: 1, + Waiting: 1, + Executing: 10, + Completed: 0, }, { - State: STATE_DONE, - Count: 0, - Terminal: true, - }, - }) - tl.ExpectStatus([]StatusCount{ - { - State: TRIGGER_STATE_NEW, - Count: 0, - }, - { - State: STATE_DONE, - Count: 1, - Terminal: true, + State: STATE_DONE, + Waiting: 0, + Executing: 0, + Completed: 0, + Terminal: true, }, }) + for i := 0; i <= 10; i++ { + tl.ExpectStatus([]StatusCount{ + { + State: TRIGGER_STATE_NEW, + Waiting: 0, + Executing: 10 - i, + Completed: 0, + }, + { + State: STATE_DONE, + Waiting: 0, + Executing: 0, + Completed: 1 + i, + Terminal: true, + }, + }) + } states := []State[MyAppContext, MyOverallContext, MyJobContext]{ State[MyAppContext, MyOverallContext, MyJobContext]{ @@ -343,10 +438,11 @@ func TestProcessor_StateCallback(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, tl) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, tl) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) assert.Less(t, delta, time.Second*2, "Should take less than 2 seconds when run in parallel") @@ -386,10 +482,11 @@ func TestProcessor_Retries(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) assert.Less(t, delta, time.Second*2, "Should take less than 2 seconds when run in parallel") @@ -451,10 +548,11 @@ func TestProcessor_RateLimiter(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) assert.Less(t, delta, time.Second*4) @@ -494,10 +592,11 @@ func TestProcessor_RateLimiterSlows(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) jobCount := len(r.Jobs) @@ -548,10 +647,11 @@ func TestProcessor_LoopWithExit(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) assert.Less(t, delta, time.Second*2, "Should take less than 2 seconds when run in parallel") @@ -602,7 +702,8 @@ func TestProcessor_Serialization(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, serialzer, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, serialzer, nil) + assert.NoError(t, err) start := time.Now() err = p.Exec(context.Background(), r) @@ -687,10 +788,11 @@ func TestProcessor_FirstStepExpands(t *testing.T) { }, } - p := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, states, nil, nil) + assert.NoError(t, err) start := time.Now() - err := p.Exec(context.Background(), r) + err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) assert.Less(t, delta, time.Second*2, "Should take less than 2 seconds when run in parallel") diff --git a/run_test.go b/run_test.go index f1df6b7..cc6937e 100644 --- a/run_test.go +++ b/run_test.go @@ -2,168 +2,31 @@ package jorb import ( "testing" + "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) -func TestStatusCounts(t *testing.T) { - t.Parallel() - r := NewRun[MyOverallContext, MyJobContext]("job", MyOverallContext{}) - r.AddJob(MyJobContext{Count: 0}) - r.AddJob(MyJobContext{Count: 0}) - r.AddJob(MyJobContext{Count: 0}) - - assert.Equal(t, map[string]StatusCount{ - TRIGGER_STATE_NEW: { - State: TRIGGER_STATE_NEW, - Count: 3, - Executing: 0, - }, - }, r.StatusCounts()) - - j, ok := r.NextJobForState(TRIGGER_STATE_NEW) - require.True(t, ok) - - assert.Equal(t, map[string]StatusCount{ - TRIGGER_STATE_NEW: { - State: TRIGGER_STATE_NEW, - Count: 3, - Executing: 1, - }, - }, r.StatusCounts()) - - j.State = STATE_MIDDLE - r.Return(j) - - assert.Equal(t, map[string]StatusCount{ - TRIGGER_STATE_NEW: { - State: TRIGGER_STATE_NEW, - Count: 2, - Executing: 0, - }, - STATE_MIDDLE: { - State: STATE_MIDDLE, - Count: 1, - Executing: 0, - Terminal: false, - }, - }, r.StatusCounts()) -} - -func TestRun_NextForStatus_NoJobs(t *testing.T) { - t.Parallel() - r := NewRun[MyOverallContext, MyJobContext]("job", MyOverallContext{}) - - _, ok := r.NextJobForState(TRIGGER_STATE_NEW) - assert.False(t, ok) -} - -func TestRun_GetsNextRunOnSecondCall(t *testing.T) { - t.Parallel() - r := NewRun[MyOverallContext, MyJobContext]("job", MyOverallContext{}) - r.AddJob(MyJobContext{Count: 0}) - r.AddJob(MyJobContext{Count: 0}) - - // Given they're all even, I expect one job to come out - j, ok := r.NextJobForState(TRIGGER_STATE_NEW) - assert.True(t, ok) - assert.NotEmpty(t, j.Id) - - // Now I should get the same job back on the second call - j2, ok := r.NextJobForState(TRIGGER_STATE_NEW) - assert.True(t, ok) - assert.NotEqual(t, j.Id, j2.Id) -} - -func TestRun_GetNextRun_Returning(t *testing.T) { - t.Parallel() - r := NewRun[MyOverallContext, MyJobContext]("job", MyOverallContext{}) - r.AddJob(MyJobContext{Count: 0}) - - // Given they're all even, I expect one job to come out - j1, ok := r.NextJobForState(TRIGGER_STATE_NEW) - assert.True(t, ok) - - // Now I should get the same job back on the second call - _, ok = r.NextJobForState(TRIGGER_STATE_NEW) - assert.False(t, ok) - - // If I return the first job I should then get it again, but the last event should be updated - r.Return(j1) - returnDate := r.Jobs[j1.Id].LastUpdate - assert.NotEqual(t, returnDate, j1.LastUpdate) - - // Verify that we get the same id back but the returnDate has been updated again - j2, ok := r.NextJobForState(TRIGGER_STATE_NEW) - assert.True(t, ok) - assert.Equal(t, j1.Id, j2.Id) - - // The date should be updated on the next fetch as well - assert.NotEqual(t, j1.LastUpdate, j2.LastUpdate) - assert.NotEqual(t, returnDate, j2.LastUpdate) -} - -func TestRun_GetNextRun_GetsInOrder(t *testing.T) { - t.Parallel() - r := NewRun[MyOverallContext, MyJobContext]("job", MyOverallContext{}) - // Add 2 jobs - r.AddJob(MyJobContext{Count: 0}) - r.AddJob(MyJobContext{Count: 0}) - - // Given they're all even, I expect one job to come out - j1, ok := r.NextJobForState(TRIGGER_STATE_NEW) - require.True(t, ok) - j2, ok := r.NextJobForState(TRIGGER_STATE_NEW) - require.True(t, ok) - - // Now I return them in reverse order - r.Return(j2) - r.Return(j1) - - // When I get them again, I should get them in order j2, then j1 because I should be getting the one that has - // be updated the earliest - - // If I return the first job I should then get it again, but the last event should be updated - j3, ok := r.NextJobForState(TRIGGER_STATE_NEW) - require.True(t, ok) - j4, ok := r.NextJobForState(TRIGGER_STATE_NEW) - require.True(t, ok) - - // Now validate the ids - // The date should be updated on the next fetch as well - assert.Equal(t, j2.Id, j3.Id) - assert.Equal(t, j1.Id, j4.Id) -} - -func TestRun_JobsInFlight2(t *testing.T) { - t.Parallel() - r := NewRun[MyOverallContext, MyJobContext]("job", MyOverallContext{}) - r.AddJob(MyJobContext{Count: 0}) - - // no jobs checked out, there should be no jobs in flight - require.False(t, r.JobsInFlight()) - - // check out a job - j, ok := r.NextJobForState(TRIGGER_STATE_NEW) - require.True(t, ok) - require.True(t, r.JobsInFlight()) - - r.Return(j) - require.False(t, r.JobsInFlight()) -} - func Test_AddJobWithState(t *testing.T) { t.Parallel() r := NewRun[MyOverallContext, MyJobContext]("job", MyOverallContext{}) r.AddJobWithState(MyJobContext{Count: 0}, "other_state") - assert.Equal(t, map[string]StatusCount{ - "other_state": { - State: "other_state", - Count: 1, - Executing: 0, - }, - }, r.StatusCounts()) assert.Equal(t, 1, len(r.Jobs)) assert.Equal(t, "other_state", r.Jobs["0"].State) + currentTime := time.Now() + assert.Less(t, *r.Jobs["0"].LastUpdate, currentTime) + + r.UpdateJob(Job[MyJobContext]{ + Id: "0", + C: MyJobContext{ + Count: 1, + }, + State: "other_state_2", + }) + // Number of jobs has not changed + assert.Equal(t, 1, len(r.Jobs)) + // Job's state has been updated + assert.Equal(t, "other_state_2", r.Jobs["0"].State) + // Job's time has been updated + assert.Greater(t, *r.Jobs["0"].LastUpdate, currentTime) } diff --git a/serializer_test.go b/serializer_test.go index 595f7c9..5a73d0a 100644 --- a/serializer_test.go +++ b/serializer_test.go @@ -12,12 +12,6 @@ import ( func TestJsonSerializer_SaveLoad(t *testing.T) { t.Parallel() - // Create a temporary directory for testing - tempDir, err := os.MkdirTemp("", "test") - if err != nil { - t.Fatalf("Failed to create temporary directory: %v", err) - } - defer os.RemoveAll(tempDir) // Create a test run run := NewRun[MyOverallContext, MyJobContext]("test", MyOverallContext{Name: "overall"}) @@ -28,11 +22,11 @@ func TestJsonSerializer_SaveLoad(t *testing.T) { } // Create a JsonSerializer with a temporary file - tempFile := filepath.Join(tempDir, "test.json") + tempFile := filepath.Join(t.TempDir(), "test.json") serializer := &JsonSerializer[MyOverallContext, MyJobContext]{File: tempFile} // Serialize the run - err = serializer.Serialize(*run) + err := serializer.Serialize(*run) require.NoError(t, err) require.FileExists(t, tempFile) @@ -56,7 +50,7 @@ func Test_SerializeWithError(t *testing.T) { defer os.RemoveAll(tempDir) r := NewRun[MyOverallContext, MyJobContext]("test", MyOverallContext{Name: "overall"}) - r.Return(Job[MyJobContext]{ + r.UpdateJob(Job[MyJobContext]{ C: MyJobContext{Count: 0, Name: "job-0"}, StateErrors: map[string][]string{ "key": []string{ From 39c3f27bd16ed5a965993591f0643b8dce893ecf Mon Sep 17 00:00:00 2001 From: Avery Gnolek Date: Tue, 23 Jul 2024 10:47:54 -0700 Subject: [PATCH 4/6] always use UTC + truncate to something sane --- job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/job.go b/job.go index bce59a7..6d559ca 100644 --- a/job.go +++ b/job.go @@ -17,7 +17,7 @@ func (j Job[JC]) UpdateLastEvent() Job[JC] { // https://pkg.go.dev/time#hdr-Monotonic_Clocks // The monotonic clock information will not be marshalled, and thus cause tests which Marshal / Unmarshal job state // and expect the results to be the same to fail. - t := time.Now().Truncate(0) + t := time.Now().UTC().Truncate(time.Millisecond) // Set the LastUpdate field to the current time j.LastUpdate = &t return j From 3d95441e8fd0f9c5458e616304851997599c3012 Mon Sep 17 00:00:00 2001 From: Avery Gnolek Date: Tue, 23 Jul 2024 11:21:02 -0700 Subject: [PATCH 5/6] more time fixup --- job.go | 2 +- processor_test.go | 6 +++--- run.go | 52 +++++++++++++++++++++++++++++++++++++++++++++++ run_test.go | 8 +++++--- 4 files changed, 61 insertions(+), 7 deletions(-) diff --git a/job.go b/job.go index 6d559ca..204d2df 100644 --- a/job.go +++ b/job.go @@ -17,7 +17,7 @@ func (j Job[JC]) UpdateLastEvent() Job[JC] { // https://pkg.go.dev/time#hdr-Monotonic_Clocks // The monotonic clock information will not be marshalled, and thus cause tests which Marshal / Unmarshal job state // and expect the results to be the same to fail. - t := time.Now().UTC().Truncate(time.Millisecond) + t := time.Now().Truncate(time.Millisecond) // Set the LastUpdate field to the current time j.LastUpdate = &t return j diff --git a/processor_test.go b/processor_test.go index 334ac6a..6fb0ea2 100644 --- a/processor_test.go +++ b/processor_test.go @@ -292,7 +292,7 @@ func TestProcessor_TwoTerminal(t *testing.T) { oc := MyOverallContext{} ac := MyAppContext{} r := NewRun[MyOverallContext, MyJobContext]("job", oc) - for i := 0; i < 30_000; i++ { + for i := 0; i < 40; i++ { r.AddJob(MyJobContext{ Count: 0, }) @@ -310,7 +310,7 @@ func TestProcessor_TwoTerminal(t *testing.T) { return jc, STATE_DONE_TWO, nil, nil }, Terminal: false, - Concurrency: 1000, + Concurrency: 10, }, State[MyAppContext, MyOverallContext, MyJobContext]{ TriggerState: STATE_DONE_TWO, @@ -331,7 +331,7 @@ func TestProcessor_TwoTerminal(t *testing.T) { err = p.Exec(context.Background(), r) delta := time.Since(start) require.NoError(t, err) - assert.Less(t, delta, time.Second*16, "Should take less than 9 seconds when run in parallel") + assert.Less(t, delta, time.Second*10, "Should take less than 10 seconds when run in parallel") stateCount := map[string]int{} for _, j := range r.Jobs { diff --git a/run.go b/run.go index aeccec2..e16ffa9 100644 --- a/run.go +++ b/run.go @@ -3,7 +3,9 @@ package jorb import ( "fmt" "log/slog" + "reflect" "sync" + "time" ) // Run is basically the overall state of a given run (batch) in the processing framework @@ -71,3 +73,53 @@ func (r *Run[OC, JC]) AddJobWithState(jc JC, state string) { func (r *Run[OC, JC]) AddJob(jc JC) { r.AddJobWithState(jc, TRIGGER_STATE_NEW) } + +func (r *Run[OC, JC]) Equal(r2 *Run[OC, JC]) bool { + if r.Name != r2.Name { + return false + } + + if len(r.Jobs) != len(r2.Jobs) { + return false + } + + for rKey, rValue := range r.Jobs { + r2Value, ok := r2.Jobs[rKey] + if !ok { + return false + } + + if rValue.Id != r2Value.Id { + return false + } + + if !reflect.DeepEqual(rValue.C, r2Value.C) { + return false + } + + if rValue.State != r2Value.State { + return false + } + + if !reflect.DeepEqual(rValue.StateErrors, r2Value.StateErrors) { + return false + } + + if (rValue.LastUpdate == nil && r2Value.LastUpdate != nil) || (rValue.LastUpdate != nil && r2Value.LastUpdate == nil) { + return false + } + + timeDiff := rValue.LastUpdate.Sub(*r2Value.LastUpdate) + if timeDiff > 0 { + if timeDiff > time.Millisecond { + return false + } + } else { + if timeDiff > -time.Millisecond { + return false + } + } + } + + return true +} diff --git a/run_test.go b/run_test.go index cc6937e..3e3364d 100644 --- a/run_test.go +++ b/run_test.go @@ -13,8 +13,8 @@ func Test_AddJobWithState(t *testing.T) { r.AddJobWithState(MyJobContext{Count: 0}, "other_state") assert.Equal(t, 1, len(r.Jobs)) assert.Equal(t, "other_state", r.Jobs["0"].State) - currentTime := time.Now() - assert.Less(t, *r.Jobs["0"].LastUpdate, currentTime) + originalTime := r.Jobs["0"].LastUpdate + time.Sleep(1 * time.Second) r.UpdateJob(Job[MyJobContext]{ Id: "0", @@ -23,10 +23,12 @@ func Test_AddJobWithState(t *testing.T) { }, State: "other_state_2", }) + + time.Sleep(1 * time.Second) // Number of jobs has not changed assert.Equal(t, 1, len(r.Jobs)) // Job's state has been updated assert.Equal(t, "other_state_2", r.Jobs["0"].State) // Job's time has been updated - assert.Greater(t, *r.Jobs["0"].LastUpdate, currentTime) + assert.NotEqual(t, originalTime, r.Jobs["0"].LastUpdate) } From 80e2f517a4e956bd55382ce75a9eb02f3b78c6e3 Mon Sep 17 00:00:00 2001 From: Avery Gnolek Date: Tue, 23 Jul 2024 12:06:27 -0700 Subject: [PATCH 6/6] more test fixes --- run.go | 6 +++++- serializer_test.go | 8 ++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/run.go b/run.go index e16ffa9..4227e35 100644 --- a/run.go +++ b/run.go @@ -105,6 +105,10 @@ func (r *Run[OC, JC]) Equal(r2 *Run[OC, JC]) bool { return false } + if rValue.LastUpdate == nil && r2Value.LastUpdate == nil { + continue + } + if (rValue.LastUpdate == nil && r2Value.LastUpdate != nil) || (rValue.LastUpdate != nil && r2Value.LastUpdate == nil) { return false } @@ -115,7 +119,7 @@ func (r *Run[OC, JC]) Equal(r2 *Run[OC, JC]) bool { return false } } else { - if timeDiff > -time.Millisecond { + if timeDiff < -time.Millisecond { return false } } diff --git a/serializer_test.go b/serializer_test.go index 5a73d0a..51f834e 100644 --- a/serializer_test.go +++ b/serializer_test.go @@ -35,9 +35,7 @@ func TestJsonSerializer_SaveLoad(t *testing.T) { require.NoError(t, err) // Check that the run is the same - assert.Equal(t, run.Overall, actualRun.Overall) - assert.Equal(t, run.Name, actualRun.Name) - assert.Equal(t, run.Jobs, actualRun.Jobs) + assert.True(t, run.Equal(actualRun)) } func Test_SerializeWithError(t *testing.T) { @@ -78,7 +76,5 @@ func Test_SerializeWithError(t *testing.T) { j.LastUpdate = nil actualRun.Jobs[k] = j } - assert.Equal(t, r.Overall, actualRun.Overall) - assert.Equal(t, r.Name, actualRun.Name) - assert.Equal(t, r.Jobs, actualRun.Jobs) + assert.True(t, r.Equal(actualRun)) }