Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CLI and API support for forcing rescheduling of failed allocs #4274

Merged
merged 14 commits into from
May 21, 2018
21 changes: 19 additions & 2 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,14 @@ func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *W
}

// ForceEvaluate is used to force-evaluate an existing job.
func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta, error) {
func (j *Jobs) ForceEvaluate(jobID string, opts EvalOptions, q *WriteOptions) (string, *WriteMeta, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth renaming to EvaluateWithOpts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modeled this after JobDeregisterOptions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant the name of the function. Since it can be more than just force given it takes a generic options object

req := &JobEvaluateRequest{
JobID: jobID,
EvalOptions: opts,
}

var resp JobRegisterResponse
wm, err := j.client.write("/v1/job/"+jobID+"/evaluate", nil, &resp, q)
wm, err := j.client.write("/v1/job/"+jobID+"/evaluate", req, &resp, q)
if err != nil {
return "", nil, err
}
Expand Down Expand Up @@ -1032,3 +1037,15 @@ type JobStabilityResponse struct {
JobModifyIndex uint64
WriteMeta
}

// JobEvaluateRequest is used when we just need to re-evaluate a target job
type JobEvaluateRequest struct {
JobID string
EvalOptions EvalOptions
WriteRequest
}

// EvalOptions is used to encapsulate options when forcing a job evaluation
type EvalOptions struct {
ForceReschedule bool
}
4 changes: 2 additions & 2 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ func TestJobs_ForceEvaluate(t *testing.T) {
jobs := c.Jobs()

// Force-eval on a non-existent job fails
_, _, err := jobs.ForceEvaluate("job1", nil)
_, _, err := jobs.ForceEvaluate("job1", EvalOptions{}, nil)
if err == nil || !strings.Contains(err.Error(), "not found") {
t.Fatalf("expected not found error, got: %#v", err)
}
Expand All @@ -1062,7 +1062,7 @@ func TestJobs_ForceEvaluate(t *testing.T) {
assertWriteMeta(t, wm)

// Try force-eval again
evalID, wm, err := jobs.ForceEvaluate("job1", nil)
evalID, wm, err := jobs.ForceEvaluate("job1", EvalOptions{}, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
Expand Down
21 changes: 19 additions & 2 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,25 @@ func (s *HTTPServer) jobForceEvaluate(resp http.ResponseWriter, req *http.Reques
if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)
}
args := structs.JobEvaluateRequest{
JobID: jobName,
var args structs.JobEvaluateRequest

// TODO(preetha) remove in 0.9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the COMPAT tag so we can grep our code base

// For backwards compatibility allow using this endpoint without a payload
if req.ContentLength == 0 {
args = structs.JobEvaluateRequest{
JobID: jobName,
}
} else {
if err := decodeBody(req, &args); err != nil {
return nil, CodedError(400, err.Error())
}
if args.JobID == "" {
return nil, CodedError(400, "Job ID must be specified")
}

if jobName != "" && args.JobID != jobName {
return nil, CodedError(400, "JobID not same as job name")
}
}
s.parseWriteRequest(req, &args.WriteRequest)

Expand Down
51 changes: 51 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,57 @@ func TestHTTP_JobForceEvaluate(t *testing.T) {
})
}

func TestHTTP_JobEvaluate_ForceReschedule(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
// Create the job
job := mock.Job()
args := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var resp structs.JobRegisterResponse
if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil {
t.Fatalf("err: %v", err)
}
jobEvalReq := api.JobEvaluateRequest{
JobID: job.ID,
EvalOptions: api.EvalOptions{
ForceReschedule: true,
},
}

buf := encodeReq(jobEvalReq)

// Make the HTTP request
req, err := http.NewRequest("POST", "/v1/job/"+job.ID+"/evaluate", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()

// Make the request
obj, err := s.Server.JobSpecificRequest(respW, req)
if err != nil {
t.Fatalf("err: %v", err)
}

// Check the response
reg := obj.(structs.JobRegisterResponse)
if reg.EvalID == "" {
t.Fatalf("bad: %v", reg)
}

// Check for the index
if respW.HeaderMap.Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}
})
}

func TestHTTP_JobEvaluations(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
Expand Down
5 changes: 5 additions & 0 deletions command/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"job eval": func() (cli.Command, error) {
return &JobEvalCommand{
Meta: meta,
}, nil
},
"job history": func() (cli.Command, error) {
return &JobHistoryCommand{
Meta: meta,
Expand Down
106 changes: 106 additions & 0 deletions command/job_eval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package command

import (
"fmt"
"strings"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)

type JobEvalCommand struct {
Meta
forceRescheduling bool
}

func (c *JobEvalCommand) Help() string {
helpText := `
Usage: nomad job eval [options] <job_id>

Force an evaluation of the provided job id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...provided job ID. Forcing an evaluation will trigger the scheduler to re-evaluate the job. The force flags allow operators to force the scheduler to create new allocations under certain scenarios.


General Options:

` + generalOptionsUsage() + `

Eval Options:

-force-reschedule
Force reschedule any failed allocations even if they are not currently
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove any and and add a period to then end.

eligible for rescheduling
`
return strings.TrimSpace(helpText)
}

func (c *JobEvalCommand) Synopsis() string {
return "Force evaluating a job using its job id"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Force an evaluation for the job

}

func (c *JobEvalCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-force-reschedule": complete.PredictNothing,
})
}

func (c *JobEvalCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := c.Meta.Client()
if err != nil {
return nil
}

resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Jobs, nil)
if err != nil {
return []string{}
}
return resp.Matches[contexts.Jobs]
})
}

func (c *JobEvalCommand) Name() string { return "eval" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return "job eval"


func (c *JobEvalCommand) Run(args []string) int {
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.BoolVar(&c.forceRescheduling, "force-reschedule", false, "")

if err := flags.Parse(args); err != nil {
return 1
}

// Check that we either got no jobs or exactly one.
args = flags.Args()
if len(args) > 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.Ui.Error("This command takes either no arguments or one: <job>")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

		c.Ui.Error("This command takes one argument: <job>")

c.Ui.Error(commandErrorText(c))
return 1
}

// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}

if len(args) == 0 {
c.Ui.Error("Must provide a job ID")
return 1
}

// Call eval end point
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

endpoint

jobID := args[0]

opts := api.EvalOptions{
ForceReschedule: c.forceRescheduling,
}
evalId, _, err := client.Jobs().ForceEvaluate(jobID, opts, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error evaluating job: %s", err))
return 1
}
c.Ui.Output(fmt.Sprintf("Created eval ID: %q ", evalId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have this invoke the eval monitor and provide a detach flag

return 0
}
65 changes: 65 additions & 0 deletions command/job_eval_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package command

import (
"strings"
"testing"

"github.com/hashicorp/nomad/nomad/mock"
"github.com/mitchellh/cli"
"github.com/posener/complete"
"github.com/stretchr/testify/assert"
)

func TestJobEvalCommand_Implements(t *testing.T) {
t.Parallel()
var _ cli.Command = &JobEvalCommand{}
}

func TestJobEvalCommand_Fails(t *testing.T) {
t.Parallel()
ui := new(cli.MockUi)
cmd := &JobEvalCommand{Meta: Meta{Ui: ui}}

// Fails on misuse
if code := cmd.Run([]string{"some", "bad", "args"}); code != 1 {
t.Fatalf("expected exit code 1, got: %d", code)
}
if out := ui.ErrorWriter.String(); !strings.Contains(out, commandErrorText(cmd)) {
t.Fatalf("expected help output, got: %s", out)
}
ui.ErrorWriter.Reset()

// Fails when job ID is not specified
if code := cmd.Run([]string{}); code != 1 {
t.Fatalf("expect exit 1, got: %d", code)
}
if out := ui.ErrorWriter.String(); !strings.Contains(out, "Must provide a job ID") {
t.Fatalf("unexpected error: %v", out)
}
ui.ErrorWriter.Reset()

}

func TestJobEvalCommand_AutocompleteArgs(t *testing.T) {
assert := assert.New(t)
t.Parallel()

srv, _, url := testServer(t, true, nil)
defer srv.Shutdown()

ui := new(cli.MockUi)
cmd := &JobEvalCommand{Meta: Meta{Ui: ui, flagAddress: url}}

// Create a fake job
state := srv.Agent.Server().State()
j := mock.Job()
assert.Nil(state.UpsertJob(1000, j))

prefix := j.ID[:len(j.ID)-5]
args := complete.Args{Last: prefix}
predictor := cmd.AutocompleteArgs()

res := predictor.Predict(args)
assert.Equal(1, len(res))
assert.Equal(j.ID, res[0])
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add a test that submits a job and runs the force eval against it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

40 changes: 35 additions & 5 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ var (
RTarget: ">= 0.6.1",
Operand: structs.ConstraintVersion,
}

// allowRescheduleTransition is the transition that allows failed
// allocations to be force rescheduled. We create a one off
// variable to avoid creating a new object for every request.
allowForceRescheduleTransition = &structs.DesiredTransition{
ForceReschedule: helper.BoolToPtr(true),
}
)

// Job endpoint is used for job interactions
Expand Down Expand Up @@ -538,6 +545,28 @@ func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegis
return fmt.Errorf("can't evaluate parameterized job")
}

forceRescheduleAllocs := make(map[string]*structs.DesiredTransition)

if args.EvalOptions.ForceReschedule {
// Find any failed allocs that could be force rescheduled
allocs, err := snap.AllocsByJob(ws, args.RequestNamespace(), args.JobID, false)
if err != nil {
return err
}

for _, alloc := range allocs {
taskGroup := job.LookupTaskGroup(alloc.TaskGroup)
// Forcing rescheduling is only allowed if task group has rescheduling enabled
if taskGroup == nil || taskGroup.ReschedulePolicy == nil || !taskGroup.ReschedulePolicy.Enabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we remove taskGroup.ReschedulePolicy == nil given that Enabled() handles the nil case

continue
}

if alloc.NextAllocation == "" && alloc.ClientStatus == structs.AllocClientStatusFailed {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if the desired transition is already set

forceRescheduleAllocs[alloc.ID] = allowForceRescheduleTransition
}
}
}

// Create a new evaluation
eval := &structs.Evaluation{
ID: uuid.Generate(),
Expand All @@ -549,13 +578,14 @@ func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegis
JobModifyIndex: job.ModifyIndex,
Status: structs.EvalStatusPending,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},

// Create a AllocUpdateDesiredTransitionRequest request with the eval and any forced rescheduled allocs
updateTransitionReq := &structs.AllocUpdateDesiredTransitionRequest{
Allocs: forceRescheduleAllocs,
Evals: []*structs.Evaluation{eval},
}
_, evalIndex, err := j.srv.raftApply(structs.AllocUpdateDesiredTransitionRequestType, updateTransitionReq)

// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: Eval create failed: %v", err)
return err
Expand Down
Loading