From 2afcbbe0823695d22e6b388da9e036790801e1dc Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 12 Nov 2015 15:28:22 -0800 Subject: [PATCH 1/7] Add TaskState to alloc --- api/allocations.go | 2 ++ api/tasks.go | 28 ++++++++++++++++++++ nomad/structs/structs.go | 56 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 86 insertions(+) diff --git a/api/allocations.go b/api/allocations.go index 829c1030f97..00ab6984dde 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -52,6 +52,7 @@ type Allocation struct { DesiredDescription string ClientStatus string ClientDescription string + TaskStates map[string]*TaskState CreateIndex uint64 ModifyIndex uint64 } @@ -83,6 +84,7 @@ type AllocationListStub struct { DesiredDescription string ClientStatus string ClientDescription string + TaskStates map[string]*TaskState CreateIndex uint64 ModifyIndex uint64 } diff --git a/api/tasks.go b/api/tasks.go index 2535d5ec565..5c6dd600be0 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -110,3 +110,31 @@ func (t *Task) Constrain(c *Constraint) *Task { t.Constraints = append(t.Constraints, c) return t } + +// TaskState tracks the current state of a task and events that caused state +// transistions. +type TaskState struct { + State string + Events []*TaskEvent +} + +// TaskEventType is the set of events that effect the state of a task. +type TaskEventType int + +const ( + TaskDriverFailure TaskEventType = iota + TaskStarted + TaskTerminated + TaskKilled +) + +// TaskEvent is an event that effects the state of a task and contains meta-data +// appropriate to the events type. +type TaskEvent struct { + Type TaskEventType + Time int64 + DriverError error + ExitCode int + Signal int + Message string +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index ce5007ac54c..a3d535e7207 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1053,6 +1053,57 @@ func (t *Task) GoString() string { return fmt.Sprintf("*%#v", *t) } +// Set of possible states for a task. +const ( + TaskStatePending = "pending" // The task is waiting to be run. + TaskStateRunning = "running" // The task is currently running. + TaskStateDead = "dead" // Terminal state of task. +) + +// TaskState tracks the current state of a task and events that caused state +// transistions. +type TaskState struct { + // The current state of the task. + State string + + // Series of task events that transistion the state of the task. + Events []*TaskEvent +} + +// TaskEventType is the set of events that effect the state of a task. +type TaskEventType int + +const ( + // A Driver failure indicates that the task could not be started due to a + // failure in the driver. + TaskDriverFailure TaskEventType = iota + + // Task Started signals that the task was started and its timestamp can be + // used to determine the running length of the task. + TaskStarted + + // Task terminated indicates that the task was started and exited. + TaskTerminated + + // Task Killed indicates a user has killed the task. + TaskKilled +) + +// TaskEvent is an event that effects the state of a task and contains meta-data +// appropriate to the events type. +type TaskEvent struct { + Type TaskEventType + Time int64 // Unix Nanosecond timestamp + + // Driver Failure fields. + DriverError error // A driver error occured while starting the task. + + // Task Terminated Fields. + ExitCode int // The exit code of the task. + Signal int // The signal that terminated the task. + Message string // A possible message explaining the termination of the task. +} + // Validate is used to sanity check a task group func (t *Task) Validate() error { var mErr multierror.Error @@ -1171,6 +1222,9 @@ type Allocation struct { // ClientStatusDescription is meant to provide more human useful information ClientDescription string + // TaskStates stores the state of each task, + TaskStates map[string]*TaskState + // Raft Indexes CreateIndex uint64 ModifyIndex uint64 @@ -1200,6 +1254,7 @@ func (a *Allocation) Stub() *AllocListStub { DesiredDescription: a.DesiredDescription, ClientStatus: a.ClientStatus, ClientDescription: a.ClientDescription, + TaskStates: a.TaskStates, CreateIndex: a.CreateIndex, ModifyIndex: a.ModifyIndex, } @@ -1217,6 +1272,7 @@ type AllocListStub struct { DesiredDescription string ClientStatus string ClientDescription string + TaskStates map[string]*TaskState CreateIndex uint64 ModifyIndex uint64 } From 9681d09d561fb7e1cc3d7f56a30fec2dc362395b Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 12 Nov 2015 17:47:51 -0800 Subject: [PATCH 2/7] Initialize task state in allocation sent by scheduler --- nomad/mock/mock.go | 8 +++++++- scheduler/generic_sched.go | 2 ++ scheduler/system_sched.go | 2 ++ scheduler/util.go | 8 ++++++++ scheduler/util_test.go | 23 +++++++++++++++++++++++ 5 files changed, 42 insertions(+), 1 deletion(-) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 329ecd87200..2ef5c834afd 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -1,8 +1,9 @@ package mock import ( - "github.com/hashicorp/nomad/nomad/structs" "time" + + "github.com/hashicorp/nomad/nomad/structs" ) func Node() *structs.Node { @@ -221,6 +222,11 @@ func Alloc() *structs.Allocation { }, }, }, + TaskStates: map[string]*structs.TaskState{ + "web": &structs.TaskState{ + State: structs.TaskStatePending, + }, + }, Job: Job(), DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 7957f2360ff..b3b48665883 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -285,11 +285,13 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { alloc.TaskResources = option.TaskResources alloc.DesiredStatus = structs.AllocDesiredStatusRun alloc.ClientStatus = structs.AllocClientStatusPending + alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStatePending) s.plan.AppendAlloc(alloc) } else { alloc.DesiredStatus = structs.AllocDesiredStatusFailed alloc.DesiredDescription = "failed to find a node for placement" alloc.ClientStatus = structs.AllocClientStatusFailed + alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStateDead) s.plan.AppendFailed(alloc) failedTG[missing.TaskGroup] = alloc } diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index d3f6fb27e60..d448642ff53 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -252,11 +252,13 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { alloc.TaskResources = option.TaskResources alloc.DesiredStatus = structs.AllocDesiredStatusRun alloc.ClientStatus = structs.AllocClientStatusPending + alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStatePending) s.plan.AppendAlloc(alloc) } else { alloc.DesiredStatus = structs.AllocDesiredStatusFailed alloc.DesiredDescription = "failed to find a node for placement" alloc.ClientStatus = structs.AllocClientStatusFailed + alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStateDead) s.plan.AppendFailed(alloc) failedTG[missing.TaskGroup] = alloc } diff --git a/scheduler/util.go b/scheduler/util.go index 43b4b0b0cfd..44bd2ae0851 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -445,3 +445,11 @@ func taskGroupConstraints(tg *structs.TaskGroup) tgConstrainTuple { return c } + +func initTaskState(tg *structs.TaskGroup, state string) map[string]*structs.TaskState { + states := make(map[string]*structs.TaskState, len(tg.Tasks)) + for _, task := range tg.Tasks { + states[task.Name] = &structs.TaskState{State: state} + } + return states +} diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 7873c55c582..275be30ba9a 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -648,3 +648,26 @@ func TestTaskGroupConstraints(t *testing.T) { } } + +func TestInitTaskState(t *testing.T) { + tg := &structs.TaskGroup{ + Tasks: []*structs.Task{ + &structs.Task{Name: "foo"}, + &structs.Task{Name: "bar"}, + }, + } + expPending := map[string]*structs.TaskState{ + "foo": &structs.TaskState{State: structs.TaskStatePending}, + "bar": &structs.TaskState{State: structs.TaskStatePending}, + } + expDead := map[string]*structs.TaskState{ + "foo": &structs.TaskState{State: structs.TaskStateDead}, + "bar": &structs.TaskState{State: structs.TaskStateDead}, + } + actPending := initTaskState(tg, structs.TaskStatePending) + actDead := initTaskState(tg, structs.TaskStateDead) + + if !(reflect.DeepEqual(expPending, actPending) && reflect.DeepEqual(expDead, actDead)) { + t.Fatal("Expected and actual not equal") + } +} From f4b461f97a9be0c2cd6990b5d3c94a82566c79c7 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 13 Nov 2015 22:07:13 -0800 Subject: [PATCH 3/7] Track Task State in the client and capture Wait results --- api/tasks.go | 1 + client/alloc_runner.go | 78 ++++++------ client/alloc_runner_test.go | 25 ++-- client/driver/docker.go | 17 ++- client/driver/docker_test.go | 21 ++-- client/driver/driver.go | 4 +- client/driver/exec.go | 16 +-- client/driver/exec_test.go | 28 ++--- client/driver/executor/exec.go | 4 +- client/driver/executor/exec_basic.go | 15 +-- client/driver/executor/exec_linux.go | 17 ++- client/driver/executor/test_harness.go | 8 +- client/driver/java.go | 16 +-- client/driver/java_test.go | 10 +- client/driver/qemu.go | 16 +-- client/driver/raw_exec.go | 16 +-- client/driver/raw_exec_test.go | 16 +-- client/driver/rkt.go | 19 +-- client/driver/rkt_test.go | 21 ++-- client/driver/spawn/spawn.go | 27 +++-- client/driver/spawn/spawn_test.go | 60 ++++------ client/driver/structs/structs.go | 27 +++++ client/restarts.go | 8 +- client/task_runner.go | 160 +++++++++++++++---------- client/task_runner_test.go | 83 ++++++------- nomad/structs/structs.go | 41 ++++++- 26 files changed, 418 insertions(+), 336 deletions(-) create mode 100644 client/driver/structs/structs.go diff --git a/api/tasks.go b/api/tasks.go index 5c6dd600be0..4a130546150 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -137,4 +137,5 @@ type TaskEvent struct { ExitCode int Signal int Message string + KillError string } diff --git a/client/alloc_runner.go b/client/alloc_runner.go index aa7cf79e593..ebbb0801c21 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -43,10 +43,10 @@ type AllocRunner struct { ctx *driver.ExecContext tasks map[string]*TaskRunner + restored map[string]struct{} RestartPolicy *structs.RestartPolicy taskLock sync.RWMutex - taskStatus map[string]taskStatus taskStatusLock sync.RWMutex updateCh chan *structs.Allocation @@ -68,16 +68,16 @@ type allocRunnerState struct { // NewAllocRunner is used to create a new allocation context func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater, alloc *structs.Allocation) *AllocRunner { ar := &AllocRunner{ - config: config, - updater: updater, - logger: logger, - alloc: alloc, - dirtyCh: make(chan struct{}, 1), - tasks: make(map[string]*TaskRunner), - taskStatus: make(map[string]taskStatus), - updateCh: make(chan *structs.Allocation, 8), - destroyCh: make(chan struct{}), - waitCh: make(chan struct{}), + config: config, + updater: updater, + logger: logger, + alloc: alloc, + dirtyCh: make(chan struct{}, 1), + tasks: make(map[string]*TaskRunner), + restored: make(map[string]struct{}), + updateCh: make(chan *structs.Allocation, 8), + destroyCh: make(chan struct{}), + waitCh: make(chan struct{}), } return ar } @@ -98,20 +98,22 @@ func (r *AllocRunner) RestoreState() error { // Restore fields r.alloc = snap.Alloc r.RestartPolicy = snap.RestartPolicy - r.taskStatus = snap.TaskStatus r.ctx = snap.Context // Restore the task runners var mErr multierror.Error - for name, status := range r.taskStatus { + for name, state := range r.alloc.TaskStates { + // Mark the task as restored. + r.restored[name] = struct{}{} + task := &structs.Task{Name: name} restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy) - tr := NewTaskRunner(r.logger, r.config, r.setTaskStatus, r.ctx, r.alloc.ID, task, restartTracker) + tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, + r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker) r.tasks[name] = tr // Skip tasks in terminal states. - if status.Status == structs.AllocClientStatusDead || - status.Status == structs.AllocClientStatusFailed { + if state.State == structs.TaskStateDead { continue } @@ -152,7 +154,6 @@ func (r *AllocRunner) saveAllocRunnerState() error { snap := allocRunnerState{ Alloc: r.alloc, RestartPolicy: r.RestartPolicy, - TaskStatus: r.taskStatus, Context: r.ctx, } return persistState(r.stateFilePath(), &snap) @@ -223,22 +224,26 @@ func (r *AllocRunner) retrySyncState(stopCh chan struct{}) { // syncStatus is used to run and sync the status when it changes func (r *AllocRunner) syncStatus() error { - // Scan the task status to termine the status of the alloc + // Scan the task states to determine the status of the alloc var pending, running, dead, failed bool r.taskStatusLock.RLock() - pending = len(r.taskStatus) < len(r.tasks) - for _, status := range r.taskStatus { - switch status.Status { - case structs.AllocClientStatusRunning: + for _, state := range r.alloc.TaskStates { + switch state.State { + case structs.TaskStateRunning: running = true - case structs.AllocClientStatusDead: - dead = true - case structs.AllocClientStatusFailed: - failed = true + case structs.TaskStatePending: + pending = true + case structs.TaskStateDead: + last := len(state.Events) - 1 + if state.Events[last].Type == structs.TaskDriverFailure { + failed = true + } else { + dead = true + } } } - if len(r.taskStatus) > 0 { - taskDesc, _ := json.Marshal(r.taskStatus) + if len(r.alloc.TaskStates) > 0 { + taskDesc, _ := json.Marshal(r.alloc.TaskStates) r.alloc.ClientDescription = string(taskDesc) } r.taskStatusLock.RUnlock() @@ -271,14 +276,8 @@ func (r *AllocRunner) setStatus(status, desc string) { } } -// setTaskStatus is used to set the status of a task -func (r *AllocRunner) setTaskStatus(taskName, status, desc string) { - r.taskStatusLock.Lock() - r.taskStatus[taskName] = taskStatus{ - Status: status, - Description: desc, - } - r.taskStatusLock.Unlock() +// setTaskState is used to set the status of a task +func (r *AllocRunner) setTaskState(taskName string) { select { case r.dirtyCh <- struct{}{}: default: @@ -323,15 +322,15 @@ func (r *AllocRunner) Run() { // Start the task runners r.taskLock.Lock() for _, task := range tg.Tasks { - // Skip tasks that were restored - if _, ok := r.taskStatus[task.Name]; ok { + if _, ok := r.restored[task.Name]; ok { continue } // Merge in the task resources task.Resources = alloc.TaskResources[task.Name] restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy) - tr := NewTaskRunner(r.logger, r.config, r.setTaskStatus, r.ctx, r.alloc.ID, task, restartTracker) + tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx, + r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker) r.tasks[task.Name] = tr go tr.Run() } @@ -344,7 +343,6 @@ OUTER: case update := <-r.updateCh: // Check if we're in a terminal status if update.TerminalStatus() { - r.setAlloc(update) break OUTER } diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 281189fbcfb..9abe6b8a27a 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -24,20 +24,25 @@ func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) error { return m.Err } -func testAllocRunner() (*MockAllocStateUpdater, *AllocRunner) { +func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) { logger := testLogger() conf := DefaultConfig() conf.StateDir = os.TempDir() conf.AllocDir = os.TempDir() upd := &MockAllocStateUpdater{} alloc := mock.Alloc() + if !restarts { + alloc.Job.Type = structs.JobTypeBatch + *alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0} + } + ar := NewAllocRunner(logger, conf, upd.Update, alloc) return upd, ar } func TestAllocRunner_SimpleRun(t *testing.T) { ctestutil.ExecCompatible(t) - upd, ar := testAllocRunner() + upd, ar := testAllocRunner(false) go ar.Run() defer ar.Destroy() @@ -54,7 +59,7 @@ func TestAllocRunner_SimpleRun(t *testing.T) { func TestAllocRunner_Destroy(t *testing.T) { ctestutil.ExecCompatible(t) - upd, ar := testAllocRunner() + upd, ar := testAllocRunner(false) // Ensure task takes some time task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -76,17 +81,17 @@ func TestAllocRunner_Destroy(t *testing.T) { last := upd.Allocs[upd.Count-1] return last.ClientStatus == structs.AllocClientStatusDead, nil }, func(err error) { - t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus) + t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) }) - if time.Since(start) > time.Second { + if time.Since(start) > 8*time.Second { t.Fatalf("took too long to terminate") } } func TestAllocRunner_Update(t *testing.T) { ctestutil.ExecCompatible(t) - upd, ar := testAllocRunner() + upd, ar := testAllocRunner(false) // Ensure task takes some time task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -109,17 +114,17 @@ func TestAllocRunner_Update(t *testing.T) { last := upd.Allocs[upd.Count-1] return last.ClientStatus == structs.AllocClientStatusDead, nil }, func(err error) { - t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus) + t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) }) - if time.Since(start) > time.Second { + if time.Since(start) > 8*time.Second { t.Fatalf("took too long to terminate") } } func TestAllocRunner_SaveRestoreState(t *testing.T) { ctestutil.ExecCompatible(t) - upd, ar := testAllocRunner() + upd, ar := testAllocRunner(false) // Ensure task takes some time task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -156,7 +161,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { last := upd.Allocs[upd.Count-1] return last.ClientStatus == structs.AllocClientStatusDead, nil }, func(err error) { - t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus) + t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) }) if time.Since(start) > 15*time.Second { diff --git a/client/driver/docker.go b/client/driver/docker.go index b89f31eb32d..11bfe75bb5b 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -9,13 +9,14 @@ import ( "strconv" "strings" - docker "github.com/fsouza/go-dockerclient" - "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/args" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/nomad/structs" + + docker "github.com/fsouza/go-dockerclient" + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) type DockerDriver struct { @@ -35,7 +36,7 @@ type dockerHandle struct { cleanupImage bool imageID string containerID string - waitCh chan error + waitCh chan *cstructs.WaitResult doneCh chan struct{} } @@ -414,7 +415,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle imageID: dockerImage.ID, containerID: container.ID, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -474,7 +475,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er imageID: pid.ImageID, containerID: pid.ContainerID, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -493,7 +494,7 @@ func (h *dockerHandle) ID() string { return fmt.Sprintf("DOCKER:%s", string(data)) } -func (h *dockerHandle) WaitCh() chan error { +func (h *dockerHandle) WaitCh() chan *cstructs.WaitResult { return h.waitCh } @@ -565,8 +566,6 @@ func (h *dockerHandle) run() { } close(h.doneCh) - if err != nil { - h.waitCh <- err - } + h.waitCh <- cstructs.NewWaitResult(exitCode, 0, err) close(h.waitCh) } diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index 872c2419b72..da4eba72558 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -11,6 +11,7 @@ import ( docker "github.com/fsouza/go-dockerclient" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/environment" + cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/nomad/structs" ) @@ -58,7 +59,7 @@ func TestDockerDriver_Handle(t *testing.T) { imageID: "imageid", containerID: "containerid", doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } actual := h.ID() @@ -163,9 +164,9 @@ func TestDockerDriver_Start_Wait(t *testing.T) { } select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(5 * time.Second): t.Fatalf("timeout") @@ -210,9 +211,9 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) { defer handle.Kill() select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(5 * time.Second): t.Fatalf("timeout") @@ -268,9 +269,9 @@ func TestDockerDriver_Start_Kill_Wait(t *testing.T) { }() select { - case err := <-handle.WaitCh(): - if err == nil { - t.Fatalf("should err: %v", err) + case res := <-handle.WaitCh(): + if res.Successful() { + t.Fatalf("should err: %v", res) } case <-time.After(10 * time.Second): t.Fatalf("timeout") diff --git a/client/driver/driver.go b/client/driver/driver.go index e2739e2b8a2..259c9df3608 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -11,6 +11,8 @@ import ( "github.com/hashicorp/nomad/client/driver/environment" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) // BuiltinDrivers contains the built in registered drivers @@ -85,7 +87,7 @@ type DriverHandle interface { ID() string // WaitCh is used to return a channel used wait for task completion - WaitCh() chan error + WaitCh() chan *cstructs.WaitResult // Update is used to update the task if possible Update(task *structs.Task) error diff --git a/client/driver/exec.go b/client/driver/exec.go index 4de719c465a..f246dd48e45 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -13,6 +13,8 @@ import ( "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) // ExecDriver fork/execs tasks using as many of the underlying OS's isolation @@ -25,7 +27,7 @@ type ExecDriver struct { // execHandle is returned from Start/Open as a handle to the PID type execHandle struct { cmd executor.Executor - waitCh chan error + waitCh chan *cstructs.WaitResult doneCh chan struct{} } @@ -106,7 +108,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, h := &execHandle{ cmd: cmd, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -123,7 +125,7 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro h := &execHandle{ cmd: cmd, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -134,7 +136,7 @@ func (h *execHandle) ID() string { return id } -func (h *execHandle) WaitCh() chan error { +func (h *execHandle) WaitCh() chan *cstructs.WaitResult { return h.waitCh } @@ -154,10 +156,8 @@ func (h *execHandle) Kill() error { } func (h *execHandle) run() { - err := h.cmd.Wait() + res := h.cmd.Wait() close(h.doneCh) - if err != nil { - h.waitCh <- err - } + h.waitCh <- res close(h.waitCh) } diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index 06c7107456a..86e4320e673 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -99,9 +99,9 @@ func TestExecDriver_Start_Wait(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(4 * time.Second): t.Fatalf("timeout") @@ -143,9 +143,9 @@ func TestExecDriver_Start_Artifact_basic(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(5 * time.Second): t.Fatalf("timeout") @@ -187,9 +187,9 @@ func TestExecDriver_Start_Artifact_expanded(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(5 * time.Second): t.Fatalf("timeout") @@ -224,9 +224,9 @@ func TestExecDriver_Start_Wait_AllocDir(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(2 * time.Second): t.Fatalf("timeout") @@ -278,8 +278,8 @@ func TestExecDriver_Start_Kill_Wait(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err == nil { + case res := <-handle.WaitCh(): + if res.Successful() { t.Fatal("should err") } case <-time.After(8 * time.Second): diff --git a/client/driver/executor/exec.go b/client/driver/executor/exec.go index 8cf076bab25..c514890ef31 100644 --- a/client/driver/executor/exec.go +++ b/client/driver/executor/exec.go @@ -27,6 +27,8 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) var errNoResources = fmt.Errorf("No resources are associated with this task") @@ -54,7 +56,7 @@ type Executor interface { Open(string) error // Wait waits till the user's command is completed. - Wait() error + Wait() *cstructs.WaitResult // Returns a handle that is executor specific for use in reopening. ID() (string, error) diff --git a/client/driver/executor/exec_basic.go b/client/driver/executor/exec_basic.go index 4b5f6a9f15f..438ae6b9212 100644 --- a/client/driver/executor/exec_basic.go +++ b/client/driver/executor/exec_basic.go @@ -15,6 +15,8 @@ import ( "github.com/hashicorp/nomad/client/driver/environment" "github.com/hashicorp/nomad/client/driver/spawn" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) // BasicExecutor should work everywhere, and as a result does not include @@ -92,17 +94,8 @@ func (e *BasicExecutor) Open(id string) error { return e.spawn.Valid() } -func (e *BasicExecutor) Wait() error { - code, err := e.spawn.Wait() - if err != nil { - return err - } - - if code != 0 { - return fmt.Errorf("Task exited with code: %d", code) - } - - return nil +func (e *BasicExecutor) Wait() *cstructs.WaitResult { + return e.spawn.Wait() } func (e *BasicExecutor) ID() (string, error) { diff --git a/client/driver/executor/exec_linux.go b/client/driver/executor/exec_linux.go index 0d1d033cad4..14f4f809ce5 100644 --- a/client/driver/executor/exec_linux.go +++ b/client/driver/executor/exec_linux.go @@ -24,6 +24,8 @@ import ( cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs" "github.com/opencontainers/runc/libcontainer/cgroups/systemd" cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) var ( @@ -199,15 +201,11 @@ func (e *LinuxExecutor) Start() error { // Wait waits til the user process exits and returns an error on non-zero exit // codes. Wait also cleans up the task directory and created cgroups. -func (e *LinuxExecutor) Wait() error { +func (e *LinuxExecutor) Wait() *cstructs.WaitResult { errs := new(multierror.Error) - code, err := e.spawn.Wait() - if err != nil { - errs = multierror.Append(errs, err) - } - - if code != 0 { - errs = multierror.Append(errs, fmt.Errorf("Task exited with code: %d", code)) + res := e.spawn.Wait() + if res.Err != nil { + errs = multierror.Append(errs, res.Err) } if err := e.destroyCgroup(); err != nil { @@ -218,7 +216,8 @@ func (e *LinuxExecutor) Wait() error { errs = multierror.Append(errs, err) } - return errs.ErrorOrNil() + res.Err = errs.ErrorOrNil() + return res } func (e *LinuxExecutor) Shutdown() error { diff --git a/client/driver/executor/test_harness.go b/client/driver/executor/test_harness.go index 1e37f8eff93..8ebd5434c9b 100644 --- a/client/driver/executor/test_harness.go +++ b/client/driver/executor/test_harness.go @@ -127,8 +127,8 @@ func Executor_Start_Wait(t *testing.T, command buildExecCommand) { log.Panicf("Start() failed: %v", err) } - if err := e.Wait(); err != nil { - log.Panicf("Wait() failed: %v", err) + if res := e.Wait(); !res.Successful() { + log.Panicf("Wait() failed: %v", res) } output, err := ioutil.ReadFile(absFilePath) @@ -215,8 +215,8 @@ func Executor_Open(t *testing.T, command buildExecCommand, newExecutor func() Ex log.Panicf("Open(%v) failed: %v", id, err) } - if err := e2.Wait(); err != nil { - log.Panicf("Wait() failed: %v", err) + if res := e2.Wait(); !res.Successful() { + log.Panicf("Wait() failed: %v", res) } output, err := ioutil.ReadFile(absFilePath) diff --git a/client/driver/java.go b/client/driver/java.go index 1aa2c6d3f94..5b4a8b5bd64 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -16,6 +16,8 @@ import ( "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) // JavaDriver is a simple driver to execute applications packaged in Jars. @@ -28,7 +30,7 @@ type JavaDriver struct { // javaHandle is returned from Start/Open as a handle to the PID type javaHandle struct { cmd executor.Executor - waitCh chan error + waitCh chan *cstructs.WaitResult doneCh chan struct{} } @@ -148,7 +150,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, h := &javaHandle{ cmd: cmd, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -166,7 +168,7 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro h := &javaHandle{ cmd: cmd, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -178,7 +180,7 @@ func (h *javaHandle) ID() string { return id } -func (h *javaHandle) WaitCh() chan error { +func (h *javaHandle) WaitCh() chan *cstructs.WaitResult { return h.waitCh } @@ -198,10 +200,8 @@ func (h *javaHandle) Kill() error { } func (h *javaHandle) run() { - err := h.cmd.Wait() + res := h.cmd.Wait() close(h.doneCh) - if err != nil { - h.waitCh <- err - } + h.waitCh <- res close(h.waitCh) } diff --git a/client/driver/java_test.go b/client/driver/java_test.go index 0c3490d0e15..b72c5899bdd 100644 --- a/client/driver/java_test.go +++ b/client/driver/java_test.go @@ -118,9 +118,9 @@ func TestJavaDriver_Start_Wait(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(2 * time.Second): // expect the timeout b/c it's a long lived process @@ -171,8 +171,8 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err == nil { + case res := <-handle.WaitCh(): + if res.Successful() { t.Fatal("should err") } case <-time.After(8 * time.Second): diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 79193a217ce..1b9835cdcfc 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -16,6 +16,8 @@ import ( "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) var ( @@ -33,7 +35,7 @@ type QemuDriver struct { // qemuHandle is returned from Start/Open as a handle to the PID type qemuHandle struct { cmd executor.Executor - waitCh chan error + waitCh chan *cstructs.WaitResult doneCh chan struct{} } @@ -193,7 +195,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, h := &qemuHandle{ cmd: cmd, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -211,7 +213,7 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro h := &execHandle{ cmd: cmd, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -222,7 +224,7 @@ func (h *qemuHandle) ID() string { return id } -func (h *qemuHandle) WaitCh() chan error { +func (h *qemuHandle) WaitCh() chan *cstructs.WaitResult { return h.waitCh } @@ -244,10 +246,8 @@ func (h *qemuHandle) Kill() error { } func (h *qemuHandle) run() { - err := h.cmd.Wait() + res := h.cmd.Wait() close(h.doneCh) - if err != nil { - h.waitCh <- err - } + h.waitCh <- res close(h.waitCh) } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 12e99b7f426..d3b78d04f47 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -12,6 +12,8 @@ import ( "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) const ( @@ -30,7 +32,7 @@ type RawExecDriver struct { // rawExecHandle is returned from Start/Open as a handle to the PID type rawExecHandle struct { cmd executor.Executor - waitCh chan error + waitCh chan *cstructs.WaitResult doneCh chan struct{} } @@ -115,7 +117,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl h := &execHandle{ cmd: cmd, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -132,7 +134,7 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e h := &execHandle{ cmd: cmd, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -143,7 +145,7 @@ func (h *rawExecHandle) ID() string { return id } -func (h *rawExecHandle) WaitCh() chan error { +func (h *rawExecHandle) WaitCh() chan *cstructs.WaitResult { return h.waitCh } @@ -163,10 +165,8 @@ func (h *rawExecHandle) Kill() error { } func (h *rawExecHandle) run() { - err := h.cmd.Wait() + res := h.cmd.Wait() close(h.doneCh) - if err != nil { - h.waitCh <- err - } + h.waitCh <- res close(h.waitCh) } diff --git a/client/driver/raw_exec_test.go b/client/driver/raw_exec_test.go index 053f29337b0..f91425fc09d 100644 --- a/client/driver/raw_exec_test.go +++ b/client/driver/raw_exec_test.go @@ -216,9 +216,9 @@ func TestRawExecDriver_Start_Wait(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(2 * time.Second): t.Fatalf("timeout") @@ -252,9 +252,9 @@ func TestRawExecDriver_Start_Wait_AllocDir(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(2 * time.Second): t.Fatalf("timeout") @@ -305,8 +305,8 @@ func TestRawExecDriver_Start_Kill_Wait(t *testing.T) { // Task should terminate quickly select { - case err := <-handle.WaitCh(): - if err == nil { + case res := <-handle.WaitCh(): + if res.Successful() { t.Fatal("should err") } case <-time.After(2 * time.Second): diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 3f191253174..1463f78a425 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -19,6 +19,8 @@ import ( "github.com/hashicorp/nomad/client/driver/args" "github.com/hashicorp/nomad/client/fingerprint" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) var ( @@ -39,7 +41,7 @@ type rktHandle struct { proc *os.Process image string logger *log.Logger - waitCh chan error + waitCh chan *cstructs.WaitResult doneCh chan struct{} } @@ -183,7 +185,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e image: img, logger: d.logger, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() return h, nil @@ -209,7 +211,7 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error image: qpid.Image, logger: d.logger, doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } go h.run() @@ -229,7 +231,7 @@ func (h *rktHandle) ID() string { return fmt.Sprintf("Rkt:%s", string(data)) } -func (h *rktHandle) WaitCh() chan error { +func (h *rktHandle) WaitCh() chan *cstructs.WaitResult { return h.waitCh } @@ -253,10 +255,11 @@ func (h *rktHandle) Kill() error { func (h *rktHandle) run() { ps, err := h.proc.Wait() close(h.doneCh) - if err != nil { - h.waitCh <- err - } else if !ps.Success() { - h.waitCh <- fmt.Errorf("task exited with error") + code := 0 + if !ps.Success() { + // TODO: Better exit code parsing. + code = 1 } + h.waitCh <- cstructs.NewWaitResult(code, 0, err) close(h.waitCh) } diff --git a/client/driver/rkt_test.go b/client/driver/rkt_test.go index 53ea2a4277c..aeed4288bd4 100644 --- a/client/driver/rkt_test.go +++ b/client/driver/rkt_test.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/nomad/structs" + cstructs "github.com/hashicorp/nomad/client/driver/structs" ctestutils "github.com/hashicorp/nomad/client/testutil" ) @@ -35,7 +36,7 @@ func TestRktDriver_Handle(t *testing.T) { proc: &os.Process{Pid: 123}, image: "foo", doneCh: make(chan struct{}), - waitCh: make(chan error, 1), + waitCh: make(chan *cstructs.WaitResult, 1), } actual := h.ID() @@ -143,9 +144,9 @@ func TestRktDriver_Start_Wait(t *testing.T) { } select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(5 * time.Second): t.Fatalf("timeout") @@ -184,9 +185,9 @@ func TestRktDriver_Start_Wait_Skip_Trust(t *testing.T) { } select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(5 * time.Second): t.Fatalf("timeout") @@ -220,9 +221,9 @@ func TestRktDriver_Start_Wait_Logs(t *testing.T) { defer handle.Kill() select { - case err := <-handle.WaitCh(): - if err != nil { - t.Fatalf("err: %v", err) + case res := <-handle.WaitCh(): + if !res.Successful() { + t.Fatalf("err: %v", res) } case <-time.After(5 * time.Second): t.Fatalf("timeout") diff --git a/client/driver/spawn/spawn.go b/client/driver/spawn/spawn.go index 9cf06e99091..32a27e11c57 100644 --- a/client/driver/spawn/spawn.go +++ b/client/driver/spawn/spawn.go @@ -11,6 +11,7 @@ import ( "time" "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/command" "github.com/hashicorp/nomad/helper/discover" ) @@ -203,7 +204,7 @@ func (s *Spawner) sendAbortCommand(w io.Writer) error { // Wait returns the exit code of the user process or an error if the wait // failed. -func (s *Spawner) Wait() (int, error) { +func (s *Spawner) Wait() *structs.WaitResult { if os.Getpid() == s.SpawnPpid { return s.waitAsParent() } @@ -212,9 +213,9 @@ func (s *Spawner) Wait() (int, error) { } // waitAsParent waits on the process if the current process was the spawner. -func (s *Spawner) waitAsParent() (int, error) { +func (s *Spawner) waitAsParent() *structs.WaitResult { if s.SpawnPpid != os.Getpid() { - return -1, fmt.Errorf("not the parent. Spawner parent is %v; current pid is %v", s.SpawnPpid, os.Getpid()) + return structs.NewWaitResult(-1, 0, fmt.Errorf("not the parent. Spawner parent is %v; current pid is %v", s.SpawnPpid, os.Getpid())) } // Try to reattach to the spawn. @@ -228,7 +229,7 @@ func (s *Spawner) waitAsParent() (int, error) { } if _, err := s.spawn.Wait(); err != nil { - return -1, err + return structs.NewWaitResult(-1, 0, err) } return s.pollWait() @@ -237,11 +238,11 @@ func (s *Spawner) waitAsParent() (int, error) { // pollWait polls on the spawn daemon to determine when it exits. After it // exits, it reads the state file and returns the exit code and possibly an // error. -func (s *Spawner) pollWait() (int, error) { +func (s *Spawner) pollWait() *structs.WaitResult { // Stat to check if it is there to avoid a race condition. stat, err := os.Stat(s.StateFile) if err != nil { - return -1, fmt.Errorf("Failed to Stat exit status file %v: %v", s.StateFile, err) + return structs.NewWaitResult(-1, 0, fmt.Errorf("Failed to Stat exit status file %v: %v", s.StateFile, err)) } // If there is data it means that the file has already been written. @@ -261,29 +262,29 @@ func (s *Spawner) pollWait() (int, error) { // readExitCode parses the state file and returns the exit code of the task. It // returns an error if the file can't be read. -func (s *Spawner) readExitCode() (int, error) { +func (s *Spawner) readExitCode() *structs.WaitResult { f, err := os.Open(s.StateFile) defer f.Close() if err != nil { - return -1, fmt.Errorf("Failed to open %v to read exit code: %v", s.StateFile, err) + return structs.NewWaitResult(-1, 0, fmt.Errorf("Failed to open %v to read exit code: %v", s.StateFile, err)) } stat, err := f.Stat() if err != nil { - return -1, fmt.Errorf("Failed to stat file %v: %v", s.StateFile, err) + return structs.NewWaitResult(-1, 0, fmt.Errorf("Failed to stat file %v: %v", s.StateFile, err)) } if stat.Size() == 0 { - return -1, fmt.Errorf("Empty state file: %v", s.StateFile) + return structs.NewWaitResult(-1, 0, fmt.Errorf("Empty state file: %v", s.StateFile)) } var exitStatus command.SpawnExitStatus dec := json.NewDecoder(f) if err := dec.Decode(&exitStatus); err != nil { - return -1, fmt.Errorf("Failed to parse exit status from %v: %v", s.StateFile, err) + return structs.NewWaitResult(-1, 0, fmt.Errorf("Failed to parse exit status from %v: %v", s.StateFile, err)) } - return exitStatus.ExitCode, nil + return structs.NewWaitResult(exitStatus.ExitCode, 0, nil) } // Valid checks that the state of the Spawner is valid and that a subsequent @@ -297,7 +298,7 @@ func (s *Spawner) Valid() error { } // The task isn't alive so check that there is a valid exit code file. - if _, err := s.readExitCode(); err == nil { + if res := s.readExitCode(); res.Err == nil { return nil } diff --git a/client/driver/spawn/spawn_test.go b/client/driver/spawn/spawn_test.go index de5d8e97ce3..eb013db0de2 100644 --- a/client/driver/spawn/spawn_test.go +++ b/client/driver/spawn/spawn_test.go @@ -68,8 +68,8 @@ func TestSpawn_SetsLogs(t *testing.T) { t.Fatalf("Spawn() failed: %v", err) } - if code, err := spawn.Wait(); code != 0 && err != nil { - t.Fatalf("Wait() returned %v, %v; want 0, nil", code, err) + if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil { + t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err) } stdout2, err := os.Open(stdout.Name()) @@ -129,13 +129,8 @@ func TestSpawn_ParentWaitExited(t *testing.T) { time.Sleep(1 * time.Second) - code, err := spawn.Wait() - if err != nil { - t.Fatalf("Wait() failed %v", err) - } - - if code != 0 { - t.Fatalf("Wait() returned %v; want 0", code) + if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil { + t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err) } } @@ -152,13 +147,8 @@ func TestSpawn_ParentWait(t *testing.T) { t.Fatalf("Spawn() failed %v", err) } - code, err := spawn.Wait() - if err != nil { - t.Fatalf("Wait() failed %v", err) - } - - if code != 0 { - t.Fatalf("Wait() returned %v; want 0", code) + if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil { + t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err) } } @@ -179,13 +169,8 @@ func TestSpawn_NonParentWaitExited(t *testing.T) { // Force the wait to assume non-parent. spawn.SpawnPpid = 0 - code, err := spawn.Wait() - if err != nil { - t.Fatalf("Wait() failed %v", err) - } - - if code != 0 { - t.Fatalf("Wait() returned %v; want 0", code) + if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil { + t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err) } } @@ -213,13 +198,8 @@ func TestSpawn_NonParentWait(t *testing.T) { // Force the wait to assume non-parent. spawn.SpawnPpid = 0 - code, err := spawn.Wait() - if err != nil { - t.Fatalf("Wait() failed %v", err) - } - - if code != 0 { - t.Fatalf("Wait() returned %v; want 0", code) + if res := spawn.Wait(); res.ExitCode != 0 && res.Err != nil { + t.Fatalf("Wait() returned %v, %v; want 0, nil", res.ExitCode, res.Err) } } @@ -255,8 +235,8 @@ func TestSpawn_DeadSpawnDaemon_Parent(t *testing.T) { t.FailNow() } - if _, err := spawn.Wait(); err == nil { - t.Fatalf("Wait() should have failed: %v", err) + if res := spawn.Wait(); res.Err == nil { + t.Fatalf("Wait() should have failed: %v", res.Err) } } @@ -294,8 +274,8 @@ func TestSpawn_DeadSpawnDaemon_NonParent(t *testing.T) { // Force the wait to assume non-parent. spawn.SpawnPpid = 0 - if _, err := spawn.Wait(); err == nil { - t.Fatalf("Wait() should have failed: %v", err) + if res := spawn.Wait(); res.Err == nil { + t.Fatalf("Wait() should have failed: %v", res.Err) } } @@ -316,8 +296,8 @@ func TestSpawn_Valid_TaskRunning(t *testing.T) { t.Fatalf("Valid() failed: %v", err) } - if _, err := spawn.Wait(); err != nil { - t.Fatalf("Wait() failed %v", err) + if res := spawn.Wait(); res.Err != nil { + t.Fatalf("Wait() failed: %v", res.Err) } } @@ -334,8 +314,8 @@ func TestSpawn_Valid_TaskExit_ExitCode(t *testing.T) { t.Fatalf("Spawn() failed %v", err) } - if _, err := spawn.Wait(); err != nil { - t.Fatalf("Wait() failed %v", err) + if res := spawn.Wait(); res.Err != nil { + t.Fatalf("Wait() failed: %v", res.Err) } if err := spawn.Valid(); err != nil { @@ -355,8 +335,8 @@ func TestSpawn_Valid_TaskExit_NoExitCode(t *testing.T) { t.Fatalf("Spawn() failed %v", err) } - if _, err := spawn.Wait(); err != nil { - t.Fatalf("Wait() failed %v", err) + if res := spawn.Wait(); res.Err != nil { + t.Fatalf("Wait() failed: %v", res.Err) } // Delete the file so that it can't find the exit code. diff --git a/client/driver/structs/structs.go b/client/driver/structs/structs.go new file mode 100644 index 00000000000..a60daf8df4a --- /dev/null +++ b/client/driver/structs/structs.go @@ -0,0 +1,27 @@ +package structs + +import "fmt" + +// WaitResult stores the result of a Wait operation. +type WaitResult struct { + ExitCode int + Signal int + Err error +} + +func NewWaitResult(code, signal int, err error) *WaitResult { + return &WaitResult{ + ExitCode: code, + Signal: signal, + Err: err, + } +} + +func (r *WaitResult) Successful() bool { + return r.ExitCode == 0 && r.Signal == 0 && r.Err == nil +} + +func (r *WaitResult) String() string { + return fmt.Sprintf("Wait returned exit code %v, signal %v, and error %v", + r.ExitCode, r.Signal, r.Err) +} diff --git a/client/restarts.go b/client/restarts.go index 4141405f8d6..ae940c2f1e1 100644 --- a/client/restarts.go +++ b/client/restarts.go @@ -1,8 +1,9 @@ package client import ( - "github.com/hashicorp/nomad/nomad/structs" "time" + + "github.com/hashicorp/nomad/nomad/structs" ) // The errorCounter keeps track of the number of times a process has exited @@ -30,6 +31,11 @@ func newRestartTracker(jobType string, restartPolicy *structs.RestartPolicy) res } } +// noRestartsTracker returns a RestartTracker that never restarts. +func noRestartsTracker() restartTracker { + return &batchRestartTracker{maxAttempts: 0} +} + type batchRestartTracker struct { maxAttempts int delay time.Duration diff --git a/client/task_runner.go b/client/task_runner.go index c8a5a390aa1..b2a2f46d4ef 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -13,6 +13,8 @@ import ( "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" "github.com/hashicorp/nomad/nomad/structs" + + cstructs "github.com/hashicorp/nomad/client/driver/structs" ) // TaskRunner is used to wrap a task within an allocation and provide the execution context. @@ -25,6 +27,7 @@ type TaskRunner struct { restartTracker restartTracker task *structs.Task + state *structs.TaskState updateCh chan *structs.Task handle driver.DriverHandle @@ -42,13 +45,14 @@ type taskRunnerState struct { HandleID string } -// TaskStateUpdater is used to update the status of a task -type TaskStateUpdater func(taskName, status, desc string) +// TaskStateUpdater is used to signal that tasks state has changed. +type TaskStateUpdater func(taskName string) // NewTaskRunner is used to create a new task context func NewTaskRunner(logger *log.Logger, config *config.Config, updater TaskStateUpdater, ctx *driver.ExecContext, - allocID string, task *structs.Task, restartTracker restartTracker) *TaskRunner { + allocID string, task *structs.Task, state *structs.TaskState, + restartTracker restartTracker) *TaskRunner { tc := &TaskRunner{ config: config, @@ -58,6 +62,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, ctx: ctx, allocID: allocID, task: task, + state: state, updateCh: make(chan *structs.Task, 8), destroyCh: make(chan struct{}), waitCh: make(chan struct{}), @@ -132,12 +137,19 @@ func (r *TaskRunner) DestroyState() error { return os.RemoveAll(r.stateFilePath()) } -// setStatus is used to update the status of the task runner -func (r *TaskRunner) setStatus(status, desc string) { +// setState is used to update the state of the task runner +func (r *TaskRunner) setState(state string, event *structs.TaskEvent) { + // Update the task. + r.state.State = state + r.state.Events = append(r.state.Events, event) + + // Persist our state to disk. if err := r.SaveState(); err != nil { r.logger.Printf("[ERR] client: failed to save state of Task Runner: %v", r.task.Name) } - r.updater(r.task.Name, status, desc) + + // Indicate the task has been updated. + r.updater(r.task.Name) } // createDriver makes a driver for the task @@ -157,7 +169,8 @@ func (r *TaskRunner) startTask() error { // Create a driver driver, err := r.createDriver() if err != nil { - r.setStatus(structs.AllocClientStatusFailed, err.Error()) + e := structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err) + r.setState(structs.TaskStateDead, e) return err } @@ -166,94 +179,115 @@ func (r *TaskRunner) startTask() error { if err != nil { r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err) - r.setStatus(structs.AllocClientStatusFailed, - fmt.Sprintf("failed to start: %v", err)) + e := structs.NewTaskEvent(structs.TaskDriverFailure). + SetDriverError(fmt.Errorf("failed to start: %v", err)) + r.setState(structs.TaskStateDead, e) return err } r.handle = handle - r.setStatus(structs.AllocClientStatusRunning, "task started") + r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) return nil } // Run is a long running routine used to manage the task func (r *TaskRunner) Run() { - var err error defer close(r.waitCh) r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')", r.task.Name, r.allocID) - // Start the task if not yet started - if r.handle == nil { - if err := r.startTask(); err != nil { - return - } - } + r.run(false) + return +} - // Monitoring the Driver - defer r.DestroyState() - err = r.monitorDriver(r.handle.WaitCh(), r.updateCh, r.destroyCh) - for err != nil { - r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", - r.task.Name, r.allocID, err) - shouldRestart, when := r.restartTracker.nextRestart() - if !shouldRestart { - r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.allocID) - r.setStatus(structs.AllocClientStatusDead, fmt.Sprintf("task failed with: %v", err)) +func (r *TaskRunner) run(forceStart bool) { + // Start the task if not yet started or it is being forced. + if r.handle == nil || forceStart { + if err := r.startTask(); err != nil { return } - - r.logger.Printf("[INFO] client: Restarting Task: %v", r.task.Name) - r.setStatus(structs.AllocClientStatusPending, "Task Restarting") - r.logger.Printf("[DEBUG] client: Sleeping for %v before restarting Task %v", when, r.task.Name) - select { - case <-time.After(when): - case <-r.destroyCh: - } - r.destroyLock.Lock() - if r.destroy { - r.logger.Printf("[DEBUG] client: Not restarting task: %v because it's destroyed by user", r.task.Name) - break - } - if err = r.startTask(); err != nil { - r.destroyLock.Unlock() - continue - } - r.destroyLock.Unlock() - err = r.monitorDriver(r.handle.WaitCh(), r.updateCh, r.destroyCh) } - // Cleanup after ourselves - r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.allocID) - r.setStatus(structs.AllocClientStatusDead, "task completed") -} + // Store the errors that caused use to stop waiting for updates. + var waitRes *cstructs.WaitResult + var destroyErr error + destroyed := false -// This functions listens to messages from the driver and blocks until the -// driver exits -func (r *TaskRunner) monitorDriver(waitCh chan error, updateCh chan *structs.Task, destroyCh chan struct{}) error { - var err error OUTER: // Wait for updates for { select { - case err = <-waitCh: + case waitRes = <-r.handle.WaitCh(): break OUTER - case update := <-updateCh: + case update := <-r.updateCh: // Update r.task = update if err := r.handle.Update(update); err != nil { - r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", - r.task.Name, r.allocID, err) + r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err) } - - case <-destroyCh: + case <-r.destroyCh: // Send the kill signal, and use the WaitCh to block until complete if err := r.handle.Kill(); err != nil { - r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", - r.task.Name, r.allocID, err) + r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err) + destroyErr = err } + destroyed = true } } - return err + + // If the user destroyed the task, we do not attempt to do any restarts. + if destroyed { + r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(destroyErr)) + return + } + + // Log whether the task was successful or not. + if !waitRes.Successful() { + r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", r.task.Name, r.allocID, waitRes) + } else { + r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.allocID) + } + + // Check if we should restart. If not mark task as dead and exit. + waitEvent := r.waitErrorToEvent(waitRes) + shouldRestart, when := r.restartTracker.nextRestart() + if !shouldRestart { + r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.allocID) + r.setState(structs.TaskStateDead, waitEvent) + return + } + + r.logger.Printf("[INFO] client: Restarting Task: %v", r.task.Name) + r.logger.Printf("[DEBUG] client: Sleeping for %v before restarting Task %v", when, r.task.Name) + r.setState(structs.TaskStatePending, waitEvent) + + // Sleep but watch for destroy events. + select { + case <-time.After(when): + case <-r.destroyCh: + } + + // Destroyed while we were waiting to restart, so abort. + r.destroyLock.Lock() + destroyed = r.destroy + r.destroyLock.Unlock() + if destroyed { + r.logger.Printf("[DEBUG] client: Not restarting task: %v because it's destroyed by user", r.task.Name) + r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled)) + return + } + + // Recurse on ourselves and force the start since we are restarting the task. + r.run(true) + return +} + +// Helper function for converting a WaitResult into a TaskTerminated event. +func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEvent { + e := structs.NewTaskEvent(structs.TaskTerminated).SetExitCode(res.ExitCode).SetSignal(res.Signal) + if res.Err != nil { + e.SetExitMessage(res.Err.Error()) + } + return e } // Update is used to update the task of the context diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 751d00fa9ec..c8f8697fa3a 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -4,7 +4,6 @@ import ( "log" "os" "path/filepath" - "strings" "testing" "time" @@ -21,21 +20,11 @@ func testLogger() *log.Logger { return log.New(os.Stderr, "", log.LstdFlags) } -type MockTaskStateUpdater struct { - Count int - Name []string - Status []string - Description []string -} +type MockTaskStateUpdater struct{} -func (m *MockTaskStateUpdater) Update(name, status, desc string) { - m.Count += 1 - m.Name = append(m.Name, name) - m.Status = append(m.Status, status) - m.Description = append(m.Description, desc) -} +func (m *MockTaskStateUpdater) Update(name string) {} -func testTaskRunner() (*MockTaskStateUpdater, *TaskRunner) { +func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { logger := testLogger() conf := DefaultConfig() conf.StateDir = os.TempDir() @@ -54,13 +43,18 @@ func testTaskRunner() (*MockTaskStateUpdater, *TaskRunner) { ctx := driver.NewExecContext(allocDir, alloc.ID) rp := structs.NewRestartPolicy(structs.JobTypeService) restartTracker := newRestartTracker(structs.JobTypeService, rp) - tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc.ID, task, restartTracker) + if !restarts { + restartTracker = noRestartsTracker() + } + + state := alloc.TaskStates[task.Name] + tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc.ID, task, state, restartTracker) return upd, tr } func TestTaskRunner_SimpleRun(t *testing.T) { ctestutil.ExecCompatible(t) - upd, tr := testTaskRunner() + _, tr := testTaskRunner(false) go tr.Run() defer tr.Destroy() defer tr.ctx.AllocDir.Destroy() @@ -71,33 +65,26 @@ func TestTaskRunner_SimpleRun(t *testing.T) { t.Fatalf("timeout") } - if upd.Count != 2 { - t.Fatalf("should have 2 updates: %#v", upd) - } - if upd.Name[0] != tr.task.Name { - t.Fatalf("bad: %#v", upd.Name) - } - if upd.Status[0] != structs.AllocClientStatusRunning { - t.Fatalf("bad: %#v", upd.Status) - } - if upd.Description[0] != "task started" { - t.Fatalf("bad: %#v", upd.Description) + if len(tr.state.Events) != 2 { + t.Fatalf("should have 2 updates: %#v", tr.state.Events) } - if upd.Name[1] != tr.task.Name { - t.Fatalf("bad: %#v", upd.Name) + if tr.state.State != structs.TaskStateDead { + t.Fatalf("TaskState %v; want %v", tr.state.State, structs.TaskStateDead) } - if upd.Status[1] != structs.AllocClientStatusDead { - t.Fatalf("bad: %#v", upd.Status) + + if tr.state.Events[0].Type != structs.TaskStarted { + t.Fatalf("First Event was %v; want %v", tr.state.Events[0].Type, structs.TaskStarted) } - if upd.Description[1] != "task completed" { - t.Fatalf("bad: %#v", upd.Description) + + if tr.state.Events[1].Type != structs.TaskTerminated { + t.Fatalf("First Event was %v; want %v", tr.state.Events[1].Type, structs.TaskTerminated) } } func TestTaskRunner_Destroy(t *testing.T) { ctestutil.ExecCompatible(t) - upd, tr := testTaskRunner() + _, tr := testTaskRunner(true) defer tr.ctx.AllocDir.Destroy() // Change command to ensure we run for a bit @@ -113,27 +100,31 @@ func TestTaskRunner_Destroy(t *testing.T) { select { case <-tr.WaitCh(): - case <-time.After(2 * time.Second): + case <-time.After(8 * time.Second): t.Fatalf("timeout") } - if upd.Count != 2 { - t.Fatalf("should have 2 updates: %#v", upd) + if len(tr.state.Events) != 2 { + t.Fatalf("should have 2 updates: %#v", tr.state.Events) } - if upd.Status[0] != structs.AllocClientStatusRunning { - t.Fatalf("bad: %#v", upd.Status) + + if tr.state.State != structs.TaskStateDead { + t.Fatalf("TaskState %v; want %v", tr.state.State, structs.TaskStateDead) } - if upd.Status[1] != structs.AllocClientStatusDead { - t.Fatalf("bad: %#v", upd.Status) + + if tr.state.Events[0].Type != structs.TaskStarted { + t.Fatalf("First Event was %v; want %v", tr.state.Events[0].Type, structs.TaskStarted) } - if !strings.Contains(upd.Description[1], "task failed") { - t.Fatalf("bad: %#v", upd.Description) + + if tr.state.Events[1].Type != structs.TaskKilled { + t.Fatalf("First Event was %v; want %v", tr.state.Events[1].Type, structs.TaskKilled) } + } func TestTaskRunner_Update(t *testing.T) { ctestutil.ExecCompatible(t) - _, tr := testTaskRunner() + _, tr := testTaskRunner(false) // Change command to ensure we run for a bit tr.task.Config["command"] = "/bin/sleep" @@ -158,7 +149,7 @@ func TestTaskRunner_Update(t *testing.T) { func TestTaskRunner_SaveRestoreState(t *testing.T) { ctestutil.ExecCompatible(t) - upd, tr := testTaskRunner() + upd, tr := testTaskRunner(false) // Change command to ensure we run for a bit tr.task.Config["command"] = "/bin/sleep" @@ -174,7 +165,7 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { // Create a new task runner tr2 := NewTaskRunner(tr.logger, tr.config, upd.Update, - tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.restartTracker) + tr.ctx, tr.allocID, &structs.Task{Name: tr.task.Name}, tr.state, tr.restartTracker) if err := tr2.RestoreState(); err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a3d535e7207..3cc7fdbac7d 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1096,12 +1096,51 @@ type TaskEvent struct { Time int64 // Unix Nanosecond timestamp // Driver Failure fields. - DriverError error // A driver error occured while starting the task. + DriverError string // A driver error occured while starting the task. // Task Terminated Fields. ExitCode int // The exit code of the task. Signal int // The signal that terminated the task. Message string // A possible message explaining the termination of the task. + + // Task Killed Fields. + KillError string // Error killing the task. +} + +func NewTaskEvent(event TaskEventType) *TaskEvent { + return &TaskEvent{ + Type: event, + Time: time.Now().UnixNano(), + } +} + +func (e *TaskEvent) SetDriverError(err error) *TaskEvent { + if err != nil { + e.DriverError = err.Error() + } + return e +} + +func (e *TaskEvent) SetExitCode(c int) *TaskEvent { + e.ExitCode = c + return e +} + +func (e *TaskEvent) SetSignal(s int) *TaskEvent { + e.Signal = s + return e +} + +func (e *TaskEvent) SetExitMessage(m string) *TaskEvent { + e.Message = m + return e +} + +func (e *TaskEvent) SetKillError(err error) *TaskEvent { + if err != nil { + e.KillError = err.Error() + } + return e } // Validate is used to sanity check a task group From fa68fb62d3cb52d8a3c581f6213b8339e2a431a5 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sat, 14 Nov 2015 14:13:32 -0800 Subject: [PATCH 4/7] Change event type to string --- api/tasks.go | 13 +++++-------- nomad/structs/structs.go | 15 ++++++--------- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index 4a130546150..548906ae2f3 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -118,20 +118,17 @@ type TaskState struct { Events []*TaskEvent } -// TaskEventType is the set of events that effect the state of a task. -type TaskEventType int - const ( - TaskDriverFailure TaskEventType = iota - TaskStarted - TaskTerminated - TaskKilled + TaskDriverFailure = "Driver Failure" + TaskStarted = "Started" + TaskTerminated = "Terminated" + TaskKilled = "Killed" ) // TaskEvent is an event that effects the state of a task and contains meta-data // appropriate to the events type. type TaskEvent struct { - Type TaskEventType + Type string Time int64 DriverError error ExitCode int diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 3cc7fdbac7d..7c040e9d8de 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1070,29 +1070,26 @@ type TaskState struct { Events []*TaskEvent } -// TaskEventType is the set of events that effect the state of a task. -type TaskEventType int - const ( // A Driver failure indicates that the task could not be started due to a // failure in the driver. - TaskDriverFailure TaskEventType = iota + TaskDriverFailure = "Driver Failure" // Task Started signals that the task was started and its timestamp can be // used to determine the running length of the task. - TaskStarted + TaskStarted = "Started" // Task terminated indicates that the task was started and exited. - TaskTerminated + TaskTerminated = "Terminated" // Task Killed indicates a user has killed the task. - TaskKilled + TaskKilled = "Killed" ) // TaskEvent is an event that effects the state of a task and contains meta-data // appropriate to the events type. type TaskEvent struct { - Type TaskEventType + Type string Time int64 // Unix Nanosecond timestamp // Driver Failure fields. @@ -1107,7 +1104,7 @@ type TaskEvent struct { KillError string // Error killing the task. } -func NewTaskEvent(event TaskEventType) *TaskEvent { +func NewTaskEvent(event string) *TaskEvent { return &TaskEvent{ Type: event, Time: time.Now().UnixNano(), From b692bcd0b46cca0e22e643e1b486753fcab6ed7d Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 16 Nov 2015 12:10:29 -0800 Subject: [PATCH 5/7] Fix the capacity --- api/tasks.go | 2 +- client/alloc_runner.go | 10 ---------- client/task_runner.go | 19 ++++++++++++++++++- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index 548906ae2f3..2eb77f9351b 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -130,7 +130,7 @@ const ( type TaskEvent struct { Type string Time int64 - DriverError error + DriverError string ExitCode int Signal int Message string diff --git a/client/alloc_runner.go b/client/alloc_runner.go index ebbb0801c21..d723478dbe8 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -183,16 +183,6 @@ func (r *AllocRunner) Alloc() *structs.Allocation { return r.alloc } -// setAlloc is used to update the allocation of the runner -// we preserve the existing client status and description -func (r *AllocRunner) setAlloc(alloc *structs.Allocation) { - if r.alloc != nil { - alloc.ClientStatus = r.alloc.ClientStatus - alloc.ClientDescription = r.alloc.ClientDescription - } - r.alloc = alloc -} - // dirtySyncState is used to watch for state being marked dirty to sync func (r *AllocRunner) dirtySyncState() { for { diff --git a/client/task_runner.go b/client/task_runner.go index b2a2f46d4ef..936ccecbf50 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -137,11 +137,27 @@ func (r *TaskRunner) DestroyState() error { return os.RemoveAll(r.stateFilePath()) } +func (r *TaskRunner) appendEvent(event *structs.TaskEvent) { + capacity := 10 + if r.state.Events == nil { + r.state.Events = make([]*structs.TaskEvent, 0, capacity) + } + + // If we hit capacity, then shift it. + if len(r.state.Events) == capacity { + old := r.state.Events + r.state.Events = make([]*structs.TaskEvent, 0, capacity) + r.state.Events = append(r.state.Events, old[1:]...) + } + + r.state.Events = append(r.state.Events, event) +} + // setState is used to update the state of the task runner func (r *TaskRunner) setState(state string, event *structs.TaskEvent) { // Update the task. r.state.State = state - r.state.Events = append(r.state.Events, event) + r.appendEvent(event) // Persist our state to disk. if err := r.SaveState(); err != nil { @@ -278,6 +294,7 @@ OUTER: // Recurse on ourselves and force the start since we are restarting the task. r.run(true) + // TODO: Alex return } From 9aa9a2244bc8d3cde55ca6cf025b635dfb3477d9 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 16 Nov 2015 12:26:11 -0800 Subject: [PATCH 6/7] Use loop not recursion --- client/task_runner.go | 143 +++++++++++++++++++++--------------------- 1 file changed, 73 insertions(+), 70 deletions(-) diff --git a/client/task_runner.go b/client/task_runner.go index 936ccecbf50..8db5ffb0947 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -211,90 +211,93 @@ func (r *TaskRunner) Run() { r.logger.Printf("[DEBUG] client: starting task context for '%s' (alloc '%s')", r.task.Name, r.allocID) - r.run(false) + r.run() return } -func (r *TaskRunner) run(forceStart bool) { - // Start the task if not yet started or it is being forced. - if r.handle == nil || forceStart { - if err := r.startTask(); err != nil { - return - } - } - - // Store the errors that caused use to stop waiting for updates. - var waitRes *cstructs.WaitResult - var destroyErr error - destroyed := false - -OUTER: - // Wait for updates +func (r *TaskRunner) run() { + var forceStart bool for { - select { - case waitRes = <-r.handle.WaitCh(): - break OUTER - case update := <-r.updateCh: - // Update - r.task = update - if err := r.handle.Update(update); err != nil { - r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err) + // Start the task if not yet started or it is being forced. + if r.handle == nil || forceStart { + forceStart = false + if err := r.startTask(); err != nil { + return } - case <-r.destroyCh: - // Send the kill signal, and use the WaitCh to block until complete - if err := r.handle.Kill(); err != nil { - r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err) - destroyErr = err + } + + // Store the errors that caused use to stop waiting for updates. + var waitRes *cstructs.WaitResult + var destroyErr error + destroyed := false + + OUTER: + // Wait for updates + for { + select { + case waitRes = <-r.handle.WaitCh(): + break OUTER + case update := <-r.updateCh: + // Update + r.task = update + if err := r.handle.Update(update); err != nil { + r.logger.Printf("[ERR] client: failed to update task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err) + } + case <-r.destroyCh: + // Send the kill signal, and use the WaitCh to block until complete + if err := r.handle.Kill(); err != nil { + r.logger.Printf("[ERR] client: failed to kill task '%s' for alloc '%s': %v", r.task.Name, r.allocID, err) + destroyErr = err + } + destroyed = true } - destroyed = true } - } - // If the user destroyed the task, we do not attempt to do any restarts. - if destroyed { - r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(destroyErr)) - return - } + // If the user destroyed the task, we do not attempt to do any restarts. + if destroyed { + r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(destroyErr)) + return + } - // Log whether the task was successful or not. - if !waitRes.Successful() { - r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", r.task.Name, r.allocID, waitRes) - } else { - r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.allocID) - } + // Log whether the task was successful or not. + if !waitRes.Successful() { + r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", r.task.Name, r.allocID, waitRes) + } else { + r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.allocID) + } - // Check if we should restart. If not mark task as dead and exit. - waitEvent := r.waitErrorToEvent(waitRes) - shouldRestart, when := r.restartTracker.nextRestart() - if !shouldRestart { - r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.allocID) - r.setState(structs.TaskStateDead, waitEvent) - return - } + // Check if we should restart. If not mark task as dead and exit. + waitEvent := r.waitErrorToEvent(waitRes) + shouldRestart, when := r.restartTracker.nextRestart() + if !shouldRestart { + r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.allocID) + r.setState(structs.TaskStateDead, waitEvent) + return + } - r.logger.Printf("[INFO] client: Restarting Task: %v", r.task.Name) - r.logger.Printf("[DEBUG] client: Sleeping for %v before restarting Task %v", when, r.task.Name) - r.setState(structs.TaskStatePending, waitEvent) + r.logger.Printf("[INFO] client: Restarting Task: %v", r.task.Name) + r.logger.Printf("[DEBUG] client: Sleeping for %v before restarting Task %v", when, r.task.Name) + r.setState(structs.TaskStatePending, waitEvent) - // Sleep but watch for destroy events. - select { - case <-time.After(when): - case <-r.destroyCh: - } + // Sleep but watch for destroy events. + select { + case <-time.After(when): + case <-r.destroyCh: + } - // Destroyed while we were waiting to restart, so abort. - r.destroyLock.Lock() - destroyed = r.destroy - r.destroyLock.Unlock() - if destroyed { - r.logger.Printf("[DEBUG] client: Not restarting task: %v because it's destroyed by user", r.task.Name) - r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled)) - return - } + // Destroyed while we were waiting to restart, so abort. + r.destroyLock.Lock() + destroyed = r.destroy + r.destroyLock.Unlock() + if destroyed { + r.logger.Printf("[DEBUG] client: Not restarting task: %v because it's destroyed by user", r.task.Name) + r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled)) + return + } - // Recurse on ourselves and force the start since we are restarting the task. - r.run(true) - // TODO: Alex + // Set force start because we are restarting the task. + forceStart = true + } return } From 95222f686020434fcb21c0f72554d9856e4ca0a4 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 16 Nov 2015 14:46:18 -0800 Subject: [PATCH 7/7] Change SetExitMessage from taking a string to an error --- client/task_runner.go | 9 ++++----- nomad/structs/structs.go | 6 ++++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/client/task_runner.go b/client/task_runner.go index 8db5ffb0947..984af27670f 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -303,11 +303,10 @@ func (r *TaskRunner) run() { // Helper function for converting a WaitResult into a TaskTerminated event. func (r *TaskRunner) waitErrorToEvent(res *cstructs.WaitResult) *structs.TaskEvent { - e := structs.NewTaskEvent(structs.TaskTerminated).SetExitCode(res.ExitCode).SetSignal(res.Signal) - if res.Err != nil { - e.SetExitMessage(res.Err.Error()) - } - return e + return structs.NewTaskEvent(structs.TaskTerminated). + SetExitCode(res.ExitCode). + SetSignal(res.Signal). + SetExitMessage(res.Err) } // Update is used to update the task of the context diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7c040e9d8de..cdfdfa53e42 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1128,8 +1128,10 @@ func (e *TaskEvent) SetSignal(s int) *TaskEvent { return e } -func (e *TaskEvent) SetExitMessage(m string) *TaskEvent { - e.Message = m +func (e *TaskEvent) SetExitMessage(err error) *TaskEvent { + if err != nil { + e.Message = err.Error() + } return e }