Skip to content

Commit

Permalink
allocrunner: terminate sidecars in the end
Browse files Browse the repository at this point in the history
This fixes a bug where a batch allocation fails to complete if it has
sidecars.

If the only remaining running tasks in an allocations are sidecars - we
must kill them and mark the allocation as complete.
  • Loading branch information
Mahmood Ali committed Jun 29, 2020
1 parent e608883 commit 73f19eb
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 9 deletions.
20 changes: 12 additions & 8 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ func (ar *allocRunner) TaskStateUpdated() {
func (ar *allocRunner) handleTaskStateUpdates() {
defer close(ar.taskStateUpdateHandlerCh)

hasSidecars := hasSidecarTasks(ar.tasks)

for done := false; !done; {
select {
case <-ar.taskStateUpdatedCh:
Expand All @@ -462,10 +464,6 @@ func (ar *allocRunner) handleTaskStateUpdates() {
// name whose fault it is.
killTask := ""

// True if task runners should be killed because a leader
// failed (informational).
leaderFailed := false

// Task state has been updated; gather the state of the other tasks
trNum := len(ar.tasks)
liveRunners := make([]*taskrunner.TaskRunner, 0, trNum)
Expand All @@ -492,18 +490,24 @@ func (ar *allocRunner) handleTaskStateUpdates() {
}
} else if tr.IsLeader() {
killEvent = structs.NewTaskEvent(structs.TaskLeaderDead)
leaderFailed = true
killTask = name
}
}

// if all live runners are sidecars - kill alloc
if killEvent == nil && hasSidecars && !hasNonSidecarTasks(liveRunners) {
killEvent = structs.NewTaskEvent(structs.TaskMainDead)
}

// If there's a kill event set and live runners, kill them
if killEvent != nil && len(liveRunners) > 0 {

// Log kill reason
if leaderFailed {
switch killEvent.Type {
case structs.TaskLeaderDead:
ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask)
} else {
case structs.TaskMainDead:
ar.logger.Debug("main tasks dead, destroying all sidecar tasks")
default:
ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask)
}

Expand Down
123 changes: 122 additions & 1 deletion client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
found := false
killingMsg := ""
for _, e := range state1.Events {
if e.Type != structs.TaskLeaderDead {
if e.Type == structs.TaskLeaderDead {
found = true
}
if e.Type == structs.TaskKilling {
Expand Down Expand Up @@ -142,6 +142,127 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
})
}

// TestAllocRunner_TaskMain_KillTG asserts that when main tasks die the
// entire task group is killed.
func TestAllocRunner_TaskMain_KillTG(t *testing.T) {
t.Parallel()

alloc := mock.BatchAlloc()
tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0

// Create three tasks in the task group
sidecar := alloc.Job.TaskGroups[0].Tasks[0].Copy()
sidecar.Name = "sidecar"
sidecar.Driver = "mock_driver"
sidecar.KillTimeout = 10 * time.Millisecond
sidecar.Lifecycle = &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: true,
}

sidecar.Config = map[string]interface{}{
"run_for": "100s",
}

main1 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
main1.Name = "task2"
main1.Driver = "mock_driver"
main1.Config = map[string]interface{}{
"run_for": "1s",
}

main2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
main2.Name = "task2"
main2.Driver = "mock_driver"
main2.Config = map[string]interface{}{
"run_for": "2s",
}

alloc.Job.TaskGroups[0].Tasks = []*structs.Task{sidecar, main1, main2}
alloc.AllocatedResources.Tasks = map[string]*structs.AllocatedTaskResources{
sidecar.Name: tr,
main1.Name: tr,
main2.Name: tr,
}

conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)
go ar.Run()

hasTaskMainEvent := func(state *structs.TaskState) bool {
for _, e := range state.Events {
if e.Type == structs.TaskMainDead {
return true
}
}

return false
}

// Wait for all tasks to be killed
upd := conf.StateUpdater.(*MockStateUpdater)
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if last.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
}

var state *structs.TaskState

// Task1 should be killed because Task2 exited
state = last.TaskStates[sidecar.Name]
if state.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
}
if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
return false, fmt.Errorf("expected to have a start and finish time")
}
if len(state.Events) < 2 {
// At least have a received and destroyed
return false, fmt.Errorf("Unexpected number of events")
}

if !hasTaskMainEvent(state) {
return false, fmt.Errorf("Did not find event %v: %#+v", structs.TaskMainDead, state.Events)
}

// main tasks should die naturely
state = last.TaskStates[main1.Name]
if state.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
}
if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
return false, fmt.Errorf("expected to have a start and finish time")
}
if hasTaskMainEvent(state) {
return false, fmt.Errorf("unexpected event %#+v in %v", structs.TaskMainDead, state.Events)
}

state = last.TaskStates[main2.Name]
if state.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
}
if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
return false, fmt.Errorf("expected to have a start and finish time")
}
if hasTaskMainEvent(state) {
return false, fmt.Errorf("unexpected event %v in %#+v", structs.TaskMainDead, state.Events)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) {
t.Parallel()

Expand Down
25 changes: 25 additions & 0 deletions client/allocrunner/task_hook_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -108,3 +109,27 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt
c.mainTaskCtxCancel()
}
}

// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks
func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
lc := tr.Task().Lifecycle
if lc == nil || !lc.Sidecar {
return true
}
}

return false
}

// hasSidecarTasks returns true if all the passed tasks are sidecar tasks
func hasSidecarTasks(tasks map[string]*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
lc := tr.Task().Lifecycle
if lc != nil && lc.Sidecar {
return true
}
}

return false
}
89 changes: 89 additions & 0 deletions client/allocrunner/task_hook_coordinator_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package allocrunner

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
"github.com/hashicorp/nomad/nomad/structs"

"github.com/hashicorp/nomad/helper/testlog"
Expand Down Expand Up @@ -230,3 +232,90 @@ func isChannelClosed(ch <-chan struct{}) bool {
return false
}
}

func TestHasSidecarTasks(t *testing.T) {

falseV, trueV := false, true

cases := []struct {
name string
// nil if main task, false if non-sidecar hook, true if sidecar hook
indicators []*bool

hasSidecars bool
hasNonsidecars bool
}{
{
name: "all sidecar - one",
indicators: []*bool{&trueV},
hasSidecars: true,
hasNonsidecars: false,
},
{
name: "all sidecar - multiple",
indicators: []*bool{&trueV, &trueV, &trueV},
hasSidecars: true,
hasNonsidecars: false,
},
{
name: "some sidecars, some others",
indicators: []*bool{nil, &falseV, &trueV},
hasSidecars: true,
hasNonsidecars: true,
},
{
name: "no sidecars",
indicators: []*bool{nil, &falseV, nil},
hasSidecars: false,
hasNonsidecars: true,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
alloc := allocWithSidecarIndicators(c.indicators)
arConf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()

ar, err := NewAllocRunner(arConf)
require.NoError(t, err)

require.Equal(t, c.hasSidecars, hasSidecarTasks(ar.tasks), "sidecars")

runners := []*taskrunner.TaskRunner{}
for _, r := range ar.tasks {
runners = append(runners, r)
}
require.Equal(t, c.hasNonsidecars, hasNonSidecarTasks(runners), "non-sidecars")

})
}
}

func allocWithSidecarIndicators(indicators []*bool) *structs.Allocation {
alloc := mock.BatchAlloc()

tasks := []*structs.Task{}
resources := map[string]*structs.AllocatedTaskResources{}

tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]

for i, indicator := range indicators {
task := alloc.Job.TaskGroups[0].Tasks[0].Copy()
task.Name = fmt.Sprintf("task%d", i)
if indicator != nil {
task.Lifecycle = &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: *indicator,
}
}
tasks = append(tasks, task)
resources[task.Name] = tr
}

alloc.Job.TaskGroups[0].Tasks = tasks

alloc.AllocatedResources.Tasks = resources
return alloc

}
5 changes: 5 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6996,6 +6996,9 @@ const (
// TaskLeaderDead indicates that the leader task within the has finished.
TaskLeaderDead = "Leader Task Dead"

// TaskMainDead indicates that the main tasks have dead
TaskMainDead = "Main Tasks Dead"

// TaskHookFailed indicates that one of the hooks for a task failed.
TaskHookFailed = "Task hook failed"

Expand Down Expand Up @@ -7217,6 +7220,8 @@ func (event *TaskEvent) PopulateEventDisplayMessage() {
desc = event.DriverMessage
case TaskLeaderDead:
desc = "Leader Task in Group dead"
case TaskMainDead:
desc = "Main tasks in the group died"
default:
desc = event.Message
}
Expand Down

0 comments on commit 73f19eb

Please sign in to comment.