From ff6028bd56560918a4358ad9fe4a8df32766d349 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 22 Mar 2016 13:49:52 -0700 Subject: [PATCH] Test task failure killing TG and fix setting the task as received on a restore --- client/alloc_runner.go | 6 ++++ client/alloc_runner_test.go | 56 +++++++++++++++++++++++++++++++- client/driver/logging/rotator.go | 7 ++-- client/task_runner.go | 7 ++-- client/task_runner_test.go | 10 +++++- 5 files changed, 78 insertions(+), 8 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 3794277441c..60e7ae66eb0 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -309,11 +309,16 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv // If the task failed, we should kill all the other tasks in the task group. if state == structs.TaskStateDead && taskState.Failed() { + var destroyingTasks []string for task, tr := range r.tasks { if task != taskName { + destroyingTasks = append(destroyingTasks, task) tr.Destroy() } } + if len(destroyingTasks) > 0 { + r.logger.Printf("[DEBUG] client: task %q failed, destroying other tasks in task group: %v", taskName, destroyingTasks) + } } select { @@ -388,6 +393,7 @@ func (r *AllocRunner) Run() { tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.Alloc(), task.Copy()) r.tasks[task.Name] = tr + tr.MarkReceived() go tr.Run() } r.taskLock.Unlock() diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index b0df609dd13..97b3dc34844 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -279,6 +279,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { ctestutil.ExecCompatible(t) upd, ar := testAllocRunner(false) + ar.logger = prefixedTestLogger("ar1: ") // Ensure task takes some time task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -291,7 +292,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { return false, fmt.Errorf("No updates") } last := upd.Allocs[upd.Count-1] - if last.ClientStatus == structs.AllocClientStatusRunning { + if last.ClientStatus != structs.AllocClientStatusRunning { return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) } return true, nil @@ -322,11 +323,13 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { // Create a new alloc runner ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update, &structs.Allocation{ID: ar.alloc.ID}) + ar2.logger = prefixedTestLogger("ar2: ") err = ar2.RestoreState() if err != nil { t.Fatalf("err: %v", err) } go ar2.Run() + ar2.logger.Println("[TESTING] starting second alloc runner") testutil.WaitForResult(func() (bool, error) { // Check the state still exists @@ -345,6 +348,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { }) // Send the destroy signal and ensure the AllocRunner cleans up. + ar2.logger.Println("[TESTING] destroying second alloc runner") ar2.Destroy() testutil.WaitForResult(func() (bool, error) { @@ -377,3 +381,53 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { t.Fatalf("err: %v", err) }) } + +func TestAllocRunner_TaskFailed_KillTG(t *testing.T) { + ctestutil.ExecCompatible(t) + upd, ar := testAllocRunner(false) + + // Create two tasks in the task group + task := ar.alloc.Job.TaskGroups[0].Tasks[0] + task.Config["command"] = "/bin/sleep" + task.Config["args"] = []string{"1000"} + + task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy() + task2.Name = "task 2" + task2.Config = map[string]interface{}{"command": "invalidBinaryToFail"} + ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2) + ar.alloc.TaskResources[task2.Name] = task2.Resources + //t.Logf("%#v", ar.alloc.Job.TaskGroups[0]) + go ar.Run() + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, fmt.Errorf("No updates") + } + last := upd.Allocs[upd.Count-1] + if last.ClientStatus != structs.AllocClientStatusFailed { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusFailed) + } + + // Task One should be killed + state1 := last.TaskStates[task.Name] + if state1.State != structs.TaskStateDead { + return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead) + } + if lastE := state1.Events[len(state1.Events)-1]; lastE.Type != structs.TaskKilled { + return false, fmt.Errorf("got last event %v; want %v", lastE.Type, structs.TaskKilled) + } + + // Task Two should be failed + state2 := last.TaskStates[task2.Name] + if state2.State != structs.TaskStateDead { + return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead) + } + if !state2.Failed() { + return false, fmt.Errorf("task2 should have failed") + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} diff --git a/client/driver/logging/rotator.go b/client/driver/logging/rotator.go index af8beaa0c69..93a4be4f0e6 100644 --- a/client/driver/logging/rotator.go +++ b/client/driver/logging/rotator.go @@ -36,8 +36,9 @@ type FileRotator struct { logger *log.Logger purgeCh chan struct{} doneCh chan struct{} - closed bool - closedLock sync.Mutex + + closed bool + closedLock sync.Mutex } // NewFileRotator returns a new file rotator @@ -207,8 +208,6 @@ func (f *FileRotator) Close() { } // Stop the purge go routine - f.closedLock.Lock() - defer f.closedLock.Unlock() if !f.closed { f.doneCh <- struct{}{} close(f.purgeCh) diff --git a/client/task_runner.go b/client/task_runner.go index c435922532c..5955376e608 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -96,11 +96,14 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, waitCh: make(chan struct{}), } - // Set the state to pending. - tc.updater(task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived)) return tc } +// MarkReceived marks the task as received. +func (r *TaskRunner) MarkReceived() { + r.updater(r.task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived)) +} + // WaitCh returns a channel to wait for termination func (r *TaskRunner) WaitCh() <-chan struct{} { return r.waitCh diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 908834fcc9c..af695d93591 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -20,7 +20,11 @@ import ( ) func testLogger() *log.Logger { - return log.New(os.Stderr, "", log.LstdFlags) + return prefixedTestLogger("") +} + +func prefixedTestLogger(prefix string) *log.Logger { + return log.New(os.Stderr, prefix, log.LstdFlags) } type MockTaskStateUpdater struct { @@ -64,6 +68,7 @@ func testTaskRunnerFromAlloc(restarts bool, alloc *structs.Allocation) (*MockTas func TestTaskRunner_SimpleRun(t *testing.T) { ctestutil.ExecCompatible(t) upd, tr := testTaskRunner(false) + tr.MarkReceived() go tr.Run() defer tr.Destroy() defer tr.ctx.AllocDir.Destroy() @@ -98,6 +103,7 @@ func TestTaskRunner_SimpleRun(t *testing.T) { func TestTaskRunner_Destroy(t *testing.T) { ctestutil.ExecCompatible(t) upd, tr := testTaskRunner(true) + tr.MarkReceived() defer tr.ctx.AllocDir.Destroy() // Change command to ensure we run for a bit @@ -253,6 +259,7 @@ func TestTaskRunner_Download_List(t *testing.T) { task.Artifacts = []*structs.TaskArtifact{&artifact1, &artifact2} upd, tr := testTaskRunnerFromAlloc(false, alloc) + tr.MarkReceived() go tr.Run() defer tr.Destroy() defer tr.ctx.AllocDir.Destroy() @@ -317,6 +324,7 @@ func TestTaskRunner_Download_Retries(t *testing.T) { } upd, tr := testTaskRunnerFromAlloc(true, alloc) + tr.MarkReceived() go tr.Run() defer tr.Destroy() defer tr.ctx.AllocDir.Destroy()