Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preemption for system jobs #4794

Merged
merged 38 commits into from
Nov 2, 2018
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
5f27e00
structs and API changes to plan and alloc structs needed for preemption
Sep 10, 2018
8004160
REview feedback
Sep 10, 2018
bf7192c
Add number of evictions to DesiredUpdates struct to use in CLI/API
Sep 11, 2018
fc266f7
structs and API changes to plan and alloc structs needed for preemption
Sep 10, 2018
715d869
Implement preemption for system jobs.
Sep 21, 2018
b5cbd73
Fix comment
Sep 24, 2018
13e314c
Fix logic bug, unit test for plan apply method in state store
Sep 24, 2018
fd6bff2
Fix linting and better comments
Sep 24, 2018
51c5bae
Fix linting
Sep 27, 2018
784b96c
Support for new scheduler config API, first use case is to disable pr…
Sep 28, 2018
2143fa2
Use scheduler config from state store to enable/disable preemption
Sep 28, 2018
6966e3c
Make preemption config a struct to allow for enabling based on schedu…
Oct 1, 2018
24b3934
Modify preemption code to use new style of resource structs
Oct 16, 2018
9f35923
fix end to end scheduler test to use new resource structs correctly
Oct 17, 2018
a960cce
comments
Oct 17, 2018
655689a
Preempted allocations should be removed from proposed allocations
Oct 17, 2018
c4e0e66
Restore/Snapshot plus unit tests for scheduler configuration
Oct 18, 2018
c3b8e4f
Add fsm layer tests
Oct 18, 2018
12af2ae
make default config a variable
Oct 18, 2018
c4a04eb
style fixes
Oct 18, 2018
191b862
More review comments
Oct 18, 2018
21432d6
More style and readablity fixes from review
Oct 18, 2018
4cc21fb
more minor cleanup
Oct 18, 2018
35635ba
refactor preemption code to use method recievers and setters for comm…
Oct 29, 2018
8800585
Introduce a response object for scheduler configuration
Oct 29, 2018
17344a7
Introduce interface with multiple implementations for resource distance
Oct 30, 2018
f2b0277
Fix return type in tests after refactor
Oct 30, 2018
22d156f
review comments
Nov 1, 2018
993b6a2
Cleaner way to exit early, and fixed a couple more places reading fro…
Nov 1, 2018
3ad7b3f
More review comments
Nov 1, 2018
06ad182
Plumb alloc resource cache in a few more places.
Nov 1, 2018
35d31f8
more minor review feedback
Nov 1, 2018
b3738a0
unit test plan apply with preemptions
Nov 2, 2018
0015095
Handle static port preemption when there are multiple devices
Nov 2, 2018
8235919
Fix static port preemption to be device aware
Nov 2, 2018
1380acb
dereference safely
Nov 2, 2018
c49a3e2
Fix test setup
Nov 2, 2018
97cf4e1
Address more minor code review feedback
Nov 2, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 32 additions & 30 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,36 +67,38 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error {

// Allocation is used for serialization of allocations.
type Allocation struct {
ID string
Namespace string
EvalID string
Name string
NodeID string
JobID string
Job *Job
TaskGroup string
Resources *Resources
TaskResources map[string]*Resources
AllocatedResources *AllocatedResources
Services map[string]string
Metrics *AllocationMetric
DesiredStatus string
DesiredDescription string
DesiredTransition DesiredTransition
ClientStatus string
ClientDescription string
TaskStates map[string]*TaskState
DeploymentID string
DeploymentStatus *AllocDeploymentStatus
FollowupEvalID string
PreviousAllocation string
NextAllocation string
RescheduleTracker *RescheduleTracker
CreateIndex uint64
ModifyIndex uint64
AllocModifyIndex uint64
CreateTime int64
ModifyTime int64
ID string
Namespace string
EvalID string
Name string
NodeID string
JobID string
Job *Job
TaskGroup string
Resources *Resources
TaskResources map[string]*Resources
AllocatedResources *AllocatedResources
Services map[string]string
Metrics *AllocationMetric
DesiredStatus string
DesiredDescription string
DesiredTransition DesiredTransition
ClientStatus string
ClientDescription string
TaskStates map[string]*TaskState
DeploymentID string
DeploymentStatus *AllocDeploymentStatus
FollowupEvalID string
PreviousAllocation string
NextAllocation string
RescheduleTracker *RescheduleTracker
PreemptedAllocations []string
PreemptedByAllocation string
CreateIndex uint64
ModifyIndex uint64
AllocModifyIndex uint64
CreateTime int64
ModifyTime int64
}

// AllocationMetric is used to deserialize allocation metrics.
Expand Down
2 changes: 2 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ type ObjectDiff struct {

type PlanAnnotations struct {
DesiredTGUpdates map[string]*DesiredUpdates
PreemptedAllocs []*AllocationListStub
}

type DesiredUpdates struct {
Expand All @@ -1023,6 +1024,7 @@ type DesiredUpdates struct {
InPlaceUpdate uint64
DestructiveUpdate uint64
Canary uint64
Preemptions uint64
}

type JobDispatchRequest struct {
Expand Down
60 changes: 60 additions & 0 deletions api/operator.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package api

import "strconv"

// Operator can be used to perform low-level operator tasks for Nomad.
type Operator struct {
c *Client
Expand Down Expand Up @@ -106,3 +108,61 @@ func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error {
resp.Body.Close()
return nil
}

type SchedulerConfiguration struct {
// PreemptionConfig specifies whether to enable eviction of lower
// priority jobs to place higher priority jobs.
PreemptionConfig PreemptionConfig

// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64
}

// SchedulerConfigurationResponse is the response object that wraps SchedulerConfiguration
type SchedulerConfigurationResponse struct {
// SchedulerConfig contains scheduler config options
SchedulerConfig SchedulerConfiguration

// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ModifyIndex uint64
}

// PreemptionConfig specifies whether preemption is enabled based on scheduler type
type PreemptionConfig struct {
SystemSchedulerEnabled bool
}

// SchedulerGetConfiguration is used to query the current Scheduler configuration.
func (op *Operator) SchedulerGetConfiguration(q *QueryOptions) (*SchedulerConfigurationResponse, *QueryMeta, error) {
var resp SchedulerConfigurationResponse
qm, err := op.c.query("/v1/operator/scheduler/config", &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}

// SchedulerSetConfiguration is used to set the current Scheduler configuration.
func (op *Operator) SchedulerSetConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (*WriteMeta, error) {
var out bool
wm, err := op.c.write("/v1/operator/scheduler/config", conf, &out, q)
if err != nil {
return nil, err
}
return wm, nil
}

// SchedulerCASConfiguration is used to perform a Check-And-Set update on the
// Scheduler configuration. The ModifyIndex value will be respected. Returns
// true on success or false on failures.
func (op *Operator) SchedulerCASConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (bool, *WriteMeta, error) {
var out bool
wm, err := op.c.write("/v1/operator/scheduler/config?cas="+strconv.FormatUint(conf.ModifyIndex, 10), conf, &out, q)
if err != nil {
return false, nil, err
}

return out, wm, nil
}
66 changes: 66 additions & 0 deletions api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package api
import (
"strings"
"testing"

"github.com/hashicorp/consul/testutil/retry"
"github.com/stretchr/testify/require"
)

func TestOperator_RaftGetConfiguration(t *testing.T) {
Expand Down Expand Up @@ -51,3 +54,66 @@ func TestOperator_RaftRemovePeerByID(t *testing.T) {
t.Fatalf("err: %v", err)
}
}

func TestAPI_OperatorSchedulerGetSetConfiguration(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()

operator := c.Operator()
var config *SchedulerConfigurationResponse
retry.Run(t, func(r *retry.R) {
var err error
config, _, err = operator.SchedulerGetConfiguration(nil)
r.Check(err)
})
require.True(config.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)

// Change a config setting
newConf := &SchedulerConfiguration{PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false}}
_, err := operator.SchedulerSetConfiguration(newConf, nil)
require.Nil(err)

config, _, err = operator.SchedulerGetConfiguration(nil)
require.Nil(err)
require.False(config.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
}

func TestAPI_OperatorSchedulerCASConfiguration(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()

operator := c.Operator()
var config *SchedulerConfigurationResponse
retry.Run(t, func(r *retry.R) {
var err error
config, _, err = operator.SchedulerGetConfiguration(nil)
r.Check(err)
})
require.True(config.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)

// Pass an invalid ModifyIndex
{
newConf := &SchedulerConfiguration{
PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false},
ModifyIndex: config.ModifyIndex - 1,
}
resp, _, err := operator.SchedulerCASConfiguration(newConf, nil)
require.Nil(err)
require.False(resp)
}

// Pass a valid ModifyIndex
{
newConf := &SchedulerConfiguration{
PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false},
ModifyIndex: config.ModifyIndex,
}
resp, _, err := operator.SchedulerCASConfiguration(newConf, nil)
require.Nil(err)
require.True(resp)
}
}
2 changes: 2 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest))
s.mux.HandleFunc("/v1/system/reconcile/summaries", s.wrap(s.ReconcileJobSummaries))

s.mux.HandleFunc("/v1/operator/scheduler/config", s.wrap(s.OperatorSchedulerConfiguration))

if uiEnabled {
s.mux.Handle("/ui/", http.StripPrefix("/ui/", handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()}))))
} else {
Expand Down
70 changes: 70 additions & 0 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,73 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re

return out, nil
}

// OperatorSchedulerConfiguration is used to inspect the current Scheduler configuration.
// This supports the stale query mode in case the cluster doesn't have a leader.
func (s *HTTPServer) OperatorSchedulerConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Switch on the method
switch req.Method {
case "GET":
var args structs.GenericRequest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally split the methods up so that it only handles one type of HTTP verb: https://github.com/hashicorp/nomad/blob/master/command/agent/job_endpoint.go#L15

if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done {
return nil, nil
}

var reply structs.SchedulerConfigurationResponse
if err := s.agent.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
return nil, err
}

out := api.SchedulerConfiguration{
PreemptionConfig: api.PreemptionConfig{SystemSchedulerEnabled: reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled},
CreateIndex: reply.CreateIndex,
ModifyIndex: reply.ModifyIndex,
}

resp := api.SchedulerConfigurationResponse{
SchedulerConfig: out,
CreateIndex: out.CreateIndex,
ModifyIndex: out.ModifyIndex,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

return resp, nil

case "PUT":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We accept POST and PUT

var args structs.SchedulerSetConfigRequest
s.parseWriteRequest(req, &args.WriteRequest)

var conf api.SchedulerConfiguration
if err := decodeBody(req, &conf); err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing scheduler config: %v", err))
}

args.Config = structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled},
}

// Check for cas value
params := req.URL.Query()
if _, ok := params["cas"]; ok {
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
if err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing cas value: %v", err))
}
args.Config.ModifyIndex = casVal
args.CAS = true
}

var reply bool
if err := s.agent.RPC("Operator.SchedulerSetConfiguration", &args, &reply); err != nil {
return nil, err
}

// Only use the out value if this was a CAS
if !args.CAS {
return true, nil
}
return reply, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


default:
return nil, CodedError(404, ErrInvalidMethod)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

404 -> 405

}
}
Loading