diff --git a/client/alloc_runner.go b/client/alloc_runner.go index dc76a0cd686..b45a553a617 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -932,6 +932,7 @@ OUTER: r.logger.Printf("[WARN] client: failed to sync alloc %q status upon receiving alloc update: %v", r.allocID, err) } + case <-r.ctx.Done(): taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled) break OUTER @@ -967,10 +968,17 @@ func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) { tr := r.tasks[leader] r.taskLock.RUnlock() - r.logger.Printf("[DEBUG] client: alloc %q destroying leader task %q of task group %q first", - r.allocID, leader, r.alloc.TaskGroup) - tr.Destroy(destroyEvent) - <-tr.WaitCh() + // Dead tasks don't have a task runner created so guard against + // the leader being dead when this AR was saved. + if tr == nil { + r.logger.Printf("[DEBUG] client: alloc %q leader task %q of task group %q already stopped", + r.allocID, leader, r.alloc.TaskGroup) + } else { + r.logger.Printf("[DEBUG] client: alloc %q destroying leader task %q of task group %q first", + r.allocID, leader, r.alloc.TaskGroup) + tr.Destroy(destroyEvent) + <-tr.WaitCh() + } } // Then destroy non-leader tasks concurrently diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index f53b01ef7ad..47348e29bd0 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -1365,6 +1365,97 @@ func TestAllocRunner_TaskLeader_StopTG(t *testing.T) { }) } +// TestAllocRunner_TaskLeader_StopRestoredTG asserts that when stopping a +// restored task group with a leader that failed before restoring the leader is +// not stopped as it does not exist. +// See https://github.com/hashicorp/nomad/issues/3420#issuecomment-341666932 +func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) { + t.Parallel() + _, ar := testAllocRunner(false) + defer ar.Destroy() + + // Create a leader and follower task in the task group + task := ar.alloc.Job.TaskGroups[0].Tasks[0] + task.Name = "follower1" + task.Driver = "mock_driver" + task.KillTimeout = 10 * time.Second + task.Config = map[string]interface{}{ + "run_for": "10s", + } + + task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy() + task2.Name = "leader" + task2.Driver = "mock_driver" + task2.Leader = true + task2.KillTimeout = 10 * time.Millisecond + task2.Config = map[string]interface{}{ + "run_for": "0s", + } + + ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2) + ar.alloc.TaskResources[task2.Name] = task2.Resources + + // Mimic Nomad exiting before the leader stopping is able to stop other tasks. + ar.tasks = map[string]*TaskRunner{ + "leader": NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState, + ar.allocDir.NewTaskDir(task2.Name), ar.Alloc(), task2.Copy(), + ar.vaultClient, ar.consulClient), + "follower1": NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState, + ar.allocDir.NewTaskDir(task.Name), ar.Alloc(), task.Copy(), + ar.vaultClient, ar.consulClient), + } + ar.taskStates = map[string]*structs.TaskState{ + "leader": {State: structs.TaskStateDead}, + "follower1": {State: structs.TaskStateRunning}, + } + if err := ar.SaveState(); err != nil { + t.Fatalf("error saving state: %v", err) + } + + // Create a new AllocRunner to test RestoreState and Run + upd2 := &MockAllocStateUpdater{} + ar2 := NewAllocRunner(ar.logger, ar.config, ar.stateDB, upd2.Update, ar.alloc, + ar.vaultClient, ar.consulClient, ar.prevAlloc) + defer ar2.Destroy() + + if err := ar2.RestoreState(); err != nil { + t.Fatalf("error restoring state: %v", err) + } + go ar2.Run() + + // Wait for tasks to be stopped because leader is dead + testutil.WaitForResult(func() (bool, error) { + _, last := upd2.Last() + if last == nil { + return false, fmt.Errorf("no updates yet") + } + if actual := last.TaskStates["leader"].State; actual != structs.TaskStateDead { + return false, fmt.Errorf("Task leader is not dead yet (it's %q)", actual) + } + if actual := last.TaskStates["follower1"].State; actual != structs.TaskStateDead { + return false, fmt.Errorf("Task follower1 is not dead yet (it's %q)", actual) + } + return true, nil + }, func(err error) { + count, last := upd2.Last() + t.Logf("Updates: %d", count) + for name, state := range last.TaskStates { + t.Logf("%s: %s", name, state.State) + } + t.Fatalf("err: %v", err) + }) + + // Make sure it GCs properly + ar2.Destroy() + + select { + case <-ar2.WaitCh(): + // exited as expected + case <-time.After(10 * time.Second): + t.Fatalf("timed out waiting for AR to GC") + } +} + // TestAllocRunner_MoveAllocDir asserts that a file written to an alloc's // local/ dir will be moved to a replacement alloc's local/ dir if sticky // volumes is on. diff --git a/client/task_runner.go b/client/task_runner.go index f7f2c043b07..f79bb87e774 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -237,7 +237,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, // Build the restart tracker. tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) if tg == nil { - logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup) + logger.Printf("[ERR] client: alloc %q for missing task group %q", alloc.ID, alloc.TaskGroup) return nil } restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type)