-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make number of scheduler workers reloadable #11593
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beautiful work.
Co-authored-by: Derek Strickland <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've re-reviewed the API and HTTP agent sections and I'm putting that review up for you @angrycub. I'll re-review the worker/leader/server section next.
api/agent.go
Outdated
} | ||
|
||
// SetSchedulerWorkerConfig attempts to update the targeted agent's worker pool configuration | ||
func (a *Agent) SetSchedulerWorkerConfig(args SchedulerWorkerPoolArgs) (*SchedulerWorkerPoolArgs, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need WriteOptions
here (and QueryOptions
for GetSchedulerConfig
above) to support ACLs and any HTTP params we might want in the future. And we always seem to want it eventually so that way we don't have to make a SetSchedulerWorkerConfigWithOptions
later on.
We can probably get away without having a QueryMeta
returned here because everything in QueryMeta
is used for comes out of raft? None of the other agent APIs in this file have it.
command/agent/agent_endpoint.go
Outdated
switch req.Method { | ||
case "PUT", "POST": | ||
return s.UpdateScheduleWorkersConfig(resp, req) | ||
case "GET": | ||
return s.GetScheduleWorkersConfig(resp, req) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're probably not consistent about this across the code base, but good to use the constants for new code at least:
switch req.Method { | |
case "PUT", "POST": | |
return s.UpdateScheduleWorkersConfig(resp, req) | |
case "GET": | |
return s.GetScheduleWorkersConfig(resp, req) | |
switch req.Method { | |
case http.MethodPut, http.MethodPost: | |
return s.UpdateScheduleWorkersConfig(resp, req) | |
case http.MethodGet: | |
return s.GetScheduleWorkersConfig(resp, req) |
command/agent/agent_endpoint.go
Outdated
} | ||
} | ||
|
||
func (s *HTTPServer) GetScheduleWorkersConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the implementation for AgentSchedulerWorkerConfigRequest
and not used in other packages, right? Usually we'll want to avoid exporting it (ex. name it getScheduleWorkersConfig
). Same applied for the update implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that I'd done that to help the OpenAPI generator auto-documentation process, but will make them private is def more correct.
command/agent/agent_endpoint.go
Outdated
type agentSchedulerWorkerConfig struct { | ||
ServerID string `json:"server_id,omitempty"` | ||
NumSchedulers int `json:"num_schedulers"` | ||
EnabledSchedulers []string `json:"enabled_schedulers"` | ||
} | ||
|
||
type agentSchedulerWorkersInfo struct { | ||
ServerID string `json:"server_id"` | ||
Schedulers []agentSchedulerWorkerInfo `json:"schedulers"` | ||
} | ||
|
||
type agentSchedulerWorkerInfo struct { | ||
ID string `json:"id"` | ||
EnabledSchedulers []string `json:"enabled_schedulers"` | ||
Started string `json:"started"` | ||
Status string `json:"status"` | ||
WorkloadStatus string `json:"workload_status"` | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't these always going to be 1:1 with api
structs? I think you can import the api
package here and use it them directly.
command/agent/agent_endpoint_test.go
Outdated
t.Run(tc.name, func(t *testing.T) { | ||
|
||
req, err := http.NewRequest(tc.request.verb, "/v1/agent/schedulers/config", bytes.NewReader([]byte(tc.request.requestBody))) | ||
require.Nil(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: while require.Nil(t, err)
and require.NoError(t, err)
test the same thing, we tend to use NoError
for clarity.
@@ -1463,3 +1464,586 @@ func TestHTTP_XSS_Monitor(t *testing.T) { | |||
}) | |||
} | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love these giant table-driven tests!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok @angrycub I've looked through the second half of the work and this is looking great. I've left a few remarks about locking in the worker.go but I think other than that there's nothing too serious here.
nomad/leader_test.go
Outdated
// this satisfies the require.Eventually test interface | ||
checkPaused := func(count int) func() bool { | ||
return func() bool { | ||
workers := pausedWorkers() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because this is all closures inside the test function, it's probably ok to inline the body of pausedWorkers
here in the checkPaused
function.
nomad/worker_test.go
Outdated
func TestWorker_WorkerInfo_String(t *testing.T) { | ||
t.Parallel() | ||
startTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) | ||
w := &Worker{ | ||
id: "uuid", | ||
start: startTime, | ||
status: WorkerStarted, | ||
workloadStatus: WorkloadBackoff, | ||
enabledSchedulers: []string{structs.JobTypeCore, structs.JobTypeBatch, structs.JobTypeSystem}, | ||
} | ||
_, err := json.Marshal(w) | ||
require.NoError(t, err) | ||
|
||
require.Equal(t, `{"id":"uuid","enabled_schedulers":["_core","batch","system"],"started":"2009-11-10T23:00:00Z","status":"Started","workload_status":"Backoff"}`, fmt.Sprint(w.Info())) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test feels like it's just testing the stdlib's encoding/json
package behavior... we can probably drop this one.
nomad/worker.go
Outdated
} | ||
|
||
// _newWorker creates a worker without calling its Start func. This is useful for testing. | ||
func newWorker(ctx context.Context, srv *Server, args SchedulerWorkerPoolArgs) (*Worker, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function doesn't ever return an error (which is pretty typical for newBlahblah
functions), so I think we can drop that return value and then clean up all the cases where we're doing w, _ := newWorker(...)
|
||
// setWorkloadStatus is used internally to the worker to update the | ||
// status of the worker based updates from the workload. | ||
func (w *Worker) setWorkloadStatus(newStatus SchedulerWorkerStatus) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great idea for making the scheduler worker behavior more observable. Having the trace log here is great but I can totally see writing a bpftrace script that hooks this function to read the stack args and catch all the transitions too.
(I might have done this in its own PR, but as long as it's here now we might as well enjoy it.)
nomad/worker.go
Outdated
w.pauseCond.Wait() | ||
} | ||
|
||
w.pauseLock.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a race here if a Pause
method call comes in after this line but before the status is set before we return. Even with the locks the Pause
method's status setting calls could be interleaved such that the worker sets its status to WorkerPaused
, sets the pause flag, and then its status is set to WorkerStarted
.
Maybe we should move this up to the top of the function as a defer w.pauseLock.Unlock()
?
nomad/worker.go
Outdated
defer func() { | ||
w.setWorkloadStatus(WorkloadStopped) | ||
w.markStopped() | ||
}() | ||
w.setStatus(WorkerStarted) | ||
w.setWorkloadStatus(WorkloadRunning) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We pair up the status and workload status updates more often than not (see maybeWait
above) and here we're not doing it atomically. It's probably safe here but easy to accidentally split up locked function calls later so that it's unsafe. So it might be a good idea to have a combined setStatus(workerStatus, workloadStatus)
function that takes care of both and does the nice bit you've done where it only logs on change.
nomad/worker_test.go
Outdated
w._start(testWorkload) | ||
require.Eventually(t, w.IsStarted, longWait, tinyWait, "should have started") | ||
|
||
go func() { | ||
time.Sleep(tinyWait) | ||
w.Pause() | ||
}() | ||
require.Eventually(t, w.IsPaused, longWait, tinyWait, "should have paused") | ||
|
||
go func() { | ||
time.Sleep(tinyWait) | ||
w.Resume() | ||
}() | ||
require.Eventually(t, w.IsStarted, longWait, tinyWait, "should have restarted from pause") | ||
|
||
go func() { | ||
time.Sleep(tinyWait) | ||
w.Stop() | ||
}() | ||
require.Eventually(t, w.IsStopped, longWait, tinyWait, "should have shutdown") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 This test shows exactly why I like the API you have around "pausing" vs "paused"
@@ -1430,17 +1440,165 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) ( | |||
return serf.Create(conf) | |||
} | |||
|
|||
// shouldReloadSchedulers checks the new config to determine if the scheduler worker pool | |||
// needs to be updated. If so, returns true and a pointer to a populated SchedulerWorkerPoolArgs | |||
func shouldReloadSchedulers(s *Server, newPoolArgs *SchedulerWorkerPoolArgs) (bool, *SchedulerWorkerPoolArgs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In retrospect we probably could have unconditionally drained and replaced the worker pool on reload; the API call is gated by ACLs so the risk of an operator DoS'ing their own scheduler seems low. And then we could come back in later to do this more clever logic in a future PR. But this is pretty nice and enables scale up/down in the future (as per your // TODO
remark below).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added one last place where we could use setStatuses
, but other than that this LGTM! Let's ship it!
nomad/worker.go
Outdated
w.setWorkloadStatus(WorkloadStopped) | ||
w.markStopped() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we missed this one:
w.setWorkloadStatus(WorkloadStopped) | |
w.markStopped() | |
w.setStatuses(WorkerStopped , WorkloadStopped) |
I'm going to lock this pull request because it has been closed for 120 days ⏳. This helps our maintainers find and focus on the active contributions. |
This PR:
This closes #11449