From 1f944196d994bff14d7bb73a29bd59dede08f315 Mon Sep 17 00:00:00 2001 From: Juana De La Cuesta Date: Mon, 18 Nov 2024 13:35:47 +0100 Subject: [PATCH] Allow scaling system jobs to 0 (#24363) * func: remove validation scaling for system jobs and dont canonicalize to 1 * test: update test to validate for 0 and improve error message * func: remove the canonicalization to 1 from system jobs * docs: add changelog * func: add test for scaling system jobs * temp: add logging to debug test * fix: clean up after test is done * fix: scaled down jobs will still have the stop allocation, update test to account for it * Update the e2e test to accomodate for system jobs to have an alloc per node * fix: filter to only count ready nodes on the node count * fix: remove the datacenter constrain from the system job definition * fix: compare alloc IDs to avoid flaky tests when verifying no alloc was stoped * fix: remove duplicated code --- .changelog/24363.txt | 3 + .../input/namespace_default_system.nomad | 25 +++++ e2e/scaling/scaling.go | 100 +++++++++++++++++- nomad/job_endpoint.go | 5 +- nomad/job_endpoint_test.go | 29 ++++- nomad/structs/structs.go | 4 - 6 files changed, 156 insertions(+), 10 deletions(-) create mode 100644 .changelog/24363.txt create mode 100644 e2e/scaling/input/namespace_default_system.nomad diff --git a/.changelog/24363.txt b/.changelog/24363.txt new file mode 100644 index 00000000000..8c7549c85b8 --- /dev/null +++ b/.changelog/24363.txt @@ -0,0 +1,3 @@ +```release-note:improvement +core: add the possibility to scale system jobs between 0 and 1 +``` diff --git a/e2e/scaling/input/namespace_default_system.nomad b/e2e/scaling/input/namespace_default_system.nomad new file mode 100644 index 00000000000..75a22af8653 --- /dev/null +++ b/e2e/scaling/input/namespace_default_system.nomad @@ -0,0 +1,25 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: BUSL-1.1 + +job "system_job" { + type = "system" + + group "system_job_group" { + + task "system_task" { + driver = "docker" + + config { + image = "busybox:1" + + command = "/bin/sh" + args = ["-c", "sleep 15000"] + } + + env { + version = "1" + } + } + } +} + diff --git a/e2e/scaling/scaling.go b/e2e/scaling/scaling.go index 10de40b1dd4..5b3580e03bd 100644 --- a/e2e/scaling/scaling.go +++ b/e2e/scaling/scaling.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/nomad/e2e/framework" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/structs" ) type ScalingE2ETest struct { @@ -27,7 +28,6 @@ func init() { new(ScalingE2ETest), }, }) - } func (tc *ScalingE2ETest) BeforeAll(f *framework.F) { @@ -165,3 +165,101 @@ func (tc *ScalingE2ETest) TestScalingNamespaces(f *framework.F) { "Nomad e2e testing", false, nil, &aWriteOpts) f.NoError(err) } + +// TestScalingBasic performs basic scaling e2e tests within a single namespace using +// using a SystemScheduler. +func (tc *ScalingE2ETest) TestScalingBasicWithSystemSchedule(f *framework.F) { + t := f.T() + nomadClient := tc.Nomad() + + // Register a system job with a scaling policy without a group count, it should + // default to 1 per node. + + jobID := "test-scaling-" + uuid.Generate()[0:8] + e2eutil.RegisterAndWaitForAllocs(t, nomadClient, "scaling/input/namespace_default_system.nomad", jobID, "") + + jobs := nomadClient.Jobs() + initialAllocs, _, err := jobs.Allocations(jobID, true, nil) + f.NoError(err) + + nodeStubList, _, err := nomadClient.Nodes().List(&api.QueryOptions{Namespace: "default"}) + f.NoError(err) + + // A system job will spawn an allocation per node, we need to know how many nodes + // there are to know how many allocations to expect. + numberOfNodes := len(nodeStubList) + + f.Equal(numberOfNodes, len(initialAllocs)) + allocIDs := e2eutil.AllocIDsFromAllocationListStubs(initialAllocs) + + // Wait for allocations to get past initial pending state + e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs) + + // Try to scale beyond 1 + testMeta := map[string]interface{}{"scaling-e2e-test": "value"} + scaleResp, _, err := tc.Nomad().Jobs().Scale(jobID, "system_job_group", pointer.Of(3), + "Nomad e2e testing", false, testMeta, nil) + + f.Error(err) + f.Nil(scaleResp) + + // The same allocs should be running. + jobs = nomadClient.Jobs() + allocs1, _, err := jobs.Allocations(jobID, true, nil) + f.NoError(err) + + f.Equal(len(initialAllocs), len(allocs1)) + + for i, a := range allocs1 { + f.Equal(a.ID, initialAllocs[i].ID) + } + + // Scale down to 0 + testMeta = map[string]interface{}{"scaling-e2e-test": "value"} + scaleResp, _, err = tc.Nomad().Jobs().Scale(jobID, "system_job_group", pointer.Of(0), + "Nomad e2e testing", false, testMeta, nil) + f.NoError(err) + f.NotEmpty(scaleResp.EvalID) + + // Assert job is still up but no allocs are running + stopedAllocs, _, err := jobs.Allocations(jobID, false, nil) + f.NoError(err) + + f.Equal(numberOfNodes, len(filterAllocsByDesiredStatus(structs.AllocDesiredStatusStop, stopedAllocs))) + f.Equal(numberOfNodes, len(stopedAllocs)) + + // Scale up to 1 again + testMeta = map[string]interface{}{"scaling-e2e-test": "value"} + scaleResp, _, err = tc.Nomad().Jobs().Scale(jobID, "system_job_group", pointer.Of(1), + "Nomad e2e testing", false, testMeta, nil) + f.NoError(err) + f.NotEmpty(scaleResp.EvalID) + + // Wait for new allocation to get past initial pending state + e2eutil.WaitForAllocsNotPending(t, nomadClient, allocIDs) + + // Assert job is still running and there is a running allocation again + allocs, _, err := jobs.Allocations(jobID, true, nil) + f.NoError(err) + f.Equal(numberOfNodes*2, len(allocs)) + + f.Equal(numberOfNodes, len(filterAllocsByDesiredStatus(structs.AllocDesiredStatusStop, allocs))) + f.Equal(numberOfNodes, len(filterAllocsByDesiredStatus(structs.AllocDesiredStatusRun, allocs))) + + // Remove the job. + _, _, err = tc.Nomad().Jobs().Deregister(jobID, true, nil) + f.NoError(err) + f.NoError(tc.Nomad().System().GarbageCollect()) +} + +func filterAllocsByDesiredStatus(status string, allocs []*api.AllocationListStub) []*api.AllocationListStub { + res := []*api.AllocationListStub{} + + for _, a := range allocs { + if a.DesiredStatus == status { + res = append(res, a) + } + } + + return res +} diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index e30ab29a611..7267f0a15b6 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1027,8 +1027,9 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes if job == nil { return structs.NewErrRPCCoded(404, fmt.Sprintf("job %q not found", args.JobID)) } - if job.Type == structs.JobTypeSystem { - return structs.NewErrRPCCoded(http.StatusBadRequest, `cannot scale jobs of type "system"`) + + if job.Type == structs.JobTypeSystem && *args.Count > 1 { + return structs.NewErrRPCCoded(http.StatusBadRequest, `jobs of type "system" can only be scaled between 0 and 1`) } // Since job is going to be mutated we must copy it since state store methods diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index f0f8cef1fa2..cd786157468 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -8271,20 +8271,43 @@ func TestJobEndpoint_Scale_SystemJob(t *testing.T) { mockSystemJob := mock.SystemJob() must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 10, nil, mockSystemJob)) + // Scale to 0 scaleReq := &structs.JobScaleRequest{ JobID: mockSystemJob.ID, Target: map[string]string{ structs.ScalingTargetGroup: mockSystemJob.TaskGroups[0].Name, }, - Count: pointer.Of(int64(13)), + Count: pointer.Of(int64(0)), WriteRequest: structs.WriteRequest{ Region: DefaultRegion, Namespace: mockSystemJob.Namespace, }, } - var resp structs.JobRegisterResponse + + resp := structs.JobRegisterResponse{} + err := msgpackrpc.CallWithCodec(codec, "Job.Scale", scaleReq, &resp) + must.NoError(t, err) + + // Scale to a negative number + scaleReq.Count = pointer.Of(int64(-5)) + + resp = structs.JobRegisterResponse{} + must.ErrorContains(t, msgpackrpc.CallWithCodec(codec, "Job.Scale", scaleReq, &resp), + `400,scaling action count can't be negative`) + + // Scale back to 1 + scaleReq.Count = pointer.Of(int64(1)) + + resp = structs.JobRegisterResponse{} + err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scaleReq, &resp) + must.NoError(t, err) + + // Scale beyond 1 + scaleReq.Count = pointer.Of(int64(13)) + + resp = structs.JobRegisterResponse{} must.ErrorContains(t, msgpackrpc.CallWithCodec(codec, "Job.Scale", scaleReq, &resp), - `400,cannot scale jobs of type "system"`) + `400,jobs of type "system" can only be scaled between 0 and 1`) } func TestJobEndpoint_Scale_BatchJob(t *testing.T) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index ba2872beb87..21d9c483aa5 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -7055,10 +7055,6 @@ func (tg *TaskGroup) Canonicalize(job *Job) { tg.EphemeralDisk = DefaultEphemeralDisk() } - if job.Type == JobTypeSystem && tg.Count == 0 { - tg.Count = 1 - } - if tg.Scaling != nil { tg.Scaling.Canonicalize(job, tg, nil) }