Skip to content

Commit

Permalink
Merge pull request #3502 from hashicorp/b-3420-restore-dead-leader
Browse files Browse the repository at this point in the history
Handle leader task being dead in RestoreState
  • Loading branch information
schmichael authored Nov 15, 2017
2 parents 5d58f7b + 0de0e1d commit 0cdc12b
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ BUG FIXES:
explicitly [GH-3520]
* cli: Fix passing Consul address via flags [GH-3504]
* cli: Fix panic when running `keyring` commands [GH-3509]
* client: Fix a panic when restoring an allocation with a dead leader task
[GH-3502]
* client: Fix allocation accounting in GC and trigger GCs on allocation
updates [GH-3445]
* core: Fixes an issue with jobs that have `auto_revert` set to true, where reverting
Expand Down
16 changes: 12 additions & 4 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,7 @@ OUTER:
r.logger.Printf("[WARN] client: failed to sync alloc %q status upon receiving alloc update: %v",
r.allocID, err)
}

case <-r.ctx.Done():
taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled)
break OUTER
Expand Down Expand Up @@ -967,10 +968,17 @@ func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) {
tr := r.tasks[leader]
r.taskLock.RUnlock()

r.logger.Printf("[DEBUG] client: alloc %q destroying leader task %q of task group %q first",
r.allocID, leader, r.alloc.TaskGroup)
tr.Destroy(destroyEvent)
<-tr.WaitCh()
// Dead tasks don't have a task runner created so guard against
// the leader being dead when this AR was saved.
if tr == nil {
r.logger.Printf("[DEBUG] client: alloc %q leader task %q of task group %q already stopped",
r.allocID, leader, r.alloc.TaskGroup)
} else {
r.logger.Printf("[DEBUG] client: alloc %q destroying leader task %q of task group %q first",
r.allocID, leader, r.alloc.TaskGroup)
tr.Destroy(destroyEvent)
<-tr.WaitCh()
}
}

// Then destroy non-leader tasks concurrently
Expand Down
91 changes: 91 additions & 0 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,97 @@ func TestAllocRunner_TaskLeader_StopTG(t *testing.T) {
})
}

// TestAllocRunner_TaskLeader_StopRestoredTG asserts that when stopping a
// restored task group with a leader that failed before restoring the leader is
// not stopped as it does not exist.
// See https://github.com/hashicorp/nomad/issues/3420#issuecomment-341666932
func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) {
t.Parallel()
_, ar := testAllocRunner(false)
defer ar.Destroy()

// Create a leader and follower task in the task group
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Name = "follower1"
task.Driver = "mock_driver"
task.KillTimeout = 10 * time.Second
task.Config = map[string]interface{}{
"run_for": "10s",
}

task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "leader"
task2.Driver = "mock_driver"
task2.Leader = true
task2.KillTimeout = 10 * time.Millisecond
task2.Config = map[string]interface{}{
"run_for": "0s",
}

ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2)
ar.alloc.TaskResources[task2.Name] = task2.Resources

// Mimic Nomad exiting before the leader stopping is able to stop other tasks.
ar.tasks = map[string]*TaskRunner{
"leader": NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState,
ar.allocDir.NewTaskDir(task2.Name), ar.Alloc(), task2.Copy(),
ar.vaultClient, ar.consulClient),
"follower1": NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState,
ar.allocDir.NewTaskDir(task.Name), ar.Alloc(), task.Copy(),
ar.vaultClient, ar.consulClient),
}
ar.taskStates = map[string]*structs.TaskState{
"leader": {State: structs.TaskStateDead},
"follower1": {State: structs.TaskStateRunning},
}
if err := ar.SaveState(); err != nil {
t.Fatalf("error saving state: %v", err)
}

// Create a new AllocRunner to test RestoreState and Run
upd2 := &MockAllocStateUpdater{}
ar2 := NewAllocRunner(ar.logger, ar.config, ar.stateDB, upd2.Update, ar.alloc,
ar.vaultClient, ar.consulClient, ar.prevAlloc)
defer ar2.Destroy()

if err := ar2.RestoreState(); err != nil {
t.Fatalf("error restoring state: %v", err)
}
go ar2.Run()

// Wait for tasks to be stopped because leader is dead
testutil.WaitForResult(func() (bool, error) {
_, last := upd2.Last()
if last == nil {
return false, fmt.Errorf("no updates yet")
}
if actual := last.TaskStates["leader"].State; actual != structs.TaskStateDead {
return false, fmt.Errorf("Task leader is not dead yet (it's %q)", actual)
}
if actual := last.TaskStates["follower1"].State; actual != structs.TaskStateDead {
return false, fmt.Errorf("Task follower1 is not dead yet (it's %q)", actual)
}
return true, nil
}, func(err error) {
count, last := upd2.Last()
t.Logf("Updates: %d", count)
for name, state := range last.TaskStates {
t.Logf("%s: %s", name, state.State)
}
t.Fatalf("err: %v", err)
})

// Make sure it GCs properly
ar2.Destroy()

select {
case <-ar2.WaitCh():
// exited as expected
case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for AR to GC")
}
}

// TestAllocRunner_MoveAllocDir asserts that a file written to an alloc's
// local/ dir will be moved to a replacement alloc's local/ dir if sticky
// volumes is on.
Expand Down
2 changes: 1 addition & 1 deletion client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
// Build the restart tracker.
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
logger.Printf("[ERR] client: alloc %q for missing task group %q", alloc.ID, alloc.TaskGroup)
return nil
}
restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type)
Expand Down

0 comments on commit 0cdc12b

Please sign in to comment.