diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index deecc1c4dfc..fe3f4077ccc 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -26,6 +26,7 @@ import ( _ "github.com/hashicorp/nomad/e2e/nodedrain" _ "github.com/hashicorp/nomad/e2e/nomad09upgrade" _ "github.com/hashicorp/nomad/e2e/nomadexec" + _ "github.com/hashicorp/nomad/e2e/parameterized" _ "github.com/hashicorp/nomad/e2e/periodic" _ "github.com/hashicorp/nomad/e2e/podman" _ "github.com/hashicorp/nomad/e2e/quotas" diff --git a/e2e/e2eutil/job.go b/e2e/e2eutil/job.go index d0a74a6df88..96d29f67efe 100644 --- a/e2e/e2eutil/job.go +++ b/e2e/e2eutil/job.go @@ -40,8 +40,7 @@ func Register(jobID, jobFilePath string) error { return nil } -// PeriodicForce forces a periodic job to dispatch, returning the child job ID -// or an error +// PeriodicForce forces a periodic job to dispatch func PeriodicForce(jobID string) error { // nomad job periodic force cmd := exec.Command("nomad", "job", "periodic", "force", jobID) @@ -54,6 +53,29 @@ func PeriodicForce(jobID string) error { return nil } +// Dispatch dispatches a parameterized job +func Dispatch(jobID string, meta map[string]string, payload string) error { + // nomad job periodic force + args := []string{"job", "dispatch"} + for k, v := range meta { + args = append(args, "-meta", fmt.Sprintf("%v=%v", k, v)) + } + args = append(args, jobID) + if payload != "" { + args = append(args, "-") + } + + cmd := exec.Command("nomad", args...) + cmd.Stdin = strings.NewReader(payload) + + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("could not dispatch job: %w\n%v", err, string(out)) + } + + return nil +} + // JobInspectTemplate runs nomad job inspect and formats the output // using the specified go template func JobInspectTemplate(jobID, template string) (string, error) { @@ -102,7 +124,10 @@ func ChildrenJobSummary(jobID string) ([]map[string]string, error) { section, err := GetSection(out, "Children Job Summary") if err != nil { - return nil, fmt.Errorf("could not find children job summary section: %w", err) + section, err = GetSection(out, "Parameterized Job Summary") + if err != nil { + return nil, fmt.Errorf("could not find children job summary section: %w", err) + } } summary, err := ParseColumns(section) @@ -131,3 +156,22 @@ func PreviouslyLaunched(jobID string) ([]map[string]string, error) { return summary, nil } + +func DispatchedJobs(jobID string) ([]map[string]string, error) { + out, err := Command("nomad", "job", "status", jobID) + if err != nil { + return nil, fmt.Errorf("nomad job status failed: %w", err) + } + + section, err := GetSection(out, "Dispatched Jobs") + if err != nil { + return nil, fmt.Errorf("could not find previously launched jobs section: %w", err) + } + + summary, err := ParseColumns(section) + if err != nil { + return nil, fmt.Errorf("could not parse previously launched jobs section: %w", err) + } + + return summary, nil +} diff --git a/e2e/parameterized/input/simple.nomad b/e2e/parameterized/input/simple.nomad new file mode 100644 index 00000000000..868095a574a --- /dev/null +++ b/e2e/parameterized/input/simple.nomad @@ -0,0 +1,26 @@ +job "periodic" { + datacenters = ["dc1"] + type = "batch" + + constraint { + attribute = "${attr.kernel.name}" + operator = "set_contains_any" + value = "darwin,linux" + } + + parameterized { + meta_optional = ["i"] + } + + group "group" { + task "task" { + driver = "docker" + + config { + image = "busybox:1" + command = "/bin/sh" + args = ["-c", "sleep 5"] + } + } + } +} diff --git a/e2e/parameterized/parameterized.go b/e2e/parameterized/parameterized.go new file mode 100644 index 00000000000..33df7cf73a6 --- /dev/null +++ b/e2e/parameterized/parameterized.go @@ -0,0 +1,90 @@ +package parameterized + +import ( + "fmt" + + "github.com/hashicorp/nomad/e2e/e2eutil" + "github.com/hashicorp/nomad/e2e/framework" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" +) + +type ParameterizedTest struct { + framework.TC + jobIDs []string +} + +func init() { + framework.AddSuites(&framework.TestSuite{ + Component: "Parameterized", + CanRunLocal: true, + Cases: []framework.TestCase{ + new(ParameterizedTest), + }, + }) +} + +func (tc *ParameterizedTest) BeforeAll(f *framework.F) { + e2eutil.WaitForLeader(f.T(), tc.Nomad()) +} + +func (tc *ParameterizedTest) AfterEach(f *framework.F) { + nomadClient := tc.Nomad() + j := nomadClient.Jobs() + + for _, id := range tc.jobIDs { + j.Deregister(id, true, nil) + } + _, err := e2eutil.Command("nomad", "system", "gc") + f.NoError(err) +} + +func (tc *ParameterizedTest) TestParameterizedDispatch_Basic(f *framework.F) { + t := f.T() + + uuid := uuid.Generate() + jobID := fmt.Sprintf("dispatch-%s", uuid[0:8]) + tc.jobIDs = append(tc.jobIDs, jobID) + + // register job + require.NoError(t, e2eutil.Register(jobID, "parameterized/input/simple.nomad")) + + // force dispatch + dispatched := 4 + + for i := 0; i < dispatched; i++ { + require.NoError(t, e2eutil.Dispatch(jobID, map[string]string{"i": fmt.Sprintf("%v", i)}, "")) + } + + testutil.WaitForResult(func() (bool, error) { + children, err := e2eutil.DispatchedJobs(jobID) + if err != nil { + return false, err + } + + dead := 0 + for _, c := range children { + if c["Status"] != "dead" { + return false, fmt.Errorf("expected periodic job to be dead") + } + dead++ + } + + if dead != dispatched { + return false, fmt.Errorf("expected %d but found %d children", dispatched, dead) + } + + return true, nil + }, func(err error) { + require.NoError(t, err) + }) + + // Assert there are no pending children + summary, err := e2eutil.ChildrenJobSummary(jobID) + require.NoError(t, err) + require.Len(t, summary, 1) + require.Equal(t, summary[0]["Pending"], "0") + require.Equal(t, summary[0]["Running"], "0") + require.Equal(t, summary[0]["Dead"], fmt.Sprintf("%v", dispatched)) +} diff --git a/e2e/periodic/input/simple.nomad b/e2e/periodic/input/simple.nomad index c63974c8169..ca315c23cf3 100644 --- a/e2e/periodic/input/simple.nomad +++ b/e2e/periodic/input/simple.nomad @@ -4,10 +4,12 @@ job "periodic" { constraint { attribute = "${attr.kernel.name}" - value = "linux" + operator = "set_contains_any" + value = "darwin,linux" } + periodic { cron = "* * * * *" prohibit_overlap = true diff --git a/e2e/periodic/periodic.go b/e2e/periodic/periodic.go index c20a3262573..cef0d11f69c 100644 --- a/e2e/periodic/periodic.go +++ b/e2e/periodic/periodic.go @@ -74,5 +74,6 @@ func (tc *PeriodicTest) TestPeriodicDispatch_Basic(f *framework.F) { require.NoError(t, err) require.Len(t, summary, 1) require.Equal(t, summary[0]["Pending"], "0") + require.Equal(t, summary[0]["Running"], "0") require.Equal(t, summary[0]["Dead"], "1") } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 82dace1c434..2278ca71ce9 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -6401,6 +6401,117 @@ func TestJobEndpoint_Dispatch(t *testing.T) { } } +// TestJobEndpoint_Dispatch_JobChildrenSummary asserts that the job summary is updated +// appropriately as its dispatched/children jobs status are updated. +func TestJobEndpoint_Dispatch_JobChildrenSummary(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + + state := s1.fsm.State() + + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + node := mock.Node() + require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1, node)) + + parameterizedJob := mock.BatchJob() + parameterizedJob.ParameterizedJob = &structs.ParameterizedJobConfig{} + + // Create the register request + regReq := &structs.JobRegisterRequest{ + Job: parameterizedJob, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: parameterizedJob.Namespace, + }, + } + var regResp structs.JobRegisterResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", regReq, ®Resp)) + + jobChildren := func() *structs.JobChildrenSummary { + summary, err := state.JobSummaryByID(nil, parameterizedJob.Namespace, parameterizedJob.ID) + require.NoError(t, err) + + return summary.Children + } + require.Equal(t, &structs.JobChildrenSummary{}, jobChildren()) + + // dispatch a child job + dispatchReq := &structs.JobDispatchRequest{ + JobID: parameterizedJob.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: parameterizedJob.Namespace, + }, + } + var dispatchResp structs.JobDispatchResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", dispatchReq, &dispatchResp) + require.NoError(t, err) + + nextIdx := dispatchResp.Index + 1 + + require.Equal(t, &structs.JobChildrenSummary{Pending: 1}, jobChildren()) + + dispatchedJob, err := state.JobByID(nil, parameterizedJob.Namespace, dispatchResp.DispatchedJobID) + require.NoError(t, err) + require.NotNil(t, dispatchedJob) + + dispatchedStatus := func() string { + job, err := state.JobByID(nil, dispatchedJob.Namespace, dispatchedJob.ID) + require.NoError(t, err) + require.NotNil(t, job) + + return job.Status + } + + // Let's start a alloc for the dispatch job and walk through states + // Note that job summary reports 1 running even when alloc is pending! + nextIdx++ + alloc := mock.Alloc() + alloc.Job = dispatchedJob + alloc.JobID = dispatchedJob.ID + alloc.TaskGroup = dispatchedJob.TaskGroups[0].Name + alloc.Namespace = dispatchedJob.Namespace + alloc.ClientStatus = structs.AllocClientStatusPending + err = s1.State().UpsertAllocs(structs.MsgTypeTestSetup, nextIdx, []*structs.Allocation{alloc}) + require.NoError(t, err) + require.Equal(t, &structs.JobChildrenSummary{Running: 1}, jobChildren()) + require.Equal(t, structs.JobStatusRunning, dispatchedStatus()) + + // mark the creation eval as completed + nextIdx++ + eval, err := state.EvalByID(nil, dispatchResp.EvalID) + require.NoError(t, err) + eval = eval.Copy() + eval.Status = structs.EvalStatusComplete + require.NoError(t, state.UpsertEvals(structs.MsgTypeTestSetup, nextIdx, []*structs.Evaluation{eval})) + + updateAllocStatus := func(status string) { + nextIdx++ + nalloc, err := state.AllocByID(nil, alloc.ID) + require.NoError(t, err) + nalloc = nalloc.Copy() + nalloc.ClientStatus = status + err = s1.State().UpdateAllocsFromClient(structs.MsgTypeTestSetup, nextIdx, []*structs.Allocation{nalloc}) + require.NoError(t, err) + } + + // job should remain remaining when alloc runs + updateAllocStatus(structs.AllocClientStatusRunning) + require.Equal(t, &structs.JobChildrenSummary{Running: 1}, jobChildren()) + require.Equal(t, structs.JobStatusRunning, dispatchedStatus()) + + // job should be dead after alloc completes + updateAllocStatus(structs.AllocClientStatusComplete) + require.Equal(t, &structs.JobChildrenSummary{Dead: 1}, jobChildren()) + require.Equal(t, structs.JobStatusDead, dispatchedStatus()) +} + func TestJobEndpoint_Scale(t *testing.T) { t.Parallel() require := require.New(t)