Skip to content

Commit

Permalink
Merge pull request #4828 from hashicorp/b-restore
Browse files Browse the repository at this point in the history
Implement client agent restarting
  • Loading branch information
schmichael authored Nov 6, 2018
2 parents ee81352 + e58a91b commit 8122c76
Show file tree
Hide file tree
Showing 20 changed files with 514 additions and 135 deletions.
85 changes: 76 additions & 9 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
StateDB: ar.stateDB,
StateUpdater: ar,
Consul: ar.consulClient,
VaultClient: ar.vaultClient,
Vault: ar.vaultClient,
PluginSingletonLoader: ar.pluginSingletonLoader,
}

Expand All @@ -181,17 +181,61 @@ func (ar *allocRunner) Run() {
ar.destroyedLock.Lock()
defer ar.destroyedLock.Unlock()

// Run should not be called after Destroy is called. This is a
// programming error.
if ar.destroyed {
// Run should not be called after Destroy is called. This is a
// programming error.
ar.logger.Error("alloc destroyed; cannot run")
return
}
ar.runLaunched = true

// If an alloc should not be run, ensure any restored task handles are
// destroyed and exit to wait for the AR to be GC'd by the client.
if !ar.shouldRun() {
ar.logger.Debug("not running terminal alloc")

// Cleanup and sync state
states := ar.killTasks()

// Get the client allocation
calloc := ar.clientAlloc(states)

// Update the server
ar.stateUpdater.AllocStateUpdated(calloc)

// Broadcast client alloc to listeners
ar.allocBroadcaster.Send(calloc)
return
}

// Run! (and mark as having been run to ensure Destroy cleans up properly)
ar.runLaunched = true
go ar.runImpl()
}

// shouldRun returns true if the alloc is in a state that the alloc runner
// should run it.
func (ar *allocRunner) shouldRun() bool {
// Do not run allocs that are terminal
if ar.Alloc().TerminalStatus() {
ar.logger.Trace("alloc terminal; not running",
"desired_status", ar.Alloc().DesiredStatus,
"client_status", ar.Alloc().ClientStatus,
)
return false
}

// It's possible that the alloc local state was marked terminal before
// the server copy of the alloc (checked above) was marked as terminal,
// so check the local state as well.
switch clientStatus := ar.AllocState().ClientStatus; clientStatus {
case structs.AllocClientStatusComplete, structs.AllocClientStatusFailed, structs.AllocClientStatusLost:
ar.logger.Trace("alloc terminal; updating server and not running", "status", clientStatus)
return false
}

return true
}

func (ar *allocRunner) runImpl() {
// Close the wait channel on return
defer close(ar.waitCh)
Expand Down Expand Up @@ -354,7 +398,7 @@ func (ar *allocRunner) handleTaskStateUpdates() {
ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask)
}

ar.killTasks()
states = ar.killTasks()
}

// Get the client allocation
Expand All @@ -369,8 +413,12 @@ func (ar *allocRunner) handleTaskStateUpdates() {
}

// killTasks kills all task runners, leader (if there is one) first. Errors are
// logged except taskrunner.ErrTaskNotRunning which is ignored.
func (ar *allocRunner) killTasks() {
// logged except taskrunner.ErrTaskNotRunning which is ignored. Task states
// after Kill has been called are returned.
func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
var mu sync.Mutex
states := make(map[string]*structs.TaskState, len(ar.tasks))

// Kill leader first, synchronously
for name, tr := range ar.tasks {
if !tr.IsLeader() {
Expand All @@ -381,6 +429,9 @@ func (ar *allocRunner) killTasks() {
if err != nil && err != taskrunner.ErrTaskNotRunning {
ar.logger.Warn("error stopping leader task", "error", err, "task_name", name)
}

state := tr.TaskState()
states[name] = state
break
}

Expand All @@ -398,9 +449,16 @@ func (ar *allocRunner) killTasks() {
if err != nil && err != taskrunner.ErrTaskNotRunning {
ar.logger.Warn("error stopping task", "error", err, "task_name", name)
}

state := tr.TaskState()
mu.Lock()
states[name] = state
mu.Unlock()
}(name, tr)
}
wg.Wait()

return states
}

// clientAlloc takes in the task states and returns an Allocation populated
Expand Down Expand Up @@ -510,6 +568,12 @@ func (ar *allocRunner) AllocState() *state.State {
}
}

// Generate alloc to get other state fields
alloc := ar.clientAlloc(state.TaskStates)
state.ClientStatus = alloc.ClientStatus
state.ClientDescription = alloc.ClientDescription
state.DeploymentStatus = alloc.DeploymentStatus

return state
}

Expand Down Expand Up @@ -563,8 +627,11 @@ func (ar *allocRunner) Destroy() {
}
defer ar.destroyedLock.Unlock()

// Stop any running tasks
ar.killTasks()
// Stop any running tasks and persist states in case the client is
// shutdown before Destroy finishes.
states := ar.killTasks()
calloc := ar.clientAlloc(states)
ar.stateUpdater.AllocStateUpdated(calloc)

// Wait for tasks to exit and postrun hooks to finish (if they ran at all)
if ar.runLaunched {
Expand Down
8 changes: 3 additions & 5 deletions client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
consulapi "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/shared/catalog"
Expand Down Expand Up @@ -57,20 +56,19 @@ func (m *MockStateUpdater) Reset() {
// testAllocRunnerConfig returns a new allocrunner.Config with mocks and noop
// versions of dependencies along with a cleanup func.
func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, func()) {
logger := testlog.HCLogger(t)
pluginLoader := catalog.TestPluginLoader(t)
clientConf, cleanup := config.TestClientConfig(t)
conf := &Config{
// Copy the alloc in case the caller edits and reuses it
Alloc: alloc.Copy(),
Logger: logger,
Logger: clientConf.Logger,
ClientConfig: clientConf,
StateDB: state.NoopDB{},
Consul: consulapi.NewMockConsulServiceClient(t, logger),
Consul: consulapi.NewMockConsulServiceClient(t, clientConf.Logger),
Vault: vaultclient.NewMockVaultClient(),
StateUpdater: &MockStateUpdater{},
PrevAllocWatcher: allocwatcher.NoopPrevAlloc{},
PluginSingletonLoader: singleton.NewSingletonLoader(logger, pluginLoader),
PluginSingletonLoader: singleton.NewSingletonLoader(clientConf.Logger, pluginLoader),
}
return conf, cleanup
}
Expand Down
4 changes: 2 additions & 2 deletions client/allocrunner/interfaces/task_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ type TaskKillResponse struct{}
type TaskKillHook interface {
TaskHook

// Kill is called when a task is going to be killed.
Kill(context.Context, *TaskKillRequest, *TaskKillResponse) error
// Killing is called when a task is going to be Killed or Restarted.
Killing(context.Context, *TaskKillRequest, *TaskKillResponse) error
}

type TaskExitedRequest struct{}
Expand Down
6 changes: 6 additions & 0 deletions client/allocrunner/taskrunner/interfaces/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ import (
)

type TaskLifecycle interface {
// Restart a task in place. If failure=false then the restart does not
// count as an attempt in the restart policy.
Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error

// Sends a signal to a task.
Signal(event *structs.TaskEvent, signal string) error

// Kill a task permanently.
Kill(ctx context.Context, event *structs.TaskEvent) error
}
36 changes: 23 additions & 13 deletions client/allocrunner/taskrunner/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
// Grab the handle
handle := tr.getDriverHandle()

// Check it is running
if handle == nil {
return ErrTaskNotRunning
Expand All @@ -20,12 +21,14 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai
// Emit the event since it may take a long time to kill
tr.EmitEvent(event)

// Run the hooks prior to restarting the task
tr.killing()

// Tell the restart tracker that a restart triggered the exit
tr.restartTracker.SetRestartTriggered(failure)

// Kill the task using an exponential backoff in-case of failures.
destroySuccess, err := tr.handleDestroy(handle)
if !destroySuccess {
if err := tr.killTask(handle); err != nil {
// We couldn't successfully destroy the resource created.
tr.logger.Error("failed to kill task. Resources may have been leaked", "error", err)
}
Expand All @@ -36,7 +39,10 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai
return err
}

<-waitCh
select {
case <-waitCh:
case <-ctx.Done():
}
return nil
}

Expand All @@ -61,7 +67,7 @@ func (tr *TaskRunner) Signal(event *structs.TaskEvent, s string) error {
func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error {
// Cancel the task runner to break out of restart delay or the main run
// loop.
tr.ctxCancel()
tr.killCtxCancel()

// Grab the handle
handle := tr.getDriverHandle()
Expand All @@ -75,16 +81,17 @@ func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error
tr.EmitEvent(event)

// Run the hooks prior to killing the task
tr.kill()
tr.killing()

// Tell the restart tracker that the task has been killed
// Tell the restart tracker that the task has been killed so it doesn't
// attempt to restart it.
tr.restartTracker.SetKilled()

// Kill the task using an exponential backoff in-case of failures.
destroySuccess, destroyErr := tr.handleDestroy(handle)
if !destroySuccess {
killErr := tr.killTask(handle)
if killErr != nil {
// We couldn't successfully destroy the resource created.
tr.logger.Error("failed to kill task. Resources may have been leaked", "error", destroyErr)
tr.logger.Error("failed to kill task. Resources may have been leaked", "error", killErr)
}

// Block until task has exited.
Expand All @@ -100,13 +107,16 @@ func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error
return err
}

<-waitCh
select {
case <-waitCh:
case <-ctx.Done():
}

// Store that the task has been destroyed and any associated error.
tr.UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(destroyErr))
tr.UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(killErr))

if destroyErr != nil {
return destroyErr
if killErr != nil {
return killErr
} else if err := ctx.Err(); err != nil {
return err
}
Expand Down
30 changes: 29 additions & 1 deletion client/allocrunner/taskrunner/service_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
Expand Down Expand Up @@ -34,6 +35,7 @@ type serviceHook struct {
logger log.Logger

// The following fields may be updated
delay time.Duration
driverExec tinterfaces.ScriptExecutor
driverNet *cstructs.DriverNetwork
canary bool
Expand All @@ -53,6 +55,7 @@ func newServiceHook(c serviceHookConfig) *serviceHook {
taskName: c.task.Name,
services: c.task.Services,
restarter: c.restarter,
delay: c.task.ShutdownDelay,
}

if res := c.alloc.TaskResources[c.task.Name]; res != nil {
Expand Down Expand Up @@ -111,6 +114,7 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ
}

// Update service hook fields
h.delay = task.ShutdownDelay
h.taskEnv = req.TaskEnv
h.services = task.Services
h.networks = networks
Expand All @@ -122,10 +126,35 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ
return h.consul.UpdateTask(oldTaskServices, newTaskServices)
}

func (h *serviceHook) Killing(ctx context.Context, req *interfaces.TaskKillRequest, resp *interfaces.TaskKillResponse) error {
h.mu.Lock()
defer h.mu.Unlock()

// Deregister before killing task
h.deregister()

// If there's no shutdown delay, exit early
if h.delay == 0 {
return nil
}

h.logger.Debug("waiting before killing task", "shutdown_delay", h.delay)
select {
case <-ctx.Done():
case <-time.After(h.delay):
}
return nil
}

func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *interfaces.TaskExitedResponse) error {
h.mu.Lock()
defer h.mu.Unlock()
h.deregister()
return nil
}

// deregister services from Consul.
func (h *serviceHook) deregister() {
taskServices := h.getTaskServices()
h.consul.RemoveTask(taskServices)

Expand All @@ -134,7 +163,6 @@ func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *in
taskServices.Canary = !taskServices.Canary
h.consul.RemoveTask(taskServices)

return nil
}

func (h *serviceHook) getTaskServices() *agentconsul.TaskServices {
Expand Down
Loading

0 comments on commit 8122c76

Please sign in to comment.