Skip to content

Commit

Permalink
Test task failure killing TG and fix setting the task as received on …
Browse files Browse the repository at this point in the history
…a restore
  • Loading branch information
dadgar committed Mar 25, 2016
1 parent 73419c2 commit ff6028b
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 8 deletions.
6 changes: 6 additions & 0 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,16 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv

// If the task failed, we should kill all the other tasks in the task group.
if state == structs.TaskStateDead && taskState.Failed() {
var destroyingTasks []string
for task, tr := range r.tasks {
if task != taskName {
destroyingTasks = append(destroyingTasks, task)
tr.Destroy()
}
}
if len(destroyingTasks) > 0 {
r.logger.Printf("[DEBUG] client: task %q failed, destroying other tasks in task group: %v", taskName, destroyingTasks)
}
}

select {
Expand Down Expand Up @@ -388,6 +393,7 @@ func (r *AllocRunner) Run() {
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.Alloc(),
task.Copy())
r.tasks[task.Name] = tr
tr.MarkReceived()
go tr.Run()
}
r.taskLock.Unlock()
Expand Down
56 changes: 55 additions & 1 deletion client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
ar.logger = prefixedTestLogger("ar1: ")

// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
Expand All @@ -291,7 +292,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus == structs.AllocClientStatusRunning {
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
Expand Down Expand Up @@ -322,11 +323,13 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
// Create a new alloc runner
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID})
ar2.logger = prefixedTestLogger("ar2: ")
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
}
go ar2.Run()
ar2.logger.Println("[TESTING] starting second alloc runner")

testutil.WaitForResult(func() (bool, error) {
// Check the state still exists
Expand All @@ -345,6 +348,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
})

// Send the destroy signal and ensure the AllocRunner cleans up.
ar2.logger.Println("[TESTING] destroying second alloc runner")
ar2.Destroy()

testutil.WaitForResult(func() (bool, error) {
Expand Down Expand Up @@ -377,3 +381,53 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
t.Fatalf("err: %v", err)
})
}

func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)

// Create two tasks in the task group
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"1000"}

task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "task 2"
task2.Config = map[string]interface{}{"command": "invalidBinaryToFail"}
ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2)
ar.alloc.TaskResources[task2.Name] = task2.Resources
//t.Logf("%#v", ar.alloc.Job.TaskGroups[0])
go ar.Run()

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusFailed {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusFailed)
}

// Task One should be killed
state1 := last.TaskStates[task.Name]
if state1.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead)
}
if lastE := state1.Events[len(state1.Events)-1]; lastE.Type != structs.TaskKilled {
return false, fmt.Errorf("got last event %v; want %v", lastE.Type, structs.TaskKilled)
}

// Task Two should be failed
state2 := last.TaskStates[task2.Name]
if state2.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead)
}
if !state2.Failed() {
return false, fmt.Errorf("task2 should have failed")
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
7 changes: 3 additions & 4 deletions client/driver/logging/rotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ type FileRotator struct {
logger *log.Logger
purgeCh chan struct{}
doneCh chan struct{}
closed bool
closedLock sync.Mutex

closed bool
closedLock sync.Mutex
}

// NewFileRotator returns a new file rotator
Expand Down Expand Up @@ -207,8 +208,6 @@ func (f *FileRotator) Close() {
}

// Stop the purge go routine
f.closedLock.Lock()
defer f.closedLock.Unlock()
if !f.closed {
f.doneCh <- struct{}{}
close(f.purgeCh)
Expand Down
7 changes: 5 additions & 2 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,14 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
waitCh: make(chan struct{}),
}

// Set the state to pending.
tc.updater(task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived))
return tc
}

// MarkReceived marks the task as received.
func (r *TaskRunner) MarkReceived() {
r.updater(r.task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived))
}

// WaitCh returns a channel to wait for termination
func (r *TaskRunner) WaitCh() <-chan struct{} {
return r.waitCh
Expand Down
10 changes: 9 additions & 1 deletion client/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ import (
)

func testLogger() *log.Logger {
return log.New(os.Stderr, "", log.LstdFlags)
return prefixedTestLogger("")
}

func prefixedTestLogger(prefix string) *log.Logger {
return log.New(os.Stderr, prefix, log.LstdFlags)
}

type MockTaskStateUpdater struct {
Expand Down Expand Up @@ -64,6 +68,7 @@ func testTaskRunnerFromAlloc(restarts bool, alloc *structs.Allocation) (*MockTas
func TestTaskRunner_SimpleRun(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, tr := testTaskRunner(false)
tr.MarkReceived()
go tr.Run()
defer tr.Destroy()
defer tr.ctx.AllocDir.Destroy()
Expand Down Expand Up @@ -98,6 +103,7 @@ func TestTaskRunner_SimpleRun(t *testing.T) {
func TestTaskRunner_Destroy(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, tr := testTaskRunner(true)
tr.MarkReceived()
defer tr.ctx.AllocDir.Destroy()

// Change command to ensure we run for a bit
Expand Down Expand Up @@ -253,6 +259,7 @@ func TestTaskRunner_Download_List(t *testing.T) {
task.Artifacts = []*structs.TaskArtifact{&artifact1, &artifact2}

upd, tr := testTaskRunnerFromAlloc(false, alloc)
tr.MarkReceived()
go tr.Run()
defer tr.Destroy()
defer tr.ctx.AllocDir.Destroy()
Expand Down Expand Up @@ -317,6 +324,7 @@ func TestTaskRunner_Download_Retries(t *testing.T) {
}

upd, tr := testTaskRunnerFromAlloc(true, alloc)
tr.MarkReceived()
go tr.Run()
defer tr.Destroy()
defer tr.ctx.AllocDir.Destroy()
Expand Down

0 comments on commit ff6028b

Please sign in to comment.