Skip to content

Commit

Permalink
Merge pull request #8187 from hashicorp/f-8143-block-scaling-during-d…
Browse files Browse the repository at this point in the history
…eployment

modify Job.Scale RPC to return an error if there is an active deployment
  • Loading branch information
cgbaker authored Jun 17, 2020
2 parents 8091ea4 + 50be177 commit ea6e351
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ FEATURES:
IMPROVEMENTS:

* core: support for persisting previous task group counts when updating a job [[GH-8168](https://github.com/hashicorp/nomad/issues/8168)]
* core: block Job.Scale actions when the job is under active deployment [[GH-8187](https://github.com/hashicorp/nomad/issues/8187)]
* api: Persist previous count with scaling events [[GH-8167](https://github.com/hashicorp/nomad/issues/8167)]
* build: Updated to Go 1.14.4 [[GH-8172](https://github.com/hashicorp/nomad/issues/9172)]

Expand Down
36 changes: 36 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,7 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
ws := memdb.NewWatchSet()
job, err := snap.JobByID(ws, namespace, args.JobID)
if err != nil {
j.logger.Error("unable to lookup job", "error", err)
return err
}
if job == nil {
Expand All @@ -955,11 +956,46 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
// for now, we'll do this even if count didn't change
prevCount := found.Count
if args.Count != nil {

// Lookup the latest deployment, to see whether this scaling event should be blocked
d, err := snap.LatestDeploymentByJobID(ws, namespace, args.JobID)
if err != nil {
j.logger.Error("unable to lookup latest deployment", "error", err)
return err
}
// explicitly filter deployment by JobCreateIndex to be safe, because LatestDeploymentByJobID doesn't
if d != nil && d.JobCreateIndex == job.CreateIndex && d.Active() {
// attempt to register the scaling event
JobScalingBlockedByActiveDeployment := "job scaling blocked due to active deployment"
event := &structs.ScalingEventRequest{
Namespace: job.Namespace,
JobID: job.ID,
TaskGroup: groupName,
ScalingEvent: &structs.ScalingEvent{
Time: now,
PreviousCount: int64(prevCount),
Message: JobScalingBlockedByActiveDeployment,
Error: true,
Meta: map[string]interface{}{
"OriginalMessage": args.Message,
"OriginalCount": *args.Count,
"OriginalMeta": args.Meta,
},
},
}
if _, _, err := j.srv.raftApply(structs.ScalingEventRegisterRequestType, event); err != nil {
// just log the error, this was a best-effort attempt
j.logger.Error("scaling event create failed during block scaling action", "error", err)
}
return structs.NewErrRPCCoded(400, JobScalingBlockedByActiveDeployment)
}

truncCount := int(*args.Count)
if int64(truncCount) != *args.Count {
return structs.NewErrRPCCoded(400,
fmt.Sprintf("new scaling count is too large for TaskGroup.Count (int): %v", args.Count))
}
// update the task group count
found.Count = truncCount

registerReq := structs.JobRegisterRequest{
Expand Down
172 changes: 172 additions & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5555,6 +5555,178 @@ func TestJobEndpoint_Scale(t *testing.T) {
require.Equal(int64(originalCount), events[groupName][0].PreviousCount)
}

func TestJobEndpoint_Scale_DeploymentBlocking(t *testing.T) {
t.Parallel()
require := require.New(t)

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
state := s1.fsm.State()

type testCase struct {
latestDeploymentStatus string
}
cases := []string{
structs.DeploymentStatusSuccessful,
structs.DeploymentStatusPaused,
structs.DeploymentStatusRunning,
}

for _, tc := range cases {
// create a job with a deployment history
job := mock.Job()
require.Nil(state.UpsertJob(1000, job), "UpsertJob")
d1 := mock.Deployment()
d1.Status = structs.DeploymentStatusCancelled
d1.StatusDescription = structs.DeploymentStatusDescriptionNewerJob
d1.JobID = job.ID
d1.JobCreateIndex = job.CreateIndex
require.Nil(state.UpsertDeployment(1001, d1), "UpsertDeployment")
d2 := mock.Deployment()
d2.Status = structs.DeploymentStatusSuccessful
d2.StatusDescription = structs.DeploymentStatusDescriptionSuccessful
d2.JobID = job.ID
d2.JobCreateIndex = job.CreateIndex
require.Nil(state.UpsertDeployment(1002, d2), "UpsertDeployment")

// add the latest deployment for the test case
dLatest := mock.Deployment()
dLatest.Status = tc
dLatest.StatusDescription = "description does not matter for this test"
dLatest.JobID = job.ID
dLatest.JobCreateIndex = job.CreateIndex
require.Nil(state.UpsertDeployment(1003, dLatest), "UpsertDeployment")

// attempt to scale
originalCount := job.TaskGroups[0].Count
newCount := int64(originalCount+1)
groupName := job.TaskGroups[0].Name
scalingMetadata := map[string]interface{}{
"meta": "data",
}
scalingMessage := "original reason for scaling"
scale := &structs.JobScaleRequest{
JobID: job.ID,
Target: map[string]string{
structs.ScalingTargetGroup: groupName,
},
Meta: scalingMetadata,
Message: scalingMessage,
Count: helper.Int64ToPtr(newCount),
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp)
if dLatest.Active() {
// should fail
require.Error(err, "test case %q", tc)
require.Contains(err.Error(), "active deployment")
} else {
require.NoError(err, "test case %q", tc)
require.NotEmpty(resp.EvalID)
require.Greater(resp.EvalCreateIndex, resp.JobModifyIndex)
}

events, _, _ := state.ScalingEventsByJob(nil, job.Namespace, job.ID)
require.Equal(1, len(events[groupName]))
latestEvent := events[groupName][0]
if dLatest.Active() {
require.True(latestEvent.Error)
require.Nil(latestEvent.Count)
require.Contains(latestEvent.Message, "blocked due to active deployment")
require.Equal(latestEvent.Meta["OriginalCount"], newCount)
require.Equal(latestEvent.Meta["OriginalMessage"], scalingMessage)
require.Equal(latestEvent.Meta["OriginalMeta"], scalingMetadata)
} else {
require.False(latestEvent.Error)
require.NotNil(latestEvent.Count)
require.Equal(newCount, *latestEvent.Count)
}
}
}

func TestJobEndpoint_Scale_InformationalEventsShouldNotBeBlocked(t *testing.T) {
t.Parallel()
require := require.New(t)

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
state := s1.fsm.State()

type testCase struct {
latestDeploymentStatus string
}
cases := []string{
structs.DeploymentStatusSuccessful,
structs.DeploymentStatusPaused,
structs.DeploymentStatusRunning,
}

for _, tc := range cases {
// create a job with a deployment history
job := mock.Job()
require.Nil(state.UpsertJob(1000, job), "UpsertJob")
d1 := mock.Deployment()
d1.Status = structs.DeploymentStatusCancelled
d1.StatusDescription = structs.DeploymentStatusDescriptionNewerJob
d1.JobID = job.ID
d1.JobCreateIndex = job.CreateIndex
require.Nil(state.UpsertDeployment(1001, d1), "UpsertDeployment")
d2 := mock.Deployment()
d2.Status = structs.DeploymentStatusSuccessful
d2.StatusDescription = structs.DeploymentStatusDescriptionSuccessful
d2.JobID = job.ID
d2.JobCreateIndex = job.CreateIndex
require.Nil(state.UpsertDeployment(1002, d2), "UpsertDeployment")

// add the latest deployment for the test case
dLatest := mock.Deployment()
dLatest.Status = tc
dLatest.StatusDescription = "description does not matter for this test"
dLatest.JobID = job.ID
dLatest.JobCreateIndex = job.CreateIndex
require.Nil(state.UpsertDeployment(1003, dLatest), "UpsertDeployment")

// register informational scaling event
groupName := job.TaskGroups[0].Name
scalingMetadata := map[string]interface{}{
"meta": "data",
}
scalingMessage := "original reason for scaling"
scale := &structs.JobScaleRequest{
JobID: job.ID,
Target: map[string]string{
structs.ScalingTargetGroup: groupName,
},
Meta: scalingMetadata,
Message: scalingMessage,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp)
require.NoError(err, "test case %q", tc)
require.Empty(resp.EvalID)

events, _, _ := state.ScalingEventsByJob(nil, job.Namespace, job.ID)
require.Equal(1, len(events[groupName]))
latestEvent := events[groupName][0]
require.False(latestEvent.Error)
require.Nil(latestEvent.Count)
require.Equal(scalingMessage, latestEvent.Message)
require.Equal(scalingMetadata, latestEvent.Meta)
}
}

func TestJobEndpoint_Scale_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down
1 change: 1 addition & 0 deletions website/pages/api-docs/jobs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -1783,6 +1783,7 @@ $ curl \

This endpoint performs a scaling action against a job.
Currently, this endpoint supports scaling the count for a task group.
This will return a 400 error if the job has an active deployment.

| Method | Path | Produces |
| ------ | ----------------------- | ------------------ |
Expand Down

0 comments on commit ea6e351

Please sign in to comment.