Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle leader task being dead in RestoreState #3502

Merged
merged 1 commit into from
Nov 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ BUG FIXES:
explicitly [GH-3520]
* cli: Fix passing Consul address via flags [GH-3504]
* cli: Fix panic when running `keyring` commands [GH-3509]
* client: Fix a panic when restoring an allocation with a dead leader task
[GH-3502]
* client: Fix allocation accounting in GC and trigger GCs on allocation
updates [GH-3445]
* core: Fixes an issue with jobs that have `auto_revert` set to true, where reverting
Expand Down
16 changes: 12 additions & 4 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,7 @@ OUTER:
r.logger.Printf("[WARN] client: failed to sync alloc %q status upon receiving alloc update: %v",
r.allocID, err)
}

case <-r.ctx.Done():
taskDestroyEvent = structs.NewTaskEvent(structs.TaskKilled)
break OUTER
Expand Down Expand Up @@ -967,10 +968,17 @@ func (r *AllocRunner) destroyTaskRunners(destroyEvent *structs.TaskEvent) {
tr := r.tasks[leader]
r.taskLock.RUnlock()

r.logger.Printf("[DEBUG] client: alloc %q destroying leader task %q of task group %q first",
r.allocID, leader, r.alloc.TaskGroup)
tr.Destroy(destroyEvent)
<-tr.WaitCh()
// Dead tasks don't have a task runner created so guard against
// the leader being dead when this AR was saved.
if tr == nil {
r.logger.Printf("[DEBUG] client: alloc %q leader task %q of task group %q already stopped",
r.allocID, leader, r.alloc.TaskGroup)
} else {
r.logger.Printf("[DEBUG] client: alloc %q destroying leader task %q of task group %q first",
r.allocID, leader, r.alloc.TaskGroup)
tr.Destroy(destroyEvent)
<-tr.WaitCh()
}
}

// Then destroy non-leader tasks concurrently
Expand Down
91 changes: 91 additions & 0 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,97 @@ func TestAllocRunner_TaskLeader_StopTG(t *testing.T) {
})
}

// TestAllocRunner_TaskLeader_StopRestoredTG asserts that when stopping a
// restored task group with a leader that failed before restoring the leader is
// not stopped as it does not exist.
// See https://github.com/hashicorp/nomad/issues/3420#issuecomment-341666932
func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) {
t.Parallel()
_, ar := testAllocRunner(false)
defer ar.Destroy()

// Create a leader and follower task in the task group
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Name = "follower1"
task.Driver = "mock_driver"
task.KillTimeout = 10 * time.Second
task.Config = map[string]interface{}{
"run_for": "10s",
}

task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "leader"
task2.Driver = "mock_driver"
task2.Leader = true
task2.KillTimeout = 10 * time.Millisecond
task2.Config = map[string]interface{}{
"run_for": "0s",
}

ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2)
ar.alloc.TaskResources[task2.Name] = task2.Resources

// Mimic Nomad exiting before the leader stopping is able to stop other tasks.
ar.tasks = map[string]*TaskRunner{
"leader": NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState,
ar.allocDir.NewTaskDir(task2.Name), ar.Alloc(), task2.Copy(),
ar.vaultClient, ar.consulClient),
"follower1": NewTaskRunner(ar.logger, ar.config, ar.stateDB, ar.setTaskState,
ar.allocDir.NewTaskDir(task.Name), ar.Alloc(), task.Copy(),
ar.vaultClient, ar.consulClient),
}
ar.taskStates = map[string]*structs.TaskState{
"leader": {State: structs.TaskStateDead},
"follower1": {State: structs.TaskStateRunning},
}
if err := ar.SaveState(); err != nil {
t.Fatalf("error saving state: %v", err)
}

// Create a new AllocRunner to test RestoreState and Run
upd2 := &MockAllocStateUpdater{}
ar2 := NewAllocRunner(ar.logger, ar.config, ar.stateDB, upd2.Update, ar.alloc,
ar.vaultClient, ar.consulClient, ar.prevAlloc)
defer ar2.Destroy()

if err := ar2.RestoreState(); err != nil {
t.Fatalf("error restoring state: %v", err)
}
go ar2.Run()

// Wait for tasks to be stopped because leader is dead
testutil.WaitForResult(func() (bool, error) {
_, last := upd2.Last()
if last == nil {
return false, fmt.Errorf("no updates yet")
}
if actual := last.TaskStates["leader"].State; actual != structs.TaskStateDead {
return false, fmt.Errorf("Task leader is not dead yet (it's %q)", actual)
}
if actual := last.TaskStates["follower1"].State; actual != structs.TaskStateDead {
return false, fmt.Errorf("Task follower1 is not dead yet (it's %q)", actual)
}
return true, nil
}, func(err error) {
count, last := upd2.Last()
t.Logf("Updates: %d", count)
for name, state := range last.TaskStates {
t.Logf("%s: %s", name, state.State)
}
t.Fatalf("err: %v", err)
})

// Make sure it GCs properly
ar2.Destroy()

select {
case <-ar2.WaitCh():
// exited as expected
case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for AR to GC")
}
}

// TestAllocRunner_MoveAllocDir asserts that a file written to an alloc's
// local/ dir will be moved to a replacement alloc's local/ dir if sticky
// volumes is on.
Expand Down
2 changes: 1 addition & 1 deletion client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
// Build the restart tracker.
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
logger.Printf("[ERR] client: alloc %q for missing task group %q", alloc.ID, alloc.TaskGroup)
return nil
}
restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type)
Expand Down