Skip to content

Commit

Permalink
fix(task): recover when call to execute fails
Browse files Browse the repository at this point in the history
This was a missed case from #11817.

This case currently occurs when creating a task through the UI, using a
session rather than a full-fledged authorization. It doesn't fix that
case yet, but at least it will log an informative message.
  • Loading branch information
mark-rushakoff committed Feb 15, 2019
1 parent 954cea9 commit 12292a4
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 10 deletions.
13 changes: 9 additions & 4 deletions task/backend/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,9 +634,14 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
defer sp.Finish()

rp, err := r.executor.Execute(spCtx, qr)

if err != nil {
// TODO(mr): retry? and log error.
runLogger.Info("Failed to begin run execution", zap.Error(err))
if err := r.desiredState.FinishRun(r.ctx, qr.TaskID, qr.RunID); err != nil {
// TODO(mr): Need to figure out how to reconcile this error, on the next run, if it happens.
runLogger.Error("Beginning run execution failed, and desired state update failed", zap.Error(err))
}

// TODO(mr): retry?
atomic.StoreUint32(r.state, runnerIdle)
r.updateRunState(qr, RunFail, runLogger)
return
Expand Down Expand Up @@ -675,7 +680,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
runLogger.Info("Failed to wait for execution result", zap.Error(err))
if err := r.desiredState.FinishRun(r.ctx, qr.TaskID, qr.RunID); err != nil {
// TODO(mr): Need to figure out how to reconcile this error, on the next run, if it happens.
runLogger.Error("Waiting for execution result failed, and desired update failed", zap.Error(err))
runLogger.Error("Waiting for execution result failed, and desired state update failed", zap.Error(err))
}

// TODO(mr): retry?
Expand All @@ -687,7 +692,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
runLogger.Info("Run failed to execute", zap.Error(err))
if err := r.desiredState.FinishRun(r.ctx, qr.TaskID, qr.RunID); err != nil {
// TODO(mr): Need to figure out how to reconcile this error, on the next run, if it happens.
runLogger.Error("Run failed to execute, and desired update failed", zap.Error(err))
runLogger.Error("Run failed to execute, and desired state update failed", zap.Error(err))
}
// TODO(mr): retry?
r.updateRunState(qr, RunFail, runLogger)
Expand Down
22 changes: 21 additions & 1 deletion task/backend/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,28 @@ func TestScheduler_RunFailureCleanup(t *testing.T) {
t.Fatal(err)
}

// One more tick just to ensure that we can keep going after this type of failure too.
// Fail to execute next run.
if n := d.TotalRunsCreatedForTask(task.ID); n != 2 {
t.Fatalf("should have created 2 runs so far, got %d", n)
}
e.FailNextCallToExecute(errors.New("forced failure on Execute"))
s.Tick(8)
// The execution happens in the background, so check a few times for 3 runs created.
const attempts = 50
for i := 0; i < attempts; i++ {
time.Sleep(2 * time.Millisecond)
n := d.TotalRunsCreatedForTask(task.ID)
if n == 3 {
break
}
if i == attempts-1 {
// Fail if we haven't seen the right count by the last attempt.
t.Fatalf("expected 3 runs created, got %d", n)
}
}

// One more tick just to ensure that we can keep going after this type of failure too.
s.Tick(9)
_, err = e.PollForNumberRunning(task.ID, 1)
if err != nil {
t.Fatal(err)
Expand Down
38 changes: 33 additions & 5 deletions task/mock/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,19 @@ type DesiredState struct {

// Map of stringified task ID to task meta.
meta map[string]backend.StoreTaskMeta

// Map of task ID to total number of runs created for that task.
totalRunsCreated map[platform.ID]int
}

var _ backend.DesiredState = (*DesiredState)(nil)

func NewDesiredState() *DesiredState {
return &DesiredState{
runIDs: make(map[string]uint64),
created: make(map[string]backend.QueuedRun),
meta: make(map[string]backend.StoreTaskMeta),
runIDs: make(map[string]uint64),
created: make(map[string]backend.QueuedRun),
meta: make(map[string]backend.StoreTaskMeta),
totalRunsCreated: make(map[platform.ID]int),
}
}

Expand Down Expand Up @@ -221,6 +225,7 @@ func (d *DesiredState) CreateNextRun(_ context.Context, taskID platform.ID, now
d.meta[tid] = meta
rc.Created.TaskID = taskID
d.created[tid+rc.Created.RunID.String()] = rc.Created
d.totalRunsCreated[taskID]++
return rc, nil
}

Expand Down Expand Up @@ -257,7 +262,15 @@ func (d *DesiredState) CreatedFor(taskID platform.ID) []backend.QueuedRun {
return qrs
}

// PollForNumberCreated blocks for a small amount of time waiting for exactly the given count of created runs for the given task ID.
// TotalRunsCreatedForTask returns the number of runs created for taskID.
func (d *DesiredState) TotalRunsCreatedForTask(taskID platform.ID) int {
d.mu.Lock()
defer d.mu.Unlock()

return d.totalRunsCreated[taskID]
}

// PollForNumberCreated blocks for a small amount of time waiting for exactly the given count of created and unfinished runs for the given task ID.
// If the expected number isn't found in time, it returns an error.
//
// Because the scheduler and executor do a lot of state changes asynchronously, this is useful in test.
Expand All @@ -273,7 +286,7 @@ func (d *DesiredState) PollForNumberCreated(taskID platform.ID, count int) ([]sc
return created, nil
}
}
return created, fmt.Errorf("did not see count of %d created task(s) for ID %s in time, instead saw %d", count, taskID.String(), actualCount) // we return created anyways, to make it easier to debug
return created, fmt.Errorf("did not see count of %d created run(s) for task with ID %s in time, instead saw %d", count, taskID.String(), actualCount) // we return created anyways, to make it easier to debug
}

type Executor struct {
Expand All @@ -286,6 +299,9 @@ type Executor struct {
// Map of stringified, concatenated task and run ID, to results of runs that have executed and completed.
finished map[string]backend.RunResult

// Forced error for next call to Execute.
nextExecuteErr error

wg sync.WaitGroup
}

Expand All @@ -303,6 +319,11 @@ func (e *Executor) Execute(ctx context.Context, run backend.QueuedRun) (backend.
rp.WithHanging(ctx, e.hangingFor)
id := run.TaskID.String() + run.RunID.String()
e.mu.Lock()
if err := e.nextExecuteErr; err != nil {
e.nextExecuteErr = nil
e.mu.Unlock()
return nil, err
}
e.running[id] = rp
e.mu.Unlock()
e.wg.Add(1)
Expand All @@ -321,6 +342,13 @@ func (e *Executor) Wait() {
e.wg.Wait()
}

// FailNextCallToExecute causes the next call to e.Execute to unconditionally return err.
func (e *Executor) FailNextCallToExecute(err error) {
e.mu.Lock()
e.nextExecuteErr = err
e.mu.Unlock()
}

func (e *Executor) WithHanging(dt time.Duration) {
e.hangingFor = dt
}
Expand Down

0 comments on commit 12292a4

Please sign in to comment.