Skip to content

Commit

Permalink
Merge pull request #10145 from hashicorp/b-periodic-init-status
Browse files Browse the repository at this point in the history
periodic: always reset periodic children status
  • Loading branch information
Mahmood Ali authored Mar 26, 2021
2 parents db37617 + d81df3d commit 274e795
Show file tree
Hide file tree
Showing 10 changed files with 318 additions and 6 deletions.
1 change: 1 addition & 0 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
_ "github.com/hashicorp/nomad/e2e/nodedrain"
_ "github.com/hashicorp/nomad/e2e/nomad09upgrade"
_ "github.com/hashicorp/nomad/e2e/nomadexec"
_ "github.com/hashicorp/nomad/e2e/parameterized"
_ "github.com/hashicorp/nomad/e2e/periodic"
_ "github.com/hashicorp/nomad/e2e/podman"
_ "github.com/hashicorp/nomad/e2e/quotas"
Expand Down
50 changes: 47 additions & 3 deletions e2e/e2eutil/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ func Register(jobID, jobFilePath string) error {
return nil
}

// PeriodicForce forces a periodic job to dispatch, returning the child job ID
// or an error
// PeriodicForce forces a periodic job to dispatch
func PeriodicForce(jobID string) error {
// nomad job periodic force
cmd := exec.Command("nomad", "job", "periodic", "force", jobID)
Expand All @@ -54,6 +53,29 @@ func PeriodicForce(jobID string) error {
return nil
}

// Dispatch dispatches a parameterized job
func Dispatch(jobID string, meta map[string]string, payload string) error {
// nomad job periodic force
args := []string{"job", "dispatch"}
for k, v := range meta {
args = append(args, "-meta", fmt.Sprintf("%v=%v", k, v))
}
args = append(args, jobID)
if payload != "" {
args = append(args, "-")
}

cmd := exec.Command("nomad", args...)
cmd.Stdin = strings.NewReader(payload)

out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("could not dispatch job: %w\n%v", err, string(out))
}

return nil
}

// JobInspectTemplate runs nomad job inspect and formats the output
// using the specified go template
func JobInspectTemplate(jobID, template string) (string, error) {
Expand Down Expand Up @@ -102,7 +124,10 @@ func ChildrenJobSummary(jobID string) ([]map[string]string, error) {

section, err := GetSection(out, "Children Job Summary")
if err != nil {
return nil, fmt.Errorf("could not find children job summary section: %w", err)
section, err = GetSection(out, "Parameterized Job Summary")
if err != nil {
return nil, fmt.Errorf("could not find children job summary section: %w", err)
}
}

summary, err := ParseColumns(section)
Expand Down Expand Up @@ -131,3 +156,22 @@ func PreviouslyLaunched(jobID string) ([]map[string]string, error) {

return summary, nil
}

func DispatchedJobs(jobID string) ([]map[string]string, error) {
out, err := Command("nomad", "job", "status", jobID)
if err != nil {
return nil, fmt.Errorf("nomad job status failed: %w", err)
}

section, err := GetSection(out, "Dispatched Jobs")
if err != nil {
return nil, fmt.Errorf("could not find previously launched jobs section: %w", err)
}

summary, err := ParseColumns(section)
if err != nil {
return nil, fmt.Errorf("could not parse previously launched jobs section: %w", err)
}

return summary, nil
}
26 changes: 26 additions & 0 deletions e2e/parameterized/input/simple.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
job "periodic" {
datacenters = ["dc1"]
type = "batch"

constraint {
attribute = "${attr.kernel.name}"
operator = "set_contains_any"
value = "darwin,linux"
}

parameterized {
meta_optional = ["i"]
}

group "group" {
task "task" {
driver = "docker"

config {
image = "busybox:1"
command = "/bin/sh"
args = ["-c", "sleep 5"]
}
}
}
}
90 changes: 90 additions & 0 deletions e2e/parameterized/parameterized.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package parameterized

import (
"fmt"

"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

type ParameterizedTest struct {
framework.TC
jobIDs []string
}

func init() {
framework.AddSuites(&framework.TestSuite{
Component: "Parameterized",
CanRunLocal: true,
Cases: []framework.TestCase{
new(ParameterizedTest),
},
})
}

func (tc *ParameterizedTest) BeforeAll(f *framework.F) {
e2eutil.WaitForLeader(f.T(), tc.Nomad())
}

func (tc *ParameterizedTest) AfterEach(f *framework.F) {
nomadClient := tc.Nomad()
j := nomadClient.Jobs()

for _, id := range tc.jobIDs {
j.Deregister(id, true, nil)
}
_, err := e2eutil.Command("nomad", "system", "gc")
f.NoError(err)
}

func (tc *ParameterizedTest) TestParameterizedDispatch_Basic(f *framework.F) {
t := f.T()

uuid := uuid.Generate()
jobID := fmt.Sprintf("dispatch-%s", uuid[0:8])
tc.jobIDs = append(tc.jobIDs, jobID)

// register job
require.NoError(t, e2eutil.Register(jobID, "parameterized/input/simple.nomad"))

// force dispatch
dispatched := 4

for i := 0; i < dispatched; i++ {
require.NoError(t, e2eutil.Dispatch(jobID, map[string]string{"i": fmt.Sprintf("%v", i)}, ""))
}

testutil.WaitForResult(func() (bool, error) {
children, err := e2eutil.DispatchedJobs(jobID)
if err != nil {
return false, err
}

dead := 0
for _, c := range children {
if c["Status"] != "dead" {
return false, fmt.Errorf("expected periodic job to be dead")
}
dead++
}

if dead != dispatched {
return false, fmt.Errorf("expected %d but found %d children", dispatched, dead)
}

return true, nil
}, func(err error) {
require.NoError(t, err)
})

// Assert there are no pending children
summary, err := e2eutil.ChildrenJobSummary(jobID)
require.NoError(t, err)
require.Len(t, summary, 1)
require.Equal(t, summary[0]["Pending"], "0")
require.Equal(t, summary[0]["Running"], "0")
require.Equal(t, summary[0]["Dead"], fmt.Sprintf("%v", dispatched))
}
4 changes: 3 additions & 1 deletion e2e/periodic/input/simple.nomad
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ job "periodic" {

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
operator = "set_contains_any"
value = "darwin,linux"
}



periodic {
cron = "* * * * *"
prohibit_overlap = true
Expand Down
1 change: 1 addition & 0 deletions e2e/periodic/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,6 @@ func (tc *PeriodicTest) TestPeriodicDispatch_Basic(f *framework.F) {
require.NoError(t, err)
require.Len(t, summary, 1)
require.Equal(t, summary[0]["Pending"], "0")
require.Equal(t, summary[0]["Running"], "0")
require.Equal(t, summary[0]["Dead"], "1")
}
4 changes: 3 additions & 1 deletion nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1887,13 +1887,15 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
return err
}

// Derive the child job and commit it via Raft
// Derive the child job and commit it via Raft - with initial status
dispatchJob := parameterizedJob.Copy()
dispatchJob.ID = structs.DispatchedID(parameterizedJob.ID, time.Now())
dispatchJob.ParentID = parameterizedJob.ID
dispatchJob.Name = dispatchJob.ID
dispatchJob.SetSubmitTime()
dispatchJob.Dispatched = true
dispatchJob.Status = ""
dispatchJob.StatusDescription = ""

// Merge in the meta data
for k, v := range args.Meta {
Expand Down
111 changes: 111 additions & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6401,6 +6401,117 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
}
}

// TestJobEndpoint_Dispatch_JobChildrenSummary asserts that the job summary is updated
// appropriately as its dispatched/children jobs status are updated.
func TestJobEndpoint_Dispatch_JobChildrenSummary(t *testing.T) {
t.Parallel()

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()

state := s1.fsm.State()

codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

node := mock.Node()
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1, node))

parameterizedJob := mock.BatchJob()
parameterizedJob.ParameterizedJob = &structs.ParameterizedJobConfig{}

// Create the register request
regReq := &structs.JobRegisterRequest{
Job: parameterizedJob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: parameterizedJob.Namespace,
},
}
var regResp structs.JobRegisterResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", regReq, &regResp))

jobChildren := func() *structs.JobChildrenSummary {
summary, err := state.JobSummaryByID(nil, parameterizedJob.Namespace, parameterizedJob.ID)
require.NoError(t, err)

return summary.Children
}
require.Equal(t, &structs.JobChildrenSummary{}, jobChildren())

// dispatch a child job
dispatchReq := &structs.JobDispatchRequest{
JobID: parameterizedJob.ID,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: parameterizedJob.Namespace,
},
}
var dispatchResp structs.JobDispatchResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", dispatchReq, &dispatchResp)
require.NoError(t, err)

nextIdx := dispatchResp.Index + 1

require.Equal(t, &structs.JobChildrenSummary{Pending: 1}, jobChildren())

dispatchedJob, err := state.JobByID(nil, parameterizedJob.Namespace, dispatchResp.DispatchedJobID)
require.NoError(t, err)
require.NotNil(t, dispatchedJob)

dispatchedStatus := func() string {
job, err := state.JobByID(nil, dispatchedJob.Namespace, dispatchedJob.ID)
require.NoError(t, err)
require.NotNil(t, job)

return job.Status
}

// Let's start a alloc for the dispatch job and walk through states
// Note that job summary reports 1 running even when alloc is pending!
nextIdx++
alloc := mock.Alloc()
alloc.Job = dispatchedJob
alloc.JobID = dispatchedJob.ID
alloc.TaskGroup = dispatchedJob.TaskGroups[0].Name
alloc.Namespace = dispatchedJob.Namespace
alloc.ClientStatus = structs.AllocClientStatusPending
err = s1.State().UpsertAllocs(structs.MsgTypeTestSetup, nextIdx, []*structs.Allocation{alloc})
require.NoError(t, err)
require.Equal(t, &structs.JobChildrenSummary{Running: 1}, jobChildren())
require.Equal(t, structs.JobStatusRunning, dispatchedStatus())

// mark the creation eval as completed
nextIdx++
eval, err := state.EvalByID(nil, dispatchResp.EvalID)
require.NoError(t, err)
eval = eval.Copy()
eval.Status = structs.EvalStatusComplete
require.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, nextIdx, []*structs.Evaluation{eval}))

updateAllocStatus := func(status string) {
nextIdx++
nalloc, err := state.AllocByID(nil, alloc.ID)
require.NoError(t, err)
nalloc = nalloc.Copy()
nalloc.ClientStatus = status
err = s1.State().UpdateAllocsFromClient(structs.MsgTypeTestSetup, nextIdx, []*structs.Allocation{nalloc})
require.NoError(t, err)
}

// job should remain remaining when alloc runs
updateAllocStatus(structs.AllocClientStatusRunning)
require.Equal(t, &structs.JobChildrenSummary{Running: 1}, jobChildren())
require.Equal(t, structs.JobStatusRunning, dispatchedStatus())

// job should be dead after alloc completes
updateAllocStatus(structs.AllocClientStatusComplete)
require.Equal(t, &structs.JobChildrenSummary{Dead: 1}, jobChildren())
require.Equal(t, structs.JobStatusDead, dispatchedStatus())
}

func TestJobEndpoint_Scale(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down
4 changes: 3 additions & 1 deletion nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,14 @@ func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) (
}()

// Create a copy of the periodic job, give it a derived ID/Name and make it
// non-periodic.
// non-periodic in initial status
derived = periodicJob.Copy()
derived.ParentID = periodicJob.ID
derived.ID = p.derivedJobID(periodicJob, time)
derived.Name = derived.ID
derived.Periodic = nil
derived.Status = ""
derived.StatusDescription = ""
return
}

Expand Down
Loading

0 comments on commit 274e795

Please sign in to comment.