diff --git a/api/allocations.go b/api/allocations.go index cdeb3e8a095..6d860362bbd 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -67,36 +67,38 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error { // Allocation is used for serialization of allocations. type Allocation struct { - ID string - Namespace string - EvalID string - Name string - NodeID string - JobID string - Job *Job - TaskGroup string - Resources *Resources - TaskResources map[string]*Resources - AllocatedResources *AllocatedResources - Services map[string]string - Metrics *AllocationMetric - DesiredStatus string - DesiredDescription string - DesiredTransition DesiredTransition - ClientStatus string - ClientDescription string - TaskStates map[string]*TaskState - DeploymentID string - DeploymentStatus *AllocDeploymentStatus - FollowupEvalID string - PreviousAllocation string - NextAllocation string - RescheduleTracker *RescheduleTracker - CreateIndex uint64 - ModifyIndex uint64 - AllocModifyIndex uint64 - CreateTime int64 - ModifyTime int64 + ID string + Namespace string + EvalID string + Name string + NodeID string + JobID string + Job *Job + TaskGroup string + Resources *Resources + TaskResources map[string]*Resources + AllocatedResources *AllocatedResources + Services map[string]string + Metrics *AllocationMetric + DesiredStatus string + DesiredDescription string + DesiredTransition DesiredTransition + ClientStatus string + ClientDescription string + TaskStates map[string]*TaskState + DeploymentID string + DeploymentStatus *AllocDeploymentStatus + FollowupEvalID string + PreviousAllocation string + NextAllocation string + RescheduleTracker *RescheduleTracker + PreemptedAllocations []string + PreemptedByAllocation string + CreateIndex uint64 + ModifyIndex uint64 + AllocModifyIndex uint64 + CreateTime int64 + ModifyTime int64 } // AllocationMetric is used to deserialize allocation metrics. diff --git a/api/jobs.go b/api/jobs.go index b89d2e05053..0c8fc4608c5 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -1013,6 +1013,7 @@ type ObjectDiff struct { type PlanAnnotations struct { DesiredTGUpdates map[string]*DesiredUpdates + PreemptedAllocs []*AllocationListStub } type DesiredUpdates struct { @@ -1023,6 +1024,7 @@ type DesiredUpdates struct { InPlaceUpdate uint64 DestructiveUpdate uint64 Canary uint64 + Preemptions uint64 } type JobDispatchRequest struct { diff --git a/api/operator.go b/api/operator.go index be2ba800569..cdef67cf811 100644 --- a/api/operator.go +++ b/api/operator.go @@ -1,5 +1,7 @@ package api +import "strconv" + // Operator can be used to perform low-level operator tasks for Nomad. type Operator struct { c *Client @@ -106,3 +108,61 @@ func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error { resp.Body.Close() return nil } + +type SchedulerConfiguration struct { + // PreemptionConfig specifies whether to enable eviction of lower + // priority jobs to place higher priority jobs. + PreemptionConfig PreemptionConfig + + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. + CreateIndex uint64 + ModifyIndex uint64 +} + +// SchedulerConfigurationResponse is the response object that wraps SchedulerConfiguration +type SchedulerConfigurationResponse struct { + // SchedulerConfig contains scheduler config options + SchedulerConfig SchedulerConfiguration + + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. + CreateIndex uint64 + ModifyIndex uint64 +} + +// PreemptionConfig specifies whether preemption is enabled based on scheduler type +type PreemptionConfig struct { + SystemSchedulerEnabled bool +} + +// SchedulerGetConfiguration is used to query the current Scheduler configuration. +func (op *Operator) SchedulerGetConfiguration(q *QueryOptions) (*SchedulerConfigurationResponse, *QueryMeta, error) { + var resp SchedulerConfigurationResponse + qm, err := op.c.query("/v1/operator/scheduler/config", &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, qm, nil +} + +// SchedulerSetConfiguration is used to set the current Scheduler configuration. +func (op *Operator) SchedulerSetConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (*WriteMeta, error) { + var out bool + wm, err := op.c.write("/v1/operator/scheduler/config", conf, &out, q) + if err != nil { + return nil, err + } + return wm, nil +} + +// SchedulerCASConfiguration is used to perform a Check-And-Set update on the +// Scheduler configuration. The ModifyIndex value will be respected. Returns +// true on success or false on failures. +func (op *Operator) SchedulerCASConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (bool, *WriteMeta, error) { + var out bool + wm, err := op.c.write("/v1/operator/scheduler/config?cas="+strconv.FormatUint(conf.ModifyIndex, 10), conf, &out, q) + if err != nil { + return false, nil, err + } + + return out, wm, nil +} diff --git a/api/operator_test.go b/api/operator_test.go index 5b13fc66c5b..c21cafd7d66 100644 --- a/api/operator_test.go +++ b/api/operator_test.go @@ -3,6 +3,9 @@ package api import ( "strings" "testing" + + "github.com/hashicorp/consul/testutil/retry" + "github.com/stretchr/testify/require" ) func TestOperator_RaftGetConfiguration(t *testing.T) { @@ -51,3 +54,66 @@ func TestOperator_RaftRemovePeerByID(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestAPI_OperatorSchedulerGetSetConfiguration(t *testing.T) { + t.Parallel() + require := require.New(t) + c, s := makeClient(t, nil, nil) + defer s.Stop() + + operator := c.Operator() + var config *SchedulerConfigurationResponse + retry.Run(t, func(r *retry.R) { + var err error + config, _, err = operator.SchedulerGetConfiguration(nil) + r.Check(err) + }) + require.True(config.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) + + // Change a config setting + newConf := &SchedulerConfiguration{PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false}} + _, err := operator.SchedulerSetConfiguration(newConf, nil) + require.Nil(err) + + config, _, err = operator.SchedulerGetConfiguration(nil) + require.Nil(err) + require.False(config.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) +} + +func TestAPI_OperatorSchedulerCASConfiguration(t *testing.T) { + t.Parallel() + require := require.New(t) + c, s := makeClient(t, nil, nil) + defer s.Stop() + + operator := c.Operator() + var config *SchedulerConfigurationResponse + retry.Run(t, func(r *retry.R) { + var err error + config, _, err = operator.SchedulerGetConfiguration(nil) + r.Check(err) + }) + require.True(config.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) + + // Pass an invalid ModifyIndex + { + newConf := &SchedulerConfiguration{ + PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false}, + ModifyIndex: config.ModifyIndex - 1, + } + resp, _, err := operator.SchedulerCASConfiguration(newConf, nil) + require.Nil(err) + require.False(resp) + } + + // Pass a valid ModifyIndex + { + newConf := &SchedulerConfiguration{ + PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false}, + ModifyIndex: config.ModifyIndex, + } + resp, _, err := operator.SchedulerCASConfiguration(newConf, nil) + require.Nil(err) + require.True(resp) + } +} diff --git a/command/agent/http.go b/command/agent/http.go index 857f5f8bcde..d1bb0e74a45 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -194,6 +194,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest)) s.mux.HandleFunc("/v1/system/reconcile/summaries", s.wrap(s.ReconcileJobSummaries)) + s.mux.HandleFunc("/v1/operator/scheduler/config", s.wrap(s.OperatorSchedulerConfiguration)) + if uiEnabled { s.mux.Handle("/ui/", http.StripPrefix("/ui/", handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()})))) } else { diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index 02b25c2187e..ec0ca85910a 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -208,3 +208,73 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re return out, nil } + +// OperatorSchedulerConfiguration is used to inspect the current Scheduler configuration. +// This supports the stale query mode in case the cluster doesn't have a leader. +func (s *HTTPServer) OperatorSchedulerConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Switch on the method + switch req.Method { + case "GET": + var args structs.GenericRequest + if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done { + return nil, nil + } + + var reply structs.SchedulerConfigurationResponse + if err := s.agent.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil { + return nil, err + } + + out := api.SchedulerConfiguration{ + PreemptionConfig: api.PreemptionConfig{SystemSchedulerEnabled: reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled}, + CreateIndex: reply.CreateIndex, + ModifyIndex: reply.ModifyIndex, + } + + resp := api.SchedulerConfigurationResponse{ + SchedulerConfig: out, + CreateIndex: out.CreateIndex, + ModifyIndex: out.ModifyIndex, + } + + return resp, nil + + case "PUT": + var args structs.SchedulerSetConfigRequest + s.parseWriteRequest(req, &args.WriteRequest) + + var conf api.SchedulerConfiguration + if err := decodeBody(req, &conf); err != nil { + return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing scheduler config: %v", err)) + } + + args.Config = structs.SchedulerConfiguration{ + PreemptionConfig: structs.PreemptionConfig{SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled}, + } + + // Check for cas value + params := req.URL.Query() + if _, ok := params["cas"]; ok { + casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64) + if err != nil { + return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing cas value: %v", err)) + } + args.Config.ModifyIndex = casVal + args.CAS = true + } + + var reply bool + if err := s.agent.RPC("Operator.SchedulerSetConfiguration", &args, &reply); err != nil { + return nil, err + } + + // Only use the out value if this was a CAS + if !args.CAS { + return true, nil + } + return reply, nil + + default: + return nil, CodedError(404, ErrInvalidMethod) + } +} diff --git a/command/agent/operator_endpoint_test.go b/command/agent/operator_endpoint_test.go index 2d848676511..6de2e2ee8da 100644 --- a/command/agent/operator_endpoint_test.go +++ b/command/agent/operator_endpoint_test.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestHTTP_OperatorRaftConfiguration(t *testing.T) { @@ -257,3 +258,102 @@ func TestOperator_ServerHealth_Unhealthy(t *testing.T) { }) }) } + +func TestOperator_SchedulerGetConfiguration(t *testing.T) { + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + require := require.New(t) + body := bytes.NewBuffer(nil) + req, _ := http.NewRequest("GET", "/v1/operator/scheduler/config", body) + resp := httptest.NewRecorder() + obj, err := s.Server.OperatorSchedulerConfiguration(resp, req) + require.Nil(err) + require.Equal(200, resp.Code) + out, ok := obj.(api.SchedulerConfigurationResponse) + require.True(ok) + require.True(out.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) + }) +} + +func TestOperator_SchedulerSetConfiguration(t *testing.T) { + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + require := require.New(t) + body := bytes.NewBuffer([]byte(`{"PreemptionConfig": { + "SystemSchedulerEnabled": true + }}`)) + req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/config", body) + resp := httptest.NewRecorder() + _, err := s.Server.OperatorSchedulerConfiguration(resp, req) + require.Nil(err) + require.Equal(200, resp.Code) + + args := structs.GenericRequest{ + QueryOptions: structs.QueryOptions{ + Region: s.Config.Region, + }, + } + + var reply structs.SchedulerConfigurationResponse + err = s.RPC("Operator.SchedulerGetConfiguration", &args, &reply) + require.Nil(err) + require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) + }) +} + +func TestOperator_SchedulerCASConfiguration(t *testing.T) { + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + require := require.New(t) + body := bytes.NewBuffer([]byte(`{"PreemptionConfig": { + "SystemSchedulerEnabled": true + }}`)) + req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/config", body) + resp := httptest.NewRecorder() + _, err := s.Server.OperatorSchedulerConfiguration(resp, req) + require.Nil(err) + require.Equal(200, resp.Code) + + args := structs.GenericRequest{ + QueryOptions: structs.QueryOptions{ + Region: s.Config.Region, + }, + } + + var reply structs.SchedulerConfigurationResponse + if err := s.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil { + t.Fatalf("err: %v", err) + } + require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) + + // Create a CAS request, bad index + { + buf := bytes.NewBuffer([]byte(`{"PreemptionConfig": { + "SystemSchedulerEnabled": false + }}`)) + req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/config?cas=%d", reply.ModifyIndex-1), buf) + resp := httptest.NewRecorder() + obj, err := s.Server.OperatorSchedulerConfiguration(resp, req) + require.Nil(err) + require.False(obj.(bool)) + } + + // Create a CAS request, good index + { + buf := bytes.NewBuffer([]byte(`{"PreemptionConfig": { + "SystemSchedulerEnabled": false + }}`)) + req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/config?cas=%d", reply.ModifyIndex), buf) + resp := httptest.NewRecorder() + obj, err := s.Server.OperatorSchedulerConfiguration(resp, req) + require.Nil(err) + require.True(obj.(bool)) + } + + // Verify the update + if err := s.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil { + t.Fatalf("err: %v", err) + } + require.False(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) + }) +} diff --git a/nomad/fsm.go b/nomad/fsm.go index 0111f0c35ca..71b4c07bdab 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -45,6 +45,7 @@ const ( DeploymentSnapshot ACLPolicySnapshot ACLTokenSnapshot + SchedulerConfigSnapshot ) // LogApplier is the definition of a function that can apply a Raft log @@ -247,6 +248,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyNodeEligibilityUpdate(buf[1:], log.Index) case structs.BatchNodeUpdateDrainRequestType: return n.applyBatchDrainUpdate(buf[1:], log.Index) + case structs.SchedulerConfigRequestType: + return n.applySchedulerConfigUpdate(buf[1:], log.Index) } // Check enterprise only message types. @@ -814,6 +817,8 @@ func (n *nomadFSM) applyPlanResults(buf []byte, index uint64) interface{} { return err } + // Add evals for jobs that were preempted + n.handleUpsertedEvals(req.PreemptionEvals) return nil } @@ -994,6 +999,23 @@ func (n *nomadFSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} { return n.state.AutopilotSetConfig(index, &req.Config) } +func (n *nomadFSM) applySchedulerConfigUpdate(buf []byte, index uint64) interface{} { + var req structs.SchedulerSetConfigRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_scheduler_config"}, time.Now()) + + if req.CAS { + applied, err := n.state.SchedulerCASConfig(index, req.Config.ModifyIndex, &req.Config) + if err != nil { + return err + } + return applied + } + return n.state.SchedulerSetConfig(index, &req.Config) +} + func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { // Create a new snapshot snap, err := n.state.Snapshot() @@ -1214,6 +1236,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } + case SchedulerConfigSnapshot: + schedConfig := new(structs.SchedulerConfiguration) + if err := dec.Decode(schedConfig); err != nil { + return err + } + if err := restore.SchedulerConfigRestore(schedConfig); err != nil { + return err + } + default: // Check if this is an enterprise only object being restored restorer, ok := n.enterpriseRestorers[snapType] @@ -1501,6 +1532,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } + if err := s.persistSchedulerConfig(sink, encoder); err != nil { + sink.Cancel() + return err + } return nil } @@ -1833,6 +1868,21 @@ func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink, return nil } +func (s *nomadSnapshot) persistSchedulerConfig(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + // Get scheduler config + _, schedConfig, err := s.snap.SchedulerConfig() + if err != nil { + return err + } + // Write out scheduler config + sink.Write([]byte{byte(SchedulerConfigSnapshot)}) + if err := encoder.Encode(schedConfig); err != nil { + return err + } + return nil +} + // Release is a no-op, as we just need to GC the pointer // to the state store snapshot. There is nothing to explicitly // cleanup. diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index c8802bf9bd3..ebe62189d78 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -1664,7 +1664,7 @@ func TestFSM_DeregisterVaultAccessor(t *testing.T) { func TestFSM_ApplyPlanResults(t *testing.T) { t.Parallel() fsm := testFSM(t) - + fsm.evalBroker.SetEnabled(true) // Create the request and create a deployment alloc := mock.Alloc() alloc.Resources = &structs.Resources{} // COMPAT(0.11): Remove in 0.11, used to bypass resource creation in state store @@ -1683,13 +1683,39 @@ func TestFSM_ApplyPlanResults(t *testing.T) { fsm.State().UpsertEvals(1, []*structs.Evaluation{eval}) fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID)) + + // set up preempted jobs and allocs + job1 := mock.Job() + job2 := mock.Job() + + alloc1 := mock.Alloc() + alloc1.Job = job1 + alloc1.JobID = job1.ID + alloc1.PreemptedByAllocation = alloc.ID + + alloc2 := mock.Alloc() + alloc2.Job = job2 + alloc2.JobID = job2.ID + alloc2.PreemptedByAllocation = alloc.ID + + fsm.State().UpsertAllocs(1, []*structs.Allocation{alloc1, alloc2}) + + // evals for preempted jobs + eval1 := mock.Eval() + eval1.JobID = job1.ID + + eval2 := mock.Eval() + eval2.JobID = job2.ID + req := structs.ApplyPlanResultsRequest{ AllocUpdateRequest: structs.AllocUpdateRequest{ Job: job, Alloc: []*structs.Allocation{alloc}, }, - Deployment: d, - EvalID: eval.ID, + Deployment: d, + EvalID: eval.ID, + NodePreemptions: []*structs.Allocation{alloc1, alloc2}, + PreemptionEvals: []*structs.Evaluation{eval1, eval2}, } buf, err := structs.Encode(structs.ApplyPlanResultsRequestType, req) if err != nil { @@ -1714,6 +1740,23 @@ func TestFSM_ApplyPlanResults(t *testing.T) { alloc.Job = job assert.Equal(alloc, out) + // Verify that evals for preempted jobs have been created + e1, err := fsm.State().EvalByID(ws, eval1.ID) + require := require.New(t) + require.Nil(err) + require.NotNil(e1) + + e2, err := fsm.State().EvalByID(ws, eval2.ID) + require.Nil(err) + require.NotNil(e2) + + // Verify that eval broker has both evals + _, ok := fsm.evalBroker.evals[e1.ID] + require.True(ok) + + _, ok = fsm.evalBroker.evals[e1.ID] + require.True(ok) + dout, err := fsm.State().DeploymentByID(ws, d.ID) assert.Nil(err) tg, ok := dout.TaskGroups[alloc.TaskGroup] @@ -2634,6 +2677,29 @@ func TestFSM_SnapshotRestore_ACLTokens(t *testing.T) { assert.Equal(t, tk2, out2) } +func TestFSM_SnapshotRestore_SchedulerConfiguration(t *testing.T) { + t.Parallel() + // Add some state + fsm := testFSM(t) + state := fsm.State() + schedConfig := &structs.SchedulerConfiguration{ + PreemptionConfig: structs.PreemptionConfig{ + SystemSchedulerEnabled: true, + }, + } + state.SchedulerSetConfig(1000, schedConfig) + + // Verify the contents + require := require.New(t) + fsm2 := testSnapshotRestore(t, fsm) + state2 := fsm2.State() + index, out, err := state2.SchedulerConfig() + require.Nil(err) + require.EqualValues(1000, index) + require.Equal(schedConfig, out) + +} + func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { t.Parallel() // Add some state @@ -2825,3 +2891,49 @@ func TestFSM_Autopilot(t *testing.T) { t.Fatalf("bad: %v", config.CleanupDeadServers) } } + +func TestFSM_SchedulerConfig(t *testing.T) { + t.Parallel() + fsm := testFSM(t) + + require := require.New(t) + + // Set the autopilot config using a request. + req := structs.SchedulerSetConfigRequest{ + Config: structs.SchedulerConfiguration{ + PreemptionConfig: structs.PreemptionConfig{ + SystemSchedulerEnabled: true, + }, + }, + } + buf, err := structs.Encode(structs.SchedulerConfigRequestType, req) + require.Nil(err) + + resp := fsm.Apply(makeLog(buf)) + if _, ok := resp.(error); ok { + t.Fatalf("bad: %v", resp) + } + + // Verify key is set directly in the state store. + _, config, err := fsm.state.SchedulerConfig() + require.Nil(err) + + require.Equal(config.PreemptionConfig.SystemSchedulerEnabled, req.Config.PreemptionConfig.SystemSchedulerEnabled) + + // Now use CAS and provide an old index + req.CAS = true + req.Config.PreemptionConfig = structs.PreemptionConfig{SystemSchedulerEnabled: false} + req.Config.ModifyIndex = config.ModifyIndex - 1 + buf, err = structs.Encode(structs.SchedulerConfigRequestType, req) + require.Nil(err) + + resp = fsm.Apply(makeLog(buf)) + if _, ok := resp.(error); ok { + t.Fatalf("bad: %v", resp) + } + + _, config, err = fsm.state.SchedulerConfig() + require.Nil(err) + // Verify that preemption is still enabled + require.True(config.PreemptionConfig.SystemSchedulerEnabled) +} diff --git a/nomad/leader.go b/nomad/leader.go index ef0e622cf9b..3d6759d6971 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -42,6 +42,13 @@ const ( var minAutopilotVersion = version.Must(version.NewVersion("0.8.0")) +// Default configuration for scheduler with preemption enabled for system jobs +var defaultSchedulerConfig = &structs.SchedulerConfiguration{ + PreemptionConfig: structs.PreemptionConfig{ + SystemSchedulerEnabled: true, + }, +} + // monitorLeadership is used to monitor if we acquire or lose our role // as the leader in the Raft cluster. There is some work the leader is // expected to do, so we must react to changes @@ -187,6 +194,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { s.getOrCreateAutopilotConfig() s.autopilot.Start() + // Initialize scheduler configuration + s.getOrCreateSchedulerConfig() + // Enable the plan queue, since we are now the leader s.planQueue.SetEnabled(true) @@ -1230,3 +1240,25 @@ func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig { return config } + +// getOrCreateSchedulerConfig is used to get the scheduler config. We create a default +// config if it doesn't already exist for bootstrapping an empty cluster +func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration { + state := s.fsm.State() + _, config, err := state.SchedulerConfig() + if err != nil { + s.logger.Named("core").Error("failed to get scheduler config", "error", err) + return nil + } + if config != nil { + return config + } + + req := structs.SchedulerSetConfigRequest{Config: *defaultSchedulerConfig} + if _, _, err = s.raftApply(structs.SchedulerConfigRequestType, req); err != nil { + s.logger.Named("core").Error("failed to initialize config", "error", err) + return nil + } + + return config +} diff --git a/nomad/operator_endpoint.go b/nomad/operator_endpoint.go index 39615b3fa89..504bcfcaca3 100644 --- a/nomad/operator_endpoint.go +++ b/nomad/operator_endpoint.go @@ -284,3 +284,65 @@ func (op *Operator) ServerHealth(args *structs.GenericRequest, reply *autopilot. return nil } + +// SchedulerSetConfiguration is used to set the current Scheduler configuration. +func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRequest, reply *bool) error { + if done, err := op.srv.forward("Operator.SchedulerSetConfiguration", args, args, reply); done { + return err + } + + // This action requires operator write access. + rule, err := op.srv.ResolveToken(args.AuthToken) + if err != nil { + return err + } else if rule != nil && !rule.AllowOperatorWrite() { + return structs.ErrPermissionDenied + } + + // Apply the update + resp, _, err := op.srv.raftApply(structs.SchedulerConfigRequestType, args) + if err != nil { + op.logger.Error("failed applying Scheduler configuration", "error", err) + return err + } else if respErr, ok := resp.(error); ok { + return respErr + } + + // Check if the return type is a bool. + if respBool, ok := resp.(bool); ok { + *reply = respBool + } + return nil +} + +// SchedulerGetConfiguration is used to retrieve the current Scheduler configuration. +func (op *Operator) SchedulerGetConfiguration(args *structs.GenericRequest, reply *structs.SchedulerConfigurationResponse) error { + if done, err := op.srv.forward("Operator.SchedulerGetConfiguration", args, args, reply); done { + return err + } + + // This action requires operator read access. + rule, err := op.srv.ResolveToken(args.AuthToken) + if err != nil { + return err + } else if rule != nil && !rule.AllowOperatorRead() { + return structs.ErrPermissionDenied + } + + state := op.srv.fsm.State() + _, config, err := state.SchedulerConfig() + if err != nil { + return err + } else if config == nil { + return fmt.Errorf("scheduler config not initialized yet") + } + + resp := &structs.SchedulerConfigurationResponse{ + SchedulerConfig: *config, + CreateIndex: config.CreateIndex, + ModifyIndex: config.ModifyIndex, + } + *reply = *resp + + return nil +} diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 12f4dc02a2c..95dd007f0bc 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -10,6 +10,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/raft" @@ -163,6 +164,7 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap Deployment: result.Deployment, DeploymentUpdates: result.DeploymentUpdates, EvalID: plan.EvalID, + NodePreemptions: make([]*structs.Allocation, 0, len(result.NodePreemptions)), } for _, updateList := range result.NodeUpdate { req.Alloc = append(req.Alloc, updateList...) @@ -171,6 +173,10 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap req.Alloc = append(req.Alloc, allocList...) } + for _, preemptions := range result.NodePreemptions { + req.NodePreemptions = append(req.NodePreemptions, preemptions...) + } + // Set the time the alloc was applied for the first time. This can be used // to approximate the scheduling time. now := time.Now().UTC().UnixNano() @@ -181,6 +187,36 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap alloc.ModifyTime = now } + // Set modify time for preempted allocs if any + // Also gather jobids to create follow up evals + preemptedJobIDs := make(map[structs.NamespacedID]struct{}) + for _, alloc := range req.NodePreemptions { + alloc.ModifyTime = now + id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID} + _, ok := preemptedJobIDs[id] + if !ok { + preemptedJobIDs[id] = struct{}{} + } + } + + var evals []*structs.Evaluation + for preemptedJobID := range preemptedJobIDs { + job, _ := p.State().JobByID(nil, preemptedJobID.Namespace, preemptedJobID.ID) + if job != nil { + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: job.Namespace, + TriggeredBy: structs.EvalTriggerPreemption, + JobID: job.ID, + Type: job.Type, + Priority: job.Priority, + Status: structs.EvalStatusPending, + } + evals = append(evals, eval) + } + } + req.PreemptionEvals = evals + // Dispatch the Raft transaction future, err := p.raftApplyFuture(structs.ApplyPlanResultsRequestType, &req) if err != nil { @@ -259,6 +295,7 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan NodeAllocation: make(map[string][]*structs.Allocation), Deployment: plan.Deployment.Copy(), DeploymentUpdates: plan.DeploymentUpdates, + NodePreemptions: make(map[string][]*structs.Allocation), } // Collect all the nodeIDs @@ -304,6 +341,7 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan result.NodeAllocation = nil result.DeploymentUpdates = nil result.Deployment = nil + result.NodePreemptions = nil return true } @@ -318,6 +356,26 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan if nodeAlloc := plan.NodeAllocation[nodeID]; len(nodeAlloc) > 0 { result.NodeAllocation[nodeID] = nodeAlloc } + + if nodePreemptions := plan.NodePreemptions[nodeID]; nodePreemptions != nil { + + // Do a pass over preempted allocs in the plan to check + // whether the alloc is already in a terminal state + var filteredNodePreemptions []*structs.Allocation + for _, preemptedAlloc := range nodePreemptions { + alloc, err := snap.AllocByID(nil, preemptedAlloc.ID) + if err != nil { + mErr.Errors = append(mErr.Errors, err) + continue + } + if alloc != nil && !alloc.TerminalStatus() { + filteredNodePreemptions = append(filteredNodePreemptions, preemptedAlloc) + } + } + + result.NodePreemptions[nodeID] = filteredNodePreemptions + } + return } @@ -461,6 +519,14 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri if update := plan.NodeUpdate[nodeID]; len(update) > 0 { remove = append(remove, update...) } + + // Remove any preempted allocs + if preempted := plan.NodePreemptions[nodeID]; len(preempted) > 0 { + for _, allocs := range preempted { + remove = append(remove, allocs) + } + } + if updated := plan.NodeAllocation[nodeID]; len(updated) > 0 { for _, alloc := range updated { remove = append(remove, alloc) diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 5f2e735caa0..1b5142aa919 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/nomad/testutil" "github.com/hashicorp/raft" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( @@ -271,6 +272,120 @@ func TestPlanApply_EvalPlan_Simple(t *testing.T) { } } +func TestPlanApply_EvalPlan_Preemption(t *testing.T) { + t.Parallel() + state := testStateStore(t) + node := mock.Node() + node.NodeResources = &structs.NodeResources{ + Cpu: structs.NodeCpuResources{ + CpuShares: 2000, + }, + Memory: structs.NodeMemoryResources{ + MemoryMB: 4192, + }, + Disk: structs.NodeDiskResources{ + DiskMB: 30 * 1024, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, + }, + }, + } + state.UpsertNode(1000, node) + + preemptedAlloc := mock.Alloc() + preemptedAlloc.NodeID = node.ID + preemptedAlloc.AllocatedResources = &structs.AllocatedResources{ + Shared: structs.AllocatedSharedResources{ + DiskMB: 25 * 1024, + }, + Tasks: map[string]*structs.AllocatedTaskResources{ + "web": { + Cpu: structs.AllocatedCpuResources{ + CpuShares: 1500, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 4000, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}}, + MBits: 800, + DynamicPorts: []structs.Port{{Label: "http", Value: 9876}}, + }, + }, + }, + }, + } + + // Insert a preempted alloc such that the alloc will fit only after preemption + state.UpsertAllocs(1001, []*structs.Allocation{preemptedAlloc}) + + alloc := mock.Alloc() + alloc.AllocatedResources = &structs.AllocatedResources{ + Shared: structs.AllocatedSharedResources{ + DiskMB: 24 * 1024, + }, + Tasks: map[string]*structs.AllocatedTaskResources{ + "web": { + Cpu: structs.AllocatedCpuResources{ + CpuShares: 1500, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 3200, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}}, + MBits: 800, + DynamicPorts: []structs.Port{{Label: "http", Value: 9876}}, + }, + }, + }, + }, + } + plan := &structs.Plan{ + Job: alloc.Job, + NodeAllocation: map[string][]*structs.Allocation{ + node.ID: {alloc}, + }, + NodePreemptions: map[string][]*structs.Allocation{ + node.ID: {preemptedAlloc}, + }, + Deployment: mock.Deployment(), + DeploymentUpdates: []*structs.DeploymentStatusUpdate{ + { + DeploymentID: uuid.Generate(), + Status: "foo", + StatusDescription: "bar", + }, + }, + } + snap, _ := state.Snapshot() + + pool := NewEvaluatePool(workerPoolSize, workerPoolBufferSize) + defer pool.Shutdown() + + result, err := evaluatePlan(pool, snap, plan, testlog.HCLogger(t)) + + require := require.New(t) + require.NoError(err) + require.NotNil(result) + + require.Equal(result.NodeAllocation, plan.NodeAllocation) + require.Equal(result.Deployment, plan.Deployment) + require.Equal(result.DeploymentUpdates, plan.DeploymentUpdates) + require.Equal(result.NodePreemptions, plan.NodePreemptions) + +} + func TestPlanApply_EvalPlan_Partial(t *testing.T) { t.Parallel() state := testStateStore(t) diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 8b77de60357..53df292dc22 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -44,6 +44,7 @@ func init() { aclPolicyTableSchema, aclTokenTableSchema, autopilotConfigTableSchema, + schedulerConfigTableSchema, }...) } @@ -599,3 +600,22 @@ func aclTokenTableSchema() *memdb.TableSchema { }, } } + +// schedulerConfigTableSchema returns the MemDB schema for the scheduler config table. +// This table is used to store configuration options for the scheduler +func schedulerConfigTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: "scheduler_config", + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + AllowMissing: true, + Unique: true, + // This indexer ensures that this table is a singleton + Indexer: &memdb.ConditionalIndex{ + Conditional: func(obj interface{}) (bool, error) { return true, nil }, + }, + }, + }, + } +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 8885ebab420..c2fcc4162fe 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -218,6 +218,45 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR } } + // Prepare preempted allocs in the plan results for update + var preemptedAllocs []*structs.Allocation + for _, preemptedAlloc := range results.NodePreemptions { + // Look for existing alloc + existing, err := txn.First("allocs", "id", preemptedAlloc.ID) + if err != nil { + return fmt.Errorf("alloc lookup failed: %v", err) + } + + // Nothing to do if this does not exist + if existing == nil { + continue + } + exist := existing.(*structs.Allocation) + + // Copy everything from the existing allocation + copyAlloc := exist.Copy() + + // Only update the fields set by the scheduler + copyAlloc.DesiredStatus = preemptedAlloc.DesiredStatus + copyAlloc.PreemptedByAllocation = preemptedAlloc.PreemptedByAllocation + copyAlloc.DesiredDescription = preemptedAlloc.DesiredDescription + copyAlloc.ModifyTime = preemptedAlloc.ModifyTime + preemptedAllocs = append(preemptedAllocs, copyAlloc) + + } + + // Upsert the preempted allocations + if err := s.upsertAllocsImpl(index, preemptedAllocs, txn); err != nil { + return err + } + + // Upsert followup evals for allocs that were preempted + for _, eval := range results.PreemptionEvals { + if err := s.nestedUpsertEval(txn, index, eval); err != nil { + return err + } + } + txn.Commit() return nil } @@ -3820,6 +3859,84 @@ func (s *StateStore) BootstrapACLTokens(index, resetIndex uint64, token *structs return nil } +// SchedulerConfig is used to get the current Scheduler configuration. +func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the scheduler config + c, err := tx.First("scheduler_config", "id") + if err != nil { + return 0, nil, fmt.Errorf("failed scheduler config lookup: %s", err) + } + + config, ok := c.(*structs.SchedulerConfiguration) + if !ok { + return 0, nil, nil + } + + return config.ModifyIndex, config, nil +} + +// SchedulerSetConfig is used to set the current Scheduler configuration. +func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerConfiguration) error { + tx := s.db.Txn(true) + defer tx.Abort() + + s.schedulerSetConfigTxn(idx, tx, config) + + tx.Commit() + return nil +} + +// SchedulerCASConfig is used to update the scheduler configuration with a +// given Raft index. If the CAS index specified is not equal to the last observed index +// for the config, then the call is a noop. +func (s *StateStore) SchedulerCASConfig(idx, cidx uint64, config *structs.SchedulerConfiguration) (bool, error) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Check for an existing config + existing, err := tx.First("scheduler_config", "id") + if err != nil { + return false, fmt.Errorf("failed scheduler config lookup: %s", err) + } + + // If the existing index does not match the provided CAS + // index arg, then we shouldn't update anything and can safely + // return early here. + e, ok := existing.(*structs.SchedulerConfiguration) + if !ok || e.ModifyIndex != cidx { + return false, nil + } + + s.schedulerSetConfigTxn(idx, tx, config) + + tx.Commit() + return true, nil +} + +func (s *StateStore) schedulerSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.SchedulerConfiguration) error { + // Check for an existing config + existing, err := tx.First("scheduler_config", "id") + if err != nil { + return fmt.Errorf("failed scheduler config lookup: %s", err) + } + + // Set the indexes. + if existing != nil { + config.CreateIndex = existing.(*structs.SchedulerConfiguration).CreateIndex + } else { + config.CreateIndex = idx + } + config.ModifyIndex = idx + + if err := tx.Insert("scheduler_config", config); err != nil { + return fmt.Errorf("failed updating scheduler config: %s", err) + } + return nil +} + // StateSnapshot is used to provide a point-in-time snapshot type StateSnapshot struct { StateStore @@ -3938,6 +4055,13 @@ func (r *StateRestore) ACLTokenRestore(token *structs.ACLToken) error { return nil } +func (r *StateRestore) SchedulerConfigRestore(schedConfig *structs.SchedulerConfiguration) error { + if err := r.txn.Insert("scheduler_config", schedConfig); err != nil { + return fmt.Errorf("inserting scheduler config failed: %s", err) + } + return nil +} + // addEphemeralDiskToTaskGroups adds missing EphemeralDisk objects to TaskGroups func (r *StateRestore) addEphemeralDiskToTaskGroups(job *structs.Job) { for _, tg := range job.TaskGroups { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 79cd8559f98..f0aa37aa1f7 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -244,6 +244,88 @@ func TestStateStore_UpsertPlanResults_Deployment(t *testing.T) { assert.EqualValues(1001, evalOut.ModifyIndex) } +// This test checks that: +// 1) Preempted allocations in plan results are updated +// 2) Evals are inserted for preempted jobs +func TestStateStore_UpsertPlanResults_PreemptedAllocs(t *testing.T) { + require := require.New(t) + + state := testStateStore(t) + alloc := mock.Alloc() + job := alloc.Job + alloc.Job = nil + + // Insert job + err := state.UpsertJob(999, job) + require.NoError(err) + + // Create an eval + eval := mock.Eval() + eval.JobID = job.ID + err = state.UpsertEvals(1, []*structs.Evaluation{eval}) + require.NoError(err) + + // Insert alloc that will be preempted in the plan + preemptedAlloc := mock.Alloc() + err = state.UpsertAllocs(2, []*structs.Allocation{preemptedAlloc}) + require.NoError(err) + + minimalPreemptedAlloc := &structs.Allocation{ + ID: preemptedAlloc.ID, + Namespace: preemptedAlloc.Namespace, + DesiredStatus: structs.AllocDesiredStatusEvict, + ModifyTime: time.Now().Unix(), + DesiredDescription: fmt.Sprintf("Preempted by allocation %v", alloc.ID), + } + + // Create eval for preempted job + eval2 := mock.Eval() + eval2.JobID = preemptedAlloc.JobID + + // Create a plan result + res := structs.ApplyPlanResultsRequest{ + AllocUpdateRequest: structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{alloc}, + Job: job, + }, + EvalID: eval.ID, + NodePreemptions: []*structs.Allocation{minimalPreemptedAlloc}, + PreemptionEvals: []*structs.Evaluation{eval2}, + } + + err = state.UpsertPlanResults(1000, &res) + require.NoError(err) + + ws := memdb.NewWatchSet() + + // Verify alloc and eval created by plan + out, err := state.AllocByID(ws, alloc.ID) + require.NoError(err) + require.Equal(alloc, out) + + index, err := state.Index("allocs") + require.NoError(err) + require.EqualValues(1000, index) + + evalOut, err := state.EvalByID(ws, eval.ID) + require.NoError(err) + require.NotNil(evalOut) + require.EqualValues(1000, evalOut.ModifyIndex) + + // Verify preempted alloc + preempted, err := state.AllocByID(ws, preemptedAlloc.ID) + require.NoError(err) + require.Equal(preempted.DesiredStatus, structs.AllocDesiredStatusEvict) + require.Equal(preempted.DesiredDescription, fmt.Sprintf("Preempted by allocation %v", alloc.ID)) + + // Verify eval for preempted job + preemptedJobEval, err := state.EvalByID(ws, eval2.ID) + require.NoError(err) + require.NotNil(preemptedJobEval) + require.EqualValues(1000, preemptedJobEval.ModifyIndex) + +} + // This test checks that deployment updates are applied correctly func TestStateStore_UpsertPlanResults_DeploymentUpdates(t *testing.T) { state := testStateStore(t) @@ -6687,6 +6769,33 @@ func TestStateStore_RestoreACLToken(t *testing.T) { assert.Equal(t, token, out) } +func TestStateStore_SchedulerConfig(t *testing.T) { + state := testStateStore(t) + schedConfig := &structs.SchedulerConfiguration{ + PreemptionConfig: structs.PreemptionConfig{ + SystemSchedulerEnabled: false, + }, + CreateIndex: 100, + ModifyIndex: 200, + } + + require := require.New(t) + restore, err := state.Restore() + + require.Nil(err) + + err = restore.SchedulerConfigRestore(schedConfig) + require.Nil(err) + + restore.Commit() + + modIndex, out, err := state.SchedulerConfig() + require.Nil(err) + require.Equal(schedConfig.ModifyIndex, modIndex) + + require.Equal(schedConfig, out) +} + func TestStateStore_Abandon(t *testing.T) { s := testStateStore(t) abandonCh := s.AbandonCh() diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go index ecd7f97d411..059399d9184 100644 --- a/nomad/structs/operator.go +++ b/nomad/structs/operator.go @@ -119,3 +119,43 @@ type AutopilotConfig struct { CreateIndex uint64 ModifyIndex uint64 } + +// SchedulerConfiguration is the config for controlling scheduler behavior +type SchedulerConfiguration struct { + // PreemptionConfig specifies whether to enable eviction of lower + // priority jobs to place higher priority jobs. + PreemptionConfig PreemptionConfig + + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. + CreateIndex uint64 + ModifyIndex uint64 +} + +// SchedulerConfigurationResponse is the response object that wraps SchedulerConfiguration +type SchedulerConfigurationResponse struct { + // SchedulerConfig contains scheduler config options + SchedulerConfig SchedulerConfiguration + + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. + CreateIndex uint64 + ModifyIndex uint64 +} + +// PreemptionConfig specifies whether preemption is enabled based on scheduler type +type PreemptionConfig struct { + // SystemSchedulerEnabled specifies if preemption is enabled for system jobs + SystemSchedulerEnabled bool +} + +// SchedulerSetConfigRequest is used by the Operator endpoint to update the +// current Scheduler configuration of the cluster. +type SchedulerSetConfigRequest struct { + // Config is the new Scheduler configuration to use. + Config SchedulerConfiguration + + // CAS controls whether to use check-and-set semantics for this request. + CAS bool + + // WriteRequest holds the ACL token to go along with this request. + WriteRequest +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index f26df1d5c35..014ee78ada0 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -81,6 +81,7 @@ const ( AllocUpdateDesiredTransitionRequestType NodeUpdateEligibilityRequestType BatchNodeUpdateDrainRequestType + SchedulerConfigRequestType ) const ( @@ -643,6 +644,14 @@ type ApplyPlanResultsRequest struct { // processed many times, potentially making state updates, without the state of // the evaluation itself being updated. EvalID string + + // NodePreemptions is a slice of allocations from other lower priority jobs + // that are preempted. Preempted allocations are marked as evicted. + NodePreemptions []*Allocation + + // PreemptionEvals is a slice of follow up evals for jobs whose allocations + // have been preempted to place allocs in this plan + PreemptionEvals []*Evaluation } // AllocUpdateRequest is used to submit changes to allocations, either @@ -7040,6 +7049,14 @@ type Allocation struct { // that can be rescheduled in the future FollowupEvalID string + // PreemptedAllocations captures IDs of any allocations that were preempted + // in order to place this allocation + PreemptedAllocations []string + + // PreemptedByAllocation tracks the alloc ID of the allocation that caused this allocation + // to stop running because it got preempted + PreemptedByAllocation string + // Raft Indexes CreateIndex uint64 ModifyIndex uint64 @@ -7114,6 +7131,7 @@ func (a *Allocation) copyImpl(job bool) *Allocation { } na.RescheduleTracker = a.RescheduleTracker.Copy() + na.PreemptedAllocations = helper.CopySliceString(a.PreemptedAllocations) return na } @@ -7385,6 +7403,7 @@ func (a *Allocation) ComparableResources() *ComparableResources { Memory: AllocatedMemoryResources{ MemoryMB: int64(resources.MemoryMB), }, + Networks: resources.Networks, }, Shared: AllocatedSharedResources{ DiskMB: int64(resources.DiskMB), @@ -7758,6 +7777,7 @@ const ( EvalTriggerMaxPlans = "max-plan-attempts" EvalTriggerRetryFailedAlloc = "alloc-failure" EvalTriggerQueuedAllocs = "queued-allocs" + EvalTriggerPreemption = "preemption" ) const ( @@ -7983,11 +8003,12 @@ func (e *Evaluation) ShouldBlock() bool { // for a given Job func (e *Evaluation) MakePlan(j *Job) *Plan { p := &Plan{ - EvalID: e.ID, - Priority: e.Priority, - Job: j, - NodeUpdate: make(map[string][]*Allocation), - NodeAllocation: make(map[string][]*Allocation), + EvalID: e.ID, + Priority: e.Priority, + Job: j, + NodeUpdate: make(map[string][]*Allocation), + NodeAllocation: make(map[string][]*Allocation), + NodePreemptions: make(map[string][]*Allocation), } if j != nil { p.AllAtOnce = j.AllAtOnce @@ -8100,6 +8121,11 @@ type Plan struct { // deployments. This allows the scheduler to cancel any unneeded deployment // because the job is stopped or the update block is removed. DeploymentUpdates []*DeploymentStatusUpdate + + // NodePreemptions is a map from node id to a set of allocations from other + // lower priority jobs that are preempted. Preempted allocations are marked + // as evicted. + NodePreemptions map[string][]*Allocation } // AppendUpdate marks the allocation for eviction. The clientStatus of the @@ -8132,6 +8158,35 @@ func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clien p.NodeUpdate[node] = append(existing, newAlloc) } +// AppendPreemptedAlloc is used to append an allocation that's being preempted to the plan. +// To minimize the size of the plan, this only sets a minimal set of fields in the allocation +func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, desiredStatus, preemptingAllocID string) { + newAlloc := &Allocation{} + newAlloc.ID = alloc.ID + newAlloc.JobID = alloc.JobID + newAlloc.Namespace = alloc.Namespace + newAlloc.DesiredStatus = desiredStatus + newAlloc.PreemptedByAllocation = preemptingAllocID + + desiredDesc := fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocID) + newAlloc.DesiredDescription = desiredDesc + + // TaskResources are needed by the plan applier to check if allocations fit + // after removing preempted allocations + if alloc.AllocatedResources != nil { + newAlloc.AllocatedResources = alloc.AllocatedResources + } else { + // COMPAT Remove in version 0.11 + newAlloc.TaskResources = alloc.TaskResources + newAlloc.SharedResources = alloc.SharedResources + } + + // Append this alloc to slice for this node + node := alloc.NodeID + existing := p.NodePreemptions[node] + p.NodePreemptions[node] = append(existing, newAlloc) +} + func (p *Plan) PopUpdate(alloc *Allocation) { existing := p.NodeUpdate[alloc.NodeID] n := len(existing) @@ -8177,6 +8232,11 @@ type PlanResult struct { // DeploymentUpdates is the set of deployment updates that were committed. DeploymentUpdates []*DeploymentStatusUpdate + // NodePreemptions is a map from node id to a set of allocations from other + // lower priority jobs that are preempted. Preempted allocations are marked + // as stopped. + NodePreemptions map[string][]*Allocation + // RefreshIndex is the index the worker should refresh state up to. // This allows all evictions and allocations to be materialized. // If any allocations were rejected due to stale data (node state, @@ -8213,6 +8273,9 @@ func (p *PlanResult) FullCommit(plan *Plan) (bool, int, int) { type PlanAnnotations struct { // DesiredTGUpdates is the set of desired updates per task group. DesiredTGUpdates map[string]*DesiredUpdates + + // PreemptedAllocs is the set of allocations to be preempted to make the placement successful. + PreemptedAllocs []*AllocListStub } // DesiredUpdates is the set of changes the scheduler would like to make given @@ -8225,6 +8288,7 @@ type DesiredUpdates struct { InPlaceUpdate uint64 DestructiveUpdate uint64 Canary uint64 + Preemptions uint64 } func (d *DesiredUpdates) GoString() string { diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 88bebcbe880..a2338300ee2 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -1869,6 +1869,75 @@ func TestResource_Add_Network(t *testing.T) { } } +func TestComparableResources_Subtract(t *testing.T) { + r1 := &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{ + CpuShares: 2000, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 2048, + }, + Networks: []*NetworkResource{ + { + CIDR: "10.0.0.0/8", + MBits: 100, + ReservedPorts: []Port{{"ssh", 22}}, + }, + }, + }, + Shared: AllocatedSharedResources{ + DiskMB: 10000, + }, + } + + r2 := &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{ + CpuShares: 1000, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 1024, + }, + Networks: []*NetworkResource{ + { + CIDR: "10.0.0.0/8", + MBits: 20, + ReservedPorts: []Port{{"ssh", 22}}, + }, + }, + }, + Shared: AllocatedSharedResources{ + DiskMB: 5000, + }, + } + r1.Subtract(r2) + + expect := &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{ + CpuShares: 1000, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 1024, + }, + Networks: []*NetworkResource{ + { + CIDR: "10.0.0.0/8", + MBits: 80, + ReservedPorts: []Port{{"ssh", 22}}, + }, + }, + }, + Shared: AllocatedSharedResources{ + DiskMB: 5000, + }, + } + + require := require.New(t) + require.Equal(expect, r1) +} + func TestEncodeDecode(t *testing.T) { type FooRequest struct { Foo string diff --git a/scheduler/context.go b/scheduler/context.go index 031a3b45a51..1da0776913e 100644 --- a/scheduler/context.go +++ b/scheduler/context.go @@ -123,6 +123,12 @@ func (e *EvalContext) ProposedAllocs(nodeID string) ([]*structs.Allocation, erro proposed = structs.RemoveAllocs(existingAlloc, update) } + // Remove any allocs that are being preempted + nodePreemptedAllocs := e.plan.NodePreemptions[nodeID] + if len(nodePreemptedAllocs) > 0 { + proposed = structs.RemoveAllocs(existingAlloc, nodePreemptedAllocs) + } + // We create an index of the existing allocations so that if an inplace // update occurs, we do not double count and we override the old allocation. proposedIDs := make(map[string]*structs.Allocation, len(proposed)) diff --git a/scheduler/context_test.go b/scheduler/context_test.go index 28416b04773..8eb7792c89b 100644 --- a/scheduler/context_test.go +++ b/scheduler/context_test.go @@ -14,8 +14,9 @@ import ( func testContext(t testing.TB) (*state.StateStore, *EvalContext) { state := state.TestStateStore(t) plan := &structs.Plan{ - NodeUpdate: make(map[string][]*structs.Allocation), - NodeAllocation: make(map[string][]*structs.Allocation), + NodeUpdate: make(map[string][]*structs.Allocation), + NodeAllocation: make(map[string][]*structs.Allocation), + NodePreemptions: make(map[string][]*structs.Allocation), } logger := testlog.HCLogger(t) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index cf6ce977af8..93f982ede7d 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -130,7 +130,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { structs.EvalTriggerRollingUpdate, structs.EvalTriggerQueuedAllocs, structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans, structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc, - structs.EvalTriggerFailedFollowUp: + structs.EvalTriggerFailedFollowUp, structs.EvalTriggerPreemption: default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) diff --git a/scheduler/preemption.go b/scheduler/preemption.go new file mode 100644 index 00000000000..fd082cdada3 --- /dev/null +++ b/scheduler/preemption.go @@ -0,0 +1,608 @@ +package scheduler + +import ( + "math" + "sort" + + "github.com/hashicorp/nomad/nomad/structs" +) + +// maxParallelPenalty is a score penalty applied to allocations to mitigate against +// too many allocations of the same job being preempted. This penalty is applied after the +// number of allocations being preempted exceeds max_parallel value in the job's migrate stanza +const maxParallelPenalty = 50.0 + +type groupedAllocs struct { + priority int + allocs []*structs.Allocation +} + +type allocInfo struct { + maxParallel int + resources *structs.ComparableResources +} + +// PreemptionResource interface is implemented by different +// types of resources. +type PreemptionResource interface { + // MeetsRequirements returns true if the available resources match needed resources + MeetsRequirements() bool + + // Distance returns values in the range [0, MaxFloat], lower is better + Distance() float64 +} + +// NetworkPreemptionResource implements PreemptionResource for network assignments +// It only looks at MBits needed +type NetworkPreemptionResource struct { + availableResources *structs.NetworkResource + resourceNeeded *structs.NetworkResource +} + +func (n *NetworkPreemptionResource) MeetsRequirements() bool { + mbitsAvailable := n.availableResources.MBits + mbitsNeeded := n.resourceNeeded.MBits + if mbitsAvailable == 0 || mbitsNeeded == 0 { + return false + } + return mbitsAvailable >= mbitsNeeded +} + +func (n *NetworkPreemptionResource) Distance() float64 { + return networkResourceDistance(n.availableResources, n.resourceNeeded) +} + +// BasePreemptionResource implements PreemptionResource for CPU/Memory/Disk +type BasePreemptionResource struct { + availableResources *structs.ComparableResources + resourceNeeded *structs.ComparableResources +} + +func (b *BasePreemptionResource) MeetsRequirements() bool { + super, _ := b.availableResources.Superset(b.resourceNeeded) + return super +} + +func (b *BasePreemptionResource) Distance() float64 { + return basicResourceDistance(b.resourceNeeded, b.availableResources) +} + +// PreemptionResourceFactory returns a new PreemptionResource +type PreemptionResourceFactory func(availableResources *structs.ComparableResources, resourceAsk *structs.ComparableResources) PreemptionResource + +// GetNetworkPreemptionResourceFactory returns a preemption resource factory for network assignments +func GetNetworkPreemptionResourceFactory() PreemptionResourceFactory { + return func(availableResources *structs.ComparableResources, resourceNeeded *structs.ComparableResources) PreemptionResource { + available := availableResources.Flattened.Networks[0] + return &NetworkPreemptionResource{ + availableResources: available, + resourceNeeded: resourceNeeded.Flattened.Networks[0], + } + } +} + +// GetBasePreemptionResourceFactory returns a preemption resource factory for CPU/Memory/Disk +func GetBasePreemptionResourceFactory() PreemptionResourceFactory { + return func(availableResources *structs.ComparableResources, resourceNeeded *structs.ComparableResources) PreemptionResource { + return &BasePreemptionResource{ + availableResources: availableResources, + resourceNeeded: resourceNeeded, + } + } +} + +// Preemptor is used to track existing allocations +// and find suitable allocations to preempt +type Preemptor struct { + + // currentPreemptions is a map computed when SetPreemptions is called + // it tracks the number of preempted allocations per job/taskgroup + currentPreemptions map[structs.NamespacedID]map[string]int + + // allocDetails is a map computed when SetCandidates is called + // it stores some precomputed details about the allocation needed + // when scoring it for preemption + allocDetails map[string]*allocInfo + + // jobPriority is the priority of the job being preempted + jobPriority int + + // nodeRemainingResources tracks available resources on the node after + // accounting for running allocations + nodeRemainingResources *structs.ComparableResources + + // currentAllocs is the candidate set used to find preemptible allocations + currentAllocs []*structs.Allocation +} + +func NewPreemptor(jobPriority int) *Preemptor { + return &Preemptor{ + currentPreemptions: make(map[structs.NamespacedID]map[string]int), + jobPriority: jobPriority, + allocDetails: make(map[string]*allocInfo), + } +} + +// SetNode sets the node +func (p *Preemptor) SetNode(node *structs.Node) { + nodeRemainingResources := node.ComparableResources() + + // Subtract the reserved resources of the node + if c := node.ComparableReservedResources(); c != nil { + nodeRemainingResources.Subtract(c) + } + p.nodeRemainingResources = nodeRemainingResources +} + +// SetCandidates initializes the candidate set from which preemptions are chosen +func (p *Preemptor) SetCandidates(allocs []*structs.Allocation) { + p.currentAllocs = allocs + for _, alloc := range allocs { + maxParallel := 0 + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + if tg != nil && tg.Migrate != nil { + maxParallel = tg.Migrate.MaxParallel + } + p.allocDetails[alloc.ID] = &allocInfo{maxParallel: maxParallel, resources: alloc.ComparableResources()} + } +} + +// SetPreemptions initializes a map tracking existing counts of preempted allocations +// per job/task group. This is used while scoring preemption options +func (p *Preemptor) SetPreemptions(allocs []*structs.Allocation) { + + // Clear out existing values since this can be called more than once + p.currentPreemptions = make(map[structs.NamespacedID]map[string]int) + + // Initialize counts + for _, alloc := range allocs { + id := structs.NamespacedID{alloc.JobID, alloc.Namespace} + countMap, ok := p.currentPreemptions[id] + if !ok { + countMap = make(map[string]int) + p.currentPreemptions[id] = countMap + } + countMap[alloc.TaskGroup]++ + } +} + +// getNumPreemptions counts the number of other allocations being preempted that match the job and task group of +// the alloc under consideration. This is used as a scoring factor to minimize too many allocs of the same job being preempted at once +func (p *Preemptor) getNumPreemptions(alloc *structs.Allocation) int { + c, ok := p.currentPreemptions[structs.NamespacedID{alloc.JobID, alloc.Namespace}][alloc.TaskGroup] + if !ok { + return 0 + } + return c +} + +// PreemptForTaskGroup computes a list of allocations to preempt to accommodate +// the resources asked for. Only allocs with a job priority < 10 of jobPriority are considered +// This method is meant only for finding preemptible allocations based on CPU/Memory/Disk +func (p *Preemptor) PreemptForTaskGroup(resourceAsk *structs.AllocatedResources) []*structs.Allocation { + resourcesNeeded := resourceAsk.Comparable() + + // Subtract current allocations + for _, alloc := range p.currentAllocs { + allocResources := p.allocDetails[alloc.ID].resources + p.nodeRemainingResources.Subtract(allocResources) + } + + // Group candidates by priority, filter out ineligible allocs + allocsByPriority := filterAndGroupPreemptibleAllocs(p.jobPriority, p.currentAllocs) + + var bestAllocs []*structs.Allocation + allRequirementsMet := false + + // Initialize variable to track resources as they become available from preemption + availableResources := p.nodeRemainingResources.Copy() + + resourcesAsked := resourceAsk.Comparable() + // Iterate over allocations grouped by priority to find preemptible allocations + for _, allocGrp := range allocsByPriority { + for len(allocGrp.allocs) > 0 && !allRequirementsMet { + closestAllocIndex := -1 + bestDistance := math.MaxFloat64 + // Find the alloc with the closest distance + for index, alloc := range allocGrp.allocs { + currentPreemptionCount := p.getNumPreemptions(alloc) + allocDetails := p.allocDetails[alloc.ID] + maxParallel := allocDetails.maxParallel + distance := scoreForTaskGroup(resourcesNeeded, allocDetails.resources, maxParallel, currentPreemptionCount) + if distance < bestDistance { + bestDistance = distance + closestAllocIndex = index + } + } + closestAlloc := allocGrp.allocs[closestAllocIndex] + closestResources := p.allocDetails[closestAlloc.ID].resources + availableResources.Add(closestResources) + + // This step needs the original resources asked for as the second arg, can't use the running total + allRequirementsMet, _ = availableResources.Superset(resourcesAsked) + + bestAllocs = append(bestAllocs, closestAlloc) + + allocGrp.allocs[closestAllocIndex] = allocGrp.allocs[len(allocGrp.allocs)-1] + allocGrp.allocs = allocGrp.allocs[:len(allocGrp.allocs)-1] + + // This is the remaining total of resources needed + resourcesNeeded.Subtract(closestResources) + } + if allRequirementsMet { + break + } + } + + // Early return if all allocs examined and requirements were not met + if !allRequirementsMet { + return nil + } + + // We do another pass to eliminate unnecessary preemptions + // This filters out allocs whose resources are already covered by another alloc + basePreemptionResource := GetBasePreemptionResourceFactory() + resourcesNeeded = resourceAsk.Comparable() + filteredBestAllocs := p.filterSuperset(bestAllocs, p.nodeRemainingResources, resourcesNeeded, basePreemptionResource) + return filteredBestAllocs + +} + +// PreemptForNetwork tries to find allocations to preempt to meet network resources. +// This is called once per task when assigning a network to the task. While finding allocations +// to preempt, this only considers allocations that share the same network device +func (p *Preemptor) PreemptForNetwork(networkResourceAsk *structs.NetworkResource, netIdx *structs.NetworkIndex) []*structs.Allocation { + + // Early return if there are no current allocs + if len(p.currentAllocs) == 0 { + return nil + } + + deviceToAllocs := make(map[string][]*structs.Allocation) + MbitsNeeded := networkResourceAsk.MBits + reservedPortsNeeded := networkResourceAsk.ReservedPorts + + // Build map of reserved ports needed for fast access + reservedPorts := make(map[int]struct{}) + for _, port := range reservedPortsNeeded { + reservedPorts[port.Value] = struct{}{} + } + + // filteredReservedPorts tracks reserved ports that are + // currently used by higher priority allocations that can't + // be preempted + filteredReservedPorts := make(map[string]map[int]struct{}) + + // Create a map from each device to allocs + // We can only preempt within allocations that + // are using the same device + for _, alloc := range p.currentAllocs { + if alloc.Job == nil { + continue + } + + // Filter out alloc that's ineligible due to priority + if p.jobPriority-alloc.Job.Priority < 10 { + // Populate any reserved ports used by + // this allocation that cannot be preempted + allocResources := p.allocDetails[alloc.ID].resources + networks := allocResources.Flattened.Networks + net := networks[0] + for _, port := range net.ReservedPorts { + portMap, ok := filteredReservedPorts[net.Device] + if !ok { + portMap = make(map[int]struct{}) + filteredReservedPorts[net.Device] = portMap + } + portMap[port.Value] = struct{}{} + } + continue + } + allocResources := p.allocDetails[alloc.ID].resources + networks := allocResources.Flattened.Networks + + // Only include if the alloc has a network device + if len(networks) > 0 { + device := networks[0].Device + allocsForDevice := deviceToAllocs[device] + allocsForDevice = append(allocsForDevice, alloc) + deviceToAllocs[device] = allocsForDevice + } + } + + // If no existing allocations use network resources, return early + if len(deviceToAllocs) == 0 { + return nil + } + + var allocsToPreempt []*structs.Allocation + met := false + freeBandwidth := 0 + +OUTER: + for device, currentAllocs := range deviceToAllocs { + totalBandwidth := netIdx.AvailBandwidth[device] + + // If the device doesn't have enough total available bandwidth, skip + if totalBandwidth < MbitsNeeded { + continue + } + + // Track how much existing free bandwidth we have before preemption + freeBandwidth = totalBandwidth - netIdx.UsedBandwidth[device] + + preemptedBandwidth := 0 + + // Reset allocsToPreempt since we don't want to preempt across devices for the same task + allocsToPreempt = nil + + // usedPortToAlloc tracks used ports by allocs in this device + usedPortToAlloc := make(map[int]*structs.Allocation) + + // First try to satisfy needed reserved ports + if len(reservedPortsNeeded) > 0 { + + // Populate usedPort map + for _, alloc := range currentAllocs { + allocResources := p.allocDetails[alloc.ID].resources + for _, n := range allocResources.Flattened.Networks { + reservedPorts := n.ReservedPorts + for _, p := range reservedPorts { + usedPortToAlloc[p.Value] = alloc + } + } + } + // Look for allocs that are using reserved ports needed + for _, port := range reservedPortsNeeded { + alloc, ok := usedPortToAlloc[port.Value] + if ok { + allocResources := p.allocDetails[alloc.ID].resources + preemptedBandwidth += allocResources.Flattened.Networks[0].MBits + allocsToPreempt = append(allocsToPreempt, alloc) + } else { + // Check if a higher priority allocation is using this port + // It cant be preempted so we skip to the next device + _, ok := filteredReservedPorts[device][port.Value] + if ok { + continue OUTER + } + } + } + + // Remove allocs that were preempted to satisfy reserved ports + currentAllocs = structs.RemoveAllocs(currentAllocs, allocsToPreempt) + } + + // If bandwidth requirements have been met, stop + if preemptedBandwidth+freeBandwidth >= MbitsNeeded { + met = true + break OUTER + } + + // Split by priority + allocsByPriority := filterAndGroupPreemptibleAllocs(p.jobPriority, currentAllocs) + + for _, allocsGrp := range allocsByPriority { + allocs := allocsGrp.allocs + + // Sort by distance function + sort.Slice(allocs, func(i, j int) bool { + return p.distanceComparatorForNetwork(allocs, networkResourceAsk, i, j) + }) + + // Iterate over allocs until end of if requirements have been met + for _, alloc := range allocs { + allocResources := p.allocDetails[alloc.ID].resources + preemptedBandwidth += allocResources.Flattened.Networks[0].MBits + allocsToPreempt = append(allocsToPreempt, alloc) + if preemptedBandwidth+freeBandwidth >= MbitsNeeded { + met = true + break OUTER + } + } + + } + + } + + // Early return if we could not meet resource needs after examining allocs + if !met { + return nil + } + + // Build a resource object with just the network Mbits filled in + // Its safe to use the first preempted allocation's network resource + // here because all allocations preempted will be from the same device + nodeRemainingResources := &structs.ComparableResources{ + Flattened: structs.AllocatedTaskResources{ + Networks: []*structs.NetworkResource{ + { + Device: allocsToPreempt[0].Resources.Networks[0].Device, + MBits: freeBandwidth, + }, + }, + }, + } + + // Do a final pass to eliminate any superset allocations + preemptionResourceFactory := GetNetworkPreemptionResourceFactory() + resourcesNeeded := &structs.ComparableResources{ + Flattened: structs.AllocatedTaskResources{ + Networks: []*structs.NetworkResource{networkResourceAsk}, + }, + } + filteredBestAllocs := p.filterSuperset(allocsToPreempt, nodeRemainingResources, resourcesNeeded, preemptionResourceFactory) + return filteredBestAllocs +} + +// basicResourceDistance computes a distance using a coordinate system. It compares resource fields like CPU/Memory and Disk. +// Values emitted are in the range [0, maxFloat] +func basicResourceDistance(resourceAsk *structs.ComparableResources, resourceUsed *structs.ComparableResources) float64 { + memoryCoord, cpuCoord, diskMBCoord := 0.0, 0.0, 0.0 + if resourceAsk.Flattened.Memory.MemoryMB > 0 { + memoryCoord = (float64(resourceAsk.Flattened.Memory.MemoryMB) - float64(resourceUsed.Flattened.Memory.MemoryMB)) / float64(resourceAsk.Flattened.Memory.MemoryMB) + } + if resourceAsk.Flattened.Cpu.CpuShares > 0 { + cpuCoord = (float64(resourceAsk.Flattened.Cpu.CpuShares) - float64(resourceUsed.Flattened.Cpu.CpuShares)) / float64(resourceAsk.Flattened.Cpu.CpuShares) + } + if resourceAsk.Shared.DiskMB > 0 { + diskMBCoord = (float64(resourceAsk.Shared.DiskMB) - float64(resourceUsed.Shared.DiskMB)) / float64(resourceAsk.Shared.DiskMB) + } + originDist := math.Sqrt( + math.Pow(memoryCoord, 2) + + math.Pow(cpuCoord, 2) + + math.Pow(diskMBCoord, 2)) + return originDist +} + +// networkResourceDistance returns a distance based only on network megabits +func networkResourceDistance(resourceUsed *structs.NetworkResource, resourceNeeded *structs.NetworkResource) float64 { + networkCoord := math.MaxFloat64 + if resourceUsed != nil && resourceNeeded != nil { + networkCoord = float64(resourceNeeded.MBits-resourceUsed.MBits) / float64(resourceNeeded.MBits) + } + + originDist := math.Abs(networkCoord) + return originDist +} + +// scoreForTaskGroup is used to calculate a score (lower is better) based on the distance between +// the needed resource and requirements. A penalty is added when the choice already has some existing +// allocations in the plan that are being preempted. +func scoreForTaskGroup(resourceAsk *structs.ComparableResources, resourceUsed *structs.ComparableResources, maxParallel int, numPreemptedAllocs int) float64 { + maxParallelScorePenalty := 0.0 + if maxParallel > 0 && numPreemptedAllocs >= maxParallel { + maxParallelScorePenalty = float64((numPreemptedAllocs+1)-maxParallel) * maxParallelPenalty + } + return basicResourceDistance(resourceAsk, resourceUsed) + maxParallelScorePenalty +} + +// scoreForNetwork is similar to scoreForTaskGroup +// but only uses network Mbits to calculate a preemption score +func scoreForNetwork(resourceUsed *structs.NetworkResource, resourceNeeded *structs.NetworkResource, maxParallel int, numPreemptedAllocs int) float64 { + if resourceUsed == nil || resourceNeeded == nil { + return math.MaxFloat64 + } + maxParallelScorePenalty := 0.0 + if maxParallel > 0 && numPreemptedAllocs >= maxParallel { + maxParallelScorePenalty = float64((numPreemptedAllocs+1)-maxParallel) * maxParallelPenalty + } + return networkResourceDistance(resourceUsed, resourceNeeded) + maxParallelScorePenalty +} + +// filterAndGroupPreemptibleAllocs groups allocations by priority after filtering allocs +// that are not preemptible based on the jobPriority arg +func filterAndGroupPreemptibleAllocs(jobPriority int, current []*structs.Allocation) []*groupedAllocs { + allocsByPriority := make(map[int][]*structs.Allocation) + for _, alloc := range current { + if alloc.Job == nil { + continue + } + + // Skip allocs whose priority is within a delta of 10 + // This also skips any allocs of the current job + // for which we are attempting preemption + if jobPriority-alloc.Job.Priority < 10 { + continue + } + grpAllocs, ok := allocsByPriority[alloc.Job.Priority] + if !ok { + grpAllocs = make([]*structs.Allocation, 0) + } + grpAllocs = append(grpAllocs, alloc) + allocsByPriority[alloc.Job.Priority] = grpAllocs + } + + var groupedSortedAllocs []*groupedAllocs + for priority, allocs := range allocsByPriority { + groupedSortedAllocs = append(groupedSortedAllocs, &groupedAllocs{ + priority: priority, + allocs: allocs}) + } + + // Sort by priority + sort.Slice(groupedSortedAllocs, func(i, j int) bool { + return groupedSortedAllocs[i].priority < groupedSortedAllocs[j].priority + }) + + return groupedSortedAllocs +} + +// filterSuperset is used as a final step to remove +// any allocations that meet a superset of requirements from +// the set of allocations to preempt +func (p *Preemptor) filterSuperset(bestAllocs []*structs.Allocation, + nodeRemainingResources *structs.ComparableResources, + resourceAsk *structs.ComparableResources, + preemptionResourceFactory PreemptionResourceFactory) []*structs.Allocation { + + // Sort bestAllocs by distance descending (without penalty) + sort.Slice(bestAllocs, func(i, j int) bool { + a1Resources := p.allocDetails[bestAllocs[i].ID].resources + a2Resources := p.allocDetails[bestAllocs[j].ID].resources + distance1 := preemptionResourceFactory(a1Resources, resourceAsk).Distance() + distance2 := preemptionResourceFactory(a2Resources, resourceAsk).Distance() + return distance1 > distance2 + }) + + availableResources := nodeRemainingResources.Copy() + var filteredBestAllocs []*structs.Allocation + + // Do another pass to eliminate allocations that are a superset of other allocations + // in the preemption set + for _, alloc := range bestAllocs { + filteredBestAllocs = append(filteredBestAllocs, alloc) + allocResources := p.allocDetails[alloc.ID].resources + availableResources.Add(allocResources) + + premptionResource := preemptionResourceFactory(availableResources, resourceAsk) + requirementsMet := premptionResource.MeetsRequirements() + if requirementsMet { + break + } + } + return filteredBestAllocs +} + +// distanceComparatorForNetwork is used as the sorting function when finding allocations to preempt. It uses +// both a coordinate distance function based on Mbits needed, and a penalty if the allocation under consideration +// belongs to a job that already has more preempted allocations +func (p *Preemptor) distanceComparatorForNetwork(allocs []*structs.Allocation, networkResourceAsk *structs.NetworkResource, i int, j int) bool { + firstAlloc := allocs[i] + currentPreemptionCount1 := p.getNumPreemptions(firstAlloc) + + // Look up configured maxParallel value for these allocation's task groups + var maxParallel1, maxParallel2 int + tg1 := allocs[i].Job.LookupTaskGroup(firstAlloc.TaskGroup) + if tg1 != nil && tg1.Migrate != nil { + maxParallel1 = tg1.Migrate.MaxParallel + } + + // Dereference network usage on first alloc if its there + firstAllocResources := p.allocDetails[firstAlloc.ID].resources + firstAllocNetworks := firstAllocResources.Flattened.Networks + var firstAllocNetResourceUsed *structs.NetworkResource + if len(firstAllocNetworks) > 0 { + firstAllocNetResourceUsed = firstAllocNetworks[0] + } + + distance1 := scoreForNetwork(firstAllocNetResourceUsed, networkResourceAsk, maxParallel1, currentPreemptionCount1) + + secondAlloc := allocs[j] + currentPreemptionCount2 := p.getNumPreemptions(secondAlloc) + tg2 := secondAlloc.Job.LookupTaskGroup(secondAlloc.TaskGroup) + if tg2 != nil && tg2.Migrate != nil { + maxParallel2 = tg2.Migrate.MaxParallel + } + + // Dereference network usage on second alloc if its there + secondAllocResources := p.allocDetails[secondAlloc.ID].resources + secondAllocNetworks := secondAllocResources.Flattened.Networks + var secondAllocNetResourceUsed *structs.NetworkResource + if len(secondAllocNetworks) > 0 { + secondAllocNetResourceUsed = secondAllocNetworks[0] + } + + distance2 := scoreForNetwork(secondAllocNetResourceUsed, networkResourceAsk, maxParallel2, currentPreemptionCount2) + return distance1 < distance2 +} diff --git a/scheduler/preemption_test.go b/scheduler/preemption_test.go new file mode 100644 index 00000000000..cf6f1ef26c8 --- /dev/null +++ b/scheduler/preemption_test.go @@ -0,0 +1,961 @@ +package scheduler + +import ( + "testing" + + "fmt" + + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func TestResourceDistance(t *testing.T) { + resourceAsk := &structs.ComparableResources{ + Flattened: structs.AllocatedTaskResources{ + Cpu: structs.AllocatedCpuResources{ + CpuShares: 2048, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 512, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 1024, + }, + }, + }, + Shared: structs.AllocatedSharedResources{ + DiskMB: 4096, + }, + } + + type testCase struct { + allocResource *structs.ComparableResources + expectedDistance string + } + + testCases := []*testCase{ + { + &structs.ComparableResources{ + Flattened: structs.AllocatedTaskResources{ + Cpu: structs.AllocatedCpuResources{ + CpuShares: 2048, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 512, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 1024, + }, + }, + }, + Shared: structs.AllocatedSharedResources{ + DiskMB: 4096, + }, + }, + "0.000", + }, + { + &structs.ComparableResources{ + Flattened: structs.AllocatedTaskResources{ + Cpu: structs.AllocatedCpuResources{ + CpuShares: 1024, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 400, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 1024, + }, + }, + }, + Shared: structs.AllocatedSharedResources{ + DiskMB: 1024, + }, + }, + "0.928", + }, + { + &structs.ComparableResources{ + Flattened: structs.AllocatedTaskResources{ + Cpu: structs.AllocatedCpuResources{ + CpuShares: 8192, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 200, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 512, + }, + }, + }, + Shared: structs.AllocatedSharedResources{ + DiskMB: 1024, + }, + }, + "3.152", + }, + { + &structs.ComparableResources{ + Flattened: structs.AllocatedTaskResources{ + Cpu: structs.AllocatedCpuResources{ + CpuShares: 2048, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 500, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 1024, + }, + }, + }, + Shared: structs.AllocatedSharedResources{ + DiskMB: 4096, + }, + }, + "0.023", + }, + } + + for _, tc := range testCases { + t.Run("", func(t *testing.T) { + require := require.New(t) + actualDistance := fmt.Sprintf("%3.3f", basicResourceDistance(resourceAsk, tc.allocResource)) + require.Equal(tc.expectedDistance, actualDistance) + }) + + } + +} + +func TestPreemption(t *testing.T) { + type testCase struct { + desc string + currentAllocations []*structs.Allocation + nodeReservedCapacity *structs.Resources + nodeCapacity *structs.Resources + resourceAsk *structs.Resources + jobPriority int + currentPreemptions []*structs.Allocation + preemptedAllocIDs map[string]struct{} + } + + highPrioJob := mock.Job() + highPrioJob.Priority = 100 + + lowPrioJob := mock.Job() + lowPrioJob.Priority = 30 + + lowPrioJob2 := mock.Job() + lowPrioJob2.Priority = 30 + + // Create some persistent alloc ids to use in test cases + allocIDs := []string{uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate()} + + // TODO(preetha): Switch to using NodeResources and NodeReservedResources + defaultNodeResources := &structs.Resources{ + CPU: 4000, + MemoryMB: 8192, + DiskMB: 100 * 1024, + IOPS: 150, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, + }, + }, + } + reservedNodeResources := &structs.Resources{ + CPU: 100, + MemoryMB: 256, + DiskMB: 4 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "ssh", Value: 22}}, + MBits: 1, + }, + }, + } + + testCases := []testCase{ + { + desc: "No preemption because existing allocs are not low priority", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 3200, + MemoryMB: 7256, + DiskMB: 4 * 1024, + IOPS: 150, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 50, + }, + }, + })}, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: defaultNodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 2000, + MemoryMB: 256, + DiskMB: 4 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "ssh", Value: 22}}, + MBits: 1, + }, + }, + }, + }, + { + desc: "Preempting low priority allocs not enough to meet resource ask", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], lowPrioJob, &structs.Resources{ + CPU: 3200, + MemoryMB: 7256, + DiskMB: 4 * 1024, + IOPS: 150, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 50, + }, + }, + })}, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: defaultNodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 4000, + MemoryMB: 8192, + DiskMB: 4 * 1024, + IOPS: 300, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "ssh", Value: 22}}, + MBits: 1, + }, + }, + }, + }, + { + desc: "preemption impossible - static port needed is used by higher priority alloc", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 1200, + MemoryMB: 2256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 150, + }, + }, + }), + createAlloc(allocIDs[1], highPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.200", + MBits: 600, + ReservedPorts: []structs.Port{ + { + Label: "db", + Value: 88, + }, + }, + }, + }, + }), + }, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: defaultNodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 600, + MemoryMB: 1000, + DiskMB: 25 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 700, + ReservedPorts: []structs.Port{ + { + Label: "db", + Value: 88, + }, + }, + }, + }, + }, + }, + { + desc: "preempt only from device that has allocation with unused reserved port", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 1200, + MemoryMB: 2256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 150, + }, + }, + }), + createAlloc(allocIDs[1], highPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth1", + IP: "192.168.0.200", + MBits: 600, + ReservedPorts: []structs.Port{ + { + Label: "db", + Value: 88, + }, + }, + }, + }, + }), + createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.200", + MBits: 600, + }, + }, + }), + }, + nodeReservedCapacity: reservedNodeResources, + // This test sets up a node with two NICs + nodeCapacity: &structs.Resources{ + CPU: 4000, + MemoryMB: 8192, + DiskMB: 100 * 1024, + IOPS: 150, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, + }, + { + Device: "eth1", + CIDR: "192.168.1.100/32", + MBits: 1000, + }, + }, + }, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 600, + MemoryMB: 1000, + DiskMB: 25 * 1024, + Networks: []*structs.NetworkResource{ + { + IP: "192.168.0.100", + MBits: 700, + ReservedPorts: []structs.Port{ + { + Label: "db", + Value: 88, + }, + }, + }, + }, + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[2]: {}, + }, + }, + { + desc: "Combination of high/low priority allocs, without static ports", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 2800, + MemoryMB: 2256, + DiskMB: 4 * 1024, + IOPS: 150, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 150, + }, + }, + }), + createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.200", + MBits: 500, + }, + }, + }), + createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 300, + }, + }, + }), + createAlloc(allocIDs[3], lowPrioJob, &structs.Resources{ + CPU: 700, + MemoryMB: 256, + DiskMB: 4 * 1024, + }), + }, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: defaultNodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 1100, + MemoryMB: 1000, + DiskMB: 25 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 840, + }, + }, + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[1]: {}, + allocIDs[2]: {}, + allocIDs[3]: {}, + }, + }, + { + desc: "Preemption needed for all resources except network", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 2800, + MemoryMB: 2256, + DiskMB: 40 * 1024, + IOPS: 100, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 150, + }, + }, + }), + createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.200", + MBits: 50, + }, + }, + }), + createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 512, + DiskMB: 25 * 1024, + }), + createAlloc(allocIDs[3], lowPrioJob, &structs.Resources{ + CPU: 700, + MemoryMB: 276, + DiskMB: 20 * 1024, + }), + }, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: defaultNodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 1000, + MemoryMB: 3000, + DiskMB: 50 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 50, + }, + }, + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[1]: {}, + allocIDs[2]: {}, + allocIDs[3]: {}, + }, + }, + { + desc: "Only one low priority alloc needs to be preempted", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 1200, + MemoryMB: 2256, + DiskMB: 4 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 150, + }, + }, + }), + createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 500, + }, + }, + }), + createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.200", + MBits: 320, + }, + }, + }), + }, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: defaultNodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 300, + MemoryMB: 500, + DiskMB: 5 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 320, + }, + }, + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[2]: {}, + }, + }, + { + desc: "one alloc meets static port need, another meets remaining mbits needed", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 1200, + MemoryMB: 2256, + DiskMB: 4 * 1024, + IOPS: 150, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 150, + }, + }, + }), + createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.200", + MBits: 500, + ReservedPorts: []structs.Port{ + { + Label: "db", + Value: 88, + }, + }, + }, + }, + }), + createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 200, + }, + }, + }), + }, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: defaultNodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 2700, + MemoryMB: 1000, + DiskMB: 25 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 800, + ReservedPorts: []structs.Port{ + { + Label: "db", + Value: 88, + }, + }, + }, + }, + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[1]: {}, + allocIDs[2]: {}, + }, + }, + { + desc: "alloc that meets static port need also meets other needs", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 1200, + MemoryMB: 2256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 150, + }, + }, + }), + createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.200", + MBits: 600, + ReservedPorts: []structs.Port{ + { + Label: "db", + Value: 88, + }, + }, + }, + }, + }), + createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 100, + }, + }, + }), + }, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: defaultNodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 600, + MemoryMB: 1000, + DiskMB: 25 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 700, + ReservedPorts: []structs.Port{ + { + Label: "db", + Value: 88, + }, + }, + }, + }, + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[1]: {}, + }, + }, + { + desc: "alloc from job that has existing evictions not chosen for preemption", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 1200, + MemoryMB: 2256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 150, + }, + }, + }), + createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 10, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.200", + MBits: 500, + }, + }, + }), + createAlloc(allocIDs[2], lowPrioJob2, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 10, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 300, + }, + }, + }), + }, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: defaultNodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 300, + MemoryMB: 500, + DiskMB: 5 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 320, + }, + }, + }, + currentPreemptions: []*structs.Allocation{ + createAlloc(allocIDs[4], lowPrioJob2, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 10, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 300, + }, + }, + }), + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[1]: {}, + }, + }, + // This test case exercises the code path for a final filtering step that tries to + // minimize the number of preemptible allocations + { + desc: "Filter out allocs whose resource usage superset is also in the preemption list", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 1800, + MemoryMB: 2256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 150, + }, + }, + }), + createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{ + CPU: 1500, + MemoryMB: 256, + DiskMB: 5 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 100, + }, + }, + }), + createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{ + CPU: 600, + MemoryMB: 256, + DiskMB: 5 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.200", + MBits: 300, + }, + }, + }), + }, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: defaultNodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 1000, + MemoryMB: 256, + DiskMB: 5 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 50, + }, + }, + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[1]: {}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + node := mock.Node() + node.Resources = tc.nodeCapacity + node.Reserved = tc.nodeReservedCapacity + node.NodeResources = nil + + state, ctx := testContext(t) + nodes := []*RankedNode{ + { + Node: node, + }, + } + state.UpsertNode(1000, node) + for _, alloc := range tc.currentAllocations { + alloc.NodeID = node.ID + } + require := require.New(t) + err := state.UpsertAllocs(1001, tc.currentAllocations) + + require.Nil(err) + if tc.currentPreemptions != nil { + ctx.plan.NodePreemptions[node.ID] = tc.currentPreemptions + } + static := NewStaticRankIterator(ctx, nodes) + binPackIter := NewBinPackIterator(ctx, static, true, tc.jobPriority) + + taskGroup := &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: tc.resourceAsk, + }, + }, + } + + binPackIter.SetTaskGroup(taskGroup) + option := binPackIter.Next() + if tc.preemptedAllocIDs == nil { + require.Nil(option) + } else { + require.NotNil(option) + preemptedAllocs := option.PreemptedAllocs + require.Equal(len(tc.preemptedAllocIDs), len(preemptedAllocs)) + for _, alloc := range preemptedAllocs { + _, ok := tc.preemptedAllocIDs[alloc.ID] + require.True(ok) + } + } + }) + } +} + +// helper method to create allocations with given jobs and resources +func createAlloc(id string, job *structs.Job, resource *structs.Resources) *structs.Allocation { + alloc := &structs.Allocation{ + ID: id, + Job: job, + JobID: job.ID, + TaskResources: map[string]*structs.Resources{ + "web": resource, + }, + Resources: resource, + Namespace: structs.DefaultNamespace, + EvalID: uuid.Generate(), + DesiredStatus: structs.AllocDesiredStatusRun, + ClientStatus: structs.AllocClientStatusRunning, + TaskGroup: "web", + } + return alloc +} diff --git a/scheduler/rank.go b/scheduler/rank.go index d2e70a4e680..402dd9c7081 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -25,6 +25,10 @@ type RankedNode struct { // Allocs is used to cache the proposed allocations on the // node. This can be shared between iterators that require it. Proposed []*structs.Allocation + + // PreemptedAllocs is used by the BinpackIterator to identify allocs + // that should be preempted in order to make the placement + PreemptedAllocs []*structs.Allocation } func (r *RankedNode) GoString() string { @@ -195,6 +199,21 @@ OUTER: DiskMB: int64(iter.taskGroup.EphemeralDisk.SizeMB), }, } + + var allocsToPreempt []*structs.Allocation + + // Initialize preemptor with node + preemptor := NewPreemptor(iter.priority) + preemptor.SetNode(option.Node) + + // Count the number of existing preemptions + allPreemptions := iter.ctx.Plan().NodePreemptions + var currentPreemptions []*structs.Allocation + for _, allocs := range allPreemptions { + currentPreemptions = append(currentPreemptions, allocs...) + } + preemptor.SetPreemptions(currentPreemptions) + for _, task := range iter.taskGroup.Tasks { // Allocate the resources taskResources := &structs.AllocatedTaskResources{ @@ -211,10 +230,41 @@ OUTER: ask := task.Resources.Networks[0].Copy() offer, err := netIdx.AssignNetwork(ask) if offer == nil { - iter.ctx.Metrics().ExhaustedNode(option.Node, - fmt.Sprintf("network: %s", err)) + // If eviction is not enabled, mark this node as exhausted and continue + if !iter.evict { + iter.ctx.Metrics().ExhaustedNode(option.Node, + fmt.Sprintf("network: %s", err)) + netIdx.Release() + continue OUTER + } + + // Look for preemptible allocations to satisfy the network resource for this task + preemptor.SetCandidates(proposed) + + netPreemptions := preemptor.PreemptForNetwork(ask, netIdx) + if netPreemptions == nil { + iter.ctx.Metrics().ExhaustedNode(option.Node, + fmt.Sprintf("unable to meet network resource %v after preemption", ask)) + netIdx.Release() + continue OUTER + } + allocsToPreempt = append(allocsToPreempt, netPreemptions...) + + // First subtract out preempted allocations + proposed = structs.RemoveAllocs(proposed, netPreemptions) + + // Reset the network index and try the offer again netIdx.Release() - continue OUTER + netIdx = structs.NewNetworkIndex() + netIdx.SetNode(option.Node) + netIdx.AddAllocs(proposed) + + offer, err = netIdx.AssignNetwork(ask) + if offer == nil { + iter.ctx.Logger().Error(fmt.Sprintf("unexpected error, unable to create offer after preempting:%v", err)) + netIdx.Release() + continue OUTER + } } // Reserve this to prevent another task from colliding @@ -231,21 +281,41 @@ OUTER: total.Tasks[task.Name] = taskResources } + // Store current set of running allocs before adding resources for the task group + current := proposed + // Add the resources we are trying to fit proposed = append(proposed, &structs.Allocation{AllocatedResources: total}) - // Check if these allocations fit, if they do not, simply skip this node + // Check if these allocations fit fit, dim, util, _ := structs.AllocsFit(option.Node, proposed, netIdx) netIdx.Release() if !fit { - iter.ctx.Metrics().ExhaustedNode(option.Node, dim) - continue - } + // Skip the node if evictions are not enabled + if !iter.evict { + iter.ctx.Metrics().ExhaustedNode(option.Node, dim) + continue + } + + // If eviction is enabled and the node doesn't fit the alloc, check if + // any allocs can be preempted + + // Initialize preemptor with candidate set + preemptor.SetCandidates(current) - // XXX: For now we completely ignore evictions. We should use that flag - // to determine if its possible to evict other lower priority allocations - // to make room. This explodes the search space, so it must be done - // carefully. + preemptedAllocs := preemptor.PreemptForTaskGroup(total) + allocsToPreempt = append(allocsToPreempt, preemptedAllocs...) + + // If we were unable to find preempted allocs to meet these requirements + // mark as exhausted and continue + if len(preemptedAllocs) == 0 { + iter.ctx.Metrics().ExhaustedNode(option.Node, dim) + continue + } + } + if len(allocsToPreempt) > 0 { + option.PreemptedAllocs = allocsToPreempt + } // Score the fit normally otherwise fitness := structs.ScoreFit(option.Node, util) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index b9f0f5e3aa7..bd921a788d1 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -88,6 +88,9 @@ type State interface { // LatestDeploymentByJobID returns the latest deployment matching the given // job ID LatestDeploymentByJobID(ws memdb.WatchSet, namespace, jobID string) (*structs.Deployment, error) + + // SchedulerConfig returns config options for the scheduler + SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) } // Planner interface is used to submit a task allocation plan. diff --git a/scheduler/stack.go b/scheduler/stack.go index 6be27c5e203..48f89f81fdb 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -109,10 +109,9 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { rankSource := NewFeasibleRankIterator(ctx, s.distinctPropertyConstraint) // Apply the bin packing, this depends on the resources needed - // by a particular task group. Only enable eviction for the service - // scheduler as that logic is expensive. - evict := !batch - s.binPack = NewBinPackIterator(ctx, rankSource, evict, 0) + // by a particular task group. + + s.binPack = NewBinPackIterator(ctx, rankSource, false, 0) // Apply the job anti-affinity iterator. This is to avoid placing // multiple allocations on the same node for this job. @@ -287,7 +286,12 @@ func NewSystemStack(ctx Context) *SystemStack { // Apply the bin packing, this depends on the resources needed // by a particular task group. Enable eviction as system jobs are high // priority. - s.binPack = NewBinPackIterator(ctx, rankSource, true, 0) + _, schedConfig, _ := s.ctx.State().SchedulerConfig() + enablePreemption := true + if schedConfig != nil { + enablePreemption = schedConfig.PreemptionConfig.SystemSchedulerEnabled + } + s.binPack = NewBinPackIterator(ctx, rankSource, enablePreemption, 0) // Apply score normalization s.scoreNorm = NewScoreNormalizationIterator(ctx, s.binPack) diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 60202eb687c..e3f4015fbd1 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -60,7 +60,7 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { // Verify the evaluation trigger reason is understood switch eval.TriggeredBy { case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, structs.EvalTriggerFailedFollowUp, - structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate, + structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate, structs.EvalTriggerPreemption, structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain: default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", @@ -347,6 +347,24 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { alloc.PreviousAllocation = missing.Alloc.ID } + // If this placement involves preemption, set DesiredState to evict for those allocations + if option.PreemptedAllocs != nil { + var preemptedAllocIDs []string + for _, stop := range option.PreemptedAllocs { + s.plan.AppendPreemptedAlloc(stop, structs.AllocDesiredStatusEvict, alloc.ID) + + preemptedAllocIDs = append(preemptedAllocIDs, stop.ID) + if s.eval.AnnotatePlan && s.plan.Annotations != nil { + s.plan.Annotations.PreemptedAllocs = append(s.plan.Annotations.PreemptedAllocs, stop.Stub()) + if s.plan.Annotations.DesiredTGUpdates != nil { + desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup.Name] + desired.Preemptions += 1 + } + } + } + alloc.PreemptedAllocations = preemptedAllocIDs + } + s.plan.AppendAlloc(alloc) } else { // Lazy initialize the failed map diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 3d78b706136..27d99b20547 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -6,11 +6,14 @@ import ( "testing" "time" + "fmt" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" ) func TestSystemSched_JobRegister(t *testing.T) { @@ -218,7 +221,7 @@ func TestSystemSched_JobRegister_EphemeralDiskConstraint(t *testing.T) { JobID: job1.ID, Status: structs.EvalStatusPending, } - noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval1})) // Process the evaluation if err := h1.Process(NewSystemScheduler, eval1); err != nil { @@ -239,6 +242,13 @@ func TestSystemSched_ExhaustResources(t *testing.T) { node := mock.Node() noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + // Enable Preemption + h.State.SchedulerSetConfig(h.NextIndex(), &structs.SchedulerConfiguration{ + PreemptionConfig: structs.PreemptionConfig{ + SystemSchedulerEnabled: true, + }, + }) + // Create a service job which consumes most of the system resources svcJob := mock.Job() svcJob.TaskGroups[0].Count = 1 @@ -274,15 +284,31 @@ func TestSystemSched_ExhaustResources(t *testing.T) { JobID: job.ID, Status: structs.EvalStatusPending, } - noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval1})) // Process the evaluation if err := h.Process(NewSystemScheduler, eval1); err != nil { t.Fatalf("err: %v", err) } - // Ensure that we have one allocation queued from the system job eval + // System scheduler will preempt the service job and would have placed eval1 + require := require.New(t) + + newPlan := h.Plans[1] + require.Len(newPlan.NodeAllocation, 1) + require.Len(newPlan.NodePreemptions, 1) + + for _, allocList := range newPlan.NodeAllocation { + require.Len(allocList, 1) + require.Equal(job.ID, allocList[0].JobID) + } + + for _, allocList := range newPlan.NodePreemptions { + require.Len(allocList, 1) + require.Equal(svcJob.ID, allocList[0].JobID) + } + // Ensure that we have no queued allocations on the second eval queued := h.Evals[1].QueuedAllocations["web"] - if queued != 1 { + if queued != 0 { t.Fatalf("expected: %v, actual: %v", 1, queued) } } @@ -1529,3 +1555,332 @@ func TestSystemSched_QueuedAllocsMultTG(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } + +func TestSystemSched_Preemption(t *testing.T) { + h := NewHarness(t) + + // Create nodes + var nodes []*structs.Node + for i := 0; i < 2; i++ { + node := mock.Node() + //TODO(preetha): remove in 0.11 + node.Resources = &structs.Resources{ + CPU: 3072, + MemoryMB: 5034, + DiskMB: 20 * 1024, + IOPS: 150, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, + }, + }, + } + node.NodeResources = &structs.NodeResources{ + Cpu: structs.NodeCpuResources{ + CpuShares: 3072, + }, + Memory: structs.NodeMemoryResources{ + MemoryMB: 5034, + }, + Disk: structs.NodeDiskResources{ + DiskMB: 20 * 1024, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, + }, + }, + } + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + nodes = append(nodes, node) + } + + // Enable Preemption + h.State.SchedulerSetConfig(h.NextIndex(), &structs.SchedulerConfiguration{ + PreemptionConfig: structs.PreemptionConfig{ + SystemSchedulerEnabled: true, + }, + }) + + // Create some low priority batch jobs and allocations for them + // One job uses a reserved port + job1 := mock.BatchJob() + job1.Type = structs.JobTypeBatch + job1.Priority = 20 + job1.TaskGroups[0].Tasks[0].Resources = &structs.Resources{ + CPU: 512, + MemoryMB: 1024, + Networks: []*structs.NetworkResource{ + { + MBits: 200, + ReservedPorts: []structs.Port{ + { + Label: "web", + Value: 80, + }, + }, + }, + }, + } + + alloc1 := mock.Alloc() + alloc1.Job = job1 + alloc1.JobID = job1.ID + alloc1.NodeID = nodes[0].ID + alloc1.Name = "my-job[0]" + alloc1.TaskGroup = job1.TaskGroups[0].Name + alloc1.AllocatedResources = &structs.AllocatedResources{ + Tasks: map[string]*structs.AllocatedTaskResources{ + "web": { + Cpu: structs.AllocatedCpuResources{ + CpuShares: 512, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 1024, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "web", Value: 80}}, + MBits: 200, + }, + }, + }, + }, + Shared: structs.AllocatedSharedResources{ + DiskMB: 5 * 1024, + }, + } + + noErr(t, h.State.UpsertJob(h.NextIndex(), job1)) + + job2 := mock.BatchJob() + job2.Type = structs.JobTypeBatch + job2.Priority = 20 + job2.TaskGroups[0].Tasks[0].Resources = &structs.Resources{ + CPU: 512, + MemoryMB: 1024, + Networks: []*structs.NetworkResource{ + { + MBits: 200, + }, + }, + } + + alloc2 := mock.Alloc() + alloc2.Job = job2 + alloc2.JobID = job2.ID + alloc2.NodeID = nodes[0].ID + alloc2.Name = "my-job[2]" + alloc2.TaskGroup = job2.TaskGroups[0].Name + alloc2.AllocatedResources = &structs.AllocatedResources{ + Tasks: map[string]*structs.AllocatedTaskResources{ + "web": { + Cpu: structs.AllocatedCpuResources{ + CpuShares: 512, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 1024, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 200, + }, + }, + }, + }, + Shared: structs.AllocatedSharedResources{ + DiskMB: 5 * 1024, + }, + } + noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) + + job3 := mock.Job() + job3.Type = structs.JobTypeBatch + job3.Priority = 40 + job3.TaskGroups[0].Tasks[0].Resources = &structs.Resources{ + CPU: 1024, + MemoryMB: 2048, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 400, + }, + }, + } + + alloc3 := mock.Alloc() + alloc3.Job = job3 + alloc3.JobID = job3.ID + alloc3.NodeID = nodes[0].ID + alloc3.Name = "my-job[0]" + alloc3.TaskGroup = job3.TaskGroups[0].Name + alloc3.AllocatedResources = &structs.AllocatedResources{ + Tasks: map[string]*structs.AllocatedTaskResources{ + "web": { + Cpu: structs.AllocatedCpuResources{ + CpuShares: 1024, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 25, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "web", Value: 80}}, + MBits: 400, + }, + }, + }, + }, + Shared: structs.AllocatedSharedResources{ + DiskMB: 5 * 1024, + }, + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc1, alloc2, alloc3})) + + // Create a high priority job and allocs for it + // These allocs should not be preempted + + job4 := mock.BatchJob() + job4.Type = structs.JobTypeBatch + job4.Priority = 100 + job4.TaskGroups[0].Tasks[0].Resources = &structs.Resources{ + CPU: 1024, + MemoryMB: 2048, + Networks: []*structs.NetworkResource{ + { + MBits: 100, + }, + }, + } + + alloc4 := mock.Alloc() + alloc4.Job = job4 + alloc4.JobID = job4.ID + alloc4.NodeID = nodes[0].ID + alloc4.Name = "my-job4[0]" + alloc4.TaskGroup = job4.TaskGroups[0].Name + alloc4.AllocatedResources = &structs.AllocatedResources{ + Tasks: map[string]*structs.AllocatedTaskResources{ + "web": { + Cpu: structs.AllocatedCpuResources{ + CpuShares: 1024, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 2048, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "web", Value: 80}}, + MBits: 100, + }, + }, + }, + }, + Shared: structs.AllocatedSharedResources{ + DiskMB: 2 * 1024, + }, + } + noErr(t, h.State.UpsertJob(h.NextIndex(), job4)) + noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc4})) + + // Create a system job such that it would need to preempt both allocs to succeed + job := mock.SystemJob() + job.TaskGroups[0].Tasks[0].Resources = &structs.Resources{ + CPU: 1948, + MemoryMB: 256, + Networks: []*structs.NetworkResource{ + { + MBits: 800, + DynamicPorts: []structs.Port{{Label: "http"}}, + }, + }, + } + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + require := require.New(t) + require.Nil(err) + + // Ensure a single plan + require.Equal(1, len(h.Plans)) + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + require.Nil(plan.Annotations) + + // Ensure the plan allocated on both nodes + var planned []*structs.Allocation + preemptingAllocId := "" + require.Equal(2, len(plan.NodeAllocation)) + + // The alloc that got placed on node 1 is the preemptor + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + for _, alloc := range allocList { + if alloc.NodeID == nodes[0].ID { + preemptingAllocId = alloc.ID + } + } + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + noErr(t, err) + + // Ensure all allocations placed + require.Equal(2, len(out)) + + // Verify that one node has preempted allocs + require.NotNil(plan.NodePreemptions[nodes[0].ID]) + preemptedAllocs := plan.NodePreemptions[nodes[0].ID] + + // Verify that three jobs have preempted allocs + require.Equal(3, len(preemptedAllocs)) + + expectedPreemptedJobIDs := []string{job1.ID, job2.ID, job3.ID} + + // We expect job1, job2 and job3 to have preempted allocations + // job4 should not have any allocs preempted + for _, alloc := range preemptedAllocs { + require.Contains(expectedPreemptedJobIDs, alloc.JobID) + } + // Look up the preempted allocs by job ID + ws = memdb.NewWatchSet() + + for _, jobId := range expectedPreemptedJobIDs { + out, err = h.State.AllocsByJob(ws, structs.DefaultNamespace, jobId, false) + noErr(t, err) + for _, alloc := range out { + require.Equal(structs.AllocDesiredStatusEvict, alloc.DesiredStatus) + require.Equal(fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocId), alloc.DesiredDescription) + } + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) + +} diff --git a/scheduler/testing.go b/scheduler/testing.go index 0410b1190d7..31fc9ab91af 100644 --- a/scheduler/testing.go +++ b/scheduler/testing.go @@ -96,6 +96,7 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er result := new(structs.PlanResult) result.NodeUpdate = plan.NodeUpdate result.NodeAllocation = plan.NodeAllocation + result.NodePreemptions = plan.NodePreemptions result.AllocIndex = index // Flatten evicts and allocs @@ -116,6 +117,15 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er } } + // Set modify time for preempted allocs and flatten them + var preemptedAllocs []*structs.Allocation + for _, preemptions := range result.NodePreemptions { + for _, alloc := range preemptions { + alloc.ModifyTime = now + preemptedAllocs = append(preemptedAllocs, alloc) + } + } + // Setup the update request req := structs.ApplyPlanResultsRequest{ AllocUpdateRequest: structs.AllocUpdateRequest{ @@ -125,6 +135,7 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er Deployment: plan.Deployment, DeploymentUpdates: plan.DeploymentUpdates, EvalID: plan.EvalID, + NodePreemptions: preemptedAllocs, } // Apply the full plan