diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 69cdb1d590b..584a031e146 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -40,12 +40,11 @@ type AllocRunner struct { dirtyCh chan struct{} - ctx *driver.ExecContext - tasks map[string]*TaskRunner - taskStates map[string]*structs.TaskState - restored map[string]struct{} - RestartPolicy *structs.RestartPolicy - taskLock sync.RWMutex + ctx *driver.ExecContext + tasks map[string]*TaskRunner + taskStates map[string]*structs.TaskState + restored map[string]struct{} + taskLock sync.RWMutex taskStatusLock sync.RWMutex @@ -62,7 +61,6 @@ type allocRunnerState struct { Alloc *structs.Allocation AllocClientStatus string AllocClientDescription string - RestartPolicy *structs.RestartPolicy TaskStates map[string]*structs.TaskState Context *driver.ExecContext } @@ -102,7 +100,6 @@ func (r *AllocRunner) RestoreState() error { // Restore fields r.alloc = snap.Alloc - r.RestartPolicy = snap.RestartPolicy r.ctx = snap.Context r.allocClientStatus = snap.AllocClientStatus r.allocClientDescription = snap.AllocClientDescription @@ -115,9 +112,8 @@ func (r *AllocRunner) RestoreState() error { r.restored[name] = struct{}{} task := &structs.Task{Name: name} - restartTracker := newRestartTracker(r.RestartPolicy, r.alloc.Job.Type) - tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, - r.alloc, task, restartTracker, r.consulService) + tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc, + task, r.consulService) r.tasks[name] = tr // Skip tasks in terminal states. @@ -163,7 +159,6 @@ func (r *AllocRunner) saveAllocRunnerState() error { defer r.allocLock.Unlock() snap := allocRunnerState{ Alloc: r.alloc, - RestartPolicy: r.RestartPolicy, Context: r.ctx, AllocClientStatus: r.allocClientStatus, AllocClientDescription: r.allocClientDescription, @@ -347,9 +342,6 @@ func (r *AllocRunner) Run() { return } - // Extract the RestartPolicy from the TG and set it on the alloc - r.RestartPolicy = tg.RestartPolicy - // Create the execution context if r.ctx == nil { allocDir := allocdir.NewAllocDir(filepath.Join(r.config.AllocDir, r.alloc.ID)) @@ -370,9 +362,8 @@ func (r *AllocRunner) Run() { // Merge in the task resources task.Resources = alloc.TaskResources[task.Name] - restartTracker := newRestartTracker(r.RestartPolicy, r.alloc.Job.Type) - tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, - r.alloc, task, restartTracker, r.consulService) + tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, r.alloc, + task, r.consulService) r.tasks[task.Name] = tr go tr.Run() } @@ -413,7 +404,7 @@ OUTER: break FOUND } } - tr.Update(task) + tr.Update(update) } r.taskLock.RUnlock() diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 6b808b0e694..5d68e013cc5 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -1,6 +1,7 @@ package client import ( + "fmt" "os" "testing" "time" @@ -49,10 +50,13 @@ func TestAllocRunner_SimpleRun(t *testing.T) { testutil.WaitForResult(func() (bool, error) { if upd.Count == 0 { - return false, nil + return false, fmt.Errorf("No updates") } last := upd.Allocs[upd.Count-1] - return last.ClientStatus == structs.AllocClientStatusDead, nil + if last.ClientStatus == structs.AllocClientStatusDead { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead) + } + return true, nil }, func(err error) { t.Fatalf("err: %v", err) }) diff --git a/client/driver/docker.go b/client/driver/docker.go index 631babdfd1f..ba2d494074e 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -614,6 +614,9 @@ func (h *DockerHandle) WaitCh() chan *cstructs.WaitResult { } func (h *DockerHandle) Update(task *structs.Task) error { + // Store the updated kill timeout. + h.killTimeout = task.KillTimeout + // Update is not possible return nil } diff --git a/client/driver/driver.go b/client/driver/driver.go index 5fd1238b380..f92b84fad8d 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -105,7 +105,8 @@ type DriverHandle interface { // WaitCh is used to return a channel used wait for task completion WaitCh() chan *cstructs.WaitResult - // Update is used to update the task if possible + // Update is used to update the task if possible and update task related + // configurations. Update(task *structs.Task) error // Kill is used to stop the task diff --git a/client/driver/exec.go b/client/driver/exec.go index efa846619a8..fb24f021fef 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -173,6 +173,9 @@ func (h *execHandle) WaitCh() chan *cstructs.WaitResult { } func (h *execHandle) Update(task *structs.Task) error { + // Store the updated kill timeout. + h.killTimeout = task.KillTimeout + // Update is not possible return nil } diff --git a/client/driver/java.go b/client/driver/java.go index 2a7cb5a3002..3cb32679c38 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -221,6 +221,9 @@ func (h *javaHandle) WaitCh() chan *cstructs.WaitResult { } func (h *javaHandle) Update(task *structs.Task) error { + // Store the updated kill timeout. + h.killTimeout = task.KillTimeout + // Update is not possible return nil } diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 76cd5aa42b2..df36b4cd320 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -262,6 +262,9 @@ func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult { } func (h *qemuHandle) Update(task *structs.Task) error { + // Store the updated kill timeout. + h.killTimeout = task.KillTimeout + // Update is not possible return nil } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 376d0b626fd..5c870e80fa2 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -170,6 +170,9 @@ func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult { } func (h *rawExecHandle) Update(task *structs.Task) error { + // Store the updated kill timeout. + h.killTimeout = task.KillTimeout + // Update is not possible return nil } diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 764e17fb195..2b0f6218ab2 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -278,6 +278,9 @@ func (h *rktHandle) WaitCh() chan *cstructs.WaitResult { } func (h *rktHandle) Update(task *structs.Task) error { + // Store the updated kill timeout. + h.killTimeout = task.KillTimeout + // Update is not possible return nil } diff --git a/client/restarts.go b/client/restarts.go index a7ac78f345d..8e6af3bf4d0 100644 --- a/client/restarts.go +++ b/client/restarts.go @@ -2,6 +2,7 @@ package client import ( "math/rand" + "sync" "time" "github.com/hashicorp/nomad/nomad/structs" @@ -29,9 +30,22 @@ type RestartTracker struct { startTime time.Time // When the interval began policy *structs.RestartPolicy rand *rand.Rand + lock sync.Mutex } +// SetPolicy updates the policy used to determine restarts. +func (r *RestartTracker) SetPolicy(policy *structs.RestartPolicy) { + r.lock.Lock() + defer r.lock.Unlock() + r.policy = policy +} + +// NextRestart takes the exit code from the last attempt and returns whether the +// task should be restarted and the duration to wait. func (r *RestartTracker) NextRestart(exitCode int) (bool, time.Duration) { + r.lock.Lock() + defer r.lock.Unlock() + // Hot path if no attempts are expected if r.policy.Attempts == 0 { return false, 0 diff --git a/client/task_runner.go b/client/task_runner.go index 8eac8e23e13..3d2b5c7c228 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -28,7 +28,7 @@ type TaskRunner struct { consulService *ConsulService task *structs.Task - updateCh chan *structs.Task + updateCh chan *structs.Allocation handle driver.DriverHandle destroy bool @@ -52,7 +52,15 @@ type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent) func NewTaskRunner(logger *log.Logger, config *config.Config, updater TaskStateUpdater, ctx *driver.ExecContext, alloc *structs.Allocation, task *structs.Task, - restartTracker *RestartTracker, consulService *ConsulService) *TaskRunner { + consulService *ConsulService) *TaskRunner { + + // Build the restart tracker. + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + if tg == nil { + logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup) + return nil + } + restartTracker := newRestartTracker(tg.RestartPolicy, alloc.Job.Type) tc := &TaskRunner{ config: config, @@ -63,7 +71,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, ctx: ctx, alloc: alloc, task: task, - updateCh: make(chan *structs.Task, 8), + updateCh: make(chan *structs.Allocation, 8), destroyCh: make(chan struct{}), waitCh: make(chan struct{}), } @@ -231,10 +239,8 @@ func (r *TaskRunner) run() { case waitRes = <-r.handle.WaitCh(): break OUTER case update := <-r.updateCh: - // Update - r.task = update - if err := r.handle.Update(update); err != nil { - r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err) + if err := r.handleUpdate(update); err != nil { + r.logger.Printf("[ERR] client: update to task %q failed: %v", r.task.Name, err) } case <-r.destroyCh: // Avoid destroying twice @@ -303,6 +309,49 @@ func (r *TaskRunner) run() { return } +// handleUpdate takes an updated allocation and updates internal state to +// reflect the new config for the task. +func (r *TaskRunner) handleUpdate(update *structs.Allocation) error { + // Extract the task group from the alloc. + tg := update.Job.LookupTaskGroup(update.TaskGroup) + if tg == nil { + return fmt.Errorf("alloc '%s' missing task group '%s'", update.ID, update.TaskGroup) + } + + // Extract the task. + var task *structs.Task + for _, t := range tg.Tasks { + if t.Name == r.task.Name { + task = t + } + } + if task == nil { + return fmt.Errorf("task group %q doesn't contain task %q", tg.Name, r.task.Name) + } + r.task = task + + // Update will update resources and store the new kill timeout. + if r.handle != nil { + if err := r.handle.Update(task); err != nil { + r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, err) + } + } + + // Update the restart policy. + if r.restartTracker != nil { + r.restartTracker.SetPolicy(tg.RestartPolicy) + } + + /* TODO + // Re-register the task to consul and store the updated alloc. + r.consulService.Deregister(r.task, r.alloc) + r.alloc = update + r.consulService.Register(r.task, r.alloc) + */ + + return nil +} + // Helper function for converting a WaitResult into a TaskTerminated event. func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEvent { return structs.NewTaskEvent(structs.TaskTerminated). @@ -312,12 +361,12 @@ func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEve } // Update is used to update the task of the context -func (r *TaskRunner) Update(update *structs.Task) { +func (r *TaskRunner) Update(update *structs.Allocation) { select { case r.updateCh <- update: default: r.logger.Printf("[ERR] client: dropping task update '%s' (alloc '%s')", - update.Name, r.alloc.ID) + r.task.Name, r.alloc.ID) } } diff --git a/client/task_runner_test.go b/client/task_runner_test.go index f0f6c92e24e..c757c5893bd 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -48,13 +48,10 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { allocDir.Build([]*structs.Task{task}) ctx := driver.NewExecContext(allocDir, alloc.ID) - rp := structs.NewRestartPolicy(structs.JobTypeService) - restartTracker := newRestartTracker(rp, alloc.Job.Type) + tr := NewTaskRunner(logger, conf, upd.Update, ctx, mock.Alloc(), task, consulClient) if !restarts { - restartTracker = noRestartsTracker() + tr.restartTracker = noRestartsTracker() } - - tr := NewTaskRunner(logger, conf, upd.Update, ctx, mock.Alloc(), task, restartTracker, consulClient) return upd, tr } @@ -134,20 +131,49 @@ func TestTaskRunner_Update(t *testing.T) { // Change command to ensure we run for a bit tr.task.Config["command"] = "/bin/sleep" - tr.task.Config["args"] = []string{"10"} + tr.task.Config["args"] = []string{"100"} go tr.Run() defer tr.Destroy() defer tr.ctx.AllocDir.Destroy() // Update the task definition - newTask := new(structs.Task) - *newTask = *tr.task + updateAlloc := tr.alloc.Copy() + + // Update the restart policy + newTG := updateAlloc.Job.TaskGroups[0] + newMode := "foo" + newTG.RestartPolicy.Mode = newMode + + newTask := updateAlloc.Job.TaskGroups[0].Tasks[0] newTask.Driver = "foobar" - tr.Update(newTask) + + // Update the kill timeout + testutil.WaitForResult(func() (bool, error) { + if tr.handle == nil { + return false, fmt.Errorf("task not started") + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + oldHandle := tr.handle.ID() + newTask.KillTimeout = time.Hour + + tr.Update(updateAlloc) // Wait for update to take place testutil.WaitForResult(func() (bool, error) { - return tr.task == newTask, nil + if tr.task != newTask { + return false, fmt.Errorf("task not updated") + } + if tr.restartTracker.policy.Mode != newMode { + return false, fmt.Errorf("restart policy not updated") + } + if tr.handle.ID() == oldHandle { + return false, fmt.Errorf("handle not updated") + } + return true, nil }, func(err error) { t.Fatalf("err: %v", err) }) @@ -172,8 +198,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { // Create a new task runner consulClient, _ := NewConsulService(&consulServiceConfig{tr.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}}) tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update, - tr.ctx, tr.alloc, &structs.Task{Name: tr.task.Name}, tr.restartTracker, - consulClient) + tr.ctx, tr.alloc, &structs.Task{Name: tr.task.Name}, consulClient) if err := tr2.RestoreState(); err != nil { t.Fatalf("err: %v", err) }