Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client only pulls update allocations from server #731

Merged
merged 5 commits into from
Feb 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,6 @@ func (n *Nodes) Allocations(nodeID string, q *QueryOptions) ([]*Allocation, *Que
return resp, qm, nil
}

// ClientAllocations is used to return a lightweight list of allocations associated with a node.
// It is primarily used by the client in order to determine which allocations actually need
// an update.
func (n *Nodes) ClientAllocations(nodeID string, q *QueryOptions) (map[string]uint64, *QueryMeta, error) {
var resp map[string]uint64
qm, err := n.client.query("/v1/node/"+nodeID+"/clientallocations", &resp, q)
if err != nil {
return nil, nil, err
}
return resp, qm, nil
}

// ForceEvaluate is used to force-evaluate an existing node.
func (n *Nodes) ForceEvaluate(nodeID string, q *WriteOptions) (string, *WriteMeta, error) {
var resp nodeEvalResponse
Expand Down
18 changes: 0 additions & 18 deletions api/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,24 +207,6 @@ func TestNodes_Allocations(t *testing.T) {
}
}

func TestNodes_ClientAllocations(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
nodes := c.Nodes()

// Looking up by a non-existent node returns nothing. We
// don't check the index here because it's possible the node
// has already registered, in which case we will get a non-
// zero result anyways.
allocs, _, err := nodes.ClientAllocations("nope", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if n := len(allocs); n != 0 {
t.Fatalf("expected 0 allocs, got: %d", n)
}
}

func TestNodes_ForceEvaluate(t *testing.T) {
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
Expand Down
176 changes: 118 additions & 58 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package client

import (
"encoding/json"
"fmt"
"log"
"os"
Expand All @@ -22,12 +21,6 @@ const (
allocSyncRetryIntv = 15 * time.Second
)

// taskStatus is used to track the status of a task
type taskStatus struct {
Status string
Description string
}

// AllocStateUpdater is used to update the status of an allocation
type AllocStateUpdater func(alloc *structs.Allocation) error

Expand All @@ -38,12 +31,18 @@ type AllocRunner struct {
logger *log.Logger
consulService *ConsulService

alloc *structs.Allocation
alloc *structs.Allocation
allocLock sync.Mutex

// Explicit status of allocation. Set when there are failures
allocClientStatus string
allocClientDescription string

dirtyCh chan struct{}

ctx *driver.ExecContext
tasks map[string]*TaskRunner
taskStates map[string]*structs.TaskState
restored map[string]struct{}
RestartPolicy *structs.RestartPolicy
taskLock sync.RWMutex
Expand All @@ -60,10 +59,12 @@ type AllocRunner struct {

// allocRunnerState is used to snapshot the state of the alloc runner
type allocRunnerState struct {
Alloc *structs.Allocation
RestartPolicy *structs.RestartPolicy
TaskStatus map[string]taskStatus
Context *driver.ExecContext
Alloc *structs.Allocation
AllocClientStatus string
AllocClientDescription string
RestartPolicy *structs.RestartPolicy
TaskStates map[string]*structs.TaskState
Context *driver.ExecContext
}

// NewAllocRunner is used to create a new allocation context
Expand All @@ -77,6 +78,7 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStat
consulService: consulService,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStates: alloc.TaskStates,
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 8),
destroyCh: make(chan struct{}),
Expand All @@ -102,18 +104,20 @@ func (r *AllocRunner) RestoreState() error {
r.alloc = snap.Alloc
r.RestartPolicy = snap.RestartPolicy
r.ctx = snap.Context
r.allocClientStatus = snap.AllocClientStatus
r.allocClientDescription = snap.AllocClientDescription
r.taskStates = snap.TaskStates

// Restore the task runners
var mErr multierror.Error
for name, state := range r.alloc.TaskStates {
for name, state := range r.taskStates {
// Mark the task as restored.
r.restored[name] = struct{}{}

task := &structs.Task{Name: name}
restartTracker := newRestartTracker(r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
r.alloc, task, restartTracker, r.consulService)
r.tasks[name] = tr

// Skip tasks in terminal states.
Expand Down Expand Up @@ -155,10 +159,15 @@ func (r *AllocRunner) SaveState() error {
func (r *AllocRunner) saveAllocRunnerState() error {
r.taskStatusLock.RLock()
defer r.taskStatusLock.RUnlock()
r.allocLock.Lock()
defer r.allocLock.Unlock()
snap := allocRunnerState{
Alloc: r.alloc,
RestartPolicy: r.RestartPolicy,
Context: r.ctx,
Alloc: r.alloc,
RestartPolicy: r.RestartPolicy,
Context: r.ctx,
AllocClientStatus: r.allocClientStatus,
AllocClientDescription: r.allocClientDescription,
TaskStates: r.taskStates,
}
return persistState(r.stateFilePath(), &snap)
}
Expand All @@ -184,7 +193,47 @@ func (r *AllocRunner) DestroyContext() error {

// Alloc returns the associated allocation
func (r *AllocRunner) Alloc() *structs.Allocation {
return r.alloc
r.allocLock.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to acquire a lock here? I don't see us mutating anything here. Does the caller expects the alloc to not change once it has? We should probably make a copy and return if that's the assumption, otherwise the alloc might change under the hood once the caller has it.

alloc := r.alloc.Copy()
r.allocLock.Unlock()

// Scan the task states to determine the status of the alloc
var pending, running, dead, failed bool
r.taskStatusLock.RLock()
alloc.TaskStates = r.taskStates
for _, state := range r.taskStates {
switch state.State {
case structs.TaskStateRunning:
running = true
case structs.TaskStatePending:
pending = true
case structs.TaskStateDead:
last := len(state.Events) - 1
if state.Events[last].Type == structs.TaskDriverFailure {
failed = true
} else {
dead = true
}
}
}
r.taskStatusLock.RUnlock()

// The status has explicitely been set.
if r.allocClientStatus != "" || r.allocClientDescription != "" {
alloc.ClientStatus = r.allocClientStatus
alloc.ClientDescription = r.allocClientDescription
return alloc
}

// Determine the alloc status
if failed {
alloc.ClientStatus = structs.AllocClientStatusFailed
} else if running {
alloc.ClientStatus = structs.AllocClientStatusRunning
} else if dead && !pending {
alloc.ClientStatus = structs.AllocClientStatusDead
}
return alloc
}

// dirtySyncState is used to watch for state being marked dirty to sync
Expand Down Expand Up @@ -218,43 +267,13 @@ func (r *AllocRunner) retrySyncState(stopCh chan struct{}) {

// syncStatus is used to run and sync the status when it changes
func (r *AllocRunner) syncStatus() error {
// Scan the task states to determine the status of the alloc
var pending, running, dead, failed bool
r.taskStatusLock.RLock()
for _, state := range r.alloc.TaskStates {
switch state.State {
case structs.TaskStateRunning:
running = true
case structs.TaskStatePending:
pending = true
case structs.TaskStateDead:
last := len(state.Events) - 1
if state.Events[last].Type == structs.TaskDriverFailure {
failed = true
} else {
dead = true
}
}
}
if len(r.alloc.TaskStates) > 0 {
taskDesc, _ := json.Marshal(r.alloc.TaskStates)
r.alloc.ClientDescription = string(taskDesc)
}
r.taskStatusLock.RUnlock()

// Determine the alloc status
if failed {
r.alloc.ClientStatus = structs.AllocClientStatusFailed
} else if running {
r.alloc.ClientStatus = structs.AllocClientStatusRunning
} else if dead && !pending {
r.alloc.ClientStatus = structs.AllocClientStatusDead
}
// Get a copy of our alloc.
alloc := r.Alloc()

// Attempt to update the status
if err := r.updater(r.alloc); err != nil {
if err := r.updater(alloc); err != nil {
r.logger.Printf("[ERR] client: failed to update alloc '%s' status to %s: %s",
r.alloc.ID, r.alloc.ClientStatus, err)
alloc.ID, alloc.ClientStatus, err)
return err
}
return nil
Expand All @@ -271,13 +290,42 @@ func (r *AllocRunner) setStatus(status, desc string) {
}

// setTaskState is used to set the status of a task
func (r *AllocRunner) setTaskState(taskName string) {
func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEvent) {
r.taskStatusLock.Lock()
defer r.taskStatusLock.Unlock()
taskState, ok := r.taskStates[taskName]
if !ok {
r.logger.Printf("[ERR] client: setting task state for unknown task %q", taskName)
return
}

// Set the tasks state.
taskState.State = state
r.appendTaskEvent(taskState, event)

select {
case r.dirtyCh <- struct{}{}:
default:
}
}

// appendTaskEvent updates the task status by appending the new event.
func (r *AllocRunner) appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent) {
capacity := 10
if state.Events == nil {
state.Events = make([]*structs.TaskEvent, 0, capacity)
}

// If we hit capacity, then shift it.
if len(state.Events) == capacity {
old := state.Events
state.Events = make([]*structs.TaskEvent, 0, capacity)
state.Events = append(state.Events, old[1:]...)
}

state.Events = append(state.Events, event)
}

// Run is a long running goroutine used to manage an allocation
func (r *AllocRunner) Run() {
defer close(r.waitCh)
Expand Down Expand Up @@ -324,8 +372,7 @@ func (r *AllocRunner) Run() {
task.Resources = alloc.TaskResources[task.Name]
restartTracker := newRestartTracker(r.RestartPolicy)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
r.consulService)
r.alloc, task, restartTracker, r.consulService)
r.tasks[task.Name] = tr
go tr.Run()
}
Expand All @@ -336,6 +383,11 @@ OUTER:
for {
select {
case update := <-r.updateCh:
// Store the updated allocation.
r.allocLock.Lock()
r.alloc = update
r.allocLock.Unlock()

// Check if we're in a terminal status
if update.TerminalStatus() {
break OUTER
Expand Down Expand Up @@ -371,8 +423,8 @@ OUTER:
}

// Destroy each sub-task
r.taskLock.RLock()
defer r.taskLock.RUnlock()
r.taskLock.Lock()
defer r.taskLock.Unlock()
for _, tr := range r.tasks {
tr.Destroy()
}
Expand Down Expand Up @@ -408,6 +460,14 @@ func (r *AllocRunner) Update(update *structs.Allocation) {
}
}

// shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and
// checks if the current running allocation is behind and should be updated.
func (r *AllocRunner) shouldUpdate(serverIndex uint64) bool {
r.allocLock.Lock()
defer r.allocLock.Unlock()
return r.alloc.AllocModifyIndex < serverIndex
}

// Destroy is used to indicate that the allocation context should be destroyed
func (r *AllocRunner) Destroy() {
r.destroyLock.Lock()
Expand Down
19 changes: 6 additions & 13 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,35 +91,28 @@ func TestAllocRunner_Destroy(t *testing.T) {

func TestAllocRunner_Update(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
_, ar := testAllocRunner(false)

// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
go ar.Run()
defer ar.Destroy()
start := time.Now()

// Update the alloc definition
newAlloc := new(structs.Allocation)
*newAlloc = *ar.alloc
newAlloc.DesiredStatus = structs.AllocDesiredStatusStop
newAlloc.Name = "FOO"
newAlloc.AllocModifyIndex++
ar.Update(newAlloc)

// Check the alloc runner stores the update allocation.
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
return ar.Alloc().Name == "FOO", nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
t.Fatalf("err: %v %#v", err, ar.Alloc())
})

if time.Since(start) > 15*time.Second {
t.Fatalf("took too long to terminate")
}
}

func TestAllocRunner_SaveRestoreState(t *testing.T) {
Expand Down
Loading