Skip to content

Commit

Permalink
Merge pull request #5168 from hashicorp/b-kill-race
Browse files Browse the repository at this point in the history
Improve Kill handling on task runner
  • Loading branch information
dadgar authored Jan 9, 2019
2 parents a646833 + d916c0d commit c7fc39d
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 57 deletions.
59 changes: 16 additions & 43 deletions client/allocrunner/taskrunner/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"

"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)

// Restart a task. Returns immediately if no task is running. Blocks until
// existing task exits or passed-in context is canceled.
func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
tr.logger.Trace("Restart requested", "failure", failure)

// Grab the handle
handle := tr.getDriverHandle()

Expand Down Expand Up @@ -47,6 +48,8 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai
}

func (tr *TaskRunner) Signal(event *structs.TaskEvent, s string) error {
tr.logger.Trace("Signal requested", "signal", s)

// Grab the handle
handle := tr.getDriverHandle()

Expand All @@ -65,58 +68,28 @@ func (tr *TaskRunner) Signal(event *structs.TaskEvent, s string) error {
// Kill a task. Blocks until task exits or context is canceled. State is set to
// dead.
func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error {
tr.logger.Trace("Kill requested", "event_type", event.Type, "event_reason", event.KillReason)

// Cancel the task runner to break out of restart delay or the main run
// loop.
tr.killCtxCancel()

// Grab the handle
handle := tr.getDriverHandle()

// Check it is running
if handle == nil {
return ErrTaskNotRunning
}

// Emit the event since it may take a long time to kill
// Emit kill event
tr.EmitEvent(event)

// Run the hooks prior to killing the task
tr.killing()

// Tell the restart tracker that the task has been killed so it doesn't
// attempt to restart it.
tr.restartTracker.SetKilled()

// Kill the task using an exponential backoff in-case of failures.
killErr := tr.killTask(handle)
if killErr != nil {
// We couldn't successfully destroy the resource created.
tr.logger.Error("failed to kill task. Resources may have been leaked", "error", killErr)
}

// Block until task has exited.
waitCh, err := handle.WaitCh(ctx)

// The error should be nil or TaskNotFound, if it's something else then a
// failure in the driver or transport layer occurred
if err != nil {
if err == drivers.ErrTaskNotFound {
return nil
}
tr.logger.Error("failed to wait on task. Resources may have been leaked", "error", err)
return err
// Check if the Run method has started yet. If it hasn't we return early,
// since the task hasn't even started so there is nothing to wait for. This
// is still correct since the Run method no-op since the kill context has
// already been cancelled.
if !tr.hasRunLaunched() {
return nil
}

select {
case <-waitCh:
case <-tr.WaitCh():
case <-ctx.Done():
return ctx.Err()
}

if killErr != nil {
return killErr
} else if err := ctx.Err(); err != nil {
return err
}

return nil
return tr.getKillErr()
}
6 changes: 6 additions & 0 deletions client/allocrunner/taskrunner/service_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ func (h *serviceHook) getTaskServices() *agentconsul.TaskServices {
// interpolateServices returns an interpolated copy of services and checks with
// values from the task's environment.
func interpolateServices(taskEnv *taskenv.TaskEnv, services []*structs.Service) []*structs.Service {
// Guard against not having a valid taskEnv. This can be the case if the
// Killing or Exited hook is run before post-run.
if taskEnv == nil || len(services) == 0 {
return nil
}

interpolated := make([]*structs.Service, len(services))

for i, origService := range services {
Expand Down
91 changes: 78 additions & 13 deletions client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,24 @@ type TaskRunner struct {
// stateDB is for persisting localState and taskState
stateDB cstate.StateDB

// shutdownCtx is used to exit the TaskRunner *without* affecting task state.
shutdownCtx context.Context

// shutdownCtxCancel causes the TaskRunner to exit immediately without
// affecting task state. Useful for testing or graceful agent shutdown.
shutdownCtxCancel context.CancelFunc

// killCtx is the task runner's context representing the tasks's lifecycle.
// The context is canceled when the task is killed.
killCtx context.Context

// killCtxCancel is called when killing a task.
killCtxCancel context.CancelFunc

// ctx is used to exit the TaskRunner *without* affecting task state.
ctx context.Context

// ctxCancel causes the TaskRunner to exit immediately without
// affecting task state. Useful for testing or graceful agent shutdown.
ctxCancel context.CancelFunc
// killErr is populated when killing a task. Access should be done use the
// getter/setter
killErr error
killErrLock sync.Mutex

// Logger is the logger for the task runner.
logger log.Logger
Expand Down Expand Up @@ -181,6 +186,11 @@ type TaskRunner struct {
// driverManager is used to dispense driver plugins and register event
// handlers
driverManager drivermanager.Manager

// runLaunched marks whether the Run goroutine has been started. It should
// be accessed via helpers
runLaunched bool
runLaunchedLock sync.Mutex
}

type Config struct {
Expand Down Expand Up @@ -251,8 +261,8 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
deviceStatsReporter: config.DeviceStatsReporter,
killCtx: killCtx,
killCtxCancel: killCancel,
ctx: trCtx,
ctxCancel: trCancel,
shutdownCtx: trCtx,
shutdownCtxCancel: trCancel,
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
waitCh: make(chan struct{}),
devicemanager: config.DeviceManager,
Expand Down Expand Up @@ -360,6 +370,10 @@ func (tr *TaskRunner) initLabels() {
// Run the TaskRunner. Starts the user's task or reattaches to a restored task.
// Run closes WaitCh when it exits. Should be started in a goroutine.
func (tr *TaskRunner) Run() {
// Mark that the run routine has been launched so that other functions can
// decide to use the wait channel or not.
tr.setRunLaunched()

defer close(tr.waitCh)
var result *drivers.ExitResult

Expand All @@ -372,8 +386,9 @@ MAIN:
for {
select {
case <-tr.killCtx.Done():
tr.handleKill()
break MAIN
case <-tr.ctx.Done():
case <-tr.shutdownCtx.Done():
// TaskRunner was told to exit immediately
return
default:
Expand All @@ -388,8 +403,9 @@ MAIN:

select {
case <-tr.killCtx.Done():
tr.handleKill()
break MAIN
case <-tr.ctx.Done():
case <-tr.shutdownCtx.Done():
// TaskRunner was told to exit immediately
return
default:
Expand Down Expand Up @@ -421,7 +437,11 @@ MAIN:
tr.logger.Error("wait task failed", "error", err)
} else {
select {
case <-tr.ctx.Done():
case <-tr.killCtx.Done():
// We can go through the normal should restart check since
// the restart tracker knowns it is killed
tr.handleKill()
case <-tr.shutdownCtx.Done():
// TaskRunner was told to exit immediately
return
case result = <-resultCh:
Expand Down Expand Up @@ -455,8 +475,9 @@ MAIN:
case <-time.After(restartDelay):
case <-tr.killCtx.Done():
tr.logger.Trace("task killed between restarts", "delay", restartDelay)
tr.handleKill()
break MAIN
case <-tr.ctx.Done():
case <-tr.shutdownCtx.Done():
// TaskRunner was told to exit immediately
return
}
Expand Down Expand Up @@ -682,6 +703,50 @@ func (tr *TaskRunner) initDriver() error {
return nil
}

// handleKill is used to handle the a request to kill a task. It will store any
// error in the task runner killErr value.
func (tr *TaskRunner) handleKill() {
// Run the hooks prior to killing the task
tr.killing()

// Tell the restart tracker that the task has been killed so it doesn't
// attempt to restart it.
tr.restartTracker.SetKilled()

// Check it is running
handle := tr.getDriverHandle()
if handle == nil {
return
}

// Kill the task using an exponential backoff in-case of failures.
killErr := tr.killTask(handle)
if killErr != nil {
// We couldn't successfully destroy the resource created.
tr.logger.Error("failed to kill task. Resources may have been leaked", "error", killErr)
tr.setKillErr(killErr)
}

// Block until task has exited.
waitCh, err := handle.WaitCh(tr.shutdownCtx)

// The error should be nil or TaskNotFound, if it's something else then a
// failure in the driver or transport layer occurred
if err != nil {
if err == drivers.ErrTaskNotFound {
return
}
tr.logger.Error("failed to wait on task. Resources may have been leaked", "error", err)
tr.setKillErr(killErr)
return
}

select {
case <-waitCh:
case <-tr.shutdownCtx.Done():
}
}

// killTask kills the task handle. In the case that killing fails,
// killTask will retry with an exponential backoff and will give up at a
// given limit. Returns an error if the task could not be killed.
Expand Down Expand Up @@ -1009,7 +1074,7 @@ func (tr *TaskRunner) triggerUpdateHooks() {
// Shutdown blocks until the main Run loop exits.
func (tr *TaskRunner) Shutdown() {
tr.logger.Trace("shutting down")
tr.ctxCancel()
tr.shutdownCtxCancel()

<-tr.WaitCh()

Expand Down
28 changes: 28 additions & 0 deletions client/allocrunner/taskrunner/task_runner_getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,31 @@ func (tr *TaskRunner) clearDriverHandle() {
}
tr.handle = nil
}

// setKillErr stores any error that arouse while killing the task
func (tr *TaskRunner) setKillErr(err error) {
tr.killErrLock.Lock()
defer tr.killErrLock.Unlock()
tr.killErr = err
}

// getKillErr returns any error that arouse while killing the task
func (tr *TaskRunner) getKillErr() error {
tr.killErrLock.Lock()
defer tr.killErrLock.Unlock()
return tr.killErr
}

// setRunLaunched marks the fact that the Run loop has been started
func (tr *TaskRunner) setRunLaunched() {
tr.runLaunchedLock.Lock()
defer tr.runLaunchedLock.Unlock()
tr.runLaunched = true
}

// hasRunLaunched returns whether the Run loop has been started
func (tr *TaskRunner) hasRunLaunched() bool {
tr.runLaunchedLock.Lock()
defer tr.runLaunchedLock.Unlock()
return tr.runLaunched
}
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (tr *TaskRunner) poststart() error {

// Pass the lazy handle to the hooks so even if the driver exits and we
// launch a new one (external plugin), the handle will refresh.
lazyHandle := NewLazyHandle(tr.ctx, tr.getDriverHandle, tr.logger)
lazyHandle := NewLazyHandle(tr.shutdownCtx, tr.getDriverHandle, tr.logger)

var merr multierror.Error
for _, hook := range tr.runnerHooks {
Expand Down

0 comments on commit c7fc39d

Please sign in to comment.