Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

periodic: always reset periodic children status #10145

Merged
merged 3 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
_ "github.com/hashicorp/nomad/e2e/nodedrain"
_ "github.com/hashicorp/nomad/e2e/nomad09upgrade"
_ "github.com/hashicorp/nomad/e2e/nomadexec"
_ "github.com/hashicorp/nomad/e2e/parameterized"
_ "github.com/hashicorp/nomad/e2e/periodic"
_ "github.com/hashicorp/nomad/e2e/podman"
_ "github.com/hashicorp/nomad/e2e/quotas"
Expand Down
50 changes: 47 additions & 3 deletions e2e/e2eutil/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ func Register(jobID, jobFilePath string) error {
return nil
}

// PeriodicForce forces a periodic job to dispatch, returning the child job ID
// or an error
// PeriodicForce forces a periodic job to dispatch
func PeriodicForce(jobID string) error {
// nomad job periodic force
cmd := exec.Command("nomad", "job", "periodic", "force", jobID)
Expand All @@ -54,6 +53,29 @@ func PeriodicForce(jobID string) error {
return nil
}

// Dispatch dispatches a parameterized job
func Dispatch(jobID string, meta map[string]string, payload string) error {
// nomad job periodic force
args := []string{"job", "dispatch"}
for k, v := range meta {
args = append(args, "-meta", fmt.Sprintf("%v=%v", k, v))
}
args = append(args, jobID)
if payload != "" {
args = append(args, "-")
}

cmd := exec.Command("nomad", args...)
cmd.Stdin = strings.NewReader(payload)

out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("could not dispatch job: %w\n%v", err, string(out))
}

return nil
}

// JobInspectTemplate runs nomad job inspect and formats the output
// using the specified go template
func JobInspectTemplate(jobID, template string) (string, error) {
Expand Down Expand Up @@ -102,7 +124,10 @@ func ChildrenJobSummary(jobID string) ([]map[string]string, error) {

section, err := GetSection(out, "Children Job Summary")
if err != nil {
return nil, fmt.Errorf("could not find children job summary section: %w", err)
section, err = GetSection(out, "Parameterized Job Summary")
if err != nil {
return nil, fmt.Errorf("could not find children job summary section: %w", err)
}
}

summary, err := ParseColumns(section)
Expand Down Expand Up @@ -131,3 +156,22 @@ func PreviouslyLaunched(jobID string) ([]map[string]string, error) {

return summary, nil
}

func DispatchedJobs(jobID string) ([]map[string]string, error) {
out, err := Command("nomad", "job", "status", jobID)
if err != nil {
return nil, fmt.Errorf("nomad job status failed: %w", err)
}

section, err := GetSection(out, "Dispatched Jobs")
if err != nil {
return nil, fmt.Errorf("could not find previously launched jobs section: %w", err)
}

summary, err := ParseColumns(section)
if err != nil {
return nil, fmt.Errorf("could not parse previously launched jobs section: %w", err)
}

return summary, nil
}
26 changes: 26 additions & 0 deletions e2e/parameterized/input/simple.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
job "periodic" {
datacenters = ["dc1"]
type = "batch"

constraint {
attribute = "${attr.kernel.name}"
operator = "set_contains_any"
value = "darwin,linux"
}

parameterized {
meta_optional = ["i"]
}

group "group" {
task "task" {
driver = "docker"

config {
image = "busybox:1"
command = "/bin/sh"
args = ["-c", "sleep 5"]
}
}
}
}
90 changes: 90 additions & 0 deletions e2e/parameterized/parameterized.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package parameterized

import (
"fmt"

"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

type ParameterizedTest struct {
framework.TC
jobIDs []string
}

func init() {
framework.AddSuites(&framework.TestSuite{
Component: "Parameterized",
CanRunLocal: true,
Cases: []framework.TestCase{
new(ParameterizedTest),
},
})
}

func (tc *ParameterizedTest) BeforeAll(f *framework.F) {
e2eutil.WaitForLeader(f.T(), tc.Nomad())
}

func (tc *ParameterizedTest) AfterEach(f *framework.F) {
nomadClient := tc.Nomad()
j := nomadClient.Jobs()

for _, id := range tc.jobIDs {
j.Deregister(id, true, nil)
}
_, err := e2eutil.Command("nomad", "system", "gc")
f.NoError(err)
}

func (tc *ParameterizedTest) TestParameterizedDispatch_Basic(f *framework.F) {
t := f.T()

uuid := uuid.Generate()
jobID := fmt.Sprintf("dispatch-%s", uuid[0:8])
tc.jobIDs = append(tc.jobIDs, jobID)

// register job
require.NoError(t, e2eutil.Register(jobID, "parameterized/input/simple.nomad"))

// force dispatch
dispatched := 4

for i := 0; i < dispatched; i++ {
require.NoError(t, e2eutil.Dispatch(jobID, map[string]string{"i": fmt.Sprintf("%v", i)}, ""))
}

testutil.WaitForResult(func() (bool, error) {
children, err := e2eutil.DispatchedJobs(jobID)
if err != nil {
return false, err
}

dead := 0
for _, c := range children {
if c["Status"] != "dead" {
return false, fmt.Errorf("expected periodic job to be dead")
}
dead++
}

if dead != dispatched {
return false, fmt.Errorf("expected %d but found %d children", dispatched, dead)
}

return true, nil
}, func(err error) {
require.NoError(t, err)
})

// Assert there are no pending children
summary, err := e2eutil.ChildrenJobSummary(jobID)
require.NoError(t, err)
require.Len(t, summary, 1)
require.Equal(t, summary[0]["Pending"], "0")
require.Equal(t, summary[0]["Running"], "0")
require.Equal(t, summary[0]["Dead"], fmt.Sprintf("%v", dispatched))
}
4 changes: 3 additions & 1 deletion e2e/periodic/input/simple.nomad
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ job "periodic" {

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
operator = "set_contains_any"
value = "darwin,linux"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's handy, good catch. There's probably a bunch of tests where this is safe to do. Something for a separate PR though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly, set_contains_any isn't documented in https://www.nomadproject.io/docs/job-specification/constraint - it's only documented for affinity constraints so we should update that too.

Also, for future readers, I considered having the constraint be based on docker os attribute - but chose not to. Windows is a bit slow, and I didn't want to run tests on LCOW Windows clients if we add some.

}



periodic {
cron = "* * * * *"
prohibit_overlap = true
Expand Down
1 change: 1 addition & 0 deletions e2e/periodic/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,6 @@ func (tc *PeriodicTest) TestPeriodicDispatch_Basic(f *framework.F) {
require.NoError(t, err)
require.Len(t, summary, 1)
require.Equal(t, summary[0]["Pending"], "0")
require.Equal(t, summary[0]["Running"], "0")
require.Equal(t, summary[0]["Dead"], "1")
}
4 changes: 3 additions & 1 deletion nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1887,13 +1887,15 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
return err
}

// Derive the child job and commit it via Raft
// Derive the child job and commit it via Raft - with initial status
dispatchJob := parameterizedJob.Copy()
dispatchJob.ID = structs.DispatchedID(parameterizedJob.ID, time.Now())
dispatchJob.ParentID = parameterizedJob.ID
dispatchJob.Name = dispatchJob.ID
dispatchJob.SetSubmitTime()
dispatchJob.Dispatched = true
dispatchJob.Status = ""
dispatchJob.StatusDescription = ""

// Merge in the meta data
for k, v := range args.Meta {
Expand Down
111 changes: 111 additions & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6401,6 +6401,117 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
}
}

// TestJobEndpoint_Dispatch_JobChildrenSummary asserts that the job summary is updated
// appropriately as its dispatched/children jobs status are updated.
func TestJobEndpoint_Dispatch_JobChildrenSummary(t *testing.T) {
t.Parallel()

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()

state := s1.fsm.State()

codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

node := mock.Node()
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1, node))

parameterizedJob := mock.BatchJob()
parameterizedJob.ParameterizedJob = &structs.ParameterizedJobConfig{}

// Create the register request
regReq := &structs.JobRegisterRequest{
Job: parameterizedJob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: parameterizedJob.Namespace,
},
}
var regResp structs.JobRegisterResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", regReq, &regResp))

jobChildren := func() *structs.JobChildrenSummary {
summary, err := state.JobSummaryByID(nil, parameterizedJob.Namespace, parameterizedJob.ID)
require.NoError(t, err)

return summary.Children
}
require.Equal(t, &structs.JobChildrenSummary{}, jobChildren())

// dispatch a child job
dispatchReq := &structs.JobDispatchRequest{
JobID: parameterizedJob.ID,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: parameterizedJob.Namespace,
},
}
var dispatchResp structs.JobDispatchResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", dispatchReq, &dispatchResp)
require.NoError(t, err)

nextIdx := dispatchResp.Index + 1

require.Equal(t, &structs.JobChildrenSummary{Pending: 1}, jobChildren())

dispatchedJob, err := state.JobByID(nil, parameterizedJob.Namespace, dispatchResp.DispatchedJobID)
require.NoError(t, err)
require.NotNil(t, dispatchedJob)

dispatchedStatus := func() string {
job, err := state.JobByID(nil, dispatchedJob.Namespace, dispatchedJob.ID)
require.NoError(t, err)
require.NotNil(t, job)

return job.Status
}

// Let's start a alloc for the dispatch job and walk through states
// Note that job summary reports 1 running even when alloc is pending!
nextIdx++
alloc := mock.Alloc()
alloc.Job = dispatchedJob
alloc.JobID = dispatchedJob.ID
alloc.TaskGroup = dispatchedJob.TaskGroups[0].Name
alloc.Namespace = dispatchedJob.Namespace
alloc.ClientStatus = structs.AllocClientStatusPending
err = s1.State().UpsertAllocs(structs.MsgTypeTestSetup, nextIdx, []*structs.Allocation{alloc})
require.NoError(t, err)
require.Equal(t, &structs.JobChildrenSummary{Running: 1}, jobChildren())
require.Equal(t, structs.JobStatusRunning, dispatchedStatus())

// mark the creation eval as completed
nextIdx++
eval, err := state.EvalByID(nil, dispatchResp.EvalID)
require.NoError(t, err)
eval = eval.Copy()
eval.Status = structs.EvalStatusComplete
require.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, nextIdx, []*structs.Evaluation{eval}))

updateAllocStatus := func(status string) {
nextIdx++
nalloc, err := state.AllocByID(nil, alloc.ID)
require.NoError(t, err)
nalloc = nalloc.Copy()
nalloc.ClientStatus = status
err = s1.State().UpdateAllocsFromClient(structs.MsgTypeTestSetup, nextIdx, []*structs.Allocation{nalloc})
require.NoError(t, err)
}

// job should remain remaining when alloc runs
updateAllocStatus(structs.AllocClientStatusRunning)
require.Equal(t, &structs.JobChildrenSummary{Running: 1}, jobChildren())
require.Equal(t, structs.JobStatusRunning, dispatchedStatus())

// job should be dead after alloc completes
updateAllocStatus(structs.AllocClientStatusComplete)
require.Equal(t, &structs.JobChildrenSummary{Dead: 1}, jobChildren())
require.Equal(t, structs.JobStatusDead, dispatchedStatus())
}

func TestJobEndpoint_Scale(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down
4 changes: 3 additions & 1 deletion nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,14 @@ func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) (
}()

// Create a copy of the periodic job, give it a derived ID/Name and make it
// non-periodic.
// non-periodic in initial status
derived = periodicJob.Copy()
derived.ParentID = periodicJob.ID
derived.ID = p.derivedJobID(periodicJob, time)
derived.Name = derived.ID
derived.Periodic = nil
derived.Status = ""
derived.StatusDescription = ""
return
}

Expand Down
Loading