From aeba2c5ac72438e926b5064097f67180a04d5f0d Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 10 Sep 2018 12:38:36 -0500 Subject: [PATCH 1/3] structs and API changes to plan and alloc structs needed for preemption --- api/allocations.go | 60 +++++++++++----------- api/jobs.go | 1 + nomad/structs/structs.go | 94 +++++++++++++++++++++++++++++++++-- nomad/structs/structs_test.go | 52 +++++++++++++++++++ 4 files changed, 173 insertions(+), 34 deletions(-) diff --git a/api/allocations.go b/api/allocations.go index a3a830481b1..c59833a6d91 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -67,35 +67,37 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error { // Allocation is used for serialization of allocations. type Allocation struct { - ID string - Namespace string - EvalID string - Name string - NodeID string - JobID string - Job *Job - TaskGroup string - Resources *Resources - TaskResources map[string]*Resources - Services map[string]string - Metrics *AllocationMetric - DesiredStatus string - DesiredDescription string - DesiredTransition DesiredTransition - ClientStatus string - ClientDescription string - TaskStates map[string]*TaskState - DeploymentID string - DeploymentStatus *AllocDeploymentStatus - FollowupEvalID string - PreviousAllocation string - NextAllocation string - RescheduleTracker *RescheduleTracker - CreateIndex uint64 - ModifyIndex uint64 - AllocModifyIndex uint64 - CreateTime int64 - ModifyTime int64 + ID string + Namespace string + EvalID string + Name string + NodeID string + JobID string + Job *Job + TaskGroup string + Resources *Resources + TaskResources map[string]*Resources + Services map[string]string + Metrics *AllocationMetric + DesiredStatus string + DesiredDescription string + DesiredTransition DesiredTransition + ClientStatus string + ClientDescription string + TaskStates map[string]*TaskState + DeploymentID string + DeploymentStatus *AllocDeploymentStatus + FollowupEvalID string + PreviousAllocation string + NextAllocation string + RescheduleTracker *RescheduleTracker + PreemptedAllocations []string + PreemptedByAllocation string + CreateIndex uint64 + ModifyIndex uint64 + AllocModifyIndex uint64 + CreateTime int64 + ModifyTime int64 } // AllocationMetric is used to deserialize allocation metrics. diff --git a/api/jobs.go b/api/jobs.go index b89d2e05053..148916c492b 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -1013,6 +1013,7 @@ type ObjectDiff struct { type PlanAnnotations struct { DesiredTGUpdates map[string]*DesiredUpdates + PreemptedAllocs []*AllocationListStub } type DesiredUpdates struct { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b5246a1eb72..31ef4f7fdfd 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -645,6 +645,15 @@ type ApplyPlanResultsRequest struct { // processed many times, potentially making state updates, without the state of // the evaluation itself being updated. EvalID string + + // NodePreemptions is a map from node id to a set of allocations from other + // lower priority jobs that are preempted. Preempted allocations are marked + // as stopped. + NodePreemptions []*Allocation + + // PreemptionEvals is a slice of follow up evals for jobs whose allocations + // have been preempted to place allocs in this plan + PreemptionEvals []*Evaluation } // AllocUpdateRequest is used to submit changes to allocations, either @@ -1797,6 +1806,29 @@ func (r *Resources) Add(delta *Resources) error { return nil } +// Subtract removes the resources of the delta to this, potentially +// returning an error if not possible. +func (r *Resources) Subtract(delta *Resources) error { + if delta == nil { + return nil + } + r.CPU -= delta.CPU + r.MemoryMB -= delta.MemoryMB + r.DiskMB -= delta.DiskMB + r.IOPS -= delta.IOPS + + for _, n := range delta.Networks { + // Find the matching interface by IP or CIDR + idx := r.NetIndex(n) + if idx == -1 { + r.Networks = append(r.Networks, n.Copy()) + } else { + r.Networks[idx].MBits -= delta.Networks[idx].MBits + } + } + return nil +} + func (r *Resources) GoString() string { return fmt.Sprintf("*%#v", *r) } @@ -6087,6 +6119,14 @@ type Allocation struct { // that can be rescheduled in the future FollowupEvalID string + // PreemptedAllocations captures IDs of any allocations that were preempted + // in order to place this allocation + PreemptedAllocations []string + + // PreemptedByAllocation tracks the alloc ID of the allocation that caused this allocation + // to stop running because it got preempted + PreemptedByAllocation string + // Raft Indexes CreateIndex uint64 ModifyIndex uint64 @@ -6160,6 +6200,7 @@ func (a *Allocation) copyImpl(job bool) *Allocation { } na.RescheduleTracker = a.RescheduleTracker.Copy() + na.PreemptedAllocations = helper.CopySliceString(a.PreemptedAllocations) return na } @@ -6747,6 +6788,7 @@ const ( EvalTriggerFailedFollowUp = "failed-follow-up" EvalTriggerMaxPlans = "max-plan-attempts" EvalTriggerRetryFailedAlloc = "alloc-failure" + EvalTriggerPreemption = "preempted" ) const ( @@ -6969,11 +7011,12 @@ func (e *Evaluation) ShouldBlock() bool { // for a given Job func (e *Evaluation) MakePlan(j *Job) *Plan { p := &Plan{ - EvalID: e.ID, - Priority: e.Priority, - Job: j, - NodeUpdate: make(map[string][]*Allocation), - NodeAllocation: make(map[string][]*Allocation), + EvalID: e.ID, + Priority: e.Priority, + Job: j, + NodeUpdate: make(map[string][]*Allocation), + NodeAllocation: make(map[string][]*Allocation), + NodePreemptions: make(map[string][]*Allocation), } if j != nil { p.AllAtOnce = j.AllAtOnce @@ -7086,6 +7129,11 @@ type Plan struct { // deployments. This allows the scheduler to cancel any unneeded deployment // because the job is stopped or the update block is removed. DeploymentUpdates []*DeploymentStatusUpdate + + // NodePreemptions is a map from node id to a set of allocations from other + // lower priority jobs that are preempted. Preempted allocations are marked + // as evicted. + NodePreemptions map[string][]*Allocation } // AppendUpdate marks the allocation for eviction. The clientStatus of the @@ -7118,6 +7166,27 @@ func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clien p.NodeUpdate[node] = append(existing, newAlloc) } +func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, desiredStatus, preemptingAllocID string) { + newAlloc := new(Allocation) + *newAlloc = *alloc + // Normalize the job + newAlloc.Job = nil + + // Strip the resources as it can be rebuilt. + newAlloc.Resources = nil + + newAlloc.DesiredStatus = desiredStatus + newAlloc.PreemptedByAllocation = preemptingAllocID + + desiredDesc := fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocID) + newAlloc.DesiredDescription = desiredDesc + + node := alloc.NodeID + // Append this alloc to slice for this node + existing := p.NodePreemptions[node] + p.NodePreemptions[node] = append(existing, newAlloc) +} + func (p *Plan) PopUpdate(alloc *Allocation) { existing := p.NodeUpdate[alloc.NodeID] n := len(existing) @@ -7145,6 +7214,14 @@ func (p *Plan) IsNoOp() bool { len(p.DeploymentUpdates) == 0 } +// PreemptedAllocs is used to store information about a set of allocations +// for the same job that get preempted as part of placing allocations for the +// job in the plan. + +// Preempted allocs represents a map from jobid to allocations +// to be preempted +type PreemptedAllocs map[*NamespacedID][]*Allocation + // PlanResult is the result of a plan submitted to the leader. type PlanResult struct { // NodeUpdate contains all the updates that were committed. @@ -7159,6 +7236,11 @@ type PlanResult struct { // DeploymentUpdates is the set of deployment updates that were committed. DeploymentUpdates []*DeploymentStatusUpdate + // NodePreemptions is a map from node id to a set of allocations from other + // lower priority jobs that are preempted. Preempted allocations are marked + // as stopped. + NodePreemptions map[string][]*Allocation + // RefreshIndex is the index the worker should refresh state up to. // This allows all evictions and allocations to be materialized. // If any allocations were rejected due to stale data (node state, @@ -7195,6 +7277,8 @@ func (p *PlanResult) FullCommit(plan *Plan) (bool, int, int) { type PlanAnnotations struct { // DesiredTGUpdates is the set of desired updates per task group. DesiredTGUpdates map[string]*DesiredUpdates + // PreemptedAllocs is the set of allocations to be preempted to make the placement successful. + PreemptedAllocs []*AllocListStub } // DesiredUpdates is the set of changes the scheduler would like to make given diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index d61f6eee17e..c3c0462bf07 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -1867,6 +1867,58 @@ func TestResource_Add_Network(t *testing.T) { } } +func TestResource_Subtract(t *testing.T) { + r1 := &Resources{ + CPU: 2000, + MemoryMB: 2048, + DiskMB: 10000, + IOPS: 100, + Networks: []*NetworkResource{ + { + CIDR: "10.0.0.0/8", + MBits: 100, + ReservedPorts: []Port{{"ssh", 22}}, + }, + }, + } + r2 := &Resources{ + CPU: 1000, + MemoryMB: 1024, + DiskMB: 5000, + IOPS: 50, + Networks: []*NetworkResource{ + { + IP: "10.0.0.1", + MBits: 20, + ReservedPorts: []Port{{"web", 80}}, + }, + }, + } + + err := r1.Subtract(r2) + if err != nil { + t.Fatalf("Err: %v", err) + } + + expect := &Resources{ + CPU: 1000, + MemoryMB: 1024, + DiskMB: 5000, + IOPS: 50, + Networks: []*NetworkResource{ + { + CIDR: "10.0.0.0/8", + MBits: 80, + ReservedPorts: []Port{{"ssh", 22}}, + }, + }, + } + + if !reflect.DeepEqual(expect.Networks, r1.Networks) { + t.Fatalf("bad: %#v %#v", expect, r1) + } +} + func TestEncodeDecode(t *testing.T) { type FooRequest struct { Foo string From ced3bb547790d693fc103b99ce03768128f0f7ee Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 10 Sep 2018 13:13:10 -0500 Subject: [PATCH 2/3] REview feedback --- nomad/structs/structs.go | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 31ef4f7fdfd..a4aec042b62 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -646,9 +646,8 @@ type ApplyPlanResultsRequest struct { // the evaluation itself being updated. EvalID string - // NodePreemptions is a map from node id to a set of allocations from other - // lower priority jobs that are preempted. Preempted allocations are marked - // as stopped. + // NodePreemptions is a slice of allocations from other lower priority jobs + // that are preempted. Preempted allocations are marked as evicted. NodePreemptions []*Allocation // PreemptionEvals is a slice of follow up evals for jobs whose allocations @@ -1820,9 +1819,7 @@ func (r *Resources) Subtract(delta *Resources) error { for _, n := range delta.Networks { // Find the matching interface by IP or CIDR idx := r.NetIndex(n) - if idx == -1 { - r.Networks = append(r.Networks, n.Copy()) - } else { + if idx != -1 { r.Networks[idx].MBits -= delta.Networks[idx].MBits } } @@ -7214,14 +7211,6 @@ func (p *Plan) IsNoOp() bool { len(p.DeploymentUpdates) == 0 } -// PreemptedAllocs is used to store information about a set of allocations -// for the same job that get preempted as part of placing allocations for the -// job in the plan. - -// Preempted allocs represents a map from jobid to allocations -// to be preempted -type PreemptedAllocs map[*NamespacedID][]*Allocation - // PlanResult is the result of a plan submitted to the leader. type PlanResult struct { // NodeUpdate contains all the updates that were committed. @@ -7277,6 +7266,7 @@ func (p *PlanResult) FullCommit(plan *Plan) (bool, int, int) { type PlanAnnotations struct { // DesiredTGUpdates is the set of desired updates per task group. DesiredTGUpdates map[string]*DesiredUpdates + // PreemptedAllocs is the set of allocations to be preempted to make the placement successful. PreemptedAllocs []*AllocListStub } From c539fa53708ead3f4fd42f0aca9e550ec9ded188 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 11 Sep 2018 11:36:15 -0500 Subject: [PATCH 3/3] Add number of evictions to DesiredUpdates struct to use in CLI/API --- api/jobs.go | 1 + nomad/structs/structs.go | 1 + 2 files changed, 2 insertions(+) diff --git a/api/jobs.go b/api/jobs.go index 148916c492b..a064b318424 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -1024,6 +1024,7 @@ type DesiredUpdates struct { InPlaceUpdate uint64 DestructiveUpdate uint64 Canary uint64 + Evict uint64 } type JobDispatchRequest struct { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a4aec042b62..f570c9220ab 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -7281,6 +7281,7 @@ type DesiredUpdates struct { InPlaceUpdate uint64 DestructiveUpdate uint64 Canary uint64 + Evict uint64 } func (d *DesiredUpdates) GoString() string {