Skip to content

Commit

Permalink
client: update alloc status when terminating
Browse files Browse the repository at this point in the history
Defensively update alloc status whenever killing all tasks.
  • Loading branch information
schmichael committed Nov 5, 2018
1 parent a22205c commit e58a91b
Showing 1 changed file with 34 additions and 6 deletions.
40 changes: 34 additions & 6 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,18 @@ func (ar *allocRunner) Run() {
// destroyed and exit to wait for the AR to be GC'd by the client.
if !ar.shouldRun() {
ar.logger.Debug("not running terminal alloc")
ar.killTasks()

// Cleanup and sync state
states := ar.killTasks()

// Get the client allocation
calloc := ar.clientAlloc(states)

// Update the server
ar.stateUpdater.AllocStateUpdated(calloc)

// Broadcast client alloc to listeners
ar.allocBroadcaster.Send(calloc)
return
}

Expand Down Expand Up @@ -387,7 +398,7 @@ func (ar *allocRunner) handleTaskStateUpdates() {
ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask)
}

ar.killTasks()
states = ar.killTasks()
}

// Get the client allocation
Expand All @@ -402,8 +413,12 @@ func (ar *allocRunner) handleTaskStateUpdates() {
}

// killTasks kills all task runners, leader (if there is one) first. Errors are
// logged except taskrunner.ErrTaskNotRunning which is ignored.
func (ar *allocRunner) killTasks() {
// logged except taskrunner.ErrTaskNotRunning which is ignored. Task states
// after Kill has been called are returned.
func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
var mu sync.Mutex
states := make(map[string]*structs.TaskState, len(ar.tasks))

// Kill leader first, synchronously
for name, tr := range ar.tasks {
if !tr.IsLeader() {
Expand All @@ -414,6 +429,9 @@ func (ar *allocRunner) killTasks() {
if err != nil && err != taskrunner.ErrTaskNotRunning {
ar.logger.Warn("error stopping leader task", "error", err, "task_name", name)
}

state := tr.TaskState()
states[name] = state
break
}

Expand All @@ -431,9 +449,16 @@ func (ar *allocRunner) killTasks() {
if err != nil && err != taskrunner.ErrTaskNotRunning {
ar.logger.Warn("error stopping task", "error", err, "task_name", name)
}

state := tr.TaskState()
mu.Lock()
states[name] = state
mu.Unlock()
}(name, tr)
}
wg.Wait()

return states
}

// clientAlloc takes in the task states and returns an Allocation populated
Expand Down Expand Up @@ -602,8 +627,11 @@ func (ar *allocRunner) Destroy() {
}
defer ar.destroyedLock.Unlock()

// Stop any running tasks
ar.killTasks()
// Stop any running tasks and persist states in case the client is
// shutdown before Destroy finishes.
states := ar.killTasks()
calloc := ar.clientAlloc(states)
ar.stateUpdater.AllocStateUpdated(calloc)

// Wait for tasks to exit and postrun hooks to finish (if they ran at all)
if ar.runLaunched {
Expand Down

0 comments on commit e58a91b

Please sign in to comment.