From 6e61606eba8c031dd8a9465fd829a9a8329bf688 Mon Sep 17 00:00:00 2001 From: Charlie Voiselle <464492+angrycub@users.noreply.github.com> Date: Thu, 6 Jan 2022 10:56:13 -0600 Subject: [PATCH] Make number of scheduler workers reloadable (#11593) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Development Environment Changes * Added stringer to build deps ## New HTTP APIs * Added scheduler worker config API * Added scheduler worker info API ## New Internals * (Scheduler)Worker API refactor—Start(), Stop(), Pause(), Resume() * Update shutdown to use context * Add mutex for contended server data - `workerLock` for the `workers` slice - `workerConfigLock` for the `Server.Config.NumSchedulers` and `Server.Config.EnabledSchedulers` values ## Other * Adding docs for scheduler worker api * Add changelog message Co-authored-by: Derek Strickland <1111455+DerekStrickland@users.noreply.github.com> --- .changelog/11593.txt | 3 + .tours/scheduler-worker---hot-reload.tour | 57 ++ .tours/scheduler-worker---pause.tour | 66 ++ .tours/scheduler-worker---unpause.tour | 51 ++ .tours/scheduler-worker.tour | 36 ++ GNUmakefile | 1 + api/agent.go | 75 +++ api/agent_test.go | 48 ++ command/agent/agent_endpoint.go | 131 +++- command/agent/agent_endpoint_test.go | 601 ++++++++++++++++++- command/agent/http.go | 6 + nomad/leader.go | 18 +- nomad/leader_test.go | 28 +- nomad/server.go | 215 ++++++- nomad/server_test.go | 71 ++- nomad/testing.go | 2 +- nomad/worker.go | 392 ++++++++++-- nomad/worker_string_schedulerworkerstatus.go | 31 + nomad/worker_string_workerstatus.go | 30 + nomad/worker_test.go | 249 +++++++- website/content/api-docs/agent.mdx | 201 +++++++ 21 files changed, 2211 insertions(+), 101 deletions(-) create mode 100644 .changelog/11593.txt create mode 100644 .tours/scheduler-worker---hot-reload.tour create mode 100644 .tours/scheduler-worker---pause.tour create mode 100644 .tours/scheduler-worker---unpause.tour create mode 100644 .tours/scheduler-worker.tour create mode 100644 nomad/worker_string_schedulerworkerstatus.go create mode 100644 nomad/worker_string_workerstatus.go diff --git a/.changelog/11593.txt b/.changelog/11593.txt new file mode 100644 index 00000000000..855ba6d3d48 --- /dev/null +++ b/.changelog/11593.txt @@ -0,0 +1,3 @@ +```release-note:improvement +server: Make num_schedulers and enabled_schedulers hot reloadable; add agent API endpoint to enable dynamic modifications of these values. +``` diff --git a/.tours/scheduler-worker---hot-reload.tour b/.tours/scheduler-worker---hot-reload.tour new file mode 100644 index 00000000000..f733733306b --- /dev/null +++ b/.tours/scheduler-worker---hot-reload.tour @@ -0,0 +1,57 @@ +{ + "$schema": "https://aka.ms/codetour-schema", + "title": "Scheduler Worker - Hot Reload", + "steps": [ + { + "file": "nomad/server.go", + "description": "## Server.Reload()\n\nServer configuration reloads start here.", + "line": 782, + "selection": { + "start": { + "line": 780, + "character": 4 + }, + "end": { + "line": 780, + "character": 10 + } + } + }, + { + "file": "nomad/server.go", + "description": "## Did NumSchedulers change?\nIf the number of schedulers has changed between the running configuration and the new one we need to adopt that change in realtime.", + "line": 812 + }, + { + "file": "nomad/server.go", + "description": "## Server.setupNewWorkers()\n\nsetupNewWorkers performs three tasks:\n\n- makes a copy of the existing worker pointers\n\n- creates a fresh array and loads a new set of workers into them\n\n- iterates through the \"old\" workers and shuts them down in individual\n goroutines for maximum parallelism", + "line": 1482, + "selection": { + "start": { + "line": 1480, + "character": 4 + }, + "end": { + "line": 1480, + "character": 12 + } + } + }, + { + "file": "nomad/server.go", + "description": "Once all of the work in setupNewWorkers is complete, we stop the old ones.", + "line": 1485 + }, + { + "file": "nomad/server.go", + "description": "The `stopOldWorkers` function iterates through the array of workers and calls their `Shutdown` method\nas a goroutine to prevent blocking.", + "line": 1505 + }, + { + "file": "nomad/worker.go", + "description": "The `Shutdown` method sets `w.stop` to true signaling that we intend for the `Worker` to stop the next time we consult it. We also manually unpause the `Worker` by setting w.paused to false and sending a `Broadcast()` via the cond.", + "line": 110 + } + ], + "ref": "f-reload-num-schedulers" +} \ No newline at end of file diff --git a/.tours/scheduler-worker---pause.tour b/.tours/scheduler-worker---pause.tour new file mode 100644 index 00000000000..5b9d21fc22e --- /dev/null +++ b/.tours/scheduler-worker---pause.tour @@ -0,0 +1,66 @@ +{ + "$schema": "https://aka.ms/codetour-schema", + "title": "Scheduler Worker - Pause", + "steps": [ + { + "file": "nomad/leader.go", + "description": "## Server.establishLeadership()\n\nUpon becoming a leader, the server pauses a subset of the workers to allow for the additional burden of the leader's goroutines. The `handlePausableWorkers` function takes a boolean that states whether or not the current node is a leader or not. Because we are in `establishLeadership` we use `true` rather than calling `s.IsLeader()`", + "line": 233, + "selection": { + "start": { + "line": 233, + "character": 4 + }, + "end": { + "line": 233, + "character": 12 + } + } + }, + { + "file": "nomad/leader.go", + "description": "## Server.handlePausableWorkers()\n\nhandlePausableWorkers ranges over a slice of Workers and manipulates their paused state by calling their `SetPause` method.", + "line": 443, + "selection": { + "start": { + "line": 443, + "character": 18 + }, + "end": { + "line": 443, + "character": 26 + } + } + }, + { + "file": "nomad/leader.go", + "description": "## Server.pausableWorkers()\n\nThe pausableWorkers function provides a consistent slice of workers that the server can pause and unpause. Since the Worker array is never mutated, the same slice is returned by pausableWorkers on every invocation.\nThis comment is interesting/potentially confusing\n\n```golang\n // Disabling 3/4 of the workers frees CPU for raft and the\n\t// plan applier which uses 1/2 the cores.\n``` \n\nHowever, the key point is that it will return a slice containg 3/4th of the workers.", + "line": 1100, + "selection": { + "start": { + "line": 1104, + "character": 1 + }, + "end": { + "line": 1105, + "character": 43 + } + } + }, + { + "file": "nomad/worker.go", + "description": "## Worker.SetPause()\n\nThe `SetPause` function is used to signal an intention to pause the worker. Because the worker's work is happening in the `run()` goroutine, pauses happen asynchronously.", + "line": 91 + }, + { + "file": "nomad/worker.go", + "description": "## Worker.dequeueEvaluation()\n\nCalls checkPaused, which will be the function we wait in if the scheduler is set to be paused. \n\n> **NOTE:** This is called here rather than in run() because this function loops in case of an error fetching a evaluation.", + "line": 206 + }, + { + "file": "nomad/worker.go", + "description": "## Worker.checkPaused()\n\nWhen `w.paused` is `true`, we call the `Wait()` function on the condition. Execution of this goroutine will stop here until it receives a `Broadcast()` or a `Signal()`. At this point, the `Worker` is paused.", + "line": 104 + } + ] +} \ No newline at end of file diff --git a/.tours/scheduler-worker---unpause.tour b/.tours/scheduler-worker---unpause.tour new file mode 100644 index 00000000000..9c1c3a796ac --- /dev/null +++ b/.tours/scheduler-worker---unpause.tour @@ -0,0 +1,51 @@ +{ + "$schema": "https://aka.ms/codetour-schema", + "title": "Scheduler Worker - Unpause", + "steps": [ + { + "file": "nomad/leader.go", + "description": "## revokeLeadership()\n\nAs a server transistions from leader to non-leader, the pausableWorkers are resumed since the other leader goroutines are stopped providing extra capacity.", + "line": 1040, + "selection": { + "start": { + "line": 1038, + "character": 10 + }, + "end": { + "line": 1038, + "character": 20 + } + } + }, + { + "file": "nomad/leader.go", + "description": "## handlePausableWorkers()\n\nThe handlePausableWorkers method is called with `false`. We fetch the pausableWorkers and call their SetPause method with `false`.\n", + "line": 443, + "selection": { + "start": { + "line": 443, + "character": 18 + }, + "end": { + "line": 443, + "character": 27 + } + } + }, + { + "file": "nomad/worker.go", + "description": "## Worker.SetPause()\n\nDuring unpause, p is false. We update w.paused in the mutex, and then call Broadcast on the cond. This wakes the goroutine sitting in the Wait() inside of `checkPaused()`", + "line": 91 + }, + { + "file": "nomad/worker.go", + "description": "## Worker.checkPaused()\n\nOnce the goroutine receives the `Broadcast()` message from `SetPause()`, execution continues here. Now that `w.paused == false`, we exit the loop and return to the caller (the `dequeueEvaluation()` function).", + "line": 104 + }, + { + "file": "nomad/worker.go", + "description": "## Worker.dequeueEvaluation\n\nWe return back into dequeueEvaluation after the call to checkPaused. At this point the worker will either stop (if that signal boolean has been set) or continue looping after returning to run().", + "line": 207 + } + ] +} \ No newline at end of file diff --git a/.tours/scheduler-worker.tour b/.tours/scheduler-worker.tour new file mode 100644 index 00000000000..bbd6fca599b --- /dev/null +++ b/.tours/scheduler-worker.tour @@ -0,0 +1,36 @@ +{ + "$schema": "https://aka.ms/codetour-schema", + "title": "Scheduler Worker - Start", + "steps": [ + { + "file": "nomad/server.go", + "description": "## Server.NewServer()\n\nScheduler workers are started as the agent starts the `server` go routines.", + "line": 402 + }, + { + "file": "nomad/server.go", + "description": "## Server.setupWorkers()\n\nThe `setupWorkers()` function validates that there are enabled Schedulers by type and count. It then creates s.config.NumSchedulers by calling `NewWorker()`\n\nThe `_core` scheduler _**must**_ be enabled. **TODO: why?**\n", + "line": 1443, + "selection": { + "start": { + "line": 1442, + "character": 4 + }, + "end": { + "line": 1442, + "character": 12 + } + } + }, + { + "file": "nomad/worker.go", + "description": "## Worker.NewWorker\n\nNewWorker creates the Worker and starts `run()` in a goroutine.", + "line": 78 + }, + { + "file": "nomad/worker.go", + "description": "## Worker.run()\n\nThe `run()` function runs in a loop until it's paused, it's stopped, or the server indicates that it is shutting down. All of the work the `Worker` performs should be\nimplemented in or called from here.\n", + "line": 152 + } + ] +} \ No newline at end of file diff --git a/GNUmakefile b/GNUmakefile index a68437e4511..56adc89496c 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -124,6 +124,7 @@ deps: ## Install build and development dependencies go install github.com/hashicorp/go-msgpack/codec/codecgen@v1.1.5 go install github.com/bufbuild/buf/cmd/buf@v0.36.0 go install github.com/hashicorp/go-changelog/cmd/changelog-build@latest + go install golang.org/x/tools/cmd/stringer@v0.1.8 .PHONY: lint-deps lint-deps: ## Install linter dependencies diff --git a/api/agent.go b/api/agent.go index 424e9ad95d2..2d19b953693 100644 --- a/api/agent.go +++ b/api/agent.go @@ -494,3 +494,78 @@ type HostDataResponse struct { AgentID string HostData *HostData `json:",omitempty"` } + +// GetSchedulerWorkerConfig returns the targeted agent's worker pool configuration +func (a *Agent) GetSchedulerWorkerConfig(q *QueryOptions) (*SchedulerWorkerPoolArgs, error) { + var resp AgentSchedulerWorkerConfigResponse + _, err := a.client.query("/v1/agent/schedulers/config", &resp, q) + if err != nil { + return nil, err + } + + return &SchedulerWorkerPoolArgs{NumSchedulers: resp.NumSchedulers, EnabledSchedulers: resp.EnabledSchedulers}, nil +} + +// SetSchedulerWorkerConfig attempts to update the targeted agent's worker pool configuration +func (a *Agent) SetSchedulerWorkerConfig(args SchedulerWorkerPoolArgs, q *WriteOptions) (*SchedulerWorkerPoolArgs, error) { + req := AgentSchedulerWorkerConfigRequest(args) + var resp AgentSchedulerWorkerConfigResponse + + _, err := a.client.write("/v1/agent/schedulers/config", &req, &resp, q) + if err != nil { + return nil, err + } + + return &SchedulerWorkerPoolArgs{NumSchedulers: resp.NumSchedulers, EnabledSchedulers: resp.EnabledSchedulers}, nil +} + +type SchedulerWorkerPoolArgs struct { + NumSchedulers int + EnabledSchedulers []string +} + +// AgentSchedulerWorkerConfigRequest is used to provide new scheduler worker configuration +// to a specific Nomad server. EnabledSchedulers must contain at least the `_core` scheduler +// to be valid. +type AgentSchedulerWorkerConfigRequest struct { + NumSchedulers int `json:"num_schedulers"` + EnabledSchedulers []string `json:"enabled_schedulers"` +} + +// AgentSchedulerWorkerConfigResponse contains the Nomad server's current running configuration +// as well as the server's id as a convenience. This can be used to provide starting values for +// creating an AgentSchedulerWorkerConfigRequest to make changes to the running configuration. +type AgentSchedulerWorkerConfigResponse struct { + ServerID string `json:"server_id"` + NumSchedulers int `json:"num_schedulers"` + EnabledSchedulers []string `json:"enabled_schedulers"` +} + +// GetSchedulerWorkersInfo returns the current status of all of the scheduler workers on +// a Nomad server. +func (a *Agent) GetSchedulerWorkersInfo(q *QueryOptions) (*AgentSchedulerWorkersInfo, error) { + var out *AgentSchedulerWorkersInfo + + _, err := a.client.query("/v1/agent/schedulers", &out, q) + if err != nil { + return nil, err + } + + return out, nil +} + +// AgentSchedulerWorkersInfo is the response from the scheduler information endpoint containing +// a detailed status of each scheduler worker running on the server. +type AgentSchedulerWorkersInfo struct { + ServerID string `json:"server_id"` + Schedulers []AgentSchedulerWorkerInfo `json:"schedulers"` +} + +// AgentSchedulerWorkerInfo holds the detailed status information for a single scheduler worker. +type AgentSchedulerWorkerInfo struct { + ID string `json:"id"` + EnabledSchedulers []string `json:"enabled_schedulers"` + Started string `json:"started"` + Status string `json:"status"` + WorkloadStatus string `json:"workload_status"` +} diff --git a/api/agent_test.go b/api/agent_test.go index 8fff6cfb0c5..cd13bab1892 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -2,6 +2,7 @@ package api import ( "fmt" + "net/http" "reflect" "sort" "strings" @@ -456,3 +457,50 @@ func TestAgentProfile(t *testing.T) { require.Nil(t, resp) } } + +func TestAgent_SchedulerWorkerConfig(t *testing.T) { + t.Parallel() + + c, s := makeClient(t, nil, nil) + defer s.Stop() + a := c.Agent() + + config, err := a.GetSchedulerWorkerConfig(nil) + require.NoError(t, err) + require.NotNil(t, config) + newConfig := SchedulerWorkerPoolArgs{NumSchedulers: 0, EnabledSchedulers: []string{"_core", "system"}} + resp, err := a.SetSchedulerWorkerConfig(newConfig, nil) + require.NoError(t, err) + assert.NotEqual(t, config, resp) +} + +func TestAgent_SchedulerWorkerConfig_BadRequest(t *testing.T) { + t.Parallel() + + c, s := makeClient(t, nil, nil) + defer s.Stop() + a := c.Agent() + + config, err := a.GetSchedulerWorkerConfig(nil) + require.NoError(t, err) + require.NotNil(t, config) + newConfig := SchedulerWorkerPoolArgs{NumSchedulers: -1, EnabledSchedulers: []string{"_core", "system"}} + _, err = a.SetSchedulerWorkerConfig(newConfig, nil) + require.Error(t, err) + require.Contains(t, err.Error(), fmt.Sprintf("%v (%s)", http.StatusBadRequest, "Invalid request")) +} + +func TestAgent_SchedulerWorkersInfo(t *testing.T) { + t.Parallel() + c, s := makeClient(t, nil, nil) + defer s.Stop() + a := c.Agent() + + info, err := a.GetSchedulerWorkersInfo(nil) + require.NoError(t, err) + require.NotNil(t, info) + defaultSchedulers := []string{"batch", "system", "sysbatch", "service", "_core"} + for _, worker := range info.Schedulers { + require.ElementsMatch(t, defaultSchedulers, worker.EnabledSchedulers) + } +} diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 798d65487f8..448b158f762 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -11,14 +11,17 @@ import ( "sort" "strconv" "strings" + "time" "github.com/docker/docker/pkg/ioutils" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/api" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/command/agent/host" "github.com/hashicorp/nomad/command/agent/pprof" + "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/serf/serf" "github.com/mitchellh/copystructure" @@ -364,7 +367,7 @@ func (s *HTTPServer) agentPprof(reqType pprof.ReqType, resp http.ResponseWriter, // Parse query param int values // Errors are dropped here and default to their zero values. - // This is to mimick the functionality that net/pprof implements. + // This is to mimic the functionality that net/pprof implements. seconds, _ := strconv.Atoi(req.URL.Query().Get("seconds")) debug, _ := strconv.Atoi(req.URL.Query().Get("debug")) gc, _ := strconv.Atoi(req.URL.Query().Get("gc")) @@ -744,3 +747,129 @@ func (s *HTTPServer) AgentHostRequest(resp http.ResponseWriter, req *http.Reques return reply, rpcErr } + +// AgentSchedulerWorkerInfoRequest is used to query the running state of the +// agent's scheduler workers. +func (s *HTTPServer) AgentSchedulerWorkerInfoRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + srv := s.agent.Server() + if srv == nil { + return nil, CodedError(http.StatusBadRequest, ErrServerOnly) + } + if req.Method != http.MethodGet { + return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod) + } + + var secret string + s.parseToken(req, &secret) + + // Check agent read permissions + if aclObj, err := s.agent.Server().ResolveToken(secret); err != nil { + return nil, CodedError(http.StatusInternalServerError, err.Error()) + } else if aclObj != nil && !aclObj.AllowAgentRead() { + return nil, CodedError(http.StatusForbidden, structs.ErrPermissionDenied.Error()) + } + + schedulersInfo := srv.GetSchedulerWorkersInfo() + response := &api.AgentSchedulerWorkersInfo{ + ServerID: srv.LocalMember().Name, + Schedulers: make([]api.AgentSchedulerWorkerInfo, len(schedulersInfo)), + } + + for i, workerInfo := range schedulersInfo { + response.Schedulers[i] = api.AgentSchedulerWorkerInfo{ + ID: workerInfo.ID, + EnabledSchedulers: make([]string, len(workerInfo.EnabledSchedulers)), + Started: workerInfo.Started.UTC().Format(time.RFC3339Nano), + Status: workerInfo.Status, + WorkloadStatus: workerInfo.WorkloadStatus, + } + copy(response.Schedulers[i].EnabledSchedulers, workerInfo.EnabledSchedulers) + } + + return response, nil +} + +// AgentSchedulerWorkerConfigRequest is used to query the count (and state eventually) +// of the scheduler workers running in a Nomad server agent. +// This endpoint can also be used to update the count of running workers for a +// given agent. +func (s *HTTPServer) AgentSchedulerWorkerConfigRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if s.agent.Server() == nil { + return nil, CodedError(http.StatusBadRequest, ErrServerOnly) + } + switch req.Method { + case http.MethodPut, http.MethodPost: + return s.updateScheduleWorkersConfig(resp, req) + case http.MethodGet: + return s.getScheduleWorkersConfig(resp, req) + default: + return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod) + } +} + +func (s *HTTPServer) getScheduleWorkersConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + srv := s.agent.Server() + if srv == nil { + return nil, CodedError(http.StatusBadRequest, ErrServerOnly) + } + + var secret string + s.parseToken(req, &secret) + + // Check agent read permissions + if aclObj, err := s.agent.Server().ResolveToken(secret); err != nil { + return nil, CodedError(http.StatusInternalServerError, err.Error()) + } else if aclObj != nil && !aclObj.AllowAgentRead() { + return nil, CodedError(http.StatusForbidden, structs.ErrPermissionDenied.Error()) + } + + config := srv.GetSchedulerWorkerConfig() + response := &api.AgentSchedulerWorkerConfigResponse{ + ServerID: srv.LocalMember().Name, + NumSchedulers: config.NumSchedulers, + EnabledSchedulers: config.EnabledSchedulers, + } + + return response, nil +} + +func (s *HTTPServer) updateScheduleWorkersConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + srv := s.agent.Server() + if srv == nil { + return nil, CodedError(http.StatusBadRequest, ErrServerOnly) + } + + var secret string + s.parseToken(req, &secret) + + // Check agent write permissions + if aclObj, err := srv.ResolveToken(secret); err != nil { + return nil, CodedError(http.StatusInternalServerError, err.Error()) + } else if aclObj != nil && !aclObj.AllowAgentWrite() { + return nil, CodedError(http.StatusForbidden, structs.ErrPermissionDenied.Error()) + } + + var args api.AgentSchedulerWorkerConfigRequest + + if err := decodeBody(req, &args); err != nil { + return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Invalid request: %s", err.Error())) + } + // the server_id provided in the payload is ignored to allow the + // response to be roundtripped right into a PUT. + newArgs := nomad.SchedulerWorkerPoolArgs{ + NumSchedulers: args.NumSchedulers, + EnabledSchedulers: args.EnabledSchedulers, + } + if newArgs.IsInvalid() { + return nil, CodedError(http.StatusBadRequest, "Invalid request") + } + reply := srv.SetSchedulerWorkerConfig(newArgs) + + response := &api.AgentSchedulerWorkerConfigResponse{ + ServerID: srv.LocalMember().Name, + NumSchedulers: reply.NumSchedulers, + EnabledSchedulers: reply.EnabledSchedulers, + } + + return response, nil +} diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index 795dffd385a..efad48880f7 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -11,6 +11,7 @@ import ( "net/http/httptest" "net/url" "os" + "reflect" "strings" "sync" "syscall" @@ -19,6 +20,7 @@ import ( msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/pool" "github.com/hashicorp/nomad/nomad/mock" @@ -263,7 +265,7 @@ func TestHTTP_AgentMonitor(t *testing.T) { t.Run("invalid log_json parameter", func(t *testing.T) { httpTest(t, nil, func(s *TestAgent) { req, err := http.NewRequest("GET", "/v1/agent/monitor?log_json=no", nil) - require.Nil(t, err) + require.NoError(t, err) resp := newClosableRecorder() // Make the request @@ -276,7 +278,7 @@ func TestHTTP_AgentMonitor(t *testing.T) { t.Run("unknown log_level", func(t *testing.T) { httpTest(t, nil, func(s *TestAgent) { req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=unknown", nil) - require.Nil(t, err) + require.NoError(t, err) resp := newClosableRecorder() // Make the request @@ -289,7 +291,7 @@ func TestHTTP_AgentMonitor(t *testing.T) { t.Run("check for specific log level", func(t *testing.T) { httpTest(t, nil, func(s *TestAgent) { req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=warn", nil) - require.Nil(t, err) + require.NoError(t, err) resp := newClosableRecorder() defer resp.Close() @@ -323,7 +325,7 @@ func TestHTTP_AgentMonitor(t *testing.T) { t.Run("plain output", func(t *testing.T) { httpTest(t, nil, func(s *TestAgent) { req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=debug&plain=true", nil) - require.Nil(t, err) + require.NoError(t, err) resp := newClosableRecorder() defer resp.Close() @@ -357,7 +359,7 @@ func TestHTTP_AgentMonitor(t *testing.T) { t.Run("logs for a specific node", func(t *testing.T) { httpTest(t, nil, func(s *TestAgent) { req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=warn&node_id="+s.client.NodeID(), nil) - require.Nil(t, err) + require.NoError(t, err) resp := newClosableRecorder() defer resp.Close() @@ -397,7 +399,7 @@ func TestHTTP_AgentMonitor(t *testing.T) { t.Run("logs for a local client with no server running on agent", func(t *testing.T) { httpTest(t, nil, func(s *TestAgent) { req, err := http.NewRequest("GET", "/v1/agent/monitor?log_level=warn", nil) - require.Nil(t, err) + require.NoError(t, err) resp := newClosableRecorder() defer resp.Close() @@ -595,7 +597,7 @@ func TestAgent_PprofRequest(t *testing.T) { } req, err := http.NewRequest("GET", url, nil) - require.Nil(t, err) + require.NoError(t, err) respW := httptest.NewRecorder() resp, err := s.Server.AgentPprofRequest(respW, req) @@ -913,7 +915,7 @@ func TestHTTP_AgentListKeys(t *testing.T) { respW := httptest.NewRecorder() out, err := s.Server.KeyringOperationRequest(respW, req) - require.Nil(t, err) + require.NoError(t, err) kresp := out.(structs.KeyringResponse) require.Len(t, kresp.Keys, 1) }) @@ -1463,3 +1465,586 @@ func TestHTTP_XSS_Monitor(t *testing.T) { }) } } + +// ---------------------------- +// SchedulerWorkerInfoAPI tests +// ---------------------------- +type schedulerWorkerAPITest_testCase struct { + name string // test case name + request schedulerWorkerAPITest_testRequest + whenACLNotEnabled schedulerWorkerAPITest_testExpect + whenACLEnabled schedulerWorkerAPITest_testExpect +} + +type schedulerWorkerAPITest_testRequest struct { + verb string + aclToken string + requestBody string +} + +type schedulerWorkerAPITest_testExpect struct { + statusCode int + response interface{} + err error + isError bool +} + +func (te schedulerWorkerAPITest_testExpect) Code() int { + return te.statusCode +} + +func schedulerWorkerInfoTest_testCases() []schedulerWorkerAPITest_testCase { + forbidden := schedulerWorkerAPITest_testExpect{ + statusCode: http.StatusForbidden, + response: structs.ErrPermissionDenied.Error(), + isError: true, + } + invalidMethod := schedulerWorkerAPITest_testExpect{ + statusCode: http.StatusMethodNotAllowed, + response: ErrInvalidMethod, + isError: true, + } + success := schedulerWorkerAPITest_testExpect{ + statusCode: http.StatusOK, + response: &api.AgentSchedulerWorkersInfo{ + Schedulers: []api.AgentSchedulerWorkerInfo{ + { + ID: "9b3713e0-6f74-0e1b-3b3e-d94f0c22dbf9", + EnabledSchedulers: []string{"_core", "batch"}, + Started: "2021-12-10 22:13:12.595366 -0500 EST m=+0.039016232", + Status: "Pausing", + WorkloadStatus: "WaitingToDequeue", + }, + { + ID: "ebda23e2-7f68-0c82-f0b2-f91d4581094d", + EnabledSchedulers: []string{"_core", "batch"}, + Started: "2021-12-10 22:13:12.595478 -0500 EST m=+0.039127886", + Status: "Pausing", + WorkloadStatus: "WaitingToDequeue", + }, + { + ID: "b3869c9b-64ff-686c-a003-e7d059d3a573", + EnabledSchedulers: []string{"_core", "batch"}, + Started: "2021-12-10 22:13:12.595501 -0500 EST m=+0.039151276", + Status: "Pausing", + WorkloadStatus: "WaitingToDequeue", + }, + { + ID: "cc5907c0-552e-bf36-0ca1-f150af7273c2", + EnabledSchedulers: []string{"_core", "batch"}, + Started: "2021-12-10 22:13:12.595691 -0500 EST m=+0.039341541", + Status: "Starting", + WorkloadStatus: "WaitingToDequeue", + }, + }, + }, + } + return []schedulerWorkerAPITest_testCase{ + { + name: "bad verb", + request: schedulerWorkerAPITest_testRequest{ + verb: "FOO", + aclToken: "", + requestBody: "", + }, + whenACLNotEnabled: invalidMethod, + whenACLEnabled: invalidMethod, + }, + { + name: "get without token", + request: schedulerWorkerAPITest_testRequest{ + verb: "GET", + aclToken: "", + requestBody: "", + }, + whenACLNotEnabled: success, + whenACLEnabled: forbidden, + }, + { + name: "get with management token", + request: schedulerWorkerAPITest_testRequest{ + verb: "GET", + aclToken: "management", + requestBody: "", + }, + whenACLNotEnabled: success, + whenACLEnabled: success, + }, + { + name: "get with read token", + request: schedulerWorkerAPITest_testRequest{ + verb: "GET", + aclToken: "agent_read", + requestBody: "", + }, + whenACLNotEnabled: success, + whenACLEnabled: success, + }, + { + name: "get with invalid token", + request: schedulerWorkerAPITest_testRequest{ + verb: "GET", + aclToken: "node_write", + requestBody: "", + }, + whenACLNotEnabled: success, + whenACLEnabled: forbidden, + }, + } +} + +func TestHTTP_AgentSchedulerWorkerInfoRequest(t *testing.T) { + configFn := func(c *Config) { + var numSchedulers = 4 + c.Server.NumSchedulers = &numSchedulers + c.Server.EnabledSchedulers = []string{"_core", "batch"} + c.Client.Enabled = false + } + + for _, runACL := range []string{"no_acl", "acl"} { + t.Run(runACL, func(t *testing.T) { + tests := func(s *TestAgent) { + testingACLS := s.Config.ACL.Enabled + var tokens map[string]*structs.ACLToken + if s.Config.ACL.Enabled { + state := s.Agent.server.State() + tokens = make(map[string]*structs.ACLToken) + + tokens["management"] = s.RootToken + tokens["agent_read"] = mock.CreatePolicyAndToken(t, state, 1005, "agent_read", mock.AgentPolicy(acl.PolicyRead)) + tokens["agent_write"] = mock.CreatePolicyAndToken(t, state, 1007, "agent_write", mock.AgentPolicy(acl.PolicyWrite)) + tokens["node_write"] = mock.CreatePolicyAndToken(t, state, 1009, "node_write", mock.NodePolicy(acl.PolicyWrite)) + } + + for _, tc := range schedulerWorkerInfoTest_testCases() { + t.Run(tc.name, func(t *testing.T) { + req, err := http.NewRequest(tc.request.verb, "/v1/agent/schedulers", bytes.NewReader([]byte(tc.request.requestBody))) + if testingACLS && tc.request.aclToken != "" { + setToken(req, tokens[tc.request.aclToken]) + } + require.NoError(t, err) + respW := httptest.NewRecorder() + workerInfoResp, err := s.Server.AgentSchedulerWorkerInfoRequest(respW, req) + + expected := tc.whenACLNotEnabled + if testingACLS { + expected = tc.whenACLEnabled + } + + if expected.isError { + require.Error(t, err) + codedErr, ok := err.(HTTPCodedError) + require.True(t, ok, "expected a HTTPCodedError") + require.Equal(t, expected.Code(), codedErr.Code()) + require.Equal(t, expected.response, codedErr.Error()) + return + } + + require.NoError(t, err) + workerInfo, ok := workerInfoResp.(*api.AgentSchedulerWorkersInfo) + require.True(t, ok, "expected an *AgentSchedulersWorkersInfo. received:%s", reflect.TypeOf(workerInfoResp)) + + expectWorkerInfo, ok := expected.response.(*api.AgentSchedulerWorkersInfo) + require.True(t, ok, "error casting test case to *AgentSchedulersWorkersInfo. received:%s", reflect.TypeOf(workerInfoResp)) + + var schedCount int = *s.Config.Server.NumSchedulers + require.Equal(t, schedCount, len(workerInfo.Schedulers), "must match num_schedulers") + require.Equal(t, len(expectWorkerInfo.Schedulers), len(workerInfo.Schedulers), "lengths must match") + + for i, info := range expectWorkerInfo.Schedulers { + require.ElementsMatch(t, info.EnabledSchedulers, workerInfo.Schedulers[i].EnabledSchedulers) + } + }) + } + } + + if runACL == "acl" { + httpACLTest(t, configFn, tests) + } else { + httpTest(t, configFn, tests) + } + }) + } +} + +// ---------------------------- +// SchedulerWorkerConfigAPI tests +// ---------------------------- +type scheduleWorkerConfigTest_workerRequestTest struct { + name string // test case name + request schedulerWorkerConfigTest_testRequest + whenACLNotEnabled schedulerWorkerConfigTest_testExpect + whenACLEnabled schedulerWorkerConfigTest_testExpect +} +type schedulerWorkerConfigTest_testRequest struct { + verb string + aclToken string + requestBody string +} +type schedulerWorkerConfigTest_testExpect struct { + expectedResponseCode int + expectedResponse interface{} +} + +// These test cases are run for both the ACL and Non-ACL enabled servers. When +// ACLS are not enabled, the request.aclTokens are ignored. +func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequestTest { + forbidden := schedulerWorkerConfigTest_testExpect{ + expectedResponseCode: http.StatusForbidden, + expectedResponse: structs.ErrPermissionDenied.Error(), + } + invalidMethod := schedulerWorkerConfigTest_testExpect{ + expectedResponseCode: http.StatusMethodNotAllowed, + expectedResponse: ErrInvalidMethod, + } + invalidRequest := schedulerWorkerConfigTest_testExpect{ + expectedResponseCode: http.StatusBadRequest, + expectedResponse: "Invalid request", + } + success1 := schedulerWorkerConfigTest_testExpect{ + expectedResponseCode: http.StatusOK, + expectedResponse: &api.AgentSchedulerWorkerConfigResponse{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: 8}, + } + + success2 := schedulerWorkerConfigTest_testExpect{ + expectedResponseCode: http.StatusOK, + expectedResponse: &api.AgentSchedulerWorkerConfigResponse{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: 9}, + } + + return []scheduleWorkerConfigTest_workerRequestTest{ + { + name: "bad verb", + request: schedulerWorkerConfigTest_testRequest{ + verb: "FOO", + aclToken: "", + requestBody: "", + }, + whenACLNotEnabled: invalidMethod, + whenACLEnabled: invalidMethod, + }, + { + name: "get without token", + request: schedulerWorkerConfigTest_testRequest{ + verb: "GET", + aclToken: "", + requestBody: "", + }, + whenACLNotEnabled: success1, + whenACLEnabled: forbidden, + }, + { + name: "get with management token", + request: schedulerWorkerConfigTest_testRequest{ + verb: "GET", + aclToken: "management", + requestBody: "", + }, + whenACLNotEnabled: success1, + whenACLEnabled: success1, + }, + { + name: "get with read token", + request: schedulerWorkerConfigTest_testRequest{ + verb: "GET", + aclToken: "agent_read", + requestBody: "", + }, + whenACLNotEnabled: success1, + whenACLEnabled: success1, + }, + { + name: "get with write token", + request: schedulerWorkerConfigTest_testRequest{ + verb: "GET", + aclToken: "agent_write", + requestBody: "", + }, + whenACLNotEnabled: success1, + whenACLEnabled: success1, + }, + { + name: "post with no token", + request: schedulerWorkerConfigTest_testRequest{ + verb: "POST", + aclToken: "", + requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`, + }, + whenACLNotEnabled: success2, + whenACLEnabled: forbidden, + }, + { + name: "put with no token", + request: schedulerWorkerConfigTest_testRequest{ + verb: "PUT", + aclToken: "", + requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`, + }, + whenACLNotEnabled: success1, + whenACLEnabled: forbidden, + }, + { + name: "post with invalid token", + request: schedulerWorkerConfigTest_testRequest{ + verb: "POST", + aclToken: "node_write", + requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`, + }, + whenACLNotEnabled: success2, + whenACLEnabled: forbidden, + }, + { + name: "put with invalid token", + request: schedulerWorkerConfigTest_testRequest{ + verb: "PUT", + aclToken: "node_write", + requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`, + }, + whenACLNotEnabled: success1, + whenACLEnabled: forbidden, + }, + { + name: "post with valid token", + request: schedulerWorkerConfigTest_testRequest{ + verb: "POST", + aclToken: "agent_write", + requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`, + }, + whenACLNotEnabled: success2, + whenACLEnabled: success2, + }, + { + name: "put with valid token", + request: schedulerWorkerConfigTest_testRequest{ + verb: "PUT", + aclToken: "agent_write", + requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`, + }, + whenACLNotEnabled: success1, + whenACLEnabled: success1, + }, + { + name: "post with good token and bad value", + request: schedulerWorkerConfigTest_testRequest{ + verb: "POST", + aclToken: "agent_write", + requestBody: `{"num_schedulers":-1,"enabled_schedulers":["_core", "batch"]}`, + }, + whenACLNotEnabled: invalidRequest, + whenACLEnabled: invalidRequest, + }, + { + name: "post with bad token and bad value", + request: schedulerWorkerConfigTest_testRequest{ + verb: "POST", + aclToken: "node_write", + requestBody: `{"num_schedulers":-1,"enabled_schedulers":["_core", "batch"]}`, + }, + whenACLNotEnabled: invalidRequest, + whenACLEnabled: forbidden, + }, + { + name: "put with good token and bad value", + request: schedulerWorkerConfigTest_testRequest{ + verb: "PUT", + aclToken: "agent_write", + requestBody: `{"num_schedulers":-1,"enabled_schedulers":["_core", "batch"]}`, + }, + whenACLNotEnabled: invalidRequest, + whenACLEnabled: invalidRequest, + }, + { + name: "put with bad token and bad value", + request: schedulerWorkerConfigTest_testRequest{ + verb: "PUT", + aclToken: "node_write", + requestBody: `{"num_schedulers":-1,"enabled_schedulers":["_core", "batch"]}`, + }, + whenACLNotEnabled: invalidRequest, + whenACLEnabled: forbidden, + }, + { + name: "post with bad json", + request: schedulerWorkerConfigTest_testRequest{ + verb: "POST", + aclToken: "agent_write", + requestBody: `{num_schedulers:-1,"enabled_schedulers":["_core", "batch"]}`, + }, + whenACLNotEnabled: invalidRequest, + whenACLEnabled: invalidRequest, + }, + { + name: "put with bad json", + request: schedulerWorkerConfigTest_testRequest{ + verb: "PUT", + aclToken: "agent_write", + requestBody: `{num_schedulers:-1,"enabled_schedulers":["_core", "batch"]}`, + }, + whenACLNotEnabled: invalidRequest, + whenACLEnabled: invalidRequest, + }, + } +} + +func TestHTTP_AgentSchedulerWorkerConfigRequest_NoACL(t *testing.T) { + configFn := func(c *Config) { + var numSchedulers = 8 + c.Server.NumSchedulers = &numSchedulers + c.Server.EnabledSchedulers = []string{"_core", "batch"} + c.Client.Enabled = false + } + testFn := func(s *TestAgent) { + for _, tc := range schedulerWorkerConfigTest_testCases() { + t.Run(tc.name, func(t *testing.T) { + + req, err := http.NewRequest(tc.request.verb, "/v1/agent/schedulers/config", bytes.NewReader([]byte(tc.request.requestBody))) + require.NoError(t, err) + respW := httptest.NewRecorder() + workersI, err := s.Server.AgentSchedulerWorkerConfigRequest(respW, req) + + switch tc.whenACLNotEnabled.expectedResponseCode { + case http.StatusBadRequest, http.StatusForbidden, http.StatusMethodNotAllowed: + schedulerWorkerTest_parseError(t, false, tc, workersI, err) + case http.StatusOK: + schedulerWorkerTest_parseSuccess(t, false, tc, workersI, err) + default: + require.Failf(t, "unexpected status code", "code: %v", tc.whenACLNotEnabled.expectedResponseCode) + } + }) + } + } + + httpTest(t, configFn, testFn) +} + +func TestHTTP_AgentSchedulerWorkerConfigRequest_ACL(t *testing.T) { + configFn := func(c *Config) { + var numSchedulers = 8 + c.Server.NumSchedulers = &numSchedulers + c.Server.EnabledSchedulers = []string{"_core", "batch"} + c.Client.Enabled = false + } + + tests := func(s *TestAgent) { + state := s.Agent.server.State() + var tokens map[string]*structs.ACLToken = make(map[string]*structs.ACLToken) + + tokens["management"] = s.RootToken + tokens["agent_read"] = mock.CreatePolicyAndToken(t, state, 1005, "agent_read", mock.AgentPolicy(acl.PolicyRead)) + tokens["agent_write"] = mock.CreatePolicyAndToken(t, state, 1007, "agent_write", mock.AgentPolicy(acl.PolicyWrite)) + tokens["node_write"] = mock.CreatePolicyAndToken(t, state, 1009, "node_write", mock.NodePolicy(acl.PolicyWrite)) + + for _, tc := range schedulerWorkerConfigTest_testCases() { + t.Run(tc.name, func(t *testing.T) { + + req, err := http.NewRequest(tc.request.verb, "/v1/agent/schedulers", bytes.NewReader([]byte(tc.request.requestBody))) + if tc.request.aclToken != "" { + setToken(req, tokens[tc.request.aclToken]) + } + require.NoError(t, err) + respW := httptest.NewRecorder() + workersI, err := s.Server.AgentSchedulerWorkerConfigRequest(respW, req) + + switch tc.whenACLEnabled.expectedResponseCode { + case http.StatusOK: + schedulerWorkerTest_parseSuccess(t, true, tc, workersI, err) + case http.StatusBadRequest, http.StatusForbidden, http.StatusMethodNotAllowed: + schedulerWorkerTest_parseError(t, true, tc, workersI, err) + default: + require.Failf(t, "unexpected status code", "code: %v", tc.whenACLEnabled.expectedResponseCode) + } + }) + } + } + + httpACLTest(t, configFn, tests) +} + +func schedulerWorkerTest_parseSuccess(t *testing.T, isACLEnabled bool, tc scheduleWorkerConfigTest_workerRequestTest, workersI interface{}, err error) { + require.NoError(t, err) + require.NotNil(t, workersI) + + testExpect := tc.whenACLNotEnabled + if isACLEnabled { + testExpect = tc.whenACLNotEnabled + } + + // test into the response when we expect an okay + tcConfig, ok := testExpect.expectedResponse.(*api.AgentSchedulerWorkerConfigResponse) + require.True(t, ok, "expected response malformed - this is an issue with a test case.") + + workersConfig, ok := workersI.(*api.AgentSchedulerWorkerConfigResponse) + require.True(t, ok, "response can not cast to an agentSchedulerWorkerConfig") + require.NotNil(t, workersConfig) + + require.Equal(t, tcConfig.NumSchedulers, workersConfig.NumSchedulers) + require.ElementsMatch(t, tcConfig.EnabledSchedulers, workersConfig.EnabledSchedulers) +} + +// schedulerWorkerTest_parseError parses the error response given +// from the API call to make sure that it's a coded error and is the +// expected value from the test case +func schedulerWorkerTest_parseError(t *testing.T, isACLEnabled bool, tc scheduleWorkerConfigTest_workerRequestTest, workersI interface{}, err error) { + require.Error(t, err) + require.Nil(t, workersI) + + codedError, ok := err.(HTTPCodedError) + require.True(t, ok, "expected an HTTPCodedError") + testExpect := tc.whenACLNotEnabled + + if isACLEnabled { + testExpect = tc.whenACLEnabled + } + + require.Equal(t, testExpect.expectedResponseCode, codedError.Code()) + // this is a relaxed test to allow us to not have to create a case + // for concatenated error strings. + require.Contains(t, codedError.Error(), testExpect.expectedResponse) +} + +func TestHTTP_AgentSchedulerWorkerInfoRequest_Client(t *testing.T) { + verbs := []string{"GET", "POST", "PUT"} + path := "schedulers" + + for _, verb := range verbs { + t.Run(verb, func(t *testing.T) { + httpTest(t, nil, func(s *TestAgent) { + s.Agent.server = nil + req, err := http.NewRequest(verb, fmt.Sprintf("/v1/agent/%v", path), nil) + require.NoError(t, err) + respW := httptest.NewRecorder() + + _, err = s.Server.AgentSchedulerWorkerInfoRequest(respW, req) + + require.Error(t, err) + codedErr, ok := err.(HTTPCodedError) + require.True(t, ok, "expected a HTTPCodedError") + require.Equal(t, http.StatusBadRequest, codedErr.Code()) + require.Equal(t, ErrServerOnly, codedErr.Error()) + }) + }) + } +} + +func TestHTTP_AgentSchedulerWorkerConfigRequest_Client(t *testing.T) { + verbs := []string{"GET", "POST", "PUT"} + path := "schedulers/config" + + for _, verb := range verbs { + t.Run(verb, func(t *testing.T) { + httpTest(t, nil, func(s *TestAgent) { + s.Agent.server = nil + req, err := http.NewRequest(verb, fmt.Sprintf("/v1/agent/%v", path), nil) + require.NoError(t, err) + respW := httptest.NewRecorder() + + _, err = s.Server.AgentSchedulerWorkerInfoRequest(respW, req) + + require.Error(t, err) + codedErr, ok := err.(HTTPCodedError) + require.True(t, ok, "expected a HTTPCodedError") + require.Equal(t, http.StatusBadRequest, codedErr.Code()) + require.Equal(t, ErrServerOnly, codedErr.Error()) + }) + }) + } +} diff --git a/command/agent/http.go b/command/agent/http.go index 142f3a6ef90..a3c082661e9 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -36,6 +36,10 @@ const ( // endpoint ErrEntOnly = "Nomad Enterprise only endpoint" + // ErrServerOnly is the error text returned if accessing a server only + // endpoint + ErrServerOnly = "Server only endpoint" + // ContextKeyReqID is a unique ID for a given request ContextKeyReqID = "requestID" @@ -311,6 +315,8 @@ func (s HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/agent/members", s.wrap(s.AgentMembersRequest)) s.mux.HandleFunc("/v1/agent/force-leave", s.wrap(s.AgentForceLeaveRequest)) s.mux.HandleFunc("/v1/agent/servers", s.wrap(s.AgentServersRequest)) + s.mux.HandleFunc("/v1/agent/schedulers", s.wrap(s.AgentSchedulerWorkerInfoRequest)) + s.mux.HandleFunc("/v1/agent/schedulers/config", s.wrap(s.AgentSchedulerWorkerConfigRequest)) s.mux.HandleFunc("/v1/agent/keyring/", s.wrap(s.KeyringOperationRequest)) s.mux.HandleFunc("/v1/agent/health", s.wrap(s.HealthRequest)) s.mux.HandleFunc("/v1/agent/host", s.wrap(s.AgentHostRequest)) diff --git a/nomad/leader.go b/nomad/leader.go index 64f06b84867..c6dca9dc677 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -230,9 +230,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Disable workers to free half the cores for use in the plan queue and // evaluation broker - for _, w := range s.pausableWorkers() { - w.SetPause(true) - } + s.handlePausableWorkers(true) // Initialize and start the autopilot routine s.getOrCreateAutopilotConfig() @@ -442,6 +440,16 @@ ERR_WAIT: } } +func (s *Server) handlePausableWorkers(isLeader bool) { + for _, w := range s.pausableWorkers() { + if isLeader { + w.Pause() + } else { + w.Resume() + } + } +} + // diffNamespaces is used to perform a two-way diff between the local namespaces // and the remote namespaces to determine which namespaces need to be deleted or // updated. @@ -1081,9 +1089,7 @@ func (s *Server) revokeLeadership() error { } // Unpause our worker if we paused previously - for _, w := range s.pausableWorkers() { - w.SetPause(false) - } + s.handlePausableWorkers(false) return nil } diff --git a/nomad/leader_test.go b/nomad/leader_test.go index ea6c4670e15..07f854d09e3 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -1328,25 +1328,31 @@ func TestLeader_PausingWorkers(t *testing.T) { testutil.WaitForLeader(t, s1.RPC) require.Len(t, s1.workers, 12) - pausedWorkers := func() int { - c := 0 - for _, w := range s1.workers { - w.pauseLock.Lock() - if w.paused { - c++ + // this satisfies the require.Eventually test interface + checkPaused := func(count int) func() bool { + return func() bool { + pausedWorkers := func() int { + c := 0 + for _, w := range s1.workers { + if w.IsPaused() { + c++ + } + } + return c } - w.pauseLock.Unlock() + + return pausedWorkers() == count } - return c } - // pause 3/4 of the workers - require.Equal(t, 9, pausedWorkers()) + // acquiring leadership should have paused 3/4 of the workers + require.Eventually(t, checkPaused(9), 1*time.Second, 10*time.Millisecond, "scheduler workers did not pause within a second at leadership change") err := s1.revokeLeadership() require.NoError(t, err) - require.Zero(t, pausedWorkers()) + // unpausing is a relatively quick activity + require.Eventually(t, checkPaused(0), 50*time.Millisecond, 10*time.Millisecond, "scheduler workers should have unpaused after losing leadership") } // Test doing an inplace upgrade on a server from raft protocol 2 to 3 diff --git a/nomad/server.go b/nomad/server.go index 557ed175962..533b2ca01f9 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -226,7 +226,9 @@ type Server struct { vault VaultClient // Worker used for processing - workers []*Worker + workers []*Worker + workerLock sync.RWMutex + workerConfigLock sync.RWMutex // aclCache is used to maintain the parsed ACL objects aclCache *lru.TwoQueueCache @@ -399,7 +401,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr } // Initialize the scheduling workers - if err := s.setupWorkers(); err != nil { + if err := s.setupWorkers(s.shutdownCtx); err != nil { s.Shutdown() s.logger.Error("failed to start workers", "error", err) return nil, fmt.Errorf("Failed to start workers: %v", err) @@ -558,7 +560,7 @@ func (s *Server) reloadTLSConnections(newTLSConfig *config.TLSConfig) error { // Check if we can reload the RPC listener if s.rpcListener == nil || s.rpcCancel == nil { - s.logger.Warn("unable to reload configuration due to uninitialized rpc listner") + s.logger.Warn("unable to reload configuration due to uninitialized rpc listener") return fmt.Errorf("can't reload uninitialized RPC listener") } @@ -809,6 +811,15 @@ func (s *Server) Reload(newConfig *Config) error { s.EnterpriseState.ReloadLicense(newConfig) } + // Because this is a new configuration, we extract the worker pool arguments without acquiring a lock + workerPoolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(newConfig) + if reload, newVals := shouldReloadSchedulers(s, workerPoolArgs); reload { + if newVals.IsValid() { + reloadSchedulers(s, newVals) + } + reloadSchedulers(s, newVals) + } + return mErr.ErrorOrNil() } @@ -1430,17 +1441,165 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) ( return serf.Create(conf) } +// shouldReloadSchedulers checks the new config to determine if the scheduler worker pool +// needs to be updated. If so, returns true and a pointer to a populated SchedulerWorkerPoolArgs +func shouldReloadSchedulers(s *Server, newPoolArgs *SchedulerWorkerPoolArgs) (bool, *SchedulerWorkerPoolArgs) { + s.workerConfigLock.RLock() + defer s.workerConfigLock.RUnlock() + + newSchedulers := make([]string, len(newPoolArgs.EnabledSchedulers)) + copy(newSchedulers, newPoolArgs.EnabledSchedulers) + sort.Strings(newSchedulers) + + if s.config.NumSchedulers != newPoolArgs.NumSchedulers { + return true, newPoolArgs + } + + oldSchedulers := make([]string, len(s.config.EnabledSchedulers)) + copy(oldSchedulers, s.config.EnabledSchedulers) + sort.Strings(oldSchedulers) + + for i, v := range newSchedulers { + if oldSchedulers[i] != v { + return true, newPoolArgs + } + } + + return false, nil +} + +// SchedulerWorkerPoolArgs are the two key configuration options for a Nomad server's +// scheduler worker pool. Before using, you should always verify that they are rational +// using IsValid() or IsInvalid() +type SchedulerWorkerPoolArgs struct { + NumSchedulers int + EnabledSchedulers []string +} + +// IsInvalid returns true when the SchedulerWorkerPoolArgs.IsValid is false +func (swpa SchedulerWorkerPoolArgs) IsInvalid() bool { + return !swpa.IsValid() +} + +// IsValid verifies that the pool arguments are valid. That is, they have a non-negative +// numSchedulers value and the enabledSchedulers list has _core and only refers to known +// schedulers. +func (swpa SchedulerWorkerPoolArgs) IsValid() bool { + if swpa.NumSchedulers < 0 { + // the pool has to be non-negative + return false + } + + // validate the scheduler list against the builtin types and _core + foundCore := false + for _, sched := range swpa.EnabledSchedulers { + if sched == structs.JobTypeCore { + foundCore = true + continue // core is not in the BuiltinSchedulers map, so we need to skip that check + } + + if _, ok := scheduler.BuiltinSchedulers[sched]; !ok { + return false // found an unknown scheduler in the list; bailing out + } + } + + return foundCore +} + +// Copy returns a clone of a SchedulerWorkerPoolArgs struct. Concurrent access +// concerns should be managed by the caller. +func (swpa SchedulerWorkerPoolArgs) Copy() SchedulerWorkerPoolArgs { + out := SchedulerWorkerPoolArgs{ + NumSchedulers: swpa.NumSchedulers, + EnabledSchedulers: make([]string, len(swpa.EnabledSchedulers)), + } + copy(out.EnabledSchedulers, swpa.EnabledSchedulers) + + return out +} + +func getSchedulerWorkerPoolArgsFromConfigLocked(c *Config) *SchedulerWorkerPoolArgs { + return &SchedulerWorkerPoolArgs{ + NumSchedulers: c.NumSchedulers, + EnabledSchedulers: c.EnabledSchedulers, + } +} + +// GetSchedulerWorkerInfo returns a slice of WorkerInfos from all of +// the running scheduler workers. +func (s *Server) GetSchedulerWorkersInfo() []WorkerInfo { + s.workerLock.RLock() + defer s.workerLock.RUnlock() + out := make([]WorkerInfo, len(s.workers)) + for i := 0; i < len(s.workers); i = i + 1 { + workerInfo := s.workers[i].Info() + out[i] = workerInfo.Copy() + } + return out +} + +// GetSchedulerWorkerConfig returns a clean copy of the server's current scheduler +// worker config. +func (s *Server) GetSchedulerWorkerConfig() SchedulerWorkerPoolArgs { + s.workerConfigLock.RLock() + defer s.workerConfigLock.RUnlock() + return getSchedulerWorkerPoolArgsFromConfigLocked(s.config).Copy() +} + +func (s *Server) SetSchedulerWorkerConfig(newArgs SchedulerWorkerPoolArgs) SchedulerWorkerPoolArgs { + if reload, newVals := shouldReloadSchedulers(s, &newArgs); reload { + if newVals.IsValid() { + reloadSchedulers(s, newVals) + } + } + return s.GetSchedulerWorkerConfig() +} + +// reloadSchedulers validates the passed scheduler worker pool arguments, locks the +// workerLock, applies the new values to the s.config, and restarts the pool +func reloadSchedulers(s *Server, newArgs *SchedulerWorkerPoolArgs) { + if newArgs == nil || newArgs.IsInvalid() { + s.logger.Info("received invalid arguments for scheduler pool reload; ignoring") + return + } + + // reload will modify the server.config so it needs a write lock + s.workerConfigLock.Lock() + defer s.workerConfigLock.Unlock() + + // reload modifies the worker slice so it needs a write lock + s.workerLock.Lock() + defer s.workerLock.Unlock() + + // TODO: If EnabledSchedulers didn't change, we can scale rather than drain and rebuild + s.config.NumSchedulers = newArgs.NumSchedulers + s.config.EnabledSchedulers = newArgs.EnabledSchedulers + s.setupNewWorkersLocked() +} + // setupWorkers is used to start the scheduling workers -func (s *Server) setupWorkers() error { +func (s *Server) setupWorkers(ctx context.Context) error { + poolArgs := s.GetSchedulerWorkerConfig() + + // we will be writing to the worker slice + s.workerLock.Lock() + defer s.workerLock.Unlock() + + return s.setupWorkersLocked(ctx, poolArgs) +} + +// setupWorkersLocked directly manipulates the server.config, so it is not safe to +// call concurrently. Use setupWorkers() or call this with server.workerLock set. +func (s *Server) setupWorkersLocked(ctx context.Context, poolArgs SchedulerWorkerPoolArgs) error { // Check if all the schedulers are disabled - if len(s.config.EnabledSchedulers) == 0 || s.config.NumSchedulers == 0 { + if len(poolArgs.EnabledSchedulers) == 0 || poolArgs.NumSchedulers == 0 { s.logger.Warn("no enabled schedulers") return nil } // Check if the core scheduler is not enabled foundCore := false - for _, sched := range s.config.EnabledSchedulers { + for _, sched := range poolArgs.EnabledSchedulers { if sched == structs.JobTypeCore { foundCore = true continue @@ -1454,18 +1613,58 @@ func (s *Server) setupWorkers() error { return fmt.Errorf("invalid configuration: %q scheduler not enabled", structs.JobTypeCore) } + s.logger.Info("starting scheduling worker(s)", "num_workers", poolArgs.NumSchedulers, "schedulers", poolArgs.EnabledSchedulers) // Start the workers + for i := 0; i < s.config.NumSchedulers; i++ { - if w, err := NewWorker(s); err != nil { + if w, err := NewWorker(ctx, s, poolArgs); err != nil { return err } else { + s.logger.Debug("started scheduling worker", "id", w.ID(), "index", i+1, "of", s.config.NumSchedulers) + s.workers = append(s.workers, w) } } - s.logger.Info("starting scheduling worker(s)", "num_workers", s.config.NumSchedulers, "schedulers", s.config.EnabledSchedulers) + s.logger.Info("started scheduling worker(s)", "num_workers", s.config.NumSchedulers, "schedulers", s.config.EnabledSchedulers) return nil } +// setupNewWorkersLocked directly manipulates the server.config, so it is not safe to +// call concurrently. Use reloadWorkers() or call this with server.workerLock set. +func (s *Server) setupNewWorkersLocked() error { + // make a copy of the s.workers array so we can safely stop those goroutines asynchronously + oldWorkers := make([]*Worker, len(s.workers)) + defer s.stopOldWorkers(oldWorkers) + for i, w := range s.workers { + oldWorkers[i] = w + } + s.logger.Info(fmt.Sprintf("marking %v current schedulers for shutdown", len(oldWorkers))) + + // build a clean backing array and call setupWorkersLocked like setupWorkers + // does in the normal startup path + s.workers = make([]*Worker, 0, s.config.NumSchedulers) + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s.config).Copy() + err := s.setupWorkersLocked(s.shutdownCtx, poolArgs) + if err != nil { + return err + } + + // if we're the leader, we need to pause all of the pausable workers. + s.handlePausableWorkers(s.IsLeader()) + + return nil +} + +// stopOldWorkers is called once setupNewWorkers has created the new worker +// array to asynchronously stop each of the old workers individually. +func (s *Server) stopOldWorkers(oldWorkers []*Worker) { + workerCount := len(oldWorkers) + for i, w := range oldWorkers { + s.logger.Debug("stopping old scheduling worker", "id", w.ID(), "index", i+1, "of", workerCount) + go w.Stop() + } +} + // numPeers is used to check on the number of known peers, including the local // node. func (s *Server) numPeers() (int, error) { diff --git a/nomad/server_test.go b/nomad/server_test.go index 64e34de576b..5872f3ceb10 100644 --- a/nomad/server_test.go +++ b/nomad/server_test.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "fmt" "io/ioutil" "os" @@ -540,13 +541,13 @@ func TestServer_InvalidSchedulers(t *testing.T) { } config.EnabledSchedulers = []string{"batch"} - err := s.setupWorkers() + err := s.setupWorkers(s.shutdownCtx) require.NotNil(err) require.Contains(err.Error(), "scheduler not enabled") // Set the config to have an unknown scheduler config.EnabledSchedulers = []string{"batch", structs.JobTypeCore, "foo"} - err = s.setupWorkers() + err = s.setupWorkers(s.shutdownCtx) require.NotNil(err) require.Contains(err.Error(), "foo") } @@ -577,3 +578,69 @@ func TestServer_RPCNameAndRegionValidation(t *testing.T) { tc.name, tc.region, tc.expected) } } + +func TestServer_ReloadSchedulers_NumSchedulers(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 8 + }) + defer cleanupS1() + + require.Equal(t, s1.config.NumSchedulers, len(s1.workers)) + + config := DefaultConfig() + config.NumSchedulers = 4 + require.NoError(t, s1.Reload(config)) + + time.Sleep(1 * time.Second) + require.Equal(t, config.NumSchedulers, len(s1.workers)) +} + +func TestServer_ReloadSchedulers_EnabledSchedulers(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.EnabledSchedulers = []string{structs.JobTypeCore, structs.JobTypeSystem} + }) + defer cleanupS1() + + require.Equal(t, s1.config.NumSchedulers, len(s1.workers)) + + config := DefaultConfig() + config.EnabledSchedulers = []string{structs.JobTypeCore, structs.JobTypeSystem, structs.JobTypeBatch} + require.NoError(t, s1.Reload(config)) + + time.Sleep(1 * time.Second) + require.Equal(t, config.NumSchedulers, len(s1.workers)) + require.ElementsMatch(t, config.EnabledSchedulers, s1.GetSchedulerWorkerConfig().EnabledSchedulers) + +} + +func TestServer_ReloadSchedulers_InvalidSchedulers(t *testing.T) { + t.Parallel() + + // Set the config to not have the core scheduler + config := DefaultConfig() + logger := testlog.HCLogger(t) + s := &Server{ + config: config, + logger: logger, + } + s.config.NumSchedulers = 0 + s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background()) + s.shutdownCh = s.shutdownCtx.Done() + + config.EnabledSchedulers = []string{"_core", "batch"} + err := s.setupWorkers(s.shutdownCtx) + require.Nil(t, err) + origWC := s.GetSchedulerWorkerConfig() + reloadSchedulers(s, &SchedulerWorkerPoolArgs{NumSchedulers: config.NumSchedulers, EnabledSchedulers: []string{"batch"}}) + currentWC := s.GetSchedulerWorkerConfig() + require.Equal(t, origWC, currentWC) + + // Set the config to have an unknown scheduler + reloadSchedulers(s, &SchedulerWorkerPoolArgs{NumSchedulers: config.NumSchedulers, EnabledSchedulers: []string{"_core", "foo"}}) + currentWC = s.GetSchedulerWorkerConfig() + require.Equal(t, origWC, currentWC) +} diff --git a/nomad/testing.go b/nomad/testing.go index 2822dc5fbfa..7c86f91e8fb 100644 --- a/nomad/testing.go +++ b/nomad/testing.go @@ -56,7 +56,7 @@ func TestServer(t testing.T, cb func(*Config)) (*Server, func()) { nodeNum := atomic.AddUint32(&nodeNumber, 1) config.NodeName = fmt.Sprintf("nomad-%03d", nodeNum) - // configer logger + // configure logger level := hclog.Trace if envLogLevel := os.Getenv("NOMAD_TEST_LOG_LEVEL"); envLogLevel != "" { level = hclog.LevelFromString(envLogLevel) diff --git a/nomad/worker.go b/nomad/worker.go index d5b9699fd0f..10e8cae0614 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -2,6 +2,7 @@ package nomad import ( "context" + "encoding/json" "fmt" "strings" "sync" @@ -10,6 +11,7 @@ import ( metrics "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" @@ -46,6 +48,35 @@ const ( dequeueErrGrace = 10 * time.Second ) +type WorkerStatus int + +//go:generate stringer -trimprefix=Worker -output worker_string_workerstatus.go -linecomment -type=WorkerStatus +const ( + WorkerUnknownStatus WorkerStatus = iota // Unknown + WorkerStarting + WorkerStarted + WorkerPausing + WorkerPaused + WorkerResuming + WorkerStopping + WorkerStopped +) + +type SchedulerWorkerStatus int + +//go:generate stringer -trimprefix=Workload -output worker_string_schedulerworkerstatus.go -linecomment -type=SchedulerWorkerStatus +const ( + WorkloadUnknownStatus SchedulerWorkerStatus = iota + WorkloadRunning + WorkloadWaitingToDequeue + WorkloadWaitingForRaft + WorkloadScheduling + WorkloadSubmitting + WorkloadBackoff + WorkloadStopped + WorkloadPaused +) + // Worker is a single threaded scheduling worker. There may be multiple // running per server (leader or follower). They are responsible for dequeuing // pending evaluations, invoking schedulers, plan submission and the @@ -55,13 +86,25 @@ type Worker struct { srv *Server logger log.Logger start time.Time + id string + + status WorkerStatus + workloadStatus SchedulerWorkerStatus + statusLock sync.RWMutex - paused bool + pauseFlag bool pauseLock sync.Mutex pauseCond *sync.Cond + ctx context.Context + cancelFn context.CancelFunc - failures uint + // the Server.Config.EnabledSchedulers value is not safe for concurrent access, so + // the worker needs a cached copy of it. Workers are stopped if this value changes. + enabledSchedulers []string + // failures is the count of errors encountered while dequeueing evaluations + // and is used to calculate backoff. + failures uint evalToken string // snapshotIndex is the index of the snapshot in which the scheduler was @@ -70,70 +113,321 @@ type Worker struct { snapshotIndex uint64 } -// NewWorker starts a new worker associated with the given server -func NewWorker(srv *Server) (*Worker, error) { +// NewWorker starts a new scheduler worker associated with the given server +func NewWorker(ctx context.Context, srv *Server, args SchedulerWorkerPoolArgs) (*Worker, error) { + w := newWorker(ctx, srv, args) + w.Start() + return w, nil +} + +// _newWorker creates a worker without calling its Start func. This is useful for testing. +func newWorker(ctx context.Context, srv *Server, args SchedulerWorkerPoolArgs) *Worker { w := &Worker{ - srv: srv, - logger: srv.logger.ResetNamed("worker"), - start: time.Now(), + id: uuid.Generate(), + srv: srv, + start: time.Now(), + status: WorkerStarting, + enabledSchedulers: make([]string, len(args.EnabledSchedulers)), } + copy(w.enabledSchedulers, args.EnabledSchedulers) + + w.logger = srv.logger.ResetNamed("worker").With("worker_id", w.id) w.pauseCond = sync.NewCond(&w.pauseLock) + w.ctx, w.cancelFn = context.WithCancel(ctx) + + return w +} + +// ID returns a string ID for the worker. +func (w *Worker) ID() string { + return w.id +} + +// Start transitions a worker to the starting state. Check +// to see if it paused using IsStarted() +func (w *Worker) Start() { + w.setStatus(WorkerStarting) go w.run() - return w, nil } -// SetPause is used to pause or unpause a worker -func (w *Worker) SetPause(p bool) { - w.pauseLock.Lock() - w.paused = p - w.pauseLock.Unlock() - if !p { +// Pause transitions a worker to the pausing state. Check +// to see if it paused using IsPaused() +func (w *Worker) Pause() { + if w.isPausable() { + w.setStatus(WorkerPausing) + w.setPauseFlag(true) + } +} + +// Resume transitions a worker to the resuming state. Check +// to see if the worker restarted by calling IsStarted() +func (w *Worker) Resume() { + if w.IsPaused() { + w.setStatus(WorkerResuming) + w.setPauseFlag(false) w.pauseCond.Broadcast() } } -// checkPaused is used to park the worker when paused -func (w *Worker) checkPaused() { +// Resume transitions a worker to the stopping state. Check +// to see if the worker stopped by calling IsStopped() +func (w *Worker) Stop() { + w.setStatus(WorkerStopping) + w.shutdown() +} + +// IsStarted returns a boolean indicating if this worker has been started. +func (w *Worker) IsStarted() bool { + return w.GetStatus() == WorkerStarted +} + +// IsPaused returns a boolean indicating if this worker has been paused. +func (w *Worker) IsPaused() bool { + return w.GetStatus() == WorkerPaused +} + +// IsStopped returns a boolean indicating if this worker has been stopped. +func (w *Worker) IsStopped() bool { + return w.GetStatus() == WorkerStopped +} + +func (w *Worker) isPausable() bool { + w.statusLock.RLock() + defer w.statusLock.RUnlock() + switch w.status { + case WorkerPausing, WorkerPaused, WorkerStopping, WorkerStopped: + return false + default: + return true + } +} + +// GetStatus returns the status of the Worker +func (w *Worker) GetStatus() WorkerStatus { + w.statusLock.RLock() + defer w.statusLock.RUnlock() + return w.status +} + +// setStatuses is used internally to the worker to update the +// status of the worker and workload at one time, since some +// transitions need to update both values using the same lock. +func (w *Worker) setStatuses(newWorkerStatus WorkerStatus, newWorkloadStatus SchedulerWorkerStatus) { + w.statusLock.Lock() + defer w.statusLock.Unlock() + w.setWorkerStatusLocked(newWorkerStatus) + w.setWorkloadStatusLocked(newWorkloadStatus) +} + +// setStatus is used internally to the worker to update the +// status of the worker based on calls to the Worker API. For +// atomically updating the scheduler status and the workload +// status, use `setStatuses`. +func (w *Worker) setStatus(newStatus WorkerStatus) { + w.statusLock.Lock() + defer w.statusLock.Unlock() + w.setWorkerStatusLocked(newStatus) +} + +func (w *Worker) setWorkerStatusLocked(newStatus WorkerStatus) { + if newStatus == w.status { + return + } + w.logger.Trace("changed worker status", "from", w.status, "to", newStatus) + w.status = newStatus +} + +// GetStatus returns the status of the Worker's Workload. +func (w *Worker) GetWorkloadStatus() SchedulerWorkerStatus { + w.statusLock.RLock() + defer w.statusLock.RUnlock() + return w.workloadStatus +} + +// setWorkloadStatus is used internally to the worker to update the +// status of the worker based updates from the workload. +func (w *Worker) setWorkloadStatus(newStatus SchedulerWorkerStatus) { + w.statusLock.Lock() + defer w.statusLock.Unlock() + w.setWorkloadStatusLocked(newStatus) +} + +func (w *Worker) setWorkloadStatusLocked(newStatus SchedulerWorkerStatus) { + if newStatus == w.workloadStatus { + return + } + w.logger.Trace("changed workload status", "from", w.workloadStatus, "to", newStatus) + w.workloadStatus = newStatus +} + +type WorkerInfo struct { + ID string `json:"id"` + EnabledSchedulers []string `json:"enabled_schedulers"` + Started time.Time `json:"started"` + Status string `json:"status"` + WorkloadStatus string `json:"workload_status"` +} + +func (w WorkerInfo) Copy() WorkerInfo { + out := WorkerInfo{ + ID: w.ID, + EnabledSchedulers: make([]string, len(w.EnabledSchedulers)), + Started: w.Started, + Status: w.Status, + WorkloadStatus: w.WorkloadStatus, + } + copy(out.EnabledSchedulers, w.EnabledSchedulers) + return out +} + +func (w WorkerInfo) String() string { + // lazy implementation of WorkerInfo to string + out, _ := json.Marshal(w) + return string(out) +} + +func (w *Worker) Info() WorkerInfo { w.pauseLock.Lock() - for w.paused { + defer w.pauseLock.Unlock() + out := WorkerInfo{ + ID: w.id, + Status: w.status.String(), + WorkloadStatus: w.workloadStatus.String(), + EnabledSchedulers: make([]string, len(w.enabledSchedulers)), + } + out.Started = w.start + copy(out.EnabledSchedulers, w.enabledSchedulers) + return out +} + +// ---------------------------------- +// Pause Implementation +// These functions are used to support the worker's pause behaviors. +// ---------------------------------- + +func (w *Worker) setPauseFlag(pause bool) { + w.pauseLock.Lock() + defer w.pauseLock.Unlock() + w.pauseFlag = pause +} + +// maybeWait is responsible for making the transition from `pausing` +// to `paused`, waiting, and then transitioning back to the running +// values. +func (w *Worker) maybeWait() { + w.pauseLock.Lock() + defer w.pauseLock.Unlock() + + if !w.pauseFlag { + return + } + + w.statusLock.Lock() + w.status = WorkerPaused + originalWorkloadStatus := w.workloadStatus + w.workloadStatus = WorkloadPaused + w.logger.Trace("changed workload status", "from", originalWorkloadStatus, "to", w.workloadStatus) + + w.statusLock.Unlock() + + for w.pauseFlag { w.pauseCond.Wait() } + + w.statusLock.Lock() + + w.logger.Trace("changed workload status", "from", w.workloadStatus, "to", originalWorkloadStatus) + w.workloadStatus = originalWorkloadStatus + + // only reset the worker status if the worker is not resuming to stop the paused workload. + if w.status != WorkerStopping { + w.logger.Trace("changed worker status", "from", w.status, "to", WorkerStarted) + w.status = WorkerStarted + } + w.statusLock.Unlock() +} + +// Shutdown is used to signal that the worker should shutdown. +func (w *Worker) shutdown() { + w.pauseLock.Lock() + wasPaused := w.pauseFlag + w.pauseFlag = false w.pauseLock.Unlock() + + w.logger.Trace("shutdown request received") + w.cancelFn() + if wasPaused { + w.pauseCond.Broadcast() + } +} + +// markStopped is used to mark the worker and workload as stopped. It should be called in a +// defer immediately upon entering the run() function. +func (w *Worker) markStopped() { + w.setStatuses(WorkerStopped, WorkloadStopped) + w.logger.Debug("stopped") } +func (w *Worker) workerShuttingDown() bool { + select { + case <-w.ctx.Done(): + return true + default: + return false + } +} + +// ---------------------------------- +// Workload behavior code +// ---------------------------------- + // run is the long-lived goroutine which is used to run the worker func (w *Worker) run() { + defer func() { + w.markStopped() + }() + w.setStatuses(WorkerStarted, WorkloadRunning) + w.logger.Debug("running") for { + // Check to see if the context has been cancelled. Server shutdown and Shutdown() + // should do this. + if w.workerShuttingDown() { + return + } // Dequeue a pending evaluation eval, token, waitIndex, shutdown := w.dequeueEvaluation(dequeueTimeout) if shutdown { return } - // Check for a shutdown + // since dequeue takes time, we could have shutdown the server after getting an eval that + // needs to be nacked before we exit. Explicitly checking the server to allow this eval + // to be processed on worker shutdown. if w.srv.IsShutdown() { w.logger.Error("nacking eval because the server is shutting down", "eval", log.Fmt("%#v", eval)) - w.sendNack(eval.ID, token) + w.sendNack(eval, token) return } // Wait for the raft log to catchup to the evaluation + w.setWorkloadStatus(WorkloadWaitingForRaft) snap, err := w.snapshotMinIndex(waitIndex, raftSyncLimit) if err != nil { w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex) - w.sendNack(eval.ID, token) + w.sendNack(eval, token) continue } // Invoke the scheduler to determine placements + w.setWorkloadStatus(WorkloadScheduling) if err := w.invokeScheduler(snap, eval, token); err != nil { w.logger.Error("error invoking scheduler", "error", err) - w.sendNack(eval.ID, token) + w.sendNack(eval, token) continue } // Complete the evaluation - w.sendAck(eval.ID, token) + w.sendAck(eval, token) } } @@ -143,7 +437,7 @@ func (w *Worker) dequeueEvaluation(timeout time.Duration) ( eval *structs.Evaluation, token string, waitIndex uint64, shutdown bool) { // Setup the request req := structs.EvalDequeueRequest{ - Schedulers: w.srv.config.EnabledSchedulers, + Schedulers: w.enabledSchedulers, Timeout: timeout, SchedulerVersion: scheduler.SchedulerVersion, WriteRequest: structs.WriteRequest{ @@ -153,15 +447,20 @@ func (w *Worker) dequeueEvaluation(timeout time.Duration) ( var resp structs.EvalDequeueResponse REQ: - // Check if we are paused - w.checkPaused() + // Wait inside this function if the worker is paused. + w.maybeWait() + // Immediately check to see if the worker has been shutdown. + if w.workerShuttingDown() { + return nil, "", 0, true + } // Make a blocking RPC start := time.Now() + w.setWorkloadStatus(WorkloadWaitingToDequeue) err := w.srv.RPC("Eval.Dequeue", &req, &resp) metrics.MeasureSince([]string{"nomad", "worker", "dequeue_eval"}, start) if err != nil { - if time.Since(w.start) > dequeueErrGrace && !w.srv.IsShutdown() { + if time.Since(w.start) > dequeueErrGrace && !w.workerShuttingDown() { w.logger.Error("failed to dequeue evaluation", "error", err) } @@ -182,25 +481,21 @@ REQ: // Check if we got a response if resp.Eval != nil { - w.logger.Debug("dequeued evaluation", "eval_id", resp.Eval.ID) + w.logger.Debug("dequeued evaluation", "eval_id", resp.Eval.ID, "type", resp.Eval.Type, "namespace", resp.Eval.Namespace, "job_id", resp.Eval.JobID, "node_id", resp.Eval.NodeID, "triggered_by", resp.Eval.TriggeredBy) return resp.Eval, resp.Token, resp.GetWaitIndex(), false } - // Check for potential shutdown - if w.srv.IsShutdown() { - return nil, "", 0, true - } goto REQ } // sendAcknowledgement should not be called directly. Call `sendAck` or `sendNack` instead. // This function implements `ack`ing or `nack`ing the evaluation generally. // Any errors are logged but swallowed. -func (w *Worker) sendAcknowledgement(evalID, token string, ack bool) { +func (w *Worker) sendAcknowledgement(eval *structs.Evaluation, token string, ack bool) { defer metrics.MeasureSince([]string{"nomad", "worker", "send_ack"}, time.Now()) // Setup the request req := structs.EvalAckRequest{ - EvalID: evalID, + EvalID: eval.ID, Token: token, WriteRequest: structs.WriteRequest{ Region: w.srv.config.Region, @@ -219,28 +514,28 @@ func (w *Worker) sendAcknowledgement(evalID, token string, ack bool) { // Make the RPC call err := w.srv.RPC(endpoint, &req, &resp) if err != nil { - w.logger.Error(fmt.Sprintf("failed to %s evaluation", verb), "eval_id", evalID, "error", err) + w.logger.Error(fmt.Sprintf("failed to %s evaluation", verb), "eval_id", eval.ID, "error", err) } else { - w.logger.Debug(fmt.Sprintf("%s evaluation", verb), "eval_id", evalID) + w.logger.Debug(fmt.Sprintf("%s evaluation", verb), "eval_id", eval.ID, "type", eval.Type, "namespace", eval.Namespace, "job_id", eval.JobID, "node_id", eval.NodeID, "triggered_by", eval.TriggeredBy) } } // sendNack makes a best effort to nack the evaluation. // Any errors are logged but swallowed. -func (w *Worker) sendNack(evalID, token string) { - w.sendAcknowledgement(evalID, token, false) +func (w *Worker) sendNack(eval *structs.Evaluation, token string) { + w.sendAcknowledgement(eval, token, false) } // sendAck makes a best effort to ack the evaluation. // Any errors are logged but swallowed. -func (w *Worker) sendAck(evalID, token string) { - w.sendAcknowledgement(evalID, token, true) +func (w *Worker) sendAck(eval *structs.Evaluation, token string) { + w.sendAcknowledgement(eval, token, true) } // snapshotMinIndex times calls to StateStore.SnapshotAfter which may block. func (w *Worker) snapshotMinIndex(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) { start := time.Now() - ctx, cancel := context.WithTimeout(w.srv.shutdownCtx, timeout) + ctx, cancel := context.WithTimeout(w.ctx, timeout) snap, err := w.srv.fsm.State().SnapshotMinIndex(ctx, waitIndex) cancel() metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start) @@ -288,7 +583,8 @@ func (w *Worker) invokeScheduler(snap *state.StateSnapshot, eval *structs.Evalua // SubmitPlan is used to submit a plan for consideration. This allows // the worker to act as the planner for the scheduler. func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler.State, error) { - // Check for a shutdown before plan submission + // Check for a shutdown before plan submission. Checking server state rather than + // worker state to allow work in flight to complete before stopping. if w.srv.IsShutdown() { return nil, nil, fmt.Errorf("shutdown while planning") } @@ -358,7 +654,8 @@ SUBMIT: // UpdateEval is used to submit an updated evaluation. This allows // the worker to act as the planner for the scheduler. func (w *Worker) UpdateEval(eval *structs.Evaluation) error { - // Check for a shutdown before plan submission + // Check for a shutdown before plan submission. Checking server state rather than + // worker state to allow a workers work in flight to complete before stopping. if w.srv.IsShutdown() { return fmt.Errorf("shutdown while planning") } @@ -396,7 +693,8 @@ SUBMIT: // CreateEval is used to create a new evaluation. This allows // the worker to act as the planner for the scheduler. func (w *Worker) CreateEval(eval *structs.Evaluation) error { - // Check for a shutdown before plan submission + // Check for a shutdown before plan submission. This consults the server Shutdown state + // instead of the worker's to prevent aborting work in flight. if w.srv.IsShutdown() { return fmt.Errorf("shutdown while planning") } @@ -437,7 +735,8 @@ SUBMIT: // ReblockEval is used to reinsert a blocked evaluation into the blocked eval // tracker. This allows the worker to act as the planner for the scheduler. func (w *Worker) ReblockEval(eval *structs.Evaluation) error { - // Check for a shutdown before plan submission + // Check for a shutdown before plan submission. This checks the server state rather than + // the worker's to prevent erroring on work in flight that would complete otherwise. if w.srv.IsShutdown() { return fmt.Errorf("shutdown while planning") } @@ -514,7 +813,10 @@ func (w *Worker) shouldResubmit(err error) bool { // backoffErr is used to do an exponential back off on error. This is // maintained statefully for the worker. Returns if attempts should be // abandoned due to shutdown. +// This uses the worker's context in order to immediately stop the +// backoff if the server or the worker is shutdown. func (w *Worker) backoffErr(base, limit time.Duration) bool { + w.setWorkloadStatus(WorkloadBackoff) backoff := (1 << (2 * w.failures)) * base if backoff > limit { backoff = limit @@ -524,7 +826,7 @@ func (w *Worker) backoffErr(base, limit time.Duration) bool { select { case <-time.After(backoff): return false - case <-w.srv.shutdownCh: + case <-w.ctx.Done(): return true } } diff --git a/nomad/worker_string_schedulerworkerstatus.go b/nomad/worker_string_schedulerworkerstatus.go new file mode 100644 index 00000000000..42181ffd0e2 --- /dev/null +++ b/nomad/worker_string_schedulerworkerstatus.go @@ -0,0 +1,31 @@ +// Code generated by "stringer -trimprefix=Workload -output worker_string_schedulerworkerstatus.go -linecomment -type=SchedulerWorkerStatus"; DO NOT EDIT. + +package nomad + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[WorkloadUnknownStatus-0] + _ = x[WorkloadRunning-1] + _ = x[WorkloadWaitingToDequeue-2] + _ = x[WorkloadWaitingForRaft-3] + _ = x[WorkloadScheduling-4] + _ = x[WorkloadSubmitting-5] + _ = x[WorkloadBackoff-6] + _ = x[WorkloadStopped-7] + _ = x[WorkloadPaused-8] +} + +const _SchedulerWorkerStatus_name = "UnknownStatusRunningWaitingToDequeueWaitingForRaftSchedulingSubmittingBackoffStoppedPaused" + +var _SchedulerWorkerStatus_index = [...]uint8{0, 13, 20, 36, 50, 60, 70, 77, 84, 90} + +func (i SchedulerWorkerStatus) String() string { + if i < 0 || i >= SchedulerWorkerStatus(len(_SchedulerWorkerStatus_index)-1) { + return "SchedulerWorkerStatus(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _SchedulerWorkerStatus_name[_SchedulerWorkerStatus_index[i]:_SchedulerWorkerStatus_index[i+1]] +} diff --git a/nomad/worker_string_workerstatus.go b/nomad/worker_string_workerstatus.go new file mode 100644 index 00000000000..1eda2d9acfe --- /dev/null +++ b/nomad/worker_string_workerstatus.go @@ -0,0 +1,30 @@ +// Code generated by "stringer -trimprefix=Worker -output worker_string_workerstatus.go -linecomment -type=WorkerStatus"; DO NOT EDIT. + +package nomad + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[WorkerUnknownStatus-0] + _ = x[WorkerStarting-1] + _ = x[WorkerStarted-2] + _ = x[WorkerPausing-3] + _ = x[WorkerPaused-4] + _ = x[WorkerResuming-5] + _ = x[WorkerStopping-6] + _ = x[WorkerStopped-7] +} + +const _WorkerStatus_name = "UnknownStartingStartedPausingPausedResumingStoppingStopped" + +var _WorkerStatus_index = [...]uint8{0, 7, 15, 22, 29, 35, 43, 51, 58} + +func (i WorkerStatus) String() string { + if i < 0 || i >= WorkerStatus(len(_WorkerStatus_index)-1) { + return "WorkerStatus(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _WorkerStatus_name[_WorkerStatus_index[i]:_WorkerStatus_index[i+1]] +} diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 34327e61fac..334790e5f9d 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "fmt" "reflect" "sync" @@ -11,6 +12,7 @@ import ( "github.com/hashicorp/go-memdb" "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -47,6 +49,19 @@ func init() { } } +// NewTestWorker returns the worker without calling it's run method. +func NewTestWorker(shutdownCtx context.Context, srv *Server) *Worker { + w := &Worker{ + srv: srv, + start: time.Now(), + id: uuid.Generate(), + } + w.logger = srv.logger.ResetNamed("worker").With("worker_id", w.id) + w.pauseCond = sync.NewCond(&w.pauseLock) + w.ctx, w.cancelFn = context.WithCancel(shutdownCtx) + return w +} + func TestWorker_dequeueEvaluation(t *testing.T) { t.Parallel() @@ -62,7 +77,8 @@ func TestWorker_dequeueEvaluation(t *testing.T) { s1.evalBroker.Enqueue(eval1) // Create a worker - w := &Worker{srv: s1, logger: s1.logger} + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy() + w, _ := NewWorker(s1.shutdownCtx, s1, poolArgs) // Attempt dequeue eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond) @@ -108,7 +124,8 @@ func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) { s1.evalBroker.Enqueue(eval2) // Create a worker - w := &Worker{srv: s1, logger: s1.logger} + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy() + w := newWorker(s1.shutdownCtx, s1, poolArgs) // Attempt dequeue eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond) @@ -133,7 +150,7 @@ func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) { } // Send the Ack - w.sendAck(eval1.ID, token) + w.sendAck(eval1, token) // Attempt second dequeue eval, token, waitIndex, shutdown = w.dequeueEvaluation(10 * time.Millisecond) @@ -168,15 +185,16 @@ func TestWorker_dequeueEvaluation_paused(t *testing.T) { s1.evalBroker.Enqueue(eval1) // Create a worker - w := &Worker{srv: s1, logger: s1.logger} + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy() + w := newWorker(s1.shutdownCtx, s1, poolArgs) w.pauseCond = sync.NewCond(&w.pauseLock) // PAUSE the worker - w.SetPause(true) + w.Pause() go func() { time.Sleep(100 * time.Millisecond) - w.SetPause(false) + w.Resume() }() // Attempt dequeue @@ -212,7 +230,8 @@ func TestWorker_dequeueEvaluation_shutdown(t *testing.T) { testutil.WaitForLeader(t, s1.RPC) // Create a worker - w := &Worker{srv: s1, logger: s1.logger} + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy() + w := newWorker(s1.shutdownCtx, s1, poolArgs) go func() { time.Sleep(10 * time.Millisecond) @@ -231,6 +250,57 @@ func TestWorker_dequeueEvaluation_shutdown(t *testing.T) { } } +func TestWorker_Shutdown(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + c.EnabledSchedulers = []string{structs.JobTypeService} + }) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy() + w := newWorker(s1.shutdownCtx, s1, poolArgs) + + go func() { + time.Sleep(10 * time.Millisecond) + w.Stop() + }() + + // Attempt dequeue + eval, _, _, shutdown := w.dequeueEvaluation(10 * time.Millisecond) + require.True(t, shutdown) + require.Nil(t, eval) +} + +func TestWorker_Shutdown_paused(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + c.EnabledSchedulers = []string{structs.JobTypeService} + }) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy() + w, _ := NewWorker(s1.shutdownCtx, s1, poolArgs) + + w.Pause() + + // pausing can take up to 500ms because of the blocking query timeout in dequeueEvaluation. + require.Eventually(t, w.IsPaused, 550*time.Millisecond, 10*time.Millisecond, "should pause") + + go func() { + w.Stop() + }() + + // transitioning to stopped from paused should be very quick, + // but might not be immediate. + require.Eventually(t, w.IsStopped, 100*time.Millisecond, 10*time.Millisecond, "should stop when paused") +} + func TestWorker_sendAck(t *testing.T) { t.Parallel() @@ -246,7 +316,8 @@ func TestWorker_sendAck(t *testing.T) { s1.evalBroker.Enqueue(eval1) // Create a worker - w := &Worker{srv: s1, logger: s1.logger} + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy() + w := newWorker(s1.shutdownCtx, s1, poolArgs) // Attempt dequeue eval, token, _, _ := w.dequeueEvaluation(10 * time.Millisecond) @@ -258,7 +329,7 @@ func TestWorker_sendAck(t *testing.T) { } // Send the Nack - w.sendNack(eval.ID, token) + w.sendNack(eval, token) // Check the depth is 1, nothing unacked stats = s1.evalBroker.Stats() @@ -270,7 +341,7 @@ func TestWorker_sendAck(t *testing.T) { eval, token, _, _ = w.dequeueEvaluation(10 * time.Millisecond) // Send the Ack - w.sendAck(eval.ID, token) + w.sendAck(eval, token) // Check the depth is 0 stats = s1.evalBroker.Stats() @@ -301,7 +372,8 @@ func TestWorker_waitForIndex(t *testing.T) { }() // Wait for a future index - w := &Worker{srv: s1, logger: s1.logger} + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy() + w := newWorker(s1.shutdownCtx, s1, poolArgs) snap, err := w.snapshotMinIndex(index+1, time.Second) require.NoError(t, err) require.NotNil(t, snap) @@ -327,7 +399,8 @@ func TestWorker_invokeScheduler(t *testing.T) { }) defer cleanupS1() - w := &Worker{srv: s1, logger: s1.logger} + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy() + w := newWorker(s1.shutdownCtx, s1, poolArgs) eval := mock.Eval() eval.Type = "noop" @@ -380,7 +453,10 @@ func TestWorker_SubmitPlan(t *testing.T) { } // Attempt to submit a plan - w := &Worker{srv: s1, logger: s1.logger, evalToken: token} + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy() + w := newWorker(s1.shutdownCtx, s1, poolArgs) + w.evalToken = token + result, state, err := w.SubmitPlan(plan) if err != nil { t.Fatalf("err: %v", err) @@ -442,7 +518,8 @@ func TestWorker_SubmitPlanNormalizedAllocations(t *testing.T) { plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID) // Attempt to submit a plan - w := &Worker{srv: s1, logger: s1.logger} + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy() + w := newWorker(s1.shutdownCtx, s1, poolArgs) w.SubmitPlan(plan) assert.Equal(t, &structs.Allocation{ @@ -499,7 +576,10 @@ func TestWorker_SubmitPlan_MissingNodeRefresh(t *testing.T) { } // Attempt to submit a plan - w := &Worker{srv: s1, logger: s1.logger, evalToken: token} + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy() + w := newWorker(s1.shutdownCtx, s1, poolArgs) + w.evalToken = token + result, state, err := w.SubmitPlan(plan) if err != nil { t.Fatalf("err: %v", err) @@ -556,7 +636,10 @@ func TestWorker_UpdateEval(t *testing.T) { eval2.Status = structs.EvalStatusComplete // Attempt to update eval - w := &Worker{srv: s1, logger: s1.logger, evalToken: token} + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy() + w := newWorker(s1.shutdownCtx, s1, poolArgs) + w.evalToken = token + err = w.UpdateEval(eval2) if err != nil { t.Fatalf("err: %v", err) @@ -605,7 +688,10 @@ func TestWorker_CreateEval(t *testing.T) { eval2.PreviousEval = eval1.ID // Attempt to create eval - w := &Worker{srv: s1, logger: s1.logger, evalToken: token} + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy() + w := newWorker(s1.shutdownCtx, s1, poolArgs) + w.evalToken = token + err = w.CreateEval(eval2) if err != nil { t.Fatalf("err: %v", err) @@ -667,14 +753,17 @@ func TestWorker_ReblockEval(t *testing.T) { eval2.QueuedAllocations = map[string]int{"web": 50} // Attempt to reblock eval - w := &Worker{srv: s1, logger: s1.logger, evalToken: token} + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy() + w := newWorker(s1.shutdownCtx, s1, poolArgs) + w.evalToken = token + err = w.ReblockEval(eval2) if err != nil { t.Fatalf("err: %v", err) } // Ack the eval - w.sendAck(evalOut.ID, token) + w.sendAck(evalOut, token) // Check that it is blocked bStats := s1.blockedEvals.Stats() @@ -713,3 +802,125 @@ func TestWorker_ReblockEval(t *testing.T) { reblockedEval.SnapshotIndex, w.snapshotIndex) } } + +func TestWorker_Info(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + c.EnabledSchedulers = []string{structs.JobTypeService} + }) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(s1.config).Copy() + + // Create a worker + w := newWorker(s1.shutdownCtx, s1, poolArgs) + + require.Equal(t, WorkerStarting, w.GetStatus()) + workerInfo := w.Info() + require.Equal(t, WorkerStarting.String(), workerInfo.Status) +} + +const ( + longWait = 100 * time.Millisecond + tinyWait = 10 * time.Millisecond +) + +func TestWorker_SetPause(t *testing.T) { + t.Parallel() + logger := testlog.HCLogger(t) + srv := &Server{ + logger: logger, + shutdownCtx: context.Background(), + } + args := SchedulerWorkerPoolArgs{ + EnabledSchedulers: []string{structs.JobTypeCore, structs.JobTypeBatch, structs.JobTypeSystem}, + } + w := newWorker(context.Background(), srv, args) + w._start(testWorkload) + require.Eventually(t, w.IsStarted, longWait, tinyWait, "should have started") + + go func() { + time.Sleep(tinyWait) + w.Pause() + }() + require.Eventually(t, w.IsPaused, longWait, tinyWait, "should have paused") + + go func() { + time.Sleep(tinyWait) + w.Pause() + }() + require.Eventually(t, w.IsPaused, longWait, tinyWait, "pausing a paused should be okay") + + go func() { + time.Sleep(tinyWait) + w.Resume() + }() + require.Eventually(t, w.IsStarted, longWait, tinyWait, "should have restarted from pause") + + go func() { + time.Sleep(tinyWait) + w.Stop() + }() + require.Eventually(t, w.IsStopped, longWait, tinyWait, "should have shutdown") +} + +func TestWorker_SetPause_OutOfOrderEvents(t *testing.T) { + t.Parallel() + logger := testlog.HCLogger(t) + srv := &Server{ + logger: logger, + shutdownCtx: context.Background(), + } + args := SchedulerWorkerPoolArgs{ + EnabledSchedulers: []string{structs.JobTypeCore, structs.JobTypeBatch, structs.JobTypeSystem}, + } + w := newWorker(context.Background(), srv, args) + w._start(testWorkload) + require.Eventually(t, w.IsStarted, longWait, tinyWait, "should have started") + + go func() { + time.Sleep(tinyWait) + w.Pause() + }() + require.Eventually(t, w.IsPaused, longWait, tinyWait, "should have paused") + + go func() { + time.Sleep(tinyWait) + w.Stop() + }() + require.Eventually(t, w.IsStopped, longWait, tinyWait, "stop from pause should have shutdown") + + go func() { + time.Sleep(tinyWait) + w.Pause() + }() + require.Eventually(t, w.IsStopped, longWait, tinyWait, "pausing a stopped should stay stopped") + +} + +// _start is a test helper function used to start a worker with an alternate workload +func (w *Worker) _start(inFunc func(w *Worker)) { + w.setStatus(WorkerStarting) + go inFunc(w) +} + +// testWorkload is a very simple function that performs the same status updating behaviors that the +// real workload does. +func testWorkload(w *Worker) { + defer w.markStopped() + w.setStatuses(WorkerStarted, WorkloadRunning) + w.logger.Debug("testWorkload running") + for { + // ensure state variables are happy after resuming. + w.maybeWait() + if w.workerShuttingDown() { + w.logger.Debug("testWorkload stopped") + return + } + // do some fake work + time.Sleep(10 * time.Millisecond) + } +} diff --git a/website/content/api-docs/agent.mdx b/website/content/api-docs/agent.mdx index e62a451054b..db51a4a7f0e 100644 --- a/website/content/api-docs/agent.mdx +++ b/website/content/api-docs/agent.mdx @@ -725,3 +725,204 @@ $ curl -O -J \ go tool trace trace ``` + +## Fetch all scheduler worker's status + +The `/agent/schedulers` endpoint allow Nomad operators to inspect the state of +a Nomad server agent's scheduler workers. + +| Method | Path | Produces | +| ------ | ------------------- | ------------------ | +| `GET` | `/agent/schedulers` | `application/json` | + +The table below shows this endpoint's support for +[blocking queries](/api-docs#blocking-queries) and +[required ACLs](/api-docs#acls). + +| Blocking Queries | ACL Required | +| ---------------- | ------------ | +| `NO` | `agent:read` | + +### Parameters + +This endpoint accepts no additional parameters. + +### Sample Request + +```shell-session +$ curl \ + https://localhost:4646/v1/agent/schedulers +``` + +### Sample Response + +```json +{ + "schedulers": [ + { + "enabled_schedulers": [ + "service", + "batch", + "system", + "sysbatch", + "_core" + ], + "id": "5669d6fa-0def-7369-6558-a47c35fdc675", + "started": "2021-12-21T19:25:00.911883Z", + "status": "Paused", + "workload_status": "Paused" + }, + { + "enabled_schedulers": [ + "service", + "batch", + "system", + "sysbatch", + "_core" + ], + "id": "c919709d-6d14-66bf-b425-80b8167a267e", + "started": "2021-12-21T19:25:00.91189Z", + "status": "Paused", + "workload_status": "Paused" + }, + { + "enabled_schedulers": [ + "service", + "batch", + "system", + "sysbatch", + "_core" + ], + "id": "f5edb69a-6122-be8f-b32a-23cd8511dba5", + "started": "2021-12-21T19:25:00.911961Z", + "status": "Paused", + "workload_status": "Paused" + }, + { + "enabled_schedulers": [ + "service", + "batch", + "system", + "sysbatch", + "_core" + ], + "id": "458816ae-83cf-0710-d8d4-35d2ad2e42d7", + "started": "2021-12-21T19:25:00.912119Z", + "status": "Started", + "workload_status": "WaitingToDequeue" + } + ], + "server_id": "server1.global" +} + +``` + +## Read scheduler worker configuration + +This endpoint returns data about the agent's scheduler configuration from +the perspective of the agent. This is only applicable for servers. + +| Method | Path | Produces | +| ------ | -------------------------- | ------------------ | +| `GET` | `/agent/schedulers/config` | `application/json` | + +The table below shows this endpoint's support for +[blocking queries](/api-docs#blocking-queries) and +[required ACLs](/api-docs#acls). + +| Blocking Queries | ACL Required | +| ---------------- | ------------ | +| `NO` | `agent:read` | + +### Parameters + +This endpoint accepts no additional parameters. + +### Sample Request + +```shell-session +$ curl \ + --request PUT \ + --data @payload.json \ + https://localhost:4646/v1/jobs +``` + +### Sample Response + +```json +{ + "enabled_schedulers": [ + "service", + "batch", + "system", + "sysbatch", + "_core" + ], + "num_schedulers": 8, + "server_id": "server1.global" +} +``` + +## Update scheduler worker configuration + +This allows a Nomad operator to modify the server's running scheduler +configuration, which will remain in effect until another update or until the +node is restarted. For durable changes to this value, set the corresponding +values—[`num_schedulers`][] and [`enabled_schedulers`][]—in the node's +configuration file. The response contains the configuration after attempting +to apply the provided values. This is only applicable for servers. + +| Method | Path | Produces | +| ------ | -------------------------- | ------------------ | +| `PUT` | `/agent/schedulers/config` | `application/json` | + +The table below shows this endpoint's support for +[blocking queries](/api-docs#blocking-queries) and +[required ACLs](/api-docs#acls). + +| Blocking Queries | ACL Required | +| ---------------- | ------------- | +| `NO` | `agent:write` | + +### Sample Payload + +```json +{ + "enabled_schedulers": [ + "service", + "batch", + "system", + "sysbatch", + "_core" + ], + "num_schedulers": 12 +} +``` + +### Sample Request + +```shell-session +$ curl \ + --request PUT \ + --data @payload.json \ + https://localhost:4646/v1/jobs +``` + +### Sample Response + +```json +{ + "enabled_schedulers": [ + "service", + "batch", + "system", + "sysbatch", + "_core" + ], + "num_schedulers": 12, + "server_id": "server1.global" +} +``` + +[`enabled_schedulers`]: /docs/configuration/server#enabled_schedulers +[`num_schedulers`]: /docs/configuration/server#num_schedulers