Skip to content

Commit

Permalink
Add check-index flag to nomad run
Browse files Browse the repository at this point in the history
  • Loading branch information
dadgar committed Jun 9, 2016
1 parent 5ffb2f0 commit 693299c
Show file tree
Hide file tree
Showing 9 changed files with 338 additions and 8 deletions.
32 changes: 30 additions & 2 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ const (
JobTypeBatch = "batch"
)

const (
// RegisterEnforceIndexErrPrefix is the prefix to use in errors caused by
// enforcing the job modify index during registers.
RegisterEnforceIndexErrPrefix = "Enforcing job modify index"
)

// Jobs is used to access the job-specific endpoints.
type Jobs struct {
client *Client
Expand All @@ -27,9 +33,27 @@ func (c *Client) Jobs() *Jobs {
// Register is used to register a new job. It returns the ID
// of the evaluation, along with any errors encountered.
func (j *Jobs) Register(job *Job, q *WriteOptions) (string, *WriteMeta, error) {

var resp registerJobResponse

req := &RegisterJobRequest{Job: job}
wm, err := j.client.write("/v1/jobs", req, &resp, q)
if err != nil {
return "", nil, err
}
return resp.EvalID, wm, nil
}

// EnforceRegister is used to register a job enforcing its job modify index.
func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (string, *WriteMeta, error) {

var resp registerJobResponse

req := &RegisterJobRequest{job}
req := &RegisterJobRequest{
Job: job,
EnforceIndex: true,
JobModifyIndex: modifyIndex,
}
wm, err := j.client.write("/v1/jobs", req, &resp, q)
if err != nil {
return "", nil, err
Expand Down Expand Up @@ -172,6 +196,7 @@ type Job struct {
StatusDescription string
CreateIndex uint64
ModifyIndex uint64
JobModifyIndex uint64
}

// JobListStub is used to return a subset of information about
Expand All @@ -186,6 +211,7 @@ type JobListStub struct {
StatusDescription string
CreateIndex uint64
ModifyIndex uint64
JobModifyIndex uint64
}

// JobIDSort is used to sort jobs by their job ID's.
Expand Down Expand Up @@ -263,7 +289,9 @@ func (j *Job) AddPeriodicConfig(cfg *PeriodicConfig) *Job {

// RegisterJobRequest is used to serialize a job registration
type RegisterJobRequest struct {
Job *Job
Job *Job
EnforceIndex bool
JobModifyIndex uint64
}

// registerJobResponse is used to deserialize a job response
Expand Down
68 changes: 68 additions & 0 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,74 @@ func TestJobs_Register(t *testing.T) {
}
}

func TestJobs_EnforceRegister(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()

// Listing jobs before registering returns nothing
resp, qm, err := jobs.List(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if qm.LastIndex != 0 {
t.Fatalf("bad index: %d", qm.LastIndex)
}
if n := len(resp); n != 0 {
t.Fatalf("expected 0 jobs, got: %d", n)
}

// Create a job and attempt to register it with an incorrect index.
job := testJob()
eval, wm, err := jobs.EnforceRegister(job, 10, nil)
if err == nil || !strings.Contains(err.Error(), RegisterEnforceIndexErrPrefix) {
t.Fatalf("expected enforcement error: %v", err)
}

// Register
eval, wm, err = jobs.EnforceRegister(job, 0, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if eval == "" {
t.Fatalf("missing eval id")
}
assertWriteMeta(t, wm)

// Query the jobs back out again
resp, qm, err = jobs.List(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
assertQueryMeta(t, qm)

// Check that we got the expected response
if len(resp) != 1 {
t.Fatalf("bad length: %d", len(resp))
}

if resp[0].ID != job.ID {
t.Fatalf("bad: %#v", resp[0])
}
curIndex := resp[0].JobModifyIndex

// Fail at incorrect index
eval, wm, err = jobs.EnforceRegister(job, 123456, nil)
if err == nil || !strings.Contains(err.Error(), RegisterEnforceIndexErrPrefix) {
t.Fatalf("expected enforcement error: %v", err)
}

// Works at correct index
eval, wm, err = jobs.EnforceRegister(job, curIndex, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if eval == "" {
t.Fatalf("missing eval id")
}
assertWriteMeta(t, wm)
}

func TestJobs_Info(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
Expand Down
2 changes: 1 addition & 1 deletion command/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *InspectCommand) Run(args []string) int {
}

// Print the contents of the job
req := api.RegisterJobRequest{job}
req := api.RegisterJobRequest{Job: job}
buf, err := json.MarshalIndent(req, "", " ")
if err != nil {
c.Ui.Error(fmt.Sprintf("Error converting job: %s", err))
Expand Down
55 changes: 53 additions & 2 deletions command/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/gob"
"encoding/json"
"fmt"
"regexp"
"strconv"
"strings"
"time"

Expand All @@ -13,6 +15,11 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

var (
// enforceIndexRegex is a regular expression which extracts the enforcement error
enforceIndexRegex = regexp.MustCompile(`\((Enforcing job modify index.*)\)`)
)

type RunCommand struct {
Meta
}
Expand Down Expand Up @@ -46,6 +53,14 @@ General Options:
Run Options:
-check-index
If set, the job is only registered or updated if the the passed
job modify index matches the server side version. If a check-index value of
zero is passed, the job is only registered if it does not yet exist. If a
non-zero value is passed, it ensures that the job is being updated from a
known state. The use of this flag is most common in conjunction with plan
command.
-detach
Return immediately instead of entering monitor mode. After job submission,
the evaluation ID will be printed to the screen, which can be used to
Expand All @@ -67,12 +82,14 @@ func (c *RunCommand) Synopsis() string {

func (c *RunCommand) Run(args []string) int {
var detach, verbose, output bool
var checkIndexStr string

flags := c.Meta.FlagSet("run", FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.BoolVar(&detach, "detach", false, "")
flags.BoolVar(&verbose, "verbose", false, "")
flags.BoolVar(&output, "output", false, "")
flags.StringVar(&checkIndexStr, "check-index", "", "")

if err := flags.Parse(args); err != nil {
return 1
Expand Down Expand Up @@ -119,7 +136,7 @@ func (c *RunCommand) Run(args []string) int {
}

if output {
req := api.RegisterJobRequest{apiJob}
req := api.RegisterJobRequest{Job: apiJob}
buf, err := json.MarshalIndent(req, "", " ")
if err != nil {
c.Ui.Error(fmt.Sprintf("Error converting job: %s", err))
Expand All @@ -142,9 +159,32 @@ func (c *RunCommand) Run(args []string) int {
client.SetRegion(r)
}

// Parse the check-index
checkIndex, enforce, err := parseCheckIndex(checkIndexStr)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing check-index value %q: %v", checkIndexStr, err))
return 1
}

// Submit the job
evalID, _, err := client.Jobs().Register(apiJob, nil)
var evalID string
if enforce {
evalID, _, err = client.Jobs().EnforceRegister(apiJob, checkIndex, nil)
} else {
evalID, _, err = client.Jobs().Register(apiJob, nil)
}
if err != nil {
if strings.Contains(err.Error(), api.RegisterEnforceIndexErrPrefix) {
// Format the error specially if the error is due to index
// enforcement
matches := enforceIndexRegex.FindStringSubmatch(err.Error())
if len(matches) == 2 {
c.Ui.Error(matches[1]) // The matched group
c.Ui.Error("Job not updated")
return 1
}
}

c.Ui.Error(fmt.Sprintf("Error submitting job: %s", err))
return 1
}
Expand All @@ -167,6 +207,17 @@ func (c *RunCommand) Run(args []string) int {

}

// parseCheckIndex parses the check-index flag and returns the index, whether it
// was set and potentially an error during parsing.
func parseCheckIndex(input string) (uint64, bool, error) {
if input == "" {
return 0, false, nil
}

u, err := strconv.ParseUint(input, 10, 64)
return u, true, err
}

// convertStructJob is used to take a *structs.Job and convert it to an *api.Job.
// This function is just a hammer and probably needs to be revisited.
func convertStructJob(in *structs.Job) (*api.Job, error) {
Expand Down
10 changes: 10 additions & 0 deletions command/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,14 @@ job "job1" {
if out := ui.ErrorWriter.String(); !strings.Contains(out, "Error submitting job") {
t.Fatalf("expected failed query error, got: %s", out)
}

// Fails on invalid check-index (requires a valid job)
if code := cmd.Run([]string{"-check-index=bad", fh3.Name()}); code != 1 {
t.Fatalf("expected exit code 1, got: %d", code)
}
if out := ui.ErrorWriter.String(); !strings.Contains(out, "parsing check-index") {
t.Fatalf("expected parse error, got: %s", out)
}
ui.ErrorWriter.Reset()

}
37 changes: 34 additions & 3 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import (
"github.com/hashicorp/nomad/scheduler"
)

const (
// RegisterEnforceIndexErrPrefix is the prefix to use in errors caused by
// enforcing the job modify index during registers.
RegisterEnforceIndexErrPrefix = "Enforcing job modify index"
)

// Job endpoint is used for job interactions
type Job struct {
srv *Server
Expand All @@ -38,6 +44,29 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}

if args.EnforceIndex {
// Lookup the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
job, err := snap.JobByID(args.Job.ID)
if err != nil {
return err
}
jmi := args.JobModifyIndex
if job != nil {
if jmi == 0 {
return fmt.Errorf("%s 0: job already exists", RegisterEnforceIndexErrPrefix)
} else if jmi != job.JobModifyIndex {
return fmt.Errorf("%s %d: job exists with conflicting job modify index: %d",
RegisterEnforceIndexErrPrefix, jmi, job.JobModifyIndex)
}
} else if jmi != 0 {
return fmt.Errorf("%s %d: job does not exist", RegisterEnforceIndexErrPrefix, jmi)
}
}

// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err != nil {
Expand Down Expand Up @@ -422,12 +451,14 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
}

var index uint64
var updatedIndex uint64
if oldJob != nil {
index = oldJob.JobModifyIndex + 1
index = oldJob.JobModifyIndex
updatedIndex = oldJob.JobModifyIndex + 1
}

// Insert the updated Job into the snapshot
snap.UpsertJob(index, args.Job)
snap.UpsertJob(updatedIndex, args.Job)

// Create an eval and mark it as requiring annotations and insert that as well
eval := &structs.Evaluation{
Expand All @@ -436,7 +467,7 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
JobModifyIndex: index,
JobModifyIndex: updatedIndex,
Status: structs.EvalStatusPending,
AnnotatePlan: true,
}
Expand Down
Loading

0 comments on commit 693299c

Please sign in to comment.