diff --git a/.changelog/11610.txt b/.changelog/11610.txt new file mode 100644 index 00000000000..1cf42c7212a --- /dev/null +++ b/.changelog/11610.txt @@ -0,0 +1,3 @@ +```release-note:improvement +scheduler: Added a `RejectJobRegistration` field to the scheduler configuration API that enabled a setting to reject job register, dispatch, and scale requests without a management ACL token +``` diff --git a/api/operator.go b/api/operator.go index 7adf79f1cbf..439fabe052e 100644 --- a/api/operator.go +++ b/api/operator.go @@ -129,6 +129,10 @@ type SchedulerConfiguration struct { // MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled MemoryOversubscriptionEnabled bool + // RejectJobRegistration disables new job registrations except with a + // management ACL token + RejectJobRegistration bool + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. CreateIndex uint64 ModifyIndex uint64 diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index 78da1fd2c22..a0f7575f4c6 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -261,6 +261,7 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R args.Config = structs.SchedulerConfiguration{ SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm), MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled, + RejectJobRegistration: conf.RejectJobRegistration, PreemptionConfig: structs.PreemptionConfig{ SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled, SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled, diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 59b5dd106f8..9613f136682 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -114,7 +114,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis reply.Warnings = structs.MergeMultierrorWarnings(warnings...) // Check job submission permissions - if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil { + var aclObj *acl.ACL + if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil { if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) { @@ -175,11 +176,17 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis } } + if ok, err := allowedRegistration(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job registration for non-management ACL rejected") + return structs.ErrPermissionDenied + } + // Lookup the job snap, err := j.srv.State().Snapshot() if err != nil { return err } + ws := memdb.NewWatchSet() existingJob, err := snap.JobByID(ws, args.RequestNamespace(), args.Job.ID) if err != nil { @@ -1022,6 +1029,11 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes } } + if ok, err := allowedRegistration(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job scaling for non-management ACL rejected") + return structs.ErrPermissionDenied + } + // Validate args err = args.Validate() if err != nil { @@ -1304,6 +1316,22 @@ func allowedNSes(aclObj *acl.ACL, state *state.StateStore, allow func(ns string) return r, nil } +// allowedRegistration checks that the scheduler is not in +// RejectJobRegistration mode for load-shedding. +func allowedRegistration(aclObj *acl.ACL, state *state.StateStore) (bool, error) { + _, cfg, err := state.SchedulerConfig() + if err != nil { + return false, err + } + if cfg != nil && !cfg.RejectJobRegistration { + return true, nil + } + if aclObj != nil && aclObj.IsManagement() { + return true, nil + } + return false, nil +} + // List is used to list the jobs registered in the system func (j *Job) List(args *structs.JobListRequest, reply *structs.JobListResponse) error { if done, err := j.srv.forward("Job.List", args, args, reply); done { @@ -1852,12 +1880,19 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa defer metrics.MeasureSince([]string{"nomad", "job", "dispatch"}, time.Now()) // Check for submit-job permissions - if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil { + var aclObj *acl.ACL + var err error + if aclObj, err = j.srv.ResolveToken(args.AuthToken); err != nil { return err } else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityDispatchJob) { return structs.ErrPermissionDenied } + if ok, err := allowedRegistration(aclObj, j.srv.State()); !ok || err != nil { + j.logger.Warn("job dispatch for non-management ACL rejected") + return structs.ErrPermissionDenied + } + // Lookup the parameterized job if args.JobID == "" { return fmt.Errorf("missing parameterized job ID") diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index a3e12332f31..943a22a04d8 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -2248,6 +2248,102 @@ func TestJobEndpoint_Register_ACL_Namespace(t *testing.T) { assert.NotNil(out, "expected job") } +func TestJobRegister_ACL_RejectedBySchedulerConfig(t *testing.T) { + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + submitJobToken := mock.CreatePolicyAndToken(t, s1.State(), 1001, "test-valid-write", + mock.NamespacePolicy(structs.DefaultNamespace, "write", nil)). + SecretID + + cases := []struct { + name string + token string + rejectEnabled bool + errExpected bool + }{ + { + name: "reject disabled, with a submit token", + token: submitJobToken, + rejectEnabled: false, + errExpected: false, + }, + { + name: "reject enabled, with a submit token", + token: submitJobToken, + rejectEnabled: true, + errExpected: true, + }, + { + name: "reject enabled, without a token", + token: "", + rejectEnabled: true, + errExpected: true, + }, + { + name: "reject enabled, with a management token", + token: root.SecretID, + rejectEnabled: true, + errExpected: false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + req.AuthToken = tc.token + + cfgReq := &structs.SchedulerSetConfigRequest{ + Config: structs.SchedulerConfiguration{ + RejectJobRegistration: tc.rejectEnabled, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + cfgReq.AuthToken = root.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", + cfgReq, &structs.SchedulerSetConfigurationResponse{}, + ) + require.NoError(t, err) + + var resp structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + + if tc.errExpected { + require.Error(t, err, "expected error") + require.EqualError(t, err, structs.ErrPermissionDenied.Error()) + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.Namespace, job.ID) + require.NoError(t, err) + require.Nil(t, out) + } else { + require.NoError(t, err, "unexpected error") + require.NotEqual(t, 0, resp.Index) + state := s1.fsm.State() + ws := memdb.NewWatchSet() + out, err := state.JobByID(ws, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, job.TaskGroups, out.TaskGroups) + } + }) + } +} + func TestJobEndpoint_Revert(t *testing.T) { t.Parallel() @@ -6646,6 +6742,97 @@ func TestJobEndpoint_Dispatch_JobChildrenSummary(t *testing.T) { require.Equal(t, structs.JobStatusDead, dispatchedStatus()) } +func TestJobEndpoint_Dispatch_ACL_RejectedBySchedulerConfig(t *testing.T) { + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + state := s1.fsm.State() + + job := mock.BatchJob() + job.ParameterizedJob = &structs.ParameterizedJobConfig{} + + err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job) + require.NoError(t, err) + + dispatch := &structs.JobDispatchRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + submitJobToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid-write", + mock.NamespacePolicy(structs.DefaultNamespace, "write", nil)). + SecretID + + cases := []struct { + name string + token string + rejectEnabled bool + errExpected bool + }{ + { + name: "reject disabled, with a submit token", + token: submitJobToken, + rejectEnabled: false, + errExpected: false, + }, + { + name: "reject enabled, with a submit token", + token: submitJobToken, + rejectEnabled: true, + errExpected: true, + }, + { + name: "reject enabled, without a token", + token: "", + rejectEnabled: true, + errExpected: true, + }, + { + name: "reject enabled, with a management token", + token: root.SecretID, + rejectEnabled: true, + errExpected: false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + + cfgReq := &structs.SchedulerSetConfigRequest{ + Config: structs.SchedulerConfiguration{ + RejectJobRegistration: tc.rejectEnabled, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + cfgReq.AuthToken = root.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", + cfgReq, &structs.SchedulerSetConfigurationResponse{}, + ) + require.NoError(t, err) + + dispatch.AuthToken = tc.token + var resp structs.JobDispatchResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Dispatch", dispatch, &resp) + + if tc.errExpected { + require.Error(t, err, "expected error") + require.EqualError(t, err, structs.ErrPermissionDenied.Error()) + } else { + require.NoError(t, err, "unexpected error") + require.NotEqual(t, 0, resp.Index) + } + }) + } + +} + func TestJobEndpoint_Scale(t *testing.T) { t.Parallel() require := require.New(t) @@ -6934,6 +7121,99 @@ func TestJobEndpoint_Scale_ACL(t *testing.T) { } +func TestJobEndpoint_Scale_ACL_RejectedBySchedulerConfig(t *testing.T) { + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + state := s1.fsm.State() + + job := mock.Job() + err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job) + require.NoError(t, err) + + scale := &structs.JobScaleRequest{ + JobID: job.ID, + Target: map[string]string{ + structs.ScalingTargetGroup: job.TaskGroups[0].Name, + }, + Message: "because of the load", + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + submitJobToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid-write", + mock.NamespacePolicy(structs.DefaultNamespace, "write", nil)). + SecretID + + cases := []struct { + name string + token string + rejectEnabled bool + errExpected bool + }{ + { + name: "reject disabled, with a submit token", + token: submitJobToken, + rejectEnabled: false, + errExpected: false, + }, + { + name: "reject enabled, with a submit token", + token: submitJobToken, + rejectEnabled: true, + errExpected: true, + }, + { + name: "reject enabled, without a token", + token: "", + rejectEnabled: true, + errExpected: true, + }, + { + name: "reject enabled, with a management token", + token: root.SecretID, + rejectEnabled: true, + errExpected: false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + + cfgReq := &structs.SchedulerSetConfigRequest{ + Config: structs.SchedulerConfiguration{ + RejectJobRegistration: tc.rejectEnabled, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + cfgReq.AuthToken = root.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", + cfgReq, &structs.SchedulerSetConfigurationResponse{}, + ) + require.NoError(t, err) + + var resp structs.JobRegisterResponse + scale.AuthToken = tc.token + err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp) + + if tc.errExpected { + require.Error(t, err, "expected error") + require.EqualError(t, err, structs.ErrPermissionDenied.Error()) + } else { + require.NoError(t, err, "unexpected error") + require.NotEqual(t, 0, resp.Index) + } + }) + } + +} + func TestJobEndpoint_Scale_Invalid(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go index d39b1a31ec3..633afa6c33e 100644 --- a/nomad/structs/operator.go +++ b/nomad/structs/operator.go @@ -149,8 +149,13 @@ type SchedulerConfiguration struct { // priority jobs to place higher priority jobs. PreemptionConfig PreemptionConfig `hcl:"preemption_config"` + // MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled MemoryOversubscriptionEnabled bool `hcl:"memory_oversubscription_enabled"` + // RejectJobRegistration disables new job registrations except with a + // management ACL token + RejectJobRegistration bool `hcl:"reject_job_registration"` + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. CreateIndex uint64 ModifyIndex uint64 diff --git a/website/content/api-docs/operator/scheduler.mdx b/website/content/api-docs/operator/scheduler.mdx index cb44b2d0f3d..921fe712557 100644 --- a/website/content/api-docs/operator/scheduler.mdx +++ b/website/content/api-docs/operator/scheduler.mdx @@ -45,6 +45,7 @@ $ curl \ "ModifyIndex": 5, "SchedulerAlgorithm": "spread", "MemoryOversubscriptionEnabled": true, + "RejectJobRegistration": false, "PreemptionConfig": { "SystemSchedulerEnabled": true, "BatchSchedulerEnabled": false, @@ -114,6 +115,7 @@ server state is authoritative. { "SchedulerAlgorithm": "spread", "MemoryOversubscriptionEnabled": false, + "RejectJobRegistration": false, "PreemptionConfig": { "SystemSchedulerEnabled": true, "BatchSchedulerEnabled": false, @@ -128,6 +130,8 @@ server state is authoritative. - `MemoryOversubscriptionEnabled` `(bool: false)` 1.1 Beta - When `true`, tasks may exceed their reserved memory limit, if the client has excess memory capacity. Tasks must specify [`memory_max`](/docs/job-specification/resources#memory_max) to take advantage of memory oversubscription. +- `RejectJobRegistration` `(bool: false)` - When `true`, the server will return permission denied errors for job registration, job dispatch, and job scale APIs, unless the ACL token for the request is a management token. This allows operators to shed load from automated proceses during incident response. + - `PreemptionConfig` `(PreemptionConfig)` - Options to enable preemption for various schedulers. diff --git a/website/content/docs/configuration/server.mdx b/website/content/docs/configuration/server.mdx index 8e6eeaa43c5..dc7163b7c26 100644 --- a/website/content/docs/configuration/server.mdx +++ b/website/content/docs/configuration/server.mdx @@ -312,6 +312,8 @@ server { memory_oversubscription_enabled = true + reject_job_registration = false + preemption_config { batch_scheduler_enabled = true system_scheduler_enabled = true