Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add check-index flag to nomad run #1243

Merged
merged 1 commit into from
Jun 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ const (
JobTypeBatch = "batch"
)

const (
// RegisterEnforceIndexErrPrefix is the prefix to use in errors caused by
// enforcing the job modify index during registers.
RegisterEnforceIndexErrPrefix = "Enforcing job modify index"
)

// Jobs is used to access the job-specific endpoints.
type Jobs struct {
client *Client
Expand All @@ -27,9 +33,27 @@ func (c *Client) Jobs() *Jobs {
// Register is used to register a new job. It returns the ID
// of the evaluation, along with any errors encountered.
func (j *Jobs) Register(job *Job, q *WriteOptions) (string, *WriteMeta, error) {

var resp registerJobResponse

req := &RegisterJobRequest{Job: job}
wm, err := j.client.write("/v1/jobs", req, &resp, q)
if err != nil {
return "", nil, err
}
return resp.EvalID, wm, nil
}

// EnforceRegister is used to register a job enforcing its job modify index.
func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (string, *WriteMeta, error) {

var resp registerJobResponse

req := &RegisterJobRequest{job}
req := &RegisterJobRequest{
Job: job,
EnforceIndex: true,
JobModifyIndex: modifyIndex,
}
wm, err := j.client.write("/v1/jobs", req, &resp, q)
if err != nil {
return "", nil, err
Expand Down Expand Up @@ -172,6 +196,7 @@ type Job struct {
StatusDescription string
CreateIndex uint64
ModifyIndex uint64
JobModifyIndex uint64
}

// JobListStub is used to return a subset of information about
Expand All @@ -186,6 +211,7 @@ type JobListStub struct {
StatusDescription string
CreateIndex uint64
ModifyIndex uint64
JobModifyIndex uint64
}

// JobIDSort is used to sort jobs by their job ID's.
Expand Down Expand Up @@ -263,7 +289,9 @@ func (j *Job) AddPeriodicConfig(cfg *PeriodicConfig) *Job {

// RegisterJobRequest is used to serialize a job registration
type RegisterJobRequest struct {
Job *Job
Job *Job
EnforceIndex bool
JobModifyIndex uint64
}

// registerJobResponse is used to deserialize a job response
Expand Down
68 changes: 68 additions & 0 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,74 @@ func TestJobs_Register(t *testing.T) {
}
}

func TestJobs_EnforceRegister(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()

// Listing jobs before registering returns nothing
resp, qm, err := jobs.List(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if qm.LastIndex != 0 {
t.Fatalf("bad index: %d", qm.LastIndex)
}
if n := len(resp); n != 0 {
t.Fatalf("expected 0 jobs, got: %d", n)
}

// Create a job and attempt to register it with an incorrect index.
job := testJob()
eval, wm, err := jobs.EnforceRegister(job, 10, nil)
if err == nil || !strings.Contains(err.Error(), RegisterEnforceIndexErrPrefix) {
t.Fatalf("expected enforcement error: %v", err)
}

// Register
eval, wm, err = jobs.EnforceRegister(job, 0, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if eval == "" {
t.Fatalf("missing eval id")
}
assertWriteMeta(t, wm)

// Query the jobs back out again
resp, qm, err = jobs.List(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
assertQueryMeta(t, qm)

// Check that we got the expected response
if len(resp) != 1 {
t.Fatalf("bad length: %d", len(resp))
}

if resp[0].ID != job.ID {
t.Fatalf("bad: %#v", resp[0])
}
curIndex := resp[0].JobModifyIndex

// Fail at incorrect index
eval, wm, err = jobs.EnforceRegister(job, 123456, nil)
if err == nil || !strings.Contains(err.Error(), RegisterEnforceIndexErrPrefix) {
t.Fatalf("expected enforcement error: %v", err)
}

// Works at correct index
eval, wm, err = jobs.EnforceRegister(job, curIndex, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if eval == "" {
t.Fatalf("missing eval id")
}
assertWriteMeta(t, wm)
}

func TestJobs_Info(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
Expand Down
2 changes: 1 addition & 1 deletion command/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *InspectCommand) Run(args []string) int {
}

// Print the contents of the job
req := api.RegisterJobRequest{job}
req := api.RegisterJobRequest{Job: job}
buf, err := json.MarshalIndent(req, "", " ")
if err != nil {
c.Ui.Error(fmt.Sprintf("Error converting job: %s", err))
Expand Down
55 changes: 53 additions & 2 deletions command/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/gob"
"encoding/json"
"fmt"
"regexp"
"strconv"
"strings"
"time"

Expand All @@ -13,6 +15,11 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

var (
// enforceIndexRegex is a regular expression which extracts the enforcement error
enforceIndexRegex = regexp.MustCompile(`\((Enforcing job modify index.*)\)`)
)

type RunCommand struct {
Meta
}
Expand Down Expand Up @@ -46,6 +53,14 @@ General Options:
Run Options:
-check-index
If set, the job is only registered or updated if the the passed
job modify index matches the server side version. If a check-index value of
zero is passed, the job is only registered if it does not yet exist. If a
non-zero value is passed, it ensures that the job is being updated from a
known state. The use of this flag is most common in conjunction with plan
command.
-detach
Return immediately instead of entering monitor mode. After job submission,
the evaluation ID will be printed to the screen, which can be used to
Expand All @@ -67,12 +82,14 @@ func (c *RunCommand) Synopsis() string {

func (c *RunCommand) Run(args []string) int {
var detach, verbose, output bool
var checkIndexStr string

flags := c.Meta.FlagSet("run", FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.BoolVar(&detach, "detach", false, "")
flags.BoolVar(&verbose, "verbose", false, "")
flags.BoolVar(&output, "output", false, "")
flags.StringVar(&checkIndexStr, "check-index", "", "")

if err := flags.Parse(args); err != nil {
return 1
Expand Down Expand Up @@ -119,7 +136,7 @@ func (c *RunCommand) Run(args []string) int {
}

if output {
req := api.RegisterJobRequest{apiJob}
req := api.RegisterJobRequest{Job: apiJob}
buf, err := json.MarshalIndent(req, "", " ")
if err != nil {
c.Ui.Error(fmt.Sprintf("Error converting job: %s", err))
Expand All @@ -142,9 +159,32 @@ func (c *RunCommand) Run(args []string) int {
client.SetRegion(r)
}

// Parse the check-index
checkIndex, enforce, err := parseCheckIndex(checkIndexStr)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing check-index value %q: %v", checkIndexStr, err))
return 1
}

// Submit the job
evalID, _, err := client.Jobs().Register(apiJob, nil)
var evalID string
if enforce {
evalID, _, err = client.Jobs().EnforceRegister(apiJob, checkIndex, nil)
} else {
evalID, _, err = client.Jobs().Register(apiJob, nil)
}
if err != nil {
if strings.Contains(err.Error(), api.RegisterEnforceIndexErrPrefix) {
// Format the error specially if the error is due to index
// enforcement
matches := enforceIndexRegex.FindStringSubmatch(err.Error())
if len(matches) == 2 {
c.Ui.Error(matches[1]) // The matched group
c.Ui.Error("Job not updated")
return 1
}
}

c.Ui.Error(fmt.Sprintf("Error submitting job: %s", err))
return 1
}
Expand All @@ -167,6 +207,17 @@ func (c *RunCommand) Run(args []string) int {

}

// parseCheckIndex parses the check-index flag and returns the index, whether it
// was set and potentially an error during parsing.
func parseCheckIndex(input string) (uint64, bool, error) {
if input == "" {
return 0, false, nil
}

u, err := strconv.ParseUint(input, 10, 64)
return u, true, err
}

// convertStructJob is used to take a *structs.Job and convert it to an *api.Job.
// This function is just a hammer and probably needs to be revisited.
func convertStructJob(in *structs.Job) (*api.Job, error) {
Expand Down
10 changes: 10 additions & 0 deletions command/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,14 @@ job "job1" {
if out := ui.ErrorWriter.String(); !strings.Contains(out, "Error submitting job") {
t.Fatalf("expected failed query error, got: %s", out)
}

// Fails on invalid check-index (requires a valid job)
if code := cmd.Run([]string{"-check-index=bad", fh3.Name()}); code != 1 {
t.Fatalf("expected exit code 1, got: %d", code)
}
if out := ui.ErrorWriter.String(); !strings.Contains(out, "parsing check-index") {
t.Fatalf("expected parse error, got: %s", out)
}
ui.ErrorWriter.Reset()

}
37 changes: 34 additions & 3 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import (
"github.com/hashicorp/nomad/scheduler"
)

const (
// RegisterEnforceIndexErrPrefix is the prefix to use in errors caused by
// enforcing the job modify index during registers.
RegisterEnforceIndexErrPrefix = "Enforcing job modify index"
)

// Job endpoint is used for job interactions
type Job struct {
srv *Server
Expand All @@ -38,6 +44,29 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}

if args.EnforceIndex {
// Lookup the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
job, err := snap.JobByID(args.Job.ID)
if err != nil {
return err
}
jmi := args.JobModifyIndex
if job != nil {
if jmi == 0 {
return fmt.Errorf("%s 0: job already exists", RegisterEnforceIndexErrPrefix)
} else if jmi != job.JobModifyIndex {
return fmt.Errorf("%s %d: job exists with conflicting job modify index: %d",
RegisterEnforceIndexErrPrefix, jmi, job.JobModifyIndex)
}
} else if jmi != 0 {
return fmt.Errorf("%s %d: job does not exist", RegisterEnforceIndexErrPrefix, jmi)
}
}

// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err != nil {
Expand Down Expand Up @@ -422,12 +451,14 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
}

var index uint64
var updatedIndex uint64
if oldJob != nil {
index = oldJob.JobModifyIndex + 1
index = oldJob.JobModifyIndex
updatedIndex = oldJob.JobModifyIndex + 1
}

// Insert the updated Job into the snapshot
snap.UpsertJob(index, args.Job)
snap.UpsertJob(updatedIndex, args.Job)

// Create an eval and mark it as requiring annotations and insert that as well
eval := &structs.Evaluation{
Expand All @@ -436,7 +467,7 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
JobModifyIndex: index,
JobModifyIndex: updatedIndex,
Status: structs.EvalStatusPending,
AnnotatePlan: true,
}
Expand Down
Loading