Skip to content

Commit

Permalink
task runner to avoid running task if terminal
Browse files Browse the repository at this point in the history
This change fixes a bug where nomad would avoid running alloc tasks if
the alloc is client terminal but the server copy on the client isn't
marked as running.

Here, we fix the case by having task runner uses the
allocRunner.shouldRun() instead of only checking the server updated
alloc.

Here, we preserve much of the invariants such that `tr.Run()` is always
run, and don't change the overall alloc runner and task runner
lifecycles.

Fixes #5883
  • Loading branch information
Mahmood Ali committed Jun 26, 2019
1 parent 72b9b87 commit 9ea8e1d
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 2 deletions.
1 change: 1 addition & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
config := &taskrunner.Config{
Alloc: ar.alloc,
ClientConfig: ar.clientConfig,
ShouldRun: ar.shouldRun,
Task: task,
TaskDir: ar.allocDir.NewTaskDir(task.Name),
Logger: ar.logger,
Expand Down
84 changes: 84 additions & 0 deletions client/allocrunner/alloc_runner_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,87 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) {
require.Equal(t, events[2].Type, structs.TaskStarted)
require.Equal(t, events[3].Type, structs.TaskTerminated)
}

// TestAllocRunner_Restore_Completed asserts that restoring a completed
// batch alloc doesn't run it again
func TestAllocRunner_Restore_CompletedBatch(t *testing.T) {
t.Parallel()

// 1. Run task and wait for it to complete
// 2. Start new alloc runner
// 3. Assert task didn't run again

alloc := mock.Alloc()
alloc.Job.Type = structs.JobTypeBatch
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "2s",
}

conf, cleanup := testAllocRunnerConfig(t, alloc.Copy())
defer cleanup()

// Maintain state for subsequent run
conf.StateDB = state.NewMemDB(conf.Logger)

// Start and wait for task to be running
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
go ar.Run()
defer destroy(ar)

testutil.WaitForResult(func() (bool, error) {
s := ar.AllocState()
if s.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("expected complete, got %s", s.ClientStatus)
}
return true, nil
}, func(err error) {
require.NoError(t, err)
})

// once job finishes, it shouldn't run again
require.False(t, ar.shouldRun())
initialRunEvents := ar.AllocState().TaskStates[task.Name].Events
require.Len(t, initialRunEvents, 4)

ls, ts, err := conf.StateDB.GetTaskRunnerState(alloc.ID, task.Name)
require.NoError(t, err)
require.NotNil(t, ls)
require.Equal(t, structs.TaskStateDead, ts.State)

// Start a new alloc runner and assert it gets stopped
conf2, cleanup2 := testAllocRunnerConfig(t, alloc)
defer cleanup2()

// Use original statedb to maintain hook state
conf2.StateDB = conf.StateDB

// Restore, start, and wait for task to be killed
ar2, err := NewAllocRunner(conf2)
require.NoError(t, err)

require.NoError(t, ar2.Restore())

go ar2.Run()
defer destroy(ar2)

// AR waitCh must be closed even when task doesn't run again
select {
case <-ar2.WaitCh():
case <-time.After(10 * time.Second):
require.Fail(t, "alloc.waitCh wasn't closed")
}

// TR waitCh must be closed too!
select {
case <-ar2.tasks[task.Name].WaitCh():
case <-time.After(10 * time.Second):
require.Fail(t, "tr.waitCh wasn't closed")
}

// Assert that events are unmodified, which they would if task re-run
events := ar2.AllocState().TaskStates[task.Name].Events
require.Equal(t, initialRunEvents, events)
}
15 changes: 13 additions & 2 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type TaskRunner struct {
alloc *structs.Allocation
allocLock sync.Mutex

// shouldRun is a function that true if the alloc isn't in a terminal status
shouldRun func() bool

clientConfig *config.Config

// stateUpdater is used to emit updated task state
Expand Down Expand Up @@ -212,6 +215,9 @@ type Config struct {
TaskDir *allocdir.TaskDir
Logger log.Logger

// ShouldRun is a function returns true if alloc is not in a terminal status
ShouldRun func() bool

// Vault is the client to use to derive and renew Vault tokens
Vault vaultclient.VaultClient

Expand Down Expand Up @@ -261,6 +267,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
tr := &TaskRunner{
alloc: config.Alloc,
allocID: config.Alloc.ID,
shouldRun: config.ShouldRun,
clientConfig: config.ClientConfig,
task: config.Task,
taskDir: config.TaskDir,
Expand Down Expand Up @@ -394,6 +401,10 @@ func (tr *TaskRunner) Run() {
defer close(tr.waitCh)
var result *drivers.ExitResult

if !tr.shouldRun() {
return
}

// Updates are handled asynchronously with the other hooks but each
// triggered update - whether due to alloc updates or a new vault token
// - should be handled serially.
Expand All @@ -413,7 +424,7 @@ func (tr *TaskRunner) Run() {
}

MAIN:
for !tr.Alloc().TerminalStatus() {
for tr.shouldRun() {
select {
case <-tr.killCtx.Done():
break MAIN
Expand Down Expand Up @@ -899,7 +910,7 @@ func (tr *TaskRunner) Restore() error {
}

alloc := tr.Alloc()
if alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem {
if !tr.shouldRun() || alloc.Job.Type == structs.JobTypeSystem {
return nil
}

Expand Down

0 comments on commit 9ea8e1d

Please sign in to comment.