From fa42e12b18006f5921b1d70705fd7ead4d69044b Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 22 Mar 2016 13:49:52 -0700 Subject: [PATCH] Test task failure killing TG and fix setting the task as received on a restore --- client/alloc_runner.go | 6 ++++ client/alloc_runner_test.go | 56 +++++++++++++++++++++++++++++++- client/driver/logging/rotator.go | 7 ++-- client/task_runner.go | 7 ++-- client/task_runner_test.go | 10 +++++- 5 files changed, 78 insertions(+), 8 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index c320986573a..7f54fdc8848 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -311,11 +311,16 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv // If the task failed, we should kill all the other tasks in the task group. if state == structs.TaskStateDead && taskState.Failed() { + var destroyingTasks []string for task, tr := range r.tasks { if task != taskName { + destroyingTasks = append(destroyingTasks, task) tr.Destroy() } } + if len(destroyingTasks) > 0 { + r.logger.Printf("[DEBUG] client: task %q failed, destroying other tasks in task group: %v", taskName, destroyingTasks) + } } select { @@ -390,6 +395,7 @@ func (r *AllocRunner) Run() { tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.Alloc(), task.Copy(), r.consulService) r.tasks[task.Name] = tr + tr.MarkReceived() go tr.Run() } r.taskLock.Unlock() diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 2af177d058a..33c6ca113a9 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -281,6 +281,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { ctestutil.ExecCompatible(t) upd, ar := testAllocRunner(false) + ar.logger = prefixedTestLogger("ar1: ") // Ensure task takes some time task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -293,7 +294,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { return false, fmt.Errorf("No updates") } last := upd.Allocs[upd.Count-1] - if last.ClientStatus == structs.AllocClientStatusRunning { + if last.ClientStatus != structs.AllocClientStatusRunning { return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) } return true, nil @@ -325,11 +326,13 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { consulClient, err := NewConsulService(&consulServiceConfig{ar.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}}) ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update, &structs.Allocation{ID: ar.alloc.ID}, consulClient) + ar2.logger = prefixedTestLogger("ar2: ") err = ar2.RestoreState() if err != nil { t.Fatalf("err: %v", err) } go ar2.Run() + ar2.logger.Println("[TESTING] starting second alloc runner") testutil.WaitForResult(func() (bool, error) { // Check the state still exists @@ -348,6 +351,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { }) // Send the destroy signal and ensure the AllocRunner cleans up. + ar2.logger.Println("[TESTING] destroying second alloc runner") ar2.Destroy() testutil.WaitForResult(func() (bool, error) { @@ -380,3 +384,53 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { t.Fatalf("err: %v", err) }) } + +func TestAllocRunner_TaskFailed_KillTG(t *testing.T) { + ctestutil.ExecCompatible(t) + upd, ar := testAllocRunner(false) + + // Create two tasks in the task group + task := ar.alloc.Job.TaskGroups[0].Tasks[0] + task.Config["command"] = "/bin/sleep" + task.Config["args"] = []string{"1000"} + + task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy() + task2.Name = "task 2" + task2.Config = map[string]interface{}{"command": "invalidBinaryToFail"} + ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2) + ar.alloc.TaskResources[task2.Name] = task2.Resources + //t.Logf("%#v", ar.alloc.Job.TaskGroups[0]) + go ar.Run() + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, fmt.Errorf("No updates") + } + last := upd.Allocs[upd.Count-1] + if last.ClientStatus != structs.AllocClientStatusFailed { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusFailed) + } + + // Task One should be killed + state1 := last.TaskStates[task.Name] + if state1.State != structs.TaskStateDead { + return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead) + } + if lastE := state1.Events[len(state1.Events)-1]; lastE.Type != structs.TaskKilled { + return false, fmt.Errorf("got last event %v; want %v", lastE.Type, structs.TaskKilled) + } + + // Task Two should be failed + state2 := last.TaskStates[task2.Name] + if state2.State != structs.TaskStateDead { + return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead) + } + if !state2.Failed() { + return false, fmt.Errorf("task2 should have failed") + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} diff --git a/client/driver/logging/rotator.go b/client/driver/logging/rotator.go index af8beaa0c69..93a4be4f0e6 100644 --- a/client/driver/logging/rotator.go +++ b/client/driver/logging/rotator.go @@ -36,8 +36,9 @@ type FileRotator struct { logger *log.Logger purgeCh chan struct{} doneCh chan struct{} - closed bool - closedLock sync.Mutex + + closed bool + closedLock sync.Mutex } // NewFileRotator returns a new file rotator @@ -207,8 +208,6 @@ func (f *FileRotator) Close() { } // Stop the purge go routine - f.closedLock.Lock() - defer f.closedLock.Unlock() if !f.closed { f.doneCh <- struct{}{} close(f.purgeCh) diff --git a/client/task_runner.go b/client/task_runner.go index fa051119537..4ddb6614b6c 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -100,11 +100,14 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, waitCh: make(chan struct{}), } - // Set the state to pending. - tc.updater(task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived)) return tc } +// MarkReceived marks the task as received. +func (r *TaskRunner) MarkReceived() { + r.updater(r.task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived)) +} + // WaitCh returns a channel to wait for termination func (r *TaskRunner) WaitCh() <-chan struct{} { return r.waitCh diff --git a/client/task_runner_test.go b/client/task_runner_test.go index c73a57140eb..8bfd47c15a8 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -20,7 +20,11 @@ import ( ) func testLogger() *log.Logger { - return log.New(os.Stderr, "", log.LstdFlags) + return prefixedTestLogger("") +} + +func prefixedTestLogger(prefix string) *log.Logger { + return log.New(os.Stderr, prefix, log.LstdFlags) } type MockTaskStateUpdater struct { @@ -65,6 +69,7 @@ func testTaskRunnerFromAlloc(restarts bool, alloc *structs.Allocation) (*MockTas func TestTaskRunner_SimpleRun(t *testing.T) { ctestutil.ExecCompatible(t) upd, tr := testTaskRunner(false) + tr.MarkReceived() go tr.Run() defer tr.Destroy() defer tr.ctx.AllocDir.Destroy() @@ -99,6 +104,7 @@ func TestTaskRunner_SimpleRun(t *testing.T) { func TestTaskRunner_Destroy(t *testing.T) { ctestutil.ExecCompatible(t) upd, tr := testTaskRunner(true) + tr.MarkReceived() defer tr.ctx.AllocDir.Destroy() // Change command to ensure we run for a bit @@ -255,6 +261,7 @@ func TestTaskRunner_Download_List(t *testing.T) { task.Artifacts = []*structs.TaskArtifact{&artifact1, &artifact2} upd, tr := testTaskRunnerFromAlloc(false, alloc) + tr.MarkReceived() go tr.Run() defer tr.Destroy() defer tr.ctx.AllocDir.Destroy() @@ -319,6 +326,7 @@ func TestTaskRunner_Download_Retries(t *testing.T) { } upd, tr := testTaskRunnerFromAlloc(true, alloc) + tr.MarkReceived() go tr.Run() defer tr.Destroy() defer tr.ctx.AllocDir.Destroy()