diff --git a/CHANGELOG.md b/CHANGELOG.md index 2132626467f..38da35a2b61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ FEATURES: IMPROVEMENTS: * core: support for persisting previous task group counts when updating a job [[GH-8168](https://github.com/hashicorp/nomad/issues/8168)] +* core: block Job.Scale actions when the job is under active deployment [[GH-8187](https://github.com/hashicorp/nomad/issues/8187)] * api: Persist previous count with scaling events [[GH-8167](https://github.com/hashicorp/nomad/issues/8167)] * build: Updated to Go 1.14.4 [[GH-8172](https://github.com/hashicorp/nomad/issues/9172)] diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 52b3ff790b8..e706eb89ba9 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -931,6 +931,7 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes ws := memdb.NewWatchSet() job, err := snap.JobByID(ws, namespace, args.JobID) if err != nil { + j.logger.Error("unable to lookup job", "error", err) return err } if job == nil { @@ -955,11 +956,46 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes // for now, we'll do this even if count didn't change prevCount := found.Count if args.Count != nil { + + // Lookup the latest deployment, to see whether this scaling event should be blocked + d, err := snap.LatestDeploymentByJobID(ws, namespace, args.JobID) + if err != nil { + j.logger.Error("unable to lookup latest deployment", "error", err) + return err + } + // explicitly filter deployment by JobCreateIndex to be safe, because LatestDeploymentByJobID doesn't + if d != nil && d.JobCreateIndex == job.CreateIndex && d.Active() { + // attempt to register the scaling event + JobScalingBlockedByActiveDeployment := "job scaling blocked due to active deployment" + event := &structs.ScalingEventRequest{ + Namespace: job.Namespace, + JobID: job.ID, + TaskGroup: groupName, + ScalingEvent: &structs.ScalingEvent{ + Time: now, + PreviousCount: int64(prevCount), + Message: JobScalingBlockedByActiveDeployment, + Error: true, + Meta: map[string]interface{}{ + "OriginalMessage": args.Message, + "OriginalCount": *args.Count, + "OriginalMeta": args.Meta, + }, + }, + } + if _, _, err := j.srv.raftApply(structs.ScalingEventRegisterRequestType, event); err != nil { + // just log the error, this was a best-effort attempt + j.logger.Error("scaling event create failed during block scaling action", "error", err) + } + return structs.NewErrRPCCoded(400, JobScalingBlockedByActiveDeployment) + } + truncCount := int(*args.Count) if int64(truncCount) != *args.Count { return structs.NewErrRPCCoded(400, fmt.Sprintf("new scaling count is too large for TaskGroup.Count (int): %v", args.Count)) } + // update the task group count found.Count = truncCount registerReq := structs.JobRegisterRequest{ diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 8db4085c8a4..8f45afab128 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -5555,6 +5555,178 @@ func TestJobEndpoint_Scale(t *testing.T) { require.Equal(int64(originalCount), events[groupName][0].PreviousCount) } +func TestJobEndpoint_Scale_DeploymentBlocking(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + state := s1.fsm.State() + + type testCase struct { + latestDeploymentStatus string + } + cases := []string{ + structs.DeploymentStatusSuccessful, + structs.DeploymentStatusPaused, + structs.DeploymentStatusRunning, + } + + for _, tc := range cases { + // create a job with a deployment history + job := mock.Job() + require.Nil(state.UpsertJob(1000, job), "UpsertJob") + d1 := mock.Deployment() + d1.Status = structs.DeploymentStatusCancelled + d1.StatusDescription = structs.DeploymentStatusDescriptionNewerJob + d1.JobID = job.ID + d1.JobCreateIndex = job.CreateIndex + require.Nil(state.UpsertDeployment(1001, d1), "UpsertDeployment") + d2 := mock.Deployment() + d2.Status = structs.DeploymentStatusSuccessful + d2.StatusDescription = structs.DeploymentStatusDescriptionSuccessful + d2.JobID = job.ID + d2.JobCreateIndex = job.CreateIndex + require.Nil(state.UpsertDeployment(1002, d2), "UpsertDeployment") + + // add the latest deployment for the test case + dLatest := mock.Deployment() + dLatest.Status = tc + dLatest.StatusDescription = "description does not matter for this test" + dLatest.JobID = job.ID + dLatest.JobCreateIndex = job.CreateIndex + require.Nil(state.UpsertDeployment(1003, dLatest), "UpsertDeployment") + + // attempt to scale + originalCount := job.TaskGroups[0].Count + newCount := int64(originalCount+1) + groupName := job.TaskGroups[0].Name + scalingMetadata := map[string]interface{}{ + "meta": "data", + } + scalingMessage := "original reason for scaling" + scale := &structs.JobScaleRequest{ + JobID: job.ID, + Target: map[string]string{ + structs.ScalingTargetGroup: groupName, + }, + Meta: scalingMetadata, + Message: scalingMessage, + Count: helper.Int64ToPtr(newCount), + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp) + if dLatest.Active() { + // should fail + require.Error(err, "test case %q", tc) + require.Contains(err.Error(), "active deployment") + } else { + require.NoError(err, "test case %q", tc) + require.NotEmpty(resp.EvalID) + require.Greater(resp.EvalCreateIndex, resp.JobModifyIndex) + } + + events, _, _ := state.ScalingEventsByJob(nil, job.Namespace, job.ID) + require.Equal(1, len(events[groupName])) + latestEvent := events[groupName][0] + if dLatest.Active() { + require.True(latestEvent.Error) + require.Nil(latestEvent.Count) + require.Contains(latestEvent.Message, "blocked due to active deployment") + require.Equal(latestEvent.Meta["OriginalCount"], newCount) + require.Equal(latestEvent.Meta["OriginalMessage"], scalingMessage) + require.Equal(latestEvent.Meta["OriginalMeta"], scalingMetadata) + } else { + require.False(latestEvent.Error) + require.NotNil(latestEvent.Count) + require.Equal(newCount, *latestEvent.Count) + } + } +} + +func TestJobEndpoint_Scale_InformationalEventsShouldNotBeBlocked(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + state := s1.fsm.State() + + type testCase struct { + latestDeploymentStatus string + } + cases := []string{ + structs.DeploymentStatusSuccessful, + structs.DeploymentStatusPaused, + structs.DeploymentStatusRunning, + } + + for _, tc := range cases { + // create a job with a deployment history + job := mock.Job() + require.Nil(state.UpsertJob(1000, job), "UpsertJob") + d1 := mock.Deployment() + d1.Status = structs.DeploymentStatusCancelled + d1.StatusDescription = structs.DeploymentStatusDescriptionNewerJob + d1.JobID = job.ID + d1.JobCreateIndex = job.CreateIndex + require.Nil(state.UpsertDeployment(1001, d1), "UpsertDeployment") + d2 := mock.Deployment() + d2.Status = structs.DeploymentStatusSuccessful + d2.StatusDescription = structs.DeploymentStatusDescriptionSuccessful + d2.JobID = job.ID + d2.JobCreateIndex = job.CreateIndex + require.Nil(state.UpsertDeployment(1002, d2), "UpsertDeployment") + + // add the latest deployment for the test case + dLatest := mock.Deployment() + dLatest.Status = tc + dLatest.StatusDescription = "description does not matter for this test" + dLatest.JobID = job.ID + dLatest.JobCreateIndex = job.CreateIndex + require.Nil(state.UpsertDeployment(1003, dLatest), "UpsertDeployment") + + // register informational scaling event + groupName := job.TaskGroups[0].Name + scalingMetadata := map[string]interface{}{ + "meta": "data", + } + scalingMessage := "original reason for scaling" + scale := &structs.JobScaleRequest{ + JobID: job.ID, + Target: map[string]string{ + structs.ScalingTargetGroup: groupName, + }, + Meta: scalingMetadata, + Message: scalingMessage, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp) + require.NoError(err, "test case %q", tc) + require.Empty(resp.EvalID) + + events, _, _ := state.ScalingEventsByJob(nil, job.Namespace, job.ID) + require.Equal(1, len(events[groupName])) + latestEvent := events[groupName][0] + require.False(latestEvent.Error) + require.Nil(latestEvent.Count) + require.Equal(scalingMessage, latestEvent.Message) + require.Equal(scalingMetadata, latestEvent.Meta) + } +} + func TestJobEndpoint_Scale_ACL(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/website/pages/api-docs/jobs.mdx b/website/pages/api-docs/jobs.mdx index 8d119c82ee9..2c4bc1f5ee8 100644 --- a/website/pages/api-docs/jobs.mdx +++ b/website/pages/api-docs/jobs.mdx @@ -1783,6 +1783,7 @@ $ curl \ This endpoint performs a scaling action against a job. Currently, this endpoint supports scaling the count for a task group. +This will return a 400 error if the job has an active deployment. | Method | Path | Produces | | ------ | ----------------------- | ------------------ |