Skip to content

Commit

Permalink
Merge pull request #2489 from hashicorp/b-immediate-periodic
Browse files Browse the repository at this point in the history
Fix dispatch of periodic job
  • Loading branch information
dadgar authored Mar 28, 2017
2 parents d698288 + 06135c5 commit cc54cbc
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 29 deletions.
11 changes: 9 additions & 2 deletions command/job_dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,20 @@ func (c *JobDispatchCommand) Run(args []string) int {
return 1
}

// See if an evaluation was created. If the job is periodic there will be no
// eval.
evalCreated := resp.EvalID != ""

basic := []string{
fmt.Sprintf("Dispatched Job ID|%s", resp.DispatchedJobID),
fmt.Sprintf("Evaluation ID|%s", limit(resp.EvalID, length)),
}
if evalCreated {
basic = append(basic, fmt.Sprintf("Evaluation ID|%s", limit(resp.EvalID, length)))
}
c.Ui.Output(formatKV(basic))

if detach {
// Nothing to do
if detach || !evalCreated {
return 0
}

Expand Down
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
return nil
}

if parent.IsPeriodic() {
if parent.IsPeriodic() && !parent.IsParameterized() {
t, err := n.periodicDispatcher.LaunchTime(req.Job.ID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: LaunchTime(%v) failed: %v", req.Job.ID, err)
Expand Down
56 changes: 31 additions & 25 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,34 +842,40 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
return err
}

// Create a new evaluation
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: dispatchJob.Priority,
Type: dispatchJob.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: dispatchJob.ID,
JobModifyIndex: jobCreateIndex,
Status: structs.EvalStatusPending,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
reply.JobCreateIndex = jobCreateIndex
reply.DispatchedJobID = dispatchJob.ID
reply.Index = jobCreateIndex

// If the job is periodic, we don't create an eval.
if !dispatchJob.IsPeriodic() {
// Create a new evaluation
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: dispatchJob.Priority,
Type: dispatchJob.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: dispatchJob.ID,
JobModifyIndex: jobCreateIndex,
Status: structs.EvalStatusPending,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}

// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: Eval create failed: %v", err)
return err
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: Eval create failed: %v", err)
return err
}

// Setup the reply
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
}

// Setup the reply
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.JobCreateIndex = jobCreateIndex
reply.DispatchedJobID = dispatchJob.ID
reply.Index = evalIndex
return nil
}

Expand Down
29 changes: 28 additions & 1 deletion nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1938,6 +1938,10 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
MetaOptional: []string{"foo", "bar"},
}

// Periodic dispatch job
d6 := mock.PeriodicJob()
d6.ParameterizedJob = &structs.ParameterizedJobConfig{}

reqNoInputNoMeta := &structs.JobDispatchRequest{}
reqInputDataNoMeta := &structs.JobDispatchRequest{
Payload: []byte("hello world"),
Expand Down Expand Up @@ -1971,6 +1975,7 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
name string
parameterizedJob *structs.Job
dispatchReq *structs.JobDispatchRequest
noEval bool
err bool
errStr string
}
Expand Down Expand Up @@ -2052,6 +2057,12 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
err: true,
errStr: "Payload exceeds maximum size",
},
{
name: "periodic job dispatched, ensure no eval",
parameterizedJob: d6,
dispatchReq: reqNoInputNoMeta,
noEval: true,
},
}

for _, tc := range cases {
Expand Down Expand Up @@ -2088,7 +2099,18 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
}

// Check that we got an eval and job id back
if dispatchResp.EvalID == "" || dispatchResp.DispatchedJobID == "" {
switch dispatchResp.EvalID {
case "":
if !tc.noEval {
t.Fatalf("Bad response")
}
default:
if tc.noEval {
t.Fatalf("Got eval %q", dispatchResp.EvalID)
}
}

if dispatchResp.DispatchedJobID == "" {
t.Fatalf("Bad response")
}

Expand All @@ -2108,11 +2130,16 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
t.Fatalf("bad parent ID")
}

if tc.noEval {
return
}

// Lookup the evaluation
eval, err := state.EvalByID(ws, dispatchResp.EvalID)
if err != nil {
t.Fatalf("err: %v", err)
}

if eval == nil {
t.Fatalf("expected eval")
}
Expand Down

0 comments on commit cc54cbc

Please sign in to comment.