Skip to content

Commit

Permalink
Merge pull request #4794 from hashicorp/f-preemption-systemjobs
Browse files Browse the repository at this point in the history
Preemption for system jobs
  • Loading branch information
Preetha authored Nov 2, 2018
2 parents 002f46f + 97cf4e1 commit 6aa0c7f
Show file tree
Hide file tree
Showing 30 changed files with 3,264 additions and 62 deletions.
62 changes: 32 additions & 30 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,36 +67,38 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error {

// Allocation is used for serialization of allocations.
type Allocation struct {
ID string
Namespace string
EvalID string
Name string
NodeID string
JobID string
Job *Job
TaskGroup string
Resources *Resources
TaskResources map[string]*Resources
AllocatedResources *AllocatedResources
Services map[string]string
Metrics *AllocationMetric
DesiredStatus string
DesiredDescription string
DesiredTransition DesiredTransition
ClientStatus string
ClientDescription string
TaskStates map[string]*TaskState
DeploymentID string
DeploymentStatus *AllocDeploymentStatus
FollowupEvalID string
PreviousAllocation string
NextAllocation string
RescheduleTracker *RescheduleTracker
CreateIndex uint64
ModifyIndex uint64
AllocModifyIndex uint64
CreateTime int64
ModifyTime int64
ID string
Namespace string
EvalID string
Name string
NodeID string
JobID string
Job *Job
TaskGroup string
Resources *Resources
TaskResources map[string]*Resources
AllocatedResources *AllocatedResources
Services map[string]string
Metrics *AllocationMetric
DesiredStatus string
DesiredDescription string
DesiredTransition DesiredTransition
ClientStatus string
ClientDescription string
TaskStates map[string]*TaskState
DeploymentID string
DeploymentStatus *AllocDeploymentStatus
FollowupEvalID string
PreviousAllocation string
NextAllocation string
RescheduleTracker *RescheduleTracker
PreemptedAllocations []string
PreemptedByAllocation string
CreateIndex uint64
ModifyIndex uint64
AllocModifyIndex uint64
CreateTime int64
ModifyTime int64
}

// AllocationMetric is used to deserialize allocation metrics.
Expand Down
2 changes: 2 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ type ObjectDiff struct {

type PlanAnnotations struct {
DesiredTGUpdates map[string]*DesiredUpdates
PreemptedAllocs []*AllocationListStub
}

type DesiredUpdates struct {
Expand All @@ -1023,6 +1024,7 @@ type DesiredUpdates struct {
InPlaceUpdate uint64
DestructiveUpdate uint64
Canary uint64
Preemptions uint64
}

type JobDispatchRequest struct {
Expand Down
60 changes: 60 additions & 0 deletions api/operator.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package api

import "strconv"

// Operator can be used to perform low-level operator tasks for Nomad.
type Operator struct {
c *Client
Expand Down Expand Up @@ -106,3 +108,61 @@ func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error {
resp.Body.Close()
return nil
}

type SchedulerConfiguration struct {
// PreemptionConfig specifies whether to enable eviction of lower
// priority jobs to place higher priority jobs.
PreemptionConfig PreemptionConfig

// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64
}

// SchedulerConfigurationResponse is the response object that wraps SchedulerConfiguration
type SchedulerConfigurationResponse struct {
// SchedulerConfig contains scheduler config options
SchedulerConfig SchedulerConfiguration

// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64
}

// PreemptionConfig specifies whether preemption is enabled based on scheduler type
type PreemptionConfig struct {
SystemSchedulerEnabled bool
}

// SchedulerGetConfiguration is used to query the current Scheduler configuration.
func (op *Operator) SchedulerGetConfiguration(q *QueryOptions) (*SchedulerConfigurationResponse, *QueryMeta, error) {
var resp SchedulerConfigurationResponse
qm, err := op.c.query("/v1/operator/scheduler/config", &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}

// SchedulerSetConfiguration is used to set the current Scheduler configuration.
func (op *Operator) SchedulerSetConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (*WriteMeta, error) {
var out bool
wm, err := op.c.write("/v1/operator/scheduler/config", conf, &out, q)
if err != nil {
return nil, err
}
return wm, nil
}

// SchedulerCASConfiguration is used to perform a Check-And-Set update on the
// Scheduler configuration. The ModifyIndex value will be respected. Returns
// true on success or false on failures.
func (op *Operator) SchedulerCASConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (bool, *WriteMeta, error) {
var out bool
wm, err := op.c.write("/v1/operator/scheduler/config?cas="+strconv.FormatUint(conf.ModifyIndex, 10), conf, &out, q)
if err != nil {
return false, nil, err
}

return out, wm, nil
}
66 changes: 66 additions & 0 deletions api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package api
import (
"strings"
"testing"

"github.com/hashicorp/consul/testutil/retry"
"github.com/stretchr/testify/require"
)

func TestOperator_RaftGetConfiguration(t *testing.T) {
Expand Down Expand Up @@ -51,3 +54,66 @@ func TestOperator_RaftRemovePeerByID(t *testing.T) {
t.Fatalf("err: %v", err)
}
}

func TestAPI_OperatorSchedulerGetSetConfiguration(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()

operator := c.Operator()
var config *SchedulerConfigurationResponse
retry.Run(t, func(r *retry.R) {
var err error
config, _, err = operator.SchedulerGetConfiguration(nil)
r.Check(err)
})
require.True(config.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)

// Change a config setting
newConf := &SchedulerConfiguration{PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false}}
_, err := operator.SchedulerSetConfiguration(newConf, nil)
require.Nil(err)

config, _, err = operator.SchedulerGetConfiguration(nil)
require.Nil(err)
require.False(config.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
}

func TestAPI_OperatorSchedulerCASConfiguration(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()

operator := c.Operator()
var config *SchedulerConfigurationResponse
retry.Run(t, func(r *retry.R) {
var err error
config, _, err = operator.SchedulerGetConfiguration(nil)
r.Check(err)
})
require.True(config.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)

// Pass an invalid ModifyIndex
{
newConf := &SchedulerConfiguration{
PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false},
ModifyIndex: config.ModifyIndex - 1,
}
resp, _, err := operator.SchedulerCASConfiguration(newConf, nil)
require.Nil(err)
require.False(resp)
}

// Pass a valid ModifyIndex
{
newConf := &SchedulerConfiguration{
PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false},
ModifyIndex: config.ModifyIndex,
}
resp, _, err := operator.SchedulerCASConfiguration(newConf, nil)
require.Nil(err)
require.True(resp)
}
}
2 changes: 2 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest))
s.mux.HandleFunc("/v1/system/reconcile/summaries", s.wrap(s.ReconcileJobSummaries))

s.mux.HandleFunc("/v1/operator/scheduler/config", s.wrap(s.OperatorSchedulerConfiguration))

if uiEnabled {
s.mux.Handle("/ui/", http.StripPrefix("/ui/", handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()}))))
} else {
Expand Down
70 changes: 70 additions & 0 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,73 @@ func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Re

return out, nil
}

// OperatorSchedulerConfiguration is used to inspect the current Scheduler configuration.
// This supports the stale query mode in case the cluster doesn't have a leader.
func (s *HTTPServer) OperatorSchedulerConfiguration(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Switch on the method
switch req.Method {
case "GET":
var args structs.GenericRequest
if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done {
return nil, nil
}

var reply structs.SchedulerConfigurationResponse
if err := s.agent.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
return nil, err
}

out := api.SchedulerConfiguration{
PreemptionConfig: api.PreemptionConfig{SystemSchedulerEnabled: reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled},
CreateIndex: reply.CreateIndex,
ModifyIndex: reply.ModifyIndex,
}

resp := api.SchedulerConfigurationResponse{
SchedulerConfig: out,
CreateIndex: out.CreateIndex,
ModifyIndex: out.ModifyIndex,
}

return resp, nil

case "PUT":
var args structs.SchedulerSetConfigRequest
s.parseWriteRequest(req, &args.WriteRequest)

var conf api.SchedulerConfiguration
if err := decodeBody(req, &conf); err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing scheduler config: %v", err))
}

args.Config = structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled},
}

// Check for cas value
params := req.URL.Query()
if _, ok := params["cas"]; ok {
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
if err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing cas value: %v", err))
}
args.Config.ModifyIndex = casVal
args.CAS = true
}

var reply bool
if err := s.agent.RPC("Operator.SchedulerSetConfiguration", &args, &reply); err != nil {
return nil, err
}

// Only use the out value if this was a CAS
if !args.CAS {
return true, nil
}
return reply, nil

default:
return nil, CodedError(404, ErrInvalidMethod)
}
}
Loading

0 comments on commit 6aa0c7f

Please sign in to comment.