Skip to content

Commit

Permalink
Merge pull request #867 from hashicorp/f-reduce-alloc-update-size
Browse files Browse the repository at this point in the history
Decrease size of Allocation when serializing
  • Loading branch information
dadgar committed Mar 1, 2016
2 parents 4e2e6b8 + ff48998 commit 94e56b3
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 64 deletions.
4 changes: 2 additions & 2 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
defer r.taskStatusLock.Unlock()
taskState, ok := r.taskStates[taskName]
if !ok {
r.logger.Printf("[ERR] client: setting task state for unknown task %q", taskName)
return
taskState = &structs.TaskState{}
r.taskStates[taskName] = taskState
}

// Set the tasks state.
Expand Down
16 changes: 15 additions & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

// Attach the plan to all the allocations. It is pulled out in the
// Attach the job to all the allocations. It is pulled out in the
// payload to avoid the redundancy of encoding, but should be denormalized
// prior to being inserted into MemDB.
if j := req.Job; j != nil {
Expand All @@ -367,6 +367,20 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} {
}
}

// Calculate the total resources of allocations. It is pulled out in the
// payload to avoid encoding something that can be computed, but should be
// denormalized prior to being inserted into MemDB.
for _, alloc := range req.Alloc {
if alloc.Resources != nil {
continue
}

alloc.Resources = new(structs.Resources)
for _, task := range alloc.TaskResources {
alloc.Resources.Add(task)
}
}

if err := n.state.UpsertAllocs(index, req.Alloc); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpsertAllocs failed: %v", err)
return err
Expand Down
37 changes: 37 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,43 @@ func TestFSM_UpsertAllocs_SharedJob(t *testing.T) {
}
}

func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) {
fsm := testFSM(t)

alloc := mock.Alloc()
job := alloc.Job
resources := alloc.Resources
alloc.Resources = nil
req := structs.AllocUpdateRequest{
Job: job,
Alloc: []*structs.Allocation{alloc},
}
buf, err := structs.Encode(structs.AllocUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}

resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}

// Verify we are registered
out, err := fsm.State().AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
alloc.CreateIndex = out.CreateIndex
alloc.ModifyIndex = out.ModifyIndex
alloc.AllocModifyIndex = out.AllocModifyIndex

// Resources should be recomputed
alloc.Resources = resources
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("bad: %#v %#v", alloc, out)
}
}

func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) {
fsm := testFSM(t)
fsm.blockedEvals.SetEnabled(true)
Expand Down
11 changes: 4 additions & 7 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,13 @@ func Alloc() *structs.Allocation {
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
DiskMB: 10,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []structs.Port{{Label: "main", Value: 12345}},
MBits: 100,
ReservedPorts: []structs.Port{{Label: "main", Value: 5000}},
MBits: 50,
DynamicPorts: []structs.Port{{Label: "http"}},
},
},
Expand All @@ -238,6 +239,7 @@ func Alloc() *structs.Allocation {
"web": &structs.Resources{
CPU: 500,
MemoryMB: 256,
DiskMB: 10,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
Device: "eth0",
Expand All @@ -249,11 +251,6 @@ func Alloc() *structs.Allocation {
},
},
},
TaskStates: map[string]*structs.TaskState{
"web": &structs.TaskState{
State: structs.TaskStatePending,
},
},
Job: Job(),
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
Expand Down
15 changes: 5 additions & 10 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,23 +1404,16 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) {
}

// Create the delta updates
ts := map[string]*structs.TaskState{"web": &structs.TaskState{State: structs.TaskStatePending}}
update := &structs.Allocation{
ID: alloc.ID,
ClientStatus: structs.AllocClientStatusFailed,
TaskStates: map[string]*structs.TaskState{
"web": &structs.TaskState{
State: structs.TaskStatePending,
},
},
TaskStates: ts,
}
update2 := &structs.Allocation{
ID: alloc2.ID,
ClientStatus: structs.AllocClientStatusRunning,
TaskStates: map[string]*structs.TaskState{
"web": &structs.TaskState{
State: structs.TaskStatePending,
},
},
TaskStates: ts,
}

err = state.UpdateAllocsFromClient(1001, []*structs.Allocation{update, update2})
Expand All @@ -1435,6 +1428,7 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) {

alloc.CreateIndex = 1000
alloc.ModifyIndex = 1001
alloc.TaskStates = ts
alloc.ClientStatus = structs.AllocClientStatusFailed
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("bad: %#v %#v", alloc, out)
Expand All @@ -1448,6 +1442,7 @@ func TestStateStore_UpdateAllocsFromClient(t *testing.T) {
alloc2.ModifyIndex = 1000
alloc2.ModifyIndex = 1001
alloc2.ClientStatus = structs.AllocClientStatusRunning
alloc2.TaskStates = ts
if !reflect.DeepEqual(alloc2, out) {
t.Fatalf("bad: %#v %#v", alloc2, out)
}
Expand Down
16 changes: 14 additions & 2 deletions nomad/structs/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,20 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, st

// For each alloc, add the resources
for _, alloc := range allocs {
if err := used.Add(alloc.Resources); err != nil {
return false, "", nil, err
if alloc.Resources != nil {
if err := used.Add(alloc.Resources); err != nil {
return false, "", nil, err
}
} else if alloc.TaskResources != nil {
// Allocations within the plan have the combined resources stripped
// to save space, so sum up the individual task resources.
for _, taskResource := range alloc.TaskResources {
if err := used.Add(taskResource); err != nil {
return false, "", nil, err
}
}
} else {
return false, "", nil, fmt.Errorf("allocation %q has no resources set", alloc.ID)
}
}

Expand Down
4 changes: 4 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2459,6 +2459,10 @@ func (p *Plan) AppendUpdate(alloc *Allocation, status, desc string) {

// Normalize the job
newAlloc.Job = nil

// Strip the resources as it can be rebuilt.
newAlloc.Resources = nil

newAlloc.DesiredStatus = status
newAlloc.DesiredDescription = desc
node := alloc.NodeID
Expand Down
5 changes: 1 addition & 4 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
}

// Attempt to match the task group
option, size := s.stack.Select(missing.TaskGroup)
option, _ := s.stack.Select(missing.TaskGroup)

// Create an allocation for this
alloc := &structs.Allocation{
Expand All @@ -344,7 +344,6 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
Name: missing.Name,
JobID: s.job.ID,
TaskGroup: missing.TaskGroup.Name,
Resources: size,
Metrics: s.ctx.Metrics(),
}

Expand All @@ -360,13 +359,11 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
alloc.TaskResources = option.TaskResources
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusPending
alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStatePending)
s.plan.AppendAlloc(alloc)
} else {
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredDescription = "failed to find a node for placement"
alloc.ClientStatus = structs.AllocClientStatusFailed
alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStateDead)
s.plan.AppendFailed(alloc)
failedTG[missing.TaskGroup] = alloc
}
Expand Down
5 changes: 1 addition & 4 deletions scheduler/system_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
s.stack.SetNodes(nodes)

// Attempt to match the task group
option, size := s.stack.Select(missing.TaskGroup)
option, _ := s.stack.Select(missing.TaskGroup)

if option == nil {
// Check if this task group has already failed
Expand All @@ -245,7 +245,6 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
Name: missing.Name,
JobID: s.job.ID,
TaskGroup: missing.TaskGroup.Name,
Resources: size,
Metrics: s.ctx.Metrics(),
}

Expand All @@ -261,13 +260,11 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
alloc.TaskResources = option.TaskResources
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusPending
alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStatePending)
s.plan.AppendAlloc(alloc)
} else {
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
alloc.DesiredDescription = "failed to find a node for placement"
alloc.ClientStatus = structs.AllocClientStatusFailed
alloc.TaskStates = initTaskState(missing.TaskGroup, structs.TaskStateDead)
s.plan.AppendFailed(alloc)
failedTG[missing.TaskGroup] = alloc
}
Expand Down
14 changes: 3 additions & 11 deletions scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
allocInPlace)

// Attempt to match the task group
option, size := stack.Select(update.TaskGroup)
option, _ := stack.Select(update.TaskGroup)

// Pop the allocation
ctx.Plan().PopUpdate(update.Alloc)
Expand All @@ -391,8 +391,8 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,

// Update the allocation
newAlloc.EvalID = eval.ID
newAlloc.Job = nil // Use the Job in the Plan
newAlloc.Resources = size
newAlloc.Job = nil // Use the Job in the Plan
newAlloc.Resources = nil // Computed in Plan Apply
newAlloc.TaskResources = option.TaskResources
newAlloc.Metrics = ctx.Metrics()
newAlloc.DesiredStatus = structs.AllocDesiredStatusRun
Expand Down Expand Up @@ -460,11 +460,3 @@ func taskGroupConstraints(tg *structs.TaskGroup) tgConstrainTuple {

return c
}

func initTaskState(tg *structs.TaskGroup, state string) map[string]*structs.TaskState {
states := make(map[string]*structs.TaskState, len(tg.Tasks))
for _, task := range tg.Tasks {
states[task.Name] = &structs.TaskState{State: state}
}
return states
}
23 changes: 0 additions & 23 deletions scheduler/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,29 +714,6 @@ func TestTaskGroupConstraints(t *testing.T) {

}

func TestInitTaskState(t *testing.T) {
tg := &structs.TaskGroup{
Tasks: []*structs.Task{
&structs.Task{Name: "foo"},
&structs.Task{Name: "bar"},
},
}
expPending := map[string]*structs.TaskState{
"foo": &structs.TaskState{State: structs.TaskStatePending},
"bar": &structs.TaskState{State: structs.TaskStatePending},
}
expDead := map[string]*structs.TaskState{
"foo": &structs.TaskState{State: structs.TaskStateDead},
"bar": &structs.TaskState{State: structs.TaskStateDead},
}
actPending := initTaskState(tg, structs.TaskStatePending)
actDead := initTaskState(tg, structs.TaskStateDead)

if !(reflect.DeepEqual(expPending, actPending) && reflect.DeepEqual(expDead, actDead)) {
t.Fatal("Expected and actual not equal")
}
}

func TestProgressMade(t *testing.T) {
noopPlan := &structs.PlanResult{}
if progressMade(nil) || progressMade(noopPlan) {
Expand Down

0 comments on commit 94e56b3

Please sign in to comment.