Skip to content

Commit

Permalink
Merge pull request #4657 from hashicorp/f-preemption-api-structs
Browse files Browse the repository at this point in the history
Changes to plan and allocation structs for preemption
  • Loading branch information
Preetha authored Sep 11, 2018
2 parents f28fd3a + c539fa5 commit 44c85b5
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 34 deletions.
60 changes: 31 additions & 29 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,35 +67,37 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error {

// Allocation is used for serialization of allocations.
type Allocation struct {
ID string
Namespace string
EvalID string
Name string
NodeID string
JobID string
Job *Job
TaskGroup string
Resources *Resources
TaskResources map[string]*Resources
Services map[string]string
Metrics *AllocationMetric
DesiredStatus string
DesiredDescription string
DesiredTransition DesiredTransition
ClientStatus string
ClientDescription string
TaskStates map[string]*TaskState
DeploymentID string
DeploymentStatus *AllocDeploymentStatus
FollowupEvalID string
PreviousAllocation string
NextAllocation string
RescheduleTracker *RescheduleTracker
CreateIndex uint64
ModifyIndex uint64
AllocModifyIndex uint64
CreateTime int64
ModifyTime int64
ID string
Namespace string
EvalID string
Name string
NodeID string
JobID string
Job *Job
TaskGroup string
Resources *Resources
TaskResources map[string]*Resources
Services map[string]string
Metrics *AllocationMetric
DesiredStatus string
DesiredDescription string
DesiredTransition DesiredTransition
ClientStatus string
ClientDescription string
TaskStates map[string]*TaskState
DeploymentID string
DeploymentStatus *AllocDeploymentStatus
FollowupEvalID string
PreviousAllocation string
NextAllocation string
RescheduleTracker *RescheduleTracker
PreemptedAllocations []string
PreemptedByAllocation string
CreateIndex uint64
ModifyIndex uint64
AllocModifyIndex uint64
CreateTime int64
ModifyTime int64
}

// AllocationMetric is used to deserialize allocation metrics.
Expand Down
2 changes: 2 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ type ObjectDiff struct {

type PlanAnnotations struct {
DesiredTGUpdates map[string]*DesiredUpdates
PreemptedAllocs []*AllocationListStub
}

type DesiredUpdates struct {
Expand All @@ -1023,6 +1024,7 @@ type DesiredUpdates struct {
InPlaceUpdate uint64
DestructiveUpdate uint64
Canary uint64
Evict uint64
}

type JobDispatchRequest struct {
Expand Down
85 changes: 80 additions & 5 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,14 @@ type ApplyPlanResultsRequest struct {
// processed many times, potentially making state updates, without the state of
// the evaluation itself being updated.
EvalID string

// NodePreemptions is a slice of allocations from other lower priority jobs
// that are preempted. Preempted allocations are marked as evicted.
NodePreemptions []*Allocation

// PreemptionEvals is a slice of follow up evals for jobs whose allocations
// have been preempted to place allocs in this plan
PreemptionEvals []*Evaluation
}

// AllocUpdateRequest is used to submit changes to allocations, either
Expand Down Expand Up @@ -1797,6 +1805,27 @@ func (r *Resources) Add(delta *Resources) error {
return nil
}

// Subtract removes the resources of the delta to this, potentially
// returning an error if not possible.
func (r *Resources) Subtract(delta *Resources) error {
if delta == nil {
return nil
}
r.CPU -= delta.CPU
r.MemoryMB -= delta.MemoryMB
r.DiskMB -= delta.DiskMB
r.IOPS -= delta.IOPS

for _, n := range delta.Networks {
// Find the matching interface by IP or CIDR
idx := r.NetIndex(n)
if idx != -1 {
r.Networks[idx].MBits -= delta.Networks[idx].MBits
}
}
return nil
}

func (r *Resources) GoString() string {
return fmt.Sprintf("*%#v", *r)
}
Expand Down Expand Up @@ -6087,6 +6116,14 @@ type Allocation struct {
// that can be rescheduled in the future
FollowupEvalID string

// PreemptedAllocations captures IDs of any allocations that were preempted
// in order to place this allocation
PreemptedAllocations []string

// PreemptedByAllocation tracks the alloc ID of the allocation that caused this allocation
// to stop running because it got preempted
PreemptedByAllocation string

// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
Expand Down Expand Up @@ -6160,6 +6197,7 @@ func (a *Allocation) copyImpl(job bool) *Allocation {
}

na.RescheduleTracker = a.RescheduleTracker.Copy()
na.PreemptedAllocations = helper.CopySliceString(a.PreemptedAllocations)
return na
}

Expand Down Expand Up @@ -6747,6 +6785,7 @@ const (
EvalTriggerFailedFollowUp = "failed-follow-up"
EvalTriggerMaxPlans = "max-plan-attempts"
EvalTriggerRetryFailedAlloc = "alloc-failure"
EvalTriggerPreemption = "preempted"
)

const (
Expand Down Expand Up @@ -6969,11 +7008,12 @@ func (e *Evaluation) ShouldBlock() bool {
// for a given Job
func (e *Evaluation) MakePlan(j *Job) *Plan {
p := &Plan{
EvalID: e.ID,
Priority: e.Priority,
Job: j,
NodeUpdate: make(map[string][]*Allocation),
NodeAllocation: make(map[string][]*Allocation),
EvalID: e.ID,
Priority: e.Priority,
Job: j,
NodeUpdate: make(map[string][]*Allocation),
NodeAllocation: make(map[string][]*Allocation),
NodePreemptions: make(map[string][]*Allocation),
}
if j != nil {
p.AllAtOnce = j.AllAtOnce
Expand Down Expand Up @@ -7086,6 +7126,11 @@ type Plan struct {
// deployments. This allows the scheduler to cancel any unneeded deployment
// because the job is stopped or the update block is removed.
DeploymentUpdates []*DeploymentStatusUpdate

// NodePreemptions is a map from node id to a set of allocations from other
// lower priority jobs that are preempted. Preempted allocations are marked
// as evicted.
NodePreemptions map[string][]*Allocation
}

// AppendUpdate marks the allocation for eviction. The clientStatus of the
Expand Down Expand Up @@ -7118,6 +7163,27 @@ func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clien
p.NodeUpdate[node] = append(existing, newAlloc)
}

func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, desiredStatus, preemptingAllocID string) {
newAlloc := new(Allocation)
*newAlloc = *alloc
// Normalize the job
newAlloc.Job = nil

// Strip the resources as it can be rebuilt.
newAlloc.Resources = nil

newAlloc.DesiredStatus = desiredStatus
newAlloc.PreemptedByAllocation = preemptingAllocID

desiredDesc := fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocID)
newAlloc.DesiredDescription = desiredDesc

node := alloc.NodeID
// Append this alloc to slice for this node
existing := p.NodePreemptions[node]
p.NodePreemptions[node] = append(existing, newAlloc)
}

func (p *Plan) PopUpdate(alloc *Allocation) {
existing := p.NodeUpdate[alloc.NodeID]
n := len(existing)
Expand Down Expand Up @@ -7159,6 +7225,11 @@ type PlanResult struct {
// DeploymentUpdates is the set of deployment updates that were committed.
DeploymentUpdates []*DeploymentStatusUpdate

// NodePreemptions is a map from node id to a set of allocations from other
// lower priority jobs that are preempted. Preempted allocations are marked
// as stopped.
NodePreemptions map[string][]*Allocation

// RefreshIndex is the index the worker should refresh state up to.
// This allows all evictions and allocations to be materialized.
// If any allocations were rejected due to stale data (node state,
Expand Down Expand Up @@ -7195,6 +7266,9 @@ func (p *PlanResult) FullCommit(plan *Plan) (bool, int, int) {
type PlanAnnotations struct {
// DesiredTGUpdates is the set of desired updates per task group.
DesiredTGUpdates map[string]*DesiredUpdates

// PreemptedAllocs is the set of allocations to be preempted to make the placement successful.
PreemptedAllocs []*AllocListStub
}

// DesiredUpdates is the set of changes the scheduler would like to make given
Expand All @@ -7207,6 +7281,7 @@ type DesiredUpdates struct {
InPlaceUpdate uint64
DestructiveUpdate uint64
Canary uint64
Evict uint64
}

func (d *DesiredUpdates) GoString() string {
Expand Down
52 changes: 52 additions & 0 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1867,6 +1867,58 @@ func TestResource_Add_Network(t *testing.T) {
}
}

func TestResource_Subtract(t *testing.T) {
r1 := &Resources{
CPU: 2000,
MemoryMB: 2048,
DiskMB: 10000,
IOPS: 100,
Networks: []*NetworkResource{
{
CIDR: "10.0.0.0/8",
MBits: 100,
ReservedPorts: []Port{{"ssh", 22}},
},
},
}
r2 := &Resources{
CPU: 1000,
MemoryMB: 1024,
DiskMB: 5000,
IOPS: 50,
Networks: []*NetworkResource{
{
IP: "10.0.0.1",
MBits: 20,
ReservedPorts: []Port{{"web", 80}},
},
},
}

err := r1.Subtract(r2)
if err != nil {
t.Fatalf("Err: %v", err)
}

expect := &Resources{
CPU: 1000,
MemoryMB: 1024,
DiskMB: 5000,
IOPS: 50,
Networks: []*NetworkResource{
{
CIDR: "10.0.0.0/8",
MBits: 80,
ReservedPorts: []Port{{"ssh", 22}},
},
},
}

if !reflect.DeepEqual(expect.Networks, r1.Networks) {
t.Fatalf("bad: %#v %#v", expect, r1)
}
}

func TestEncodeDecode(t *testing.T) {
type FooRequest struct {
Foo string
Expand Down

0 comments on commit 44c85b5

Please sign in to comment.