Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track Tasks State #416

Merged
merged 7 commits into from
Nov 16, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Allocation struct {
DesiredDescription string
ClientStatus string
ClientDescription string
TaskStates map[string]*TaskState
CreateIndex uint64
ModifyIndex uint64
}
Expand Down Expand Up @@ -83,6 +84,7 @@ type AllocationListStub struct {
DesiredDescription string
ClientStatus string
ClientDescription string
TaskStates map[string]*TaskState
CreateIndex uint64
ModifyIndex uint64
}
Expand Down
26 changes: 26 additions & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,29 @@ func (t *Task) Constrain(c *Constraint) *Task {
t.Constraints = append(t.Constraints, c)
return t
}

// TaskState tracks the current state of a task and events that caused state
// transistions.
type TaskState struct {
State string
Events []*TaskEvent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we bounding the size of Events somewhere? A service style job might potentially have a lot of events over the course of time if the node on which it is placed remains healthy for a really long time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will in setState as suggested below

}

const (
TaskDriverFailure = "Driver Failure"
TaskStarted = "Started"
TaskTerminated = "Terminated"
TaskKilled = "Killed"
)

// TaskEvent is an event that effects the state of a task and contains meta-data
// appropriate to the events type.
type TaskEvent struct {
Type string
Time int64
DriverError string
ExitCode int
Signal int
Message string
KillError string
}
88 changes: 38 additions & 50 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ type AllocRunner struct {

ctx *driver.ExecContext
tasks map[string]*TaskRunner
restored map[string]struct{}
RestartPolicy *structs.RestartPolicy
taskLock sync.RWMutex

taskStatus map[string]taskStatus
taskStatusLock sync.RWMutex

updateCh chan *structs.Allocation
Expand All @@ -68,16 +68,16 @@ type allocRunnerState struct {
// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater, alloc *structs.Allocation) *AllocRunner {
ar := &AllocRunner{
config: config,
updater: updater,
logger: logger,
alloc: alloc,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStatus: make(map[string]taskStatus),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
config: config,
updater: updater,
logger: logger,
alloc: alloc,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
}
return ar
}
Expand All @@ -98,20 +98,22 @@ func (r *AllocRunner) RestoreState() error {
// Restore fields
r.alloc = snap.Alloc
r.RestartPolicy = snap.RestartPolicy
r.taskStatus = snap.TaskStatus
r.ctx = snap.Context

// Restore the task runners
var mErr multierror.Error
for name, status := range r.taskStatus {
for name, state := range r.alloc.TaskStates {
// Mark the task as restored.
r.restored[name] = struct{}{}

task := &structs.Task{Name: name}
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskStatus, r.ctx, r.alloc.ID, task, restartTracker)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker)
r.tasks[name] = tr

// Skip tasks in terminal states.
if status.Status == structs.AllocClientStatusDead ||
status.Status == structs.AllocClientStatusFailed {
if state.State == structs.TaskStateDead {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully alloc dirs of Tasks in dead states would be cleaned up without their task runners?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep it is the alloc_runner that does the destroy.

continue
}

Expand Down Expand Up @@ -152,7 +154,6 @@ func (r *AllocRunner) saveAllocRunnerState() error {
snap := allocRunnerState{
Alloc: r.alloc,
RestartPolicy: r.RestartPolicy,
TaskStatus: r.taskStatus,
Context: r.ctx,
}
return persistState(r.stateFilePath(), &snap)
Expand Down Expand Up @@ -182,16 +183,6 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
return r.alloc
}

// setAlloc is used to update the allocation of the runner
// we preserve the existing client status and description
func (r *AllocRunner) setAlloc(alloc *structs.Allocation) {
if r.alloc != nil {
alloc.ClientStatus = r.alloc.ClientStatus
alloc.ClientDescription = r.alloc.ClientDescription
}
r.alloc = alloc
}

// dirtySyncState is used to watch for state being marked dirty to sync
func (r *AllocRunner) dirtySyncState() {
for {
Expand Down Expand Up @@ -223,22 +214,26 @@ func (r *AllocRunner) retrySyncState(stopCh chan struct{}) {

// syncStatus is used to run and sync the status when it changes
func (r *AllocRunner) syncStatus() error {
// Scan the task status to termine the status of the alloc
// Scan the task states to determine the status of the alloc
var pending, running, dead, failed bool
r.taskStatusLock.RLock()
pending = len(r.taskStatus) < len(r.tasks)
for _, status := range r.taskStatus {
switch status.Status {
case structs.AllocClientStatusRunning:
for _, state := range r.alloc.TaskStates {
switch state.State {
case structs.TaskStateRunning:
running = true
case structs.AllocClientStatusDead:
dead = true
case structs.AllocClientStatusFailed:
failed = true
case structs.TaskStatePending:
pending = true
case structs.TaskStateDead:
last := len(state.Events) - 1
if state.Events[last].Type == structs.TaskDriverFailure {
failed = true
} else {
dead = true
}
}
}
if len(r.taskStatus) > 0 {
taskDesc, _ := json.Marshal(r.taskStatus)
if len(r.alloc.TaskStates) > 0 {
taskDesc, _ := json.Marshal(r.alloc.TaskStates)
r.alloc.ClientDescription = string(taskDesc)
}
r.taskStatusLock.RUnlock()
Expand Down Expand Up @@ -271,14 +266,8 @@ func (r *AllocRunner) setStatus(status, desc string) {
}
}

// setTaskStatus is used to set the status of a task
func (r *AllocRunner) setTaskStatus(taskName, status, desc string) {
r.taskStatusLock.Lock()
r.taskStatus[taskName] = taskStatus{
Status: status,
Description: desc,
}
r.taskStatusLock.Unlock()
// setTaskState is used to set the status of a task
func (r *AllocRunner) setTaskState(taskName string) {
select {
case r.dirtyCh <- struct{}{}:
default:
Expand Down Expand Up @@ -323,15 +312,15 @@ func (r *AllocRunner) Run() {
// Start the task runners
r.taskLock.Lock()
for _, task := range tg.Tasks {
// Skip tasks that were restored
if _, ok := r.taskStatus[task.Name]; ok {
if _, ok := r.restored[task.Name]; ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be in another PR, but it might be nice to not have logic like this in Task Runner where it decides whether to run itself or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah totally agree. There was just a lot already happening in this PR.

continue
}

// Merge in the task resources
task.Resources = alloc.TaskResources[task.Name]
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskStatus, r.ctx, r.alloc.ID, task, restartTracker)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc.ID, task, r.alloc.TaskStates[task.Name], restartTracker)
r.tasks[task.Name] = tr
go tr.Run()
}
Expand All @@ -344,7 +333,6 @@ OUTER:
case update := <-r.updateCh:
// Check if we're in a terminal status
if update.TerminalStatus() {
r.setAlloc(update)
break OUTER
}

Expand Down
25 changes: 15 additions & 10 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,25 @@ func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) error {
return m.Err
}

func testAllocRunner() (*MockAllocStateUpdater, *AllocRunner) {
func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) {
logger := testLogger()
conf := DefaultConfig()
conf.StateDir = os.TempDir()
conf.AllocDir = os.TempDir()
upd := &MockAllocStateUpdater{}
alloc := mock.Alloc()
if !restarts {
alloc.Job.Type = structs.JobTypeBatch
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
}

ar := NewAllocRunner(logger, conf, upd.Update, alloc)
return upd, ar
}

func TestAllocRunner_SimpleRun(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner()
upd, ar := testAllocRunner(false)
go ar.Run()
defer ar.Destroy()

Expand All @@ -54,7 +59,7 @@ func TestAllocRunner_SimpleRun(t *testing.T) {

func TestAllocRunner_Destroy(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner()
upd, ar := testAllocRunner(false)

// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
Expand All @@ -76,17 +81,17 @@ func TestAllocRunner_Destroy(t *testing.T) {
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus)
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
})

if time.Since(start) > time.Second {
if time.Since(start) > 8*time.Second {
t.Fatalf("took too long to terminate")
}
}

func TestAllocRunner_Update(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner()
upd, ar := testAllocRunner(false)

// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
Expand All @@ -109,17 +114,17 @@ func TestAllocRunner_Update(t *testing.T) {
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus)
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
})

if time.Since(start) > time.Second {
if time.Since(start) > 8*time.Second {
t.Fatalf("took too long to terminate")
}
}

func TestAllocRunner_SaveRestoreState(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner()
upd, ar := testAllocRunner(false)

// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
Expand Down Expand Up @@ -156,7 +161,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus)
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
})

if time.Since(start) > 15*time.Second {
Expand Down
17 changes: 8 additions & 9 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"strconv"
"strings"

docker "github.com/fsouza/go-dockerclient"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/args"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/nomad/structs"

docker "github.com/fsouza/go-dockerclient"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
)

type DockerDriver struct {
Expand All @@ -35,7 +36,7 @@ type dockerHandle struct {
cleanupImage bool
imageID string
containerID string
waitCh chan error
waitCh chan *cstructs.WaitResult
doneCh chan struct{}
}

Expand Down Expand Up @@ -414,7 +415,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
imageID: dockerImage.ID,
containerID: container.ID,
doneCh: make(chan struct{}),
waitCh: make(chan error, 1),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
return h, nil
Expand Down Expand Up @@ -474,7 +475,7 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er
imageID: pid.ImageID,
containerID: pid.ContainerID,
doneCh: make(chan struct{}),
waitCh: make(chan error, 1),
waitCh: make(chan *cstructs.WaitResult, 1),
}
go h.run()
return h, nil
Expand All @@ -493,7 +494,7 @@ func (h *dockerHandle) ID() string {
return fmt.Sprintf("DOCKER:%s", string(data))
}

func (h *dockerHandle) WaitCh() chan error {
func (h *dockerHandle) WaitCh() chan *cstructs.WaitResult {
return h.waitCh
}

Expand Down Expand Up @@ -565,8 +566,6 @@ func (h *dockerHandle) run() {
}

close(h.doneCh)
if err != nil {
h.waitCh <- err
}
h.waitCh <- cstructs.NewWaitResult(exitCode, 0, err)
close(h.waitCh)
}
Loading