Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement preemption for system jobs. #4710

Merged
merged 10 commits into from
Oct 15, 2018
50 changes: 50 additions & 0 deletions api/operator.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package api

import "strconv"

// Operator can be used to perform low-level operator tasks for Nomad.
type Operator struct {
c *Client
Expand Down Expand Up @@ -106,3 +108,51 @@ func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error {
resp.Body.Close()
return nil
}

type SchedulerConfiguration struct {
// PreemptionConfig specifies whether to enable eviction of lower
// priority jobs to place higher priority jobs.
PreemptionConfig PreemptionConfig

// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64
}

// PreemptionConfig specifies whether preemption is enabled based on scheduler type
type PreemptionConfig struct {
SystemSchedulerEnabled bool
}

// SchedulerGetConfiguration is used to query the current Scheduler configuration.
func (op *Operator) SchedulerGetConfiguration(q *QueryOptions) (*SchedulerConfiguration, *QueryMeta, error) {
var resp SchedulerConfiguration
qm, err := op.c.query("/v1/operator/scheduler/config", &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}

// SchedulerSetConfiguration is used to set the current Scheduler configuration.
func (op *Operator) SchedulerSetConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (*WriteMeta, error) {
var out bool
wm, err := op.c.write("/v1/operator/scheduler/config", conf, &out, q)
if err != nil {
return nil, err
}
return wm, nil
}

// SchedulerCASConfiguration is used to perform a Check-And-Set update on the
// Scheduler configuration. The ModifyIndex value will be respected. Returns
// true on success or false on failures.
func (op *Operator) SchedulerCASConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (bool, *WriteMeta, error) {
var out bool
wm, err := op.c.write("/v1/operator/scheduler/config?cas="+strconv.FormatUint(conf.ModifyIndex, 10), conf, &out, q)
if err != nil {
return false, nil, err
}

return out, wm, nil
}
66 changes: 66 additions & 0 deletions api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package api
import (
"strings"
"testing"

"github.com/hashicorp/consul/testutil/retry"
"github.com/stretchr/testify/require"
)

func TestOperator_RaftGetConfiguration(t *testing.T) {
Expand Down Expand Up @@ -51,3 +54,66 @@ func TestOperator_RaftRemovePeerByID(t *testing.T) {
t.Fatalf("err: %v", err)
}
}

func TestAPI_OperatorSchedulerGetSetConfiguration(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()

operator := c.Operator()
var config *SchedulerConfiguration
retry.Run(t, func(r *retry.R) {
var err error
config, _, err = operator.SchedulerGetConfiguration(nil)
r.Check(err)
})
require.True(config.PreemptionConfig.SystemSchedulerEnabled)

// Change a config setting
newConf := &SchedulerConfiguration{PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false}}
_, err := operator.SchedulerSetConfiguration(newConf, nil)
require.Nil(err)

config, _, err = operator.SchedulerGetConfiguration(nil)
require.Nil(err)
require.True(config.PreemptionConfig.SystemSchedulerEnabled)
}

func TestAPI_OperatorSchedulerCASConfiguration(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()

operator := c.Operator()
var config *SchedulerConfiguration
retry.Run(t, func(r *retry.R) {
var err error
config, _, err = operator.SchedulerGetConfiguration(nil)
r.Check(err)
})
require.True(config.PreemptionConfig.SystemSchedulerEnabled)

// Pass an invalid ModifyIndex
{
newConf := &SchedulerConfiguration{
PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false},
ModifyIndex: config.ModifyIndex - 1,
}
resp, _, err := operator.SchedulerCASConfiguration(newConf, nil)
require.Nil(err)
require.False(resp)
}

// Pass a valid ModifyIndex
{
newConf := &SchedulerConfiguration{
PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false},
ModifyIndex: config.ModifyIndex,
}
resp, _, err := operator.SchedulerCASConfiguration(newConf, nil)
require.Nil(err)
require.True(resp)
}
}
2 changes: 2 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest))
s.mux.HandleFunc("/v1/system/reconcile/summaries", s.wrap(s.ReconcileJobSummaries))

s.mux.HandleFunc("/v1/operator/scheduler/config", s.wrap(s.OperatorSchedulerConfiguration))

if uiEnabled {
s.mux.Handle("/ui/", http.StripPrefix("/ui/", handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()}))))
} else {
Expand Down
64 changes: 64 additions & 0 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,67 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re

return out, nil
}

// OperatorSchedulerConfiguration is used to inspect the current Scheduler configuration.
// This supports the stale query mode in case the cluster doesn't have a leader.
func (s *HTTPServer) OperatorSchedulerConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Switch on the method
switch req.Method {
case "GET":
var args structs.GenericRequest
if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done {
return nil, nil
}

var reply structs.SchedulerConfiguration
if err := s.agent.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
return nil, err
}

out := api.SchedulerConfiguration{
PreemptionConfig: api.PreemptionConfig{SystemSchedulerEnabled: reply.PreemptionConfig.SystemSchedulerEnabled},
CreateIndex: reply.CreateIndex,
ModifyIndex: reply.ModifyIndex,
}

return out, nil

case "PUT":
var args structs.SchedulerSetConfigRequest
s.parseWriteRequest(req, &args.WriteRequest)

var conf api.SchedulerConfiguration
if err := decodeBody(req, &conf); err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing scheduler config: %v", err))
}

args.Config = structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled},
}

// Check for cas value
params := req.URL.Query()
if _, ok := params["cas"]; ok {
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
if err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing cas value: %v", err))
}
args.Config.ModifyIndex = casVal
args.CAS = true
}

var reply bool
if err := s.agent.RPC("Operator.SchedulerSetConfiguration", &args, &reply); err != nil {
return nil, err
}

// Only use the out value if this was a CAS
if !args.CAS {
return true, nil
}
return reply, nil

default:
return nil, CodedError(404, ErrInvalidMethod)
}
}
100 changes: 100 additions & 0 deletions command/agent/operator_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestHTTP_OperatorRaftConfiguration(t *testing.T) {
Expand Down Expand Up @@ -257,3 +258,102 @@ func TestOperator_ServerHealth_Unhealthy(t *testing.T) {
})
})
}

func TestOperator_SchedulerGetConfiguration(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
require := require.New(t)
body := bytes.NewBuffer(nil)
req, _ := http.NewRequest("GET", "/v1/operator/scheduler/config", body)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)
out, ok := obj.(api.SchedulerConfiguration)
require.True(ok)
require.True(out.PreemptionConfig.SystemSchedulerEnabled)
})
}

func TestOperator_SchedulerSetConfiguration(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
require := require.New(t)
body := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": true
}}`))
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/config", body)
resp := httptest.NewRecorder()
_, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)

args := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: s.Config.Region,
},
}

var reply structs.SchedulerConfiguration
err = s.RPC("Operator.SchedulerGetConfiguration", &args, &reply)
require.Nil(err)
require.True(reply.PreemptionConfig.SystemSchedulerEnabled)
})
}

func TestOperator_SchedulerCASConfiguration(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
require := require.New(t)
body := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": true
}}`))
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/config", body)
resp := httptest.NewRecorder()
_, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)

args := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: s.Config.Region,
},
}

var reply structs.SchedulerConfiguration
if err := s.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
require.True(reply.PreemptionConfig.SystemSchedulerEnabled)

// Create a CAS request, bad index
{
buf := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": false
}}`))
req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/config?cas=%d", reply.ModifyIndex-1), buf)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.False(obj.(bool))
}

// Create a CAS request, good index
{
buf := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": false
}}`))
req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/config?cas=%d", reply.ModifyIndex), buf)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.True(obj.(bool))
}

// Verify the update
if err := s.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
t.Fatalf("err: %v", err)
}
require.False(reply.PreemptionConfig.SystemSchedulerEnabled)
})
}
21 changes: 20 additions & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyNodeEligibilityUpdate(buf[1:], log.Index)
case structs.BatchNodeUpdateDrainRequestType:
return n.applyBatchDrainUpdate(buf[1:], log.Index)
case structs.SchedulerConfigRequestType:
return n.applySchedulerConfigUpdate(buf[1:], log.Index)
}

// Check enterprise only message types.
Expand Down Expand Up @@ -820,7 +822,7 @@ func (n *nomadFSM) applyPlanResults(buf []byte, index uint64) interface{} {
n.logger.Error("ApplyPlan failed", "error", err)
return err
}

n.handleUpsertedEvals(req.PreemptionEvals)
return nil
}

Expand Down Expand Up @@ -1840,6 +1842,23 @@ func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink,
return nil
}

func (n *nomadFSM) applySchedulerConfigUpdate(buf []byte, index uint64) interface{} {
var req structs.SchedulerSetConfigRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "scheduler-config"}, time.Now())

if req.CAS {
act, err := n.state.SchedulerCASConfig(index, req.Config.ModifyIndex, &req.Config)
if err != nil {
return err
}
return act
}
return n.state.SchedulerSetConfig(index, &req.Config)
}

// Release is a no-op, as we just need to GC the pointer
// to the state store snapshot. There is nothing to explicitly
// cleanup.
Expand Down
Loading