Skip to content

Commit

Permalink
scaling policy: use request namespace as target if unset in jobspec (#…
Browse files Browse the repository at this point in the history
…24065)

When jobs are submitted with a scaling policy, the scaling policy's target only
includes the job's namespace if the `namespace` field is set in the jobspec and
not from the request. Normally jobs are canonicalized in the RPC handler before
being written to Raft. But the scaling policy targets are instead written during
the conversion from `api.Job` to `structs.Job`. We populate the `structs.Job`
namespace from the request here as well, but only after the conversion has
occurred. Swap the order of these operations so that the conversion is always
happening with a correct namespace.

Long-term we should not be making mutations during conversion either. But we
can't remove it immediately because API requests may come from any agent across
upgrades. Move the scaling target creation into the `Canonicalize` method and
mark it for future removal in the API conversion code path.

Fixes: #24039
  • Loading branch information
tgross committed Oct 1, 2024
1 parent fccd941 commit 919f92c
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 39 deletions.
3 changes: 3 additions & 0 deletions .changelog/24065.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scaling: Fixed a bug where scaling policies would not get created during job submission unless namespace field was set in jobspec
```
18 changes: 11 additions & 7 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hashicorp/nomad/acl"
api "github.com/hashicorp/nomad/api"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/jobspec"
"github.com/hashicorp/nomad/jobspec2"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -938,15 +939,17 @@ func (s *HTTPServer) apiJobAndRequestToStructs(job *api.Job, req *http.Request,
job, queryRegion, writeReq.Region, s.agent.GetConfig().Region,
)

sJob := ApiJobToStructJob(job)
sJob.Region = jobRegion
writeReq.Region = requestRegion

// mutate the namespace before we convert just in case anything is expecting
// the namespace to be correct
queryNamespace := req.URL.Query().Get("namespace")
namespace := namespaceForJob(job.Namespace, queryNamespace, writeReq.Namespace)
sJob.Namespace = namespace
job.Namespace = pointer.Of(namespace)
writeReq.Namespace = namespace

sJob := ApiJobToStructJob(job)
sJob.Region = jobRegion
writeReq.Region = requestRegion

return sJob, writeReq
}

Expand Down Expand Up @@ -1193,7 +1196,8 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta
}

if taskGroup.Scaling != nil {
tg.Scaling = ApiScalingPolicyToStructs(tg.Count, taskGroup.Scaling).TargetTaskGroup(job, tg)
tg.Scaling = ApiScalingPolicyToStructs(
job, tg, nil, tg.Count, taskGroup.Scaling)
}

tg.EphemeralDisk = &structs.EphemeralDisk{
Expand Down Expand Up @@ -1330,7 +1334,7 @@ func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup,
for _, policy := range apiTask.ScalingPolicies {
structsTask.ScalingPolicies = append(
structsTask.ScalingPolicies,
ApiScalingPolicyToStructs(0, policy).TargetTask(job, group, structsTask))
ApiScalingPolicyToStructs(job, group, structsTask, 0, policy))
}
}

Expand Down
6 changes: 5 additions & 1 deletion command/agent/scaling_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *HTTPServer) scalingPolicyQuery(resp http.ResponseWriter, req *http.Requ
return out.Policy, nil
}

func ApiScalingPolicyToStructs(count int, ap *api.ScalingPolicy) *structs.ScalingPolicy {
func ApiScalingPolicyToStructs(job *structs.Job, tg *structs.TaskGroup, task *structs.Task, count int, ap *api.ScalingPolicy) *structs.ScalingPolicy {
p := structs.ScalingPolicy{
Type: ap.Type,
Policy: ap.Policy,
Expand All @@ -103,5 +103,9 @@ func ApiScalingPolicyToStructs(count int, ap *api.ScalingPolicy) *structs.Scalin
} else {
p.Min = int64(count)
}

// COMPAT(1.12.0) - canonicalization is done in Job.Register as of 1.9,
// remove this canonicalization in 1.12.0 LTS
p.Canonicalize(job, tg, task)
return &p
}
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1748,7 +1748,7 @@ func (n *nomadFSM) restoreImpl(old io.ReadCloser, filter *FSMFilter) error {
if filter.Include(scalingPolicy) {
// Handle upgrade path:
// - Set policy type if empty
scalingPolicy.Canonicalize()
scalingPolicy.Canonicalize(nil, nil, nil)
if err := restore.ScalingPolicyRestore(scalingPolicy); err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6627,7 +6627,8 @@ func TestJobEndpoint_Plan_Scaling(t *testing.T) {
tg := job.TaskGroups[0]
tg.Tasks[0].Resources.MemoryMB = 999999999
scaling := &structs.ScalingPolicy{Min: 1, Max: 100, Type: structs.ScalingPolicyTypeHorizontal}
tg.Scaling = scaling.TargetTaskGroup(job, tg)
scaling.Canonicalize(job, tg, nil)
tg.Scaling = scaling
planReq := &structs.JobPlanRequest{
Job: job,
Diff: false,
Expand Down
2 changes: 1 addition & 1 deletion nomad/mock/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func JobWithScalingPolicy() (*structs.Job, *structs.ScalingPolicy) {
Policy: map[string]interface{}{},
Enabled: true,
}
policy.TargetTaskGroup(job, job.TaskGroups[0])
policy.Canonicalize(job, job.TaskGroups[0], nil)
job.TaskGroups[0].Scaling = policy
return job, policy
}
Expand Down
6 changes: 3 additions & 3 deletions nomad/scaling_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,15 @@ func TestScalingEndpoint_ListPolicies(t *testing.T) {
j1 := mock.Job()
j1polV := mock.ScalingPolicy()
j1polV.Type = "vertical-cpu"
j1polV.TargetTask(j1, j1.TaskGroups[0], j1.TaskGroups[0].Tasks[0])
j1polV.Canonicalize(j1, j1.TaskGroups[0], j1.TaskGroups[0].Tasks[0])
j1polH := mock.ScalingPolicy()
j1polH.Type = "horizontal"
j1polH.TargetTaskGroup(j1, j1.TaskGroups[0])
j1polH.Canonicalize(j1, j1.TaskGroups[0], nil)

j2 := mock.Job()
j2polH := mock.ScalingPolicy()
j2polH.Type = "horizontal"
j2polH.TargetTaskGroup(j2, j2.TaskGroups[0])
j2polH.Canonicalize(j2, j2.TaskGroups[0], nil)

s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, nil, j1)
s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, nil, j2)
Expand Down
40 changes: 21 additions & 19 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6251,10 +6251,25 @@ const (
ScalingPolicyTypeHorizontal = "horizontal"
)

func (p *ScalingPolicy) Canonicalize() {
func (p *ScalingPolicy) Canonicalize(job *Job, tg *TaskGroup, task *Task) {
if p.Type == "" {
p.Type = ScalingPolicyTypeHorizontal
}

// during restore we canonicalize to update, but these values will already
// have been populated during submit and we don't have references to the
// job, group, and task
if job != nil && tg != nil {
p.Target = map[string]string{
ScalingTargetNamespace: job.Namespace,
ScalingTargetJob: job.ID,
ScalingTargetGroup: tg.Name,
}

if task != nil {
p.Target[ScalingTargetTask] = task.Name
}
}
}

func (p *ScalingPolicy) Copy() *ScalingPolicy {
Expand Down Expand Up @@ -6343,23 +6358,6 @@ func (p *ScalingPolicy) Diff(p2 *ScalingPolicy) bool {
return !reflect.DeepEqual(*p, copy)
}

// TargetTaskGroup updates a ScalingPolicy target to specify a given task group
func (p *ScalingPolicy) TargetTaskGroup(job *Job, tg *TaskGroup) *ScalingPolicy {
p.Target = map[string]string{
ScalingTargetNamespace: job.Namespace,
ScalingTargetJob: job.ID,
ScalingTargetGroup: tg.Name,
}
return p
}

// TargetTask updates a ScalingPolicy target to specify a given task
func (p *ScalingPolicy) TargetTask(job *Job, tg *TaskGroup, task *Task) *ScalingPolicy {
p.TargetTaskGroup(job, tg)
p.Target[ScalingTargetTask] = task.Name
return p
}

func (p *ScalingPolicy) Stub() *ScalingPolicyListStub {
stub := &ScalingPolicyListStub{
ID: p.ID,
Expand Down Expand Up @@ -6954,7 +6952,7 @@ func (tg *TaskGroup) Canonicalize(job *Job) {
}

if tg.Scaling != nil {
tg.Scaling.Canonicalize()
tg.Scaling.Canonicalize(job, tg, nil)
}

for _, service := range tg.Services {
Expand Down Expand Up @@ -8052,6 +8050,10 @@ func (t *Task) Canonicalize(job *Job, tg *TaskGroup) {
t.KillTimeout = DefaultKillTimeout
}

for _, policy := range t.ScalingPolicies {
policy.Canonicalize(job, tg, t)
}

if t.Vault != nil {
t.Vault.Canonicalize()
}
Expand Down
47 changes: 41 additions & 6 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6426,10 +6426,17 @@ func TestDispatchPayloadConfig_Validate(t *testing.T) {
func TestScalingPolicy_Canonicalize(t *testing.T) {
ci.Parallel(t)

job := &Job{Namespace: "prod", ID: "example"}
tg := &TaskGroup{Name: "web"}
task := &Task{Name: "httpd"}

cases := []struct {
name string
input *ScalingPolicy
expected *ScalingPolicy
job *Job
tg *TaskGroup
task *Task
}{
{
name: "empty policy",
Expand All @@ -6441,14 +6448,42 @@ func TestScalingPolicy_Canonicalize(t *testing.T) {
input: &ScalingPolicy{Type: "other-type"},
expected: &ScalingPolicy{Type: "other-type"},
},
{
name: "policy with type and task group",
input: &ScalingPolicy{Type: "other-type"},
expected: &ScalingPolicy{
Type: "other-type",
Target: map[string]string{
ScalingTargetNamespace: "prod",
ScalingTargetJob: "example",
ScalingTargetGroup: "web",
},
},
job: job,
tg: tg,
},
{
name: "policy with type and task",
input: &ScalingPolicy{Type: "other-type"},
expected: &ScalingPolicy{
Type: "other-type",
Target: map[string]string{
ScalingTargetNamespace: "prod",
ScalingTargetJob: "example",
ScalingTargetGroup: "web",
ScalingTargetTask: "httpd",
},
},
job: job,
tg: tg,
task: task,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
require := require.New(t)

c.input.Canonicalize()
require.Equal(c.expected, c.input)
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
tc.input.Canonicalize(tc.job, tc.tg, tc.task)
must.Eq(t, tc.expected, tc.input)
})
}
}
Expand Down

0 comments on commit 919f92c

Please sign in to comment.