Skip to content

Commit

Permalink
Merge pull request #9 from thesilentg/cancel
Browse files Browse the repository at this point in the history
Fix panic when context cancelled
  • Loading branch information
gaffo authored Aug 23, 2024
2 parents 82cc1ce + ed516e0 commit 02ba45f
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
5 changes: 4 additions & 1 deletion processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (p *Processor[AC, OC, JC]) Exec(ctx context.Context, r *Run[OC, JC]) error
})

p.wg.Wait()
return nil
return ctx.Err()
}

func (p *Processor[AC, OC, JC]) process(ctx context.Context, r *Run[OC, JC], wg *sync.WaitGroup) {
Expand Down Expand Up @@ -407,6 +407,9 @@ func (s *StateExec[AC, OC, JC]) Run() {

rtn.Job = j
slog.Info("Returning job", "job", j.Id, "newState", j.State)
if s.ctx.Err() != nil {
return
}
s.returnChan <- rtn
slog.Info("Returned job", "job", j.Id, "newState", j.State)
}
Expand Down
65 changes: 65 additions & 0 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,71 @@ func TestFairness(t *testing.T) {
}
}

func TestClose(t *testing.T) {
oc := MyOverallContext{}
ac := MyAppContext{}
r := NewRun[MyOverallContext, MyJobContext]("job", oc)
r.AddJob(MyJobContext{
Count: 0,
})

for _, testCase := range []struct {
testName string
states []State[MyAppContext, MyOverallContext, MyJobContext]
shouldError bool
}{
{
testName: "keeps running",
states: []State[MyAppContext, MyOverallContext, MyJobContext]{
{
TriggerState: TRIGGER_STATE_NEW,
Exec: func(ctx context.Context, ac MyAppContext, oc MyOverallContext, jc MyJobContext) (MyJobContext, string, []KickRequest[MyJobContext], error) {
time.Sleep(50 * time.Millisecond)
return jc, TRIGGER_STATE_NEW, nil, nil
},
Concurrency: 1,
},
},
shouldError: true,
},
{
testName: "completes",
states: []State[MyAppContext, MyOverallContext, MyJobContext]{
{
TriggerState: TRIGGER_STATE_NEW,
Exec: func(ctx context.Context, ac MyAppContext, oc MyOverallContext, jc MyJobContext) (MyJobContext, string, []KickRequest[MyJobContext], error) {
time.Sleep(50 * time.Millisecond)
return jc, "done", nil, nil
},
Concurrency: 1,
},
{
TriggerState: "done",
Terminal: true,
},
},
shouldError: false,
},
} {
t.Run(testCase.testName, func(t *testing.T) {
p, err := NewProcessor[MyAppContext, MyOverallContext, MyJobContext](ac, testCase.states, nil, nil)

ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()

err = p.Exec(ctx, r)
if testCase.shouldError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

func TestStatusCountDedup(t *testing.T) {
oc := MyOverallContext{}
ac := MyAppContext{}
Expand Down

0 comments on commit 02ba45f

Please sign in to comment.