From 693299cd9b75cc66a25bfc8d8042c9d7b9ebae47 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 8 Jun 2016 16:48:02 -0700 Subject: [PATCH] Add check-index flag to nomad run --- api/jobs.go | 32 +++++- api/jobs_test.go | 68 +++++++++++ command/inspect.go | 2 +- command/run.go | 55 ++++++++- command/run_test.go | 10 ++ nomad/job_endpoint.go | 37 +++++- nomad/job_endpoint_test.go | 112 +++++++++++++++++++ nomad/structs/structs.go | 9 ++ website/source/docs/commands/run.html.md.erb | 21 ++++ 9 files changed, 338 insertions(+), 8 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index 7ce57c2acd1..3d8b8a4df09 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -14,6 +14,12 @@ const ( JobTypeBatch = "batch" ) +const ( + // RegisterEnforceIndexErrPrefix is the prefix to use in errors caused by + // enforcing the job modify index during registers. + RegisterEnforceIndexErrPrefix = "Enforcing job modify index" +) + // Jobs is used to access the job-specific endpoints. type Jobs struct { client *Client @@ -27,9 +33,27 @@ func (c *Client) Jobs() *Jobs { // Register is used to register a new job. It returns the ID // of the evaluation, along with any errors encountered. func (j *Jobs) Register(job *Job, q *WriteOptions) (string, *WriteMeta, error) { + + var resp registerJobResponse + + req := &RegisterJobRequest{Job: job} + wm, err := j.client.write("/v1/jobs", req, &resp, q) + if err != nil { + return "", nil, err + } + return resp.EvalID, wm, nil +} + +// EnforceRegister is used to register a job enforcing its job modify index. +func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (string, *WriteMeta, error) { + var resp registerJobResponse - req := &RegisterJobRequest{job} + req := &RegisterJobRequest{ + Job: job, + EnforceIndex: true, + JobModifyIndex: modifyIndex, + } wm, err := j.client.write("/v1/jobs", req, &resp, q) if err != nil { return "", nil, err @@ -172,6 +196,7 @@ type Job struct { StatusDescription string CreateIndex uint64 ModifyIndex uint64 + JobModifyIndex uint64 } // JobListStub is used to return a subset of information about @@ -186,6 +211,7 @@ type JobListStub struct { StatusDescription string CreateIndex uint64 ModifyIndex uint64 + JobModifyIndex uint64 } // JobIDSort is used to sort jobs by their job ID's. @@ -263,7 +289,9 @@ func (j *Job) AddPeriodicConfig(cfg *PeriodicConfig) *Job { // RegisterJobRequest is used to serialize a job registration type RegisterJobRequest struct { - Job *Job + Job *Job + EnforceIndex bool + JobModifyIndex uint64 } // registerJobResponse is used to deserialize a job response diff --git a/api/jobs_test.go b/api/jobs_test.go index 8ca0b55fcbe..8bda1708bdd 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -50,6 +50,74 @@ func TestJobs_Register(t *testing.T) { } } +func TestJobs_EnforceRegister(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + // Listing jobs before registering returns nothing + resp, qm, err := jobs.List(nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if qm.LastIndex != 0 { + t.Fatalf("bad index: %d", qm.LastIndex) + } + if n := len(resp); n != 0 { + t.Fatalf("expected 0 jobs, got: %d", n) + } + + // Create a job and attempt to register it with an incorrect index. + job := testJob() + eval, wm, err := jobs.EnforceRegister(job, 10, nil) + if err == nil || !strings.Contains(err.Error(), RegisterEnforceIndexErrPrefix) { + t.Fatalf("expected enforcement error: %v", err) + } + + // Register + eval, wm, err = jobs.EnforceRegister(job, 0, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if eval == "" { + t.Fatalf("missing eval id") + } + assertWriteMeta(t, wm) + + // Query the jobs back out again + resp, qm, err = jobs.List(nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertQueryMeta(t, qm) + + // Check that we got the expected response + if len(resp) != 1 { + t.Fatalf("bad length: %d", len(resp)) + } + + if resp[0].ID != job.ID { + t.Fatalf("bad: %#v", resp[0]) + } + curIndex := resp[0].JobModifyIndex + + // Fail at incorrect index + eval, wm, err = jobs.EnforceRegister(job, 123456, nil) + if err == nil || !strings.Contains(err.Error(), RegisterEnforceIndexErrPrefix) { + t.Fatalf("expected enforcement error: %v", err) + } + + // Works at correct index + eval, wm, err = jobs.EnforceRegister(job, curIndex, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if eval == "" { + t.Fatalf("missing eval id") + } + assertWriteMeta(t, wm) +} + func TestJobs_Info(t *testing.T) { c, s := makeClient(t, nil, nil) defer s.Stop() diff --git a/command/inspect.go b/command/inspect.go index 3a77b8a3a01..12c59640074 100644 --- a/command/inspect.go +++ b/command/inspect.go @@ -84,7 +84,7 @@ func (c *InspectCommand) Run(args []string) int { } // Print the contents of the job - req := api.RegisterJobRequest{job} + req := api.RegisterJobRequest{Job: job} buf, err := json.MarshalIndent(req, "", " ") if err != nil { c.Ui.Error(fmt.Sprintf("Error converting job: %s", err)) diff --git a/command/run.go b/command/run.go index 30c9fc7d0d4..033ded3ce52 100644 --- a/command/run.go +++ b/command/run.go @@ -5,6 +5,8 @@ import ( "encoding/gob" "encoding/json" "fmt" + "regexp" + "strconv" "strings" "time" @@ -13,6 +15,11 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +var ( + // enforceIndexRegex is a regular expression which extracts the enforcement error + enforceIndexRegex = regexp.MustCompile(`\((Enforcing job modify index.*)\)`) +) + type RunCommand struct { Meta } @@ -46,6 +53,14 @@ General Options: Run Options: + -check-index + If set, the job is only registered or updated if the the passed + job modify index matches the server side version. If a check-index value of + zero is passed, the job is only registered if it does not yet exist. If a + non-zero value is passed, it ensures that the job is being updated from a + known state. The use of this flag is most common in conjunction with plan + command. + -detach Return immediately instead of entering monitor mode. After job submission, the evaluation ID will be printed to the screen, which can be used to @@ -67,12 +82,14 @@ func (c *RunCommand) Synopsis() string { func (c *RunCommand) Run(args []string) int { var detach, verbose, output bool + var checkIndexStr string flags := c.Meta.FlagSet("run", FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } flags.BoolVar(&detach, "detach", false, "") flags.BoolVar(&verbose, "verbose", false, "") flags.BoolVar(&output, "output", false, "") + flags.StringVar(&checkIndexStr, "check-index", "", "") if err := flags.Parse(args); err != nil { return 1 @@ -119,7 +136,7 @@ func (c *RunCommand) Run(args []string) int { } if output { - req := api.RegisterJobRequest{apiJob} + req := api.RegisterJobRequest{Job: apiJob} buf, err := json.MarshalIndent(req, "", " ") if err != nil { c.Ui.Error(fmt.Sprintf("Error converting job: %s", err)) @@ -142,9 +159,32 @@ func (c *RunCommand) Run(args []string) int { client.SetRegion(r) } + // Parse the check-index + checkIndex, enforce, err := parseCheckIndex(checkIndexStr) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error parsing check-index value %q: %v", checkIndexStr, err)) + return 1 + } + // Submit the job - evalID, _, err := client.Jobs().Register(apiJob, nil) + var evalID string + if enforce { + evalID, _, err = client.Jobs().EnforceRegister(apiJob, checkIndex, nil) + } else { + evalID, _, err = client.Jobs().Register(apiJob, nil) + } if err != nil { + if strings.Contains(err.Error(), api.RegisterEnforceIndexErrPrefix) { + // Format the error specially if the error is due to index + // enforcement + matches := enforceIndexRegex.FindStringSubmatch(err.Error()) + if len(matches) == 2 { + c.Ui.Error(matches[1]) // The matched group + c.Ui.Error("Job not updated") + return 1 + } + } + c.Ui.Error(fmt.Sprintf("Error submitting job: %s", err)) return 1 } @@ -167,6 +207,17 @@ func (c *RunCommand) Run(args []string) int { } +// parseCheckIndex parses the check-index flag and returns the index, whether it +// was set and potentially an error during parsing. +func parseCheckIndex(input string) (uint64, bool, error) { + if input == "" { + return 0, false, nil + } + + u, err := strconv.ParseUint(input, 10, 64) + return u, true, err +} + // convertStructJob is used to take a *structs.Job and convert it to an *api.Job. // This function is just a hammer and probably needs to be revisited. func convertStructJob(in *structs.Job) (*api.Job, error) { diff --git a/command/run_test.go b/command/run_test.go index 2bf09088b7d..2e500b2cc5a 100644 --- a/command/run_test.go +++ b/command/run_test.go @@ -136,4 +136,14 @@ job "job1" { if out := ui.ErrorWriter.String(); !strings.Contains(out, "Error submitting job") { t.Fatalf("expected failed query error, got: %s", out) } + + // Fails on invalid check-index (requires a valid job) + if code := cmd.Run([]string{"-check-index=bad", fh3.Name()}); code != 1 { + t.Fatalf("expected exit code 1, got: %d", code) + } + if out := ui.ErrorWriter.String(); !strings.Contains(out, "parsing check-index") { + t.Fatalf("expected parse error, got: %s", out) + } + ui.ErrorWriter.Reset() + } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 40f3b231c2c..473f39584da 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -13,6 +13,12 @@ import ( "github.com/hashicorp/nomad/scheduler" ) +const ( + // RegisterEnforceIndexErrPrefix is the prefix to use in errors caused by + // enforcing the job modify index during registers. + RegisterEnforceIndexErrPrefix = "Enforcing job modify index" +) + // Job endpoint is used for job interactions type Job struct { srv *Server @@ -38,6 +44,29 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return err } + if args.EnforceIndex { + // Lookup the job + snap, err := j.srv.fsm.State().Snapshot() + if err != nil { + return err + } + job, err := snap.JobByID(args.Job.ID) + if err != nil { + return err + } + jmi := args.JobModifyIndex + if job != nil { + if jmi == 0 { + return fmt.Errorf("%s 0: job already exists", RegisterEnforceIndexErrPrefix) + } else if jmi != job.JobModifyIndex { + return fmt.Errorf("%s %d: job exists with conflicting job modify index: %d", + RegisterEnforceIndexErrPrefix, jmi, job.JobModifyIndex) + } + } else if jmi != 0 { + return fmt.Errorf("%s %d: job does not exist", RegisterEnforceIndexErrPrefix, jmi) + } + } + // Commit this update via Raft _, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args) if err != nil { @@ -422,12 +451,14 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse) } var index uint64 + var updatedIndex uint64 if oldJob != nil { - index = oldJob.JobModifyIndex + 1 + index = oldJob.JobModifyIndex + updatedIndex = oldJob.JobModifyIndex + 1 } // Insert the updated Job into the snapshot - snap.UpsertJob(index, args.Job) + snap.UpsertJob(updatedIndex, args.Job) // Create an eval and mark it as requiring annotations and insert that as well eval := &structs.Evaluation{ @@ -436,7 +467,7 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse) Type: args.Job.Type, TriggeredBy: structs.EvalTriggerJobRegister, JobID: args.Job.ID, - JobModifyIndex: index, + JobModifyIndex: updatedIndex, Status: structs.EvalStatusPending, AnnotatePlan: true, } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 339b13439b4..da69a684e7d 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -248,6 +248,118 @@ func TestJobEndpoint_Register_Periodic(t *testing.T) { } } +func TestJobEndpoint_Register_EnforceIndex(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request and enforcing an incorrect index + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + EnforceIndex: true, + JobModifyIndex: 100, // Not registered yet so not possible + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + if err == nil || !strings.Contains(err.Error(), RegisterEnforceIndexErrPrefix) { + t.Fatalf("expected enforcement error") + } + + // Create the register request and enforcing it is new + req = &structs.JobRegisterRequest{ + Job: job, + EnforceIndex: true, + JobModifyIndex: 0, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index == 0 { + t.Fatalf("bad index: %d", resp.Index) + } + + curIndex := resp.JobModifyIndex + + // Check for the node in the FSM + state := s1.fsm.State() + out, err := state.JobByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("expected job") + } + if out.CreateIndex != resp.JobModifyIndex { + t.Fatalf("index mis-match") + } + + // Reregister request and enforcing it be a new job + req = &structs.JobRegisterRequest{ + Job: job, + EnforceIndex: true, + JobModifyIndex: 0, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + if err == nil || !strings.Contains(err.Error(), RegisterEnforceIndexErrPrefix) { + t.Fatalf("expected enforcement error") + } + + // Reregister request and enforcing it be at an incorrect index + req = &structs.JobRegisterRequest{ + Job: job, + EnforceIndex: true, + JobModifyIndex: curIndex - 1, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + if err == nil || !strings.Contains(err.Error(), RegisterEnforceIndexErrPrefix) { + t.Fatalf("expected enforcement error") + } + + // Reregister request and enforcing it be at the correct index + job.Priority = job.Priority + 1 + req = &structs.JobRegisterRequest{ + Job: job, + EnforceIndex: true, + JobModifyIndex: curIndex, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index == 0 { + t.Fatalf("bad index: %d", resp.Index) + } + + out, err = state.JobByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("expected job") + } + if out.Priority != job.Priority { + t.Fatalf("priority mis-match") + } +} + func TestJobEndpoint_Evaluate(t *testing.T) { s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index f624c4bfdde..14d66772bb9 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -182,6 +182,13 @@ type NodeSpecificRequest struct { // to register a job as being a schedulable entity. type JobRegisterRequest struct { Job *Job + + // If EnforceIndex is set then the job will only be registered if the passed + // JobModifyIndex matches the current Jobs index. If the index is zero, the + // register only occurs if the job is new. + EnforceIndex bool + JobModifyIndex uint64 + WriteRequest } @@ -1075,6 +1082,7 @@ func (j *Job) Stub() *JobListStub { StatusDescription: j.StatusDescription, CreateIndex: j.CreateIndex, ModifyIndex: j.ModifyIndex, + JobModifyIndex: j.JobModifyIndex, } } @@ -1095,6 +1103,7 @@ type JobListStub struct { StatusDescription string CreateIndex uint64 ModifyIndex uint64 + JobModifyIndex uint64 } // UpdateStrategy is used to modify how updates are done diff --git a/website/source/docs/commands/run.html.md.erb b/website/source/docs/commands/run.html.md.erb index 3e83aa51f66..e0b3e6ef5d3 100644 --- a/website/source/docs/commands/run.html.md.erb +++ b/website/source/docs/commands/run.html.md.erb @@ -41,6 +41,13 @@ environment variable are overridden and the the job's region is used. ## Run Options +* `-check-index`: If set, the job is only registered or updated if the the + passed job modify index matches the server side version. If a check-index value + of zero is passed, the job is only registered if it does not yet exist. If a + non-zero value is passed, it ensures that the job is being updated from a known + state. The use of this flag is most common in conjunction with [plan + command](/docs/commands/plan.html). + * `-detach`: Return immediately instead of monitoring. A new evaluation ID will be output, which can be used to examine the evaluation using the [eval-status](/docs/commands/eval-status.html) command @@ -65,6 +72,20 @@ $ nomad run job1.nomad ==> Evaluation "52dee78a" finished with status "complete" ``` +Update the job using check-index: +``` +$ nomad run -check-index 5 example.nomad +Enforcing job modify index 5: job exists with conflicting job modify index: 6 +Job not updated + +$ nomad run -check-index 6 example.nomad +==> Monitoring evaluation "5ef16dff" + Evaluation triggered by job "example" + Allocation "6ec7d16f" modified: node "6e1f9bf6", group "cache" + Evaluation status changed: "pending" -> "complete" +==> Evaluation "5ef16dff" finished with status "complete" +``` + Schedule the job contained in `job1.nomad` and return immediately: ```