Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make number of scheduler workers reloadable #11593

Merged
merged 32 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1908187
Working POC
angrycub Nov 20, 2021
0071e55
Unexport setupNewWorkers; improve comments
angrycub Nov 23, 2021
763671a
Added some VSCode codetours
angrycub Nov 23, 2021
339316a
Update shutdown to use context
angrycub Nov 30, 2021
1a985b3
Apply suggestions from code review
angrycub Dec 1, 2021
22f93b7
Implement GET for SchedulerWorker API + tests
angrycub Dec 1, 2021
16f9dd4
Merge branch 'f-reload-num-schedulers' of github.com:hashicorp/nomad …
angrycub Dec 1, 2021
48428e7
Wired API, refactors, more testing
angrycub Dec 3, 2021
1258128
Merge branch 'main' into f-reload-num-schedulers
angrycub Dec 6, 2021
1845577
Fix linter complaints
angrycub Dec 6, 2021
9c4e5c4
Updating worker to cache EnabledScheduler list
angrycub Dec 6, 2021
0d8b7ec
Refactor `unsafe...` func names to `...Locked`
angrycub Dec 8, 2021
f5bb227
Passing enabled schedulers list to worker
angrycub Dec 10, 2021
292518b
Add note about scheduler death
angrycub Dec 10, 2021
1337f04
Worker API refactor
angrycub Dec 10, 2021
bd345e0
Made handler methods public for OpenAPI, remove unused test bool
angrycub Dec 10, 2021
31687cd
Implement SchedulerWorker status part 1
angrycub Dec 10, 2021
3739987
Fix broken Pause logic; split WorkloadWaiting status
angrycub Dec 11, 2021
7fe5949
Added scheduler info api
angrycub Dec 11, 2021
60d53fa
Added worker info api to api package
angrycub Dec 11, 2021
3d755aa
bugfixes
angrycub Dec 11, 2021
4ee6b8c
Adding stringer to build deps
angrycub Dec 13, 2021
71dab36
Changing route to /v1/agent/schedulers
angrycub Dec 20, 2021
1dc9f96
Adding docs for scheduler worker api
angrycub Dec 21, 2021
0417332
Adding API test for bad worker info
angrycub Dec 22, 2021
420a158
Add changelog message
angrycub Dec 23, 2021
fd016de
typo in changelog 🤦
angrycub Dec 23, 2021
167c6a3
Incorporate API code review feedback
angrycub Jan 3, 2022
f4f610b
Incorporate api-docs feedback
angrycub Jan 4, 2022
689fa77
Updates to worker/leader code from code review
angrycub Jan 4, 2022
982c397
Fix test response type
angrycub Jan 5, 2022
7581957
Set both statuses in markStopped so they are atomic
angrycub Jan 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,3 +483,46 @@ type HostDataResponse struct {
AgentID string
HostData *HostData `json:",omitempty"`
}

// GetSchedulerWorkerConfig returns the targeted agent's worker pool configuration
func (a *Agent) GetSchedulerWorkerConfig() (*SchedulerWorkerPoolArgs, error) {
var resp AgentSchedulerWorkerConfigResponse
_, err := a.client.query("/v1/agent/workers", &resp, nil)
if err != nil {
return nil, err
}

return &SchedulerWorkerPoolArgs{NumSchedulers: resp.NumSchedulers, EnabledSchedulers: resp.EnabledSchedulers}, nil
}

// SetSchedulerWorkerConfig attempts to update the targeted agent's worker pool configuration
func (a *Agent) SetSchedulerWorkerConfig(args SchedulerWorkerPoolArgs) (*SchedulerWorkerPoolArgs, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need WriteOptions here (and QueryOptions for GetSchedulerConfig above) to support ACLs and any HTTP params we might want in the future. And we always seem to want it eventually so that way we don't have to make a SetSchedulerWorkerConfigWithOptions later on.

We can probably get away without having a QueryMeta returned here because everything in QueryMeta is used for comes out of raft? None of the other agent APIs in this file have it.

req := AgentSchedulerWorkerConfigRequest{
NumSchedulers: args.NumSchedulers,
EnabledSchedulers: args.EnabledSchedulers,
}

var resp AgentSchedulerWorkerConfigResponse
_, err := a.client.write("/v1/agent/workers", &req, &resp, nil)
if err != nil {
return nil, err
}

return &SchedulerWorkerPoolArgs{NumSchedulers: resp.NumSchedulers, EnabledSchedulers: resp.EnabledSchedulers}, nil
}

type SchedulerWorkerPoolArgs struct {
NumSchedulers int
EnabledSchedulers []string
}

// AgentSchedulerWorkerConfig
angrycub marked this conversation as resolved.
Show resolved Hide resolved
type AgentSchedulerWorkerConfigRequest struct {
NumSchedulers int `json:"num_schedulers"`
EnabledSchedulers []string `json:"enabled_schedulers"`
}

type AgentSchedulerWorkerConfigResponse struct {
NumSchedulers int `json:"num_schedulers"`
EnabledSchedulers []string `json:"enabled_schedulers"`
}
15 changes: 15 additions & 0 deletions api/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,3 +456,18 @@ func TestAgentProfile(t *testing.T) {
require.Nil(t, resp)
}
}

func TestAgent_SchedulerWorkerConfig(t *testing.T) {
t.Parallel()

c, s := makeClient(t, nil, nil)
defer s.Stop()
a := c.Agent()

config, err := a.GetSchedulerWorkerConfig()
require.Nil(t, err)
newConfig := SchedulerWorkerPoolArgs{NumSchedulers: 0, EnabledSchedulers: []string{"_core", "system"}}
resp, err := a.SetSchedulerWorkerConfig(newConfig)
require.NoError(t, err)
assert.NotEqual(t, config, resp)
}
103 changes: 60 additions & 43 deletions command/agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/command/agent/host"
"github.com/hashicorp/nomad/command/agent/pprof"
"github.com/hashicorp/nomad/nomad"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/serf/serf"
"github.com/mitchellh/copystructure"
Expand Down Expand Up @@ -741,31 +742,30 @@ func (s *HTTPServer) AgentHostRequest(resp http.ResponseWriter, req *http.Reques
return reply, rpcErr
}

// AgentSchedulerWorkerConfig
type AgentSchedulerWorkerConfig struct {
NumSchedulers uint `json:"num_schedulers"`
EnabledSchedulers []string `json:"enabled_schedulers"`
}

// AgentSchedulerWorkerRequest is used to query the count (and state eventually)
// AgentSchedulerWorkerConfigRequest is used to query the count (and state eventually)
// of the scheduler workers running in a Nomad server agent.
// This endpoint can also be used to update the count of running workers for a
// given agent.
func (s *HTTPServer) AgentSchedulerWorkerRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
func (s *HTTPServer) AgentSchedulerWorkerConfigRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if s.agent.Server() == nil {
return nil, CodedError(http.StatusBadRequest, "server only endpoint")
}
switch req.Method {
case "PUT", "POST":
return s.updateScheduleWorkers(resp, req)
return s.updateScheduleWorkersConfig(resp, req)
case "GET":
return s.getScheduleWorkersInfo(resp, req)
return s.getScheduleWorkersConfig(resp, req)
default:
return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod)
}
}

func (s *HTTPServer) getScheduleWorkersInfo(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
func (s *HTTPServer) getScheduleWorkersConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
srv := s.agent.Server()
if srv == nil {
return nil, CodedError(http.StatusBadRequest, "server only endpoint")
}

var secret string
s.parseToken(req, &secret)

Expand All @@ -776,44 +776,61 @@ func (s *HTTPServer) getScheduleWorkersInfo(resp http.ResponseWriter, req *http.
return nil, CodedError(http.StatusForbidden, structs.ErrPermissionDenied.Error())
}

config := s.agent.server.GetConfig()
response := &AgentSchedulerWorkerConfig{
NumSchedulers: uint(config.NumSchedulers),
config := srv.GetSchedulerWorkerConfig()
response := &agentSchedulerWorkerConfig{
NumSchedulers: config.NumSchedulers,
EnabledSchedulers: config.EnabledSchedulers,
}

return response, nil
}

func (s *HTTPServer) updateScheduleWorkers(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
server := s.agent.Server()
if server == nil {
func (s *HTTPServer) updateScheduleWorkersConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
srv := s.agent.Server()
if srv == nil {
return nil, CodedError(400, "server only endpoint")
}

// // Get the servers from the request
// qsNumSchedulers := req.URL.Query()["num_schedulers"]
// if len(qsNumSchedulers) != 1
// if newNumSchedulers == 0 {
// return nil, CodedError(400, "missing server address")
// }

// var secret string
// s.parseToken(req, &secret)

// // Check agent write permissions
// if aclObj, err := s.agent.Client().ResolveToken(secret); err != nil {
// return nil, err
// } else if aclObj != nil && !aclObj.AllowAgentWrite() {
// return nil, structs.ErrPermissionDenied
// }

// // Set the servers list into the client
// s.agent.logger.Trace("adding servers to the client's primary server list", "servers", servers, "path", "/v1/agent/servers", "method", "PUT")
// if _, err := client.SetServers(servers); err != nil {
// s.agent.logger.Error("failed adding servers to client's server list", "servers", servers, "error", err, "path", "/v1/agent/servers", "method", "PUT")
// //TODO is this the right error to return?
// return nil, CodedError(400, err.Error())
// }
return nil, nil
var secret string
s.parseToken(req, &secret)

// Check agent write permissions
if aclObj, err := srv.ResolveToken(secret); err != nil {
return nil, CodedError(http.StatusInternalServerError, err.Error())
} else if aclObj != nil && !aclObj.AllowAgentWrite() {
return nil, CodedError(http.StatusForbidden, structs.ErrPermissionDenied.Error())
}

var args agentSchedulerWorkerConfig

if err := decodeBody(req, &args); err != nil {
return nil, CodedError(http.StatusBadRequest, err.Error())
}
newArgs := nomad.SchedulerWorkerPoolArgs{
NumSchedulers: args.NumSchedulers,
EnabledSchedulers: args.EnabledSchedulers,
}
if newArgs.IsInvalid() {
return nil, CodedError(http.StatusBadRequest, "invalid arguments")
}
reply := srv.SetSchedulerWorkerConfig(newArgs)

response := &agentSchedulerWorkerConfig{
NumSchedulers: reply.NumSchedulers,
EnabledSchedulers: reply.EnabledSchedulers,
}

return response, nil
}

type agentSchedulerWorkerConfig struct {
NumSchedulers int `json:"num_schedulers"`
EnabledSchedulers []string `json:"enabled_schedulers"`
}
type agentSchedulerWorkerConfigRequest struct {
agentSchedulerWorkerConfig
}

type agentSchedulerWorkerConfigResponse struct {
NumSchedulers int `json:"num_schedulers"`
EnabledSchedulers []string `json:"enabled_schedulers"`
}
65 changes: 28 additions & 37 deletions command/agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1470,7 +1470,6 @@ func TestHTTP_XSS_Monitor(t *testing.T) {
type scheduleWorkerTest_workerRequestTest struct {
name string // test case name
requiresACL bool // prevents test cases that require ACLs from running in the non-ACL version
serverConfig AgentSchedulerWorkerConfig
request schedulerWorkerTest_testRequest
whenACLNotEnabled schedulerWorkerTest_testExpect
whenACLEnabled schedulerWorkerTest_testExpect
Expand All @@ -1485,19 +1484,6 @@ type schedulerWorkerTest_testExpect struct {
expectedResponse interface{}
}

// schedulerWorkerTest_serverConfig creates a test function that can merge
// in an existing Config. Passing nil as the parameter will not override any
// thing.
func schedulerWorkerTest_serverConfig(inConfig *Config) func(*Config) {
if inConfig != nil {
return func(c *Config) {
inConfig.Merge(c)
}
} else {
return func(c *Config) {}
}
}

// These test cases are run for both the ACL and Non-ACL enabled servers. When
// ACLS are not enabled, the request.aclTokens are ignored.
func schedulerWorkerTest_testCases() []scheduleWorkerTest_workerRequestTest {
Expand All @@ -1511,7 +1497,12 @@ func schedulerWorkerTest_testCases() []scheduleWorkerTest_workerRequestTest {
}
success1 := schedulerWorkerTest_testExpect{
expectedResponseCode: http.StatusOK,
expectedResponse: &AgentSchedulerWorkerConfig{EnabledSchedulers: []string{"_core", "system"}, NumSchedulers: 8},
expectedResponse: &agentSchedulerWorkerConfig{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: 8},
}

success2 := schedulerWorkerTest_testExpect{
expectedResponseCode: http.StatusOK,
expectedResponse: &agentSchedulerWorkerConfig{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: 9},
}

return []scheduleWorkerTest_workerRequestTest{
Expand Down Expand Up @@ -1572,17 +1563,17 @@ func schedulerWorkerTest_testCases() []scheduleWorkerTest_workerRequestTest {
request: schedulerWorkerTest_testRequest{
verb: "POST",
aclToken: "",
requestBody: "",
requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: success1,
whenACLNotEnabled: success2,
whenACLEnabled: forbidden,
},
{
name: "put with no token",
request: schedulerWorkerTest_testRequest{
verb: "PUT",
aclToken: "",
requestBody: "",
requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: success1,
whenACLEnabled: forbidden,
Expand All @@ -1592,17 +1583,17 @@ func schedulerWorkerTest_testCases() []scheduleWorkerTest_workerRequestTest {
request: schedulerWorkerTest_testRequest{
verb: "POST",
aclToken: "node_write",
requestBody: "",
requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: success1,
whenACLNotEnabled: success2,
whenACLEnabled: forbidden,
},
{
name: "put with invalid token",
request: schedulerWorkerTest_testRequest{
verb: "PUT",
aclToken: "node_write",
requestBody: "",
requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: success1,
whenACLEnabled: forbidden,
Expand All @@ -1612,17 +1603,17 @@ func schedulerWorkerTest_testCases() []scheduleWorkerTest_workerRequestTest {
request: schedulerWorkerTest_testRequest{
verb: "POST",
aclToken: "agent_write",
requestBody: "",
requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: success1,
whenACLEnabled: success1,
whenACLNotEnabled: success2,
whenACLEnabled: success2,
},
{
name: "put with valid token",
request: schedulerWorkerTest_testRequest{
verb: "PUT",
aclToken: "agent_write",
requestBody: "",
requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: success1,
whenACLEnabled: success1,
Expand All @@ -1634,17 +1625,17 @@ func TestHTTP_AgentSchedulerWorkerRequest_NoACL(t *testing.T) {
configFn := func(c *Config) {
var numSchedulers = 8
c.Server.NumSchedulers = &numSchedulers
c.Server.EnabledSchedulers = []string{"_core", "system"}
c.Server.EnabledSchedulers = []string{"_core", "batch"}
c.Client.Enabled = false
}
testFn := func(s *TestAgent) {
for _, tc := range schedulerWorkerTest_testCases() {
t.Run(tc.name, func(t *testing.T) {

req, err := http.NewRequest(tc.request.verb, "/v1/agent/workers", nil)
req, err := http.NewRequest(tc.request.verb, "/v1/agent/workers", bytes.NewReader([]byte(tc.request.requestBody)))
angrycub marked this conversation as resolved.
Show resolved Hide resolved
require.Nil(t, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: while require.Nil(t, err) and require.NoError(t, err) test the same thing, we tend to use NoError for clarity.

respW := httptest.NewRecorder()
workersI, err := s.Server.AgentSchedulerWorkerRequest(respW, req)
workersI, err := s.Server.AgentSchedulerWorkerConfigRequest(respW, req)

switch tc.whenACLNotEnabled.expectedResponseCode {
case http.StatusBadRequest, http.StatusForbidden, http.StatusMethodNotAllowed:
Expand All @@ -1665,7 +1656,7 @@ func TestHTTP_AgentSchedulerWorkerRequest_ACL(t *testing.T) {
configFn := func(c *Config) {
var numSchedulers = 8
c.Server.NumSchedulers = &numSchedulers
c.Server.EnabledSchedulers = []string{"_core", "system"}
c.Server.EnabledSchedulers = []string{"_core", "batch"}
c.Client.Enabled = false
}

Expand All @@ -1681,14 +1672,13 @@ func TestHTTP_AgentSchedulerWorkerRequest_ACL(t *testing.T) {
for _, tc := range schedulerWorkerTest_testCases() {
t.Run(tc.name, func(t *testing.T) {

req, err := http.NewRequest(tc.request.verb, "/v1/agent/workers", nil)
req, err := http.NewRequest(tc.request.verb, "/v1/agent/workers", bytes.NewReader([]byte(tc.request.requestBody)))
if tc.request.aclToken != "" {
setToken(req, tokens[tc.request.aclToken])
}
require.Nil(t, err)

respW := httptest.NewRecorder()
workersI, err := s.Server.AgentSchedulerWorkerRequest(respW, req)
workersI, err := s.Server.AgentSchedulerWorkerConfigRequest(respW, req)

switch tc.whenACLEnabled.expectedResponseCode {
case http.StatusOK:
Expand All @@ -1715,19 +1705,20 @@ func schedulerWorkerTest_parseSuccess(t *testing.T, isACLEnabled bool, tc schedu
}

// test into the response when we expect an okay
tcConfig, ok := testExpect.expectedResponse.(*AgentSchedulerWorkerConfig)
tcConfig, ok := testExpect.expectedResponse.(*agentSchedulerWorkerConfig)
require.True(t, ok, "expected response malformed - this is an issue with a test case.")

workersConfig, ok := workersI.(*AgentSchedulerWorkerConfig)
require.True(t, ok, "response can not cast to an AgentSchedulerWorkerConfig")
workersConfig, ok := workersI.(*agentSchedulerWorkerConfig)
require.True(t, ok, "response can not cast to an agentSchedulerWorkerConfig")
require.NotNil(t, workersConfig)

require.Equal(t, tcConfig.NumSchedulers, workersConfig.NumSchedulers)
require.ElementsMatch(t, tcConfig.EnabledSchedulers, workersConfig.EnabledSchedulers)
}

// schedulerWorkerTest_parseError parses the error response given
// from the
// from the API call to make sure that it's a coded error and is the
// expected value from the test case
func schedulerWorkerTest_parseError(t *testing.T, isACLEnabled bool, tc scheduleWorkerTest_workerRequestTest, workersI interface{}, err error) {
require.Error(t, err)
require.Nil(t, workersI)
Expand All @@ -1737,7 +1728,7 @@ func schedulerWorkerTest_parseError(t *testing.T, isACLEnabled bool, tc schedule
testExpect := tc.whenACLNotEnabled

if isACLEnabled {
testExpect = tc.whenACLNotEnabled
testExpect = tc.whenACLEnabled
}

require.Equal(t, testExpect.expectedResponseCode, codedError.Code())
Expand Down
Loading