Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use both job and type query on scaling policy list endpoint #9312

Merged
merged 2 commits into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ __BACKWARDS INCOMPATIBILITIES:__
BUG FIXES:

* agent (Enterprise): Fixed a bug where audit logging caused websocket and streaming http endpoints to fail [[GH-9319](https://github.com/hashicorp/nomad/issues/9319)]
* core: Fixed a bug where blocking queries would not include the query's maximum wait time when calculating whether it was safe to retry. [[GH-8921](https://github.com/hashicorp/nomad/issues/8921)]
* core: Fixed a bug where ACL handling prevented cross-namespace allocation listing [[GH-9278](https://github.com/hashicorp/nomad/issues/9278)]
* core: Fixed a bug where scaling policy filtering would ignore type query if job query was present [[GH-9312](https://github.com/hashicorp/nomad/issues/9312)]
* core: Fixed a bug where a request to scale a job would fail if the job was not in the default namespace. [[GH-9296](https://github.com/hashicorp/nomad/pull/9296)]
* core: Fixed a bug where blocking queries would not include the query's maximum wait time when calculating whether it was safe to retry. [[GH-8921](https://github.com/hashicorp/nomad/issues/8921)]
* config (Enterprise): Fixed default enterprise config merging. [[GH-9083](https://github.com/hashicorp/nomad/pull/9083)]
* client: Fixed an issue with the Java fingerprinter on macOS causing pop-up notifications when no JVM installed. [[GH-9225](https://github.com/hashicorp/nomad/pull/9225)]
* client: Fixed an fingerprinter issue detecting bridge kernel module [[GH-9299](https://github.com/hashicorp/nomad/pull/9299)]
Expand Down
2 changes: 1 addition & 1 deletion nomad/scaling_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (p *Scaling) ListPolicies(args *structs.ScalingPolicyListRequest, reply *st
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = state.ScalingPoliciesByIDPrefix(ws, args.RequestNamespace(), prefix)
} else if job := args.Job; job != "" {
iter, err = state.ScalingPoliciesByJob(ws, args.RequestNamespace(), job)
iter, err = state.ScalingPoliciesByJob(ws, args.RequestNamespace(), job, args.Type)
} else {
iter, err = state.ScalingPoliciesByNamespace(ws, args.Namespace, args.Type)
}
Expand Down
98 changes: 80 additions & 18 deletions nomad/scaling_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package nomad

import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
"testing"
"time"

msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
Expand Down Expand Up @@ -125,33 +126,94 @@ func TestScalingEndpoint_GetPolicy_ACL(t *testing.T) {

func TestScalingEndpoint_ListPolicies(t *testing.T) {
t.Parallel()
require := require.New(t)

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Lookup the policies
get := &structs.ScalingPolicyListRequest{
var resp structs.ScalingPolicyListResponse
err := msgpackrpc.CallWithCodec(codec, "Scaling.ListPolicies", &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Region: "global",
Namespace: "default",
},
}, &resp)
require.NoError(t, err)
require.Empty(t, resp.Policies)

j1 := mock.Job()
j1polV := mock.ScalingPolicy()
j1polV.Type = "vertical-cpu"
j1polV.TargetTask(j1, j1.TaskGroups[0], j1.TaskGroups[0].Tasks[0])
j1polH := mock.ScalingPolicy()
j1polH.Type = "horizontal"
j1polH.TargetTaskGroup(j1, j1.TaskGroups[0])

j2 := mock.Job()
j2polH := mock.ScalingPolicy()
j2polH.Type = "horizontal"
j2polH.TargetTaskGroup(j2, j2.TaskGroups[0])

s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, j1)
s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, j2)

pols := []*structs.ScalingPolicy{j1polV, j1polH, j2polH}
s1.fsm.State().UpsertScalingPolicies(1000, pols)
for _, p := range pols {
p.ModifyIndex = 1000
p.CreateIndex = 1000
}
var resp structs.ACLPolicyListResponse
err := msgpackrpc.CallWithCodec(codec, "Scaling.ListPolicies", get, &resp)
require.NoError(err)
require.Empty(resp.Policies)

p1 := mock.ScalingPolicy()
p2 := mock.ScalingPolicy()
s1.fsm.State().UpsertScalingPolicies(1000, []*structs.ScalingPolicy{p1, p2})
cases := []struct {
Label string
Job string
Type string
Expected []*structs.ScalingPolicy
}{
{
Label: "all policies",
Expected: []*structs.ScalingPolicy{j1polH, j1polV, j2polH},
},
{
Label: "job filter",
Job: j1.ID,
Expected: []*structs.ScalingPolicy{j1polH, j1polV},
},
{
Label: "type filter",
Type: "horizontal",
Expected: []*structs.ScalingPolicy{j1polH, j2polH},
},
{
Label: "job and type",
Job: j1.ID,
Type: "horizontal",
Expected: []*structs.ScalingPolicy{j1polH},
},
}

err = msgpackrpc.CallWithCodec(codec, "Scaling.ListPolicies", get, &resp)
require.NoError(err)
require.EqualValues(1000, resp.Index)
require.Len(resp.Policies, 2)
for _, tc := range cases {
t.Run(tc.Label, func(t *testing.T) {
get := &structs.ScalingPolicyListRequest{
Job: tc.Job,
Type: tc.Type,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "default",
},
}
var resp structs.ScalingPolicyListResponse
err = msgpackrpc.CallWithCodec(codec, "Scaling.ListPolicies", get, &resp)
require.NoError(t, err)
stubs := []*structs.ScalingPolicyListStub{}
for _, p := range tc.Expected {
stubs = append(stubs, p.Stub())
}
require.ElementsMatch(t, stubs, resp.Policies)
})
}
}

func TestScalingEndpoint_ListPolicies_ACL(t *testing.T) {
Expand All @@ -170,7 +232,7 @@ func TestScalingEndpoint_ListPolicies_ACL(t *testing.T) {

get := &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Region: "global",
Namespace: "default",
},
}
Expand Down Expand Up @@ -263,7 +325,7 @@ func TestScalingEndpoint_ListPolicies_Blocking(t *testing.T) {
req := &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "default",
Namespace: "default",
MinQueryIndex: 150,
},
}
Expand Down
22 changes: 20 additions & 2 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5760,9 +5760,27 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace, ty
return iter, nil
}

func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) {
func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID, policyType string) (memdb.ResultIterator,
error) {
txn := s.db.ReadTxn()
return s.ScalingPoliciesByJobTxn(ws, namespace, jobID, txn)
iter, err := s.ScalingPoliciesByJobTxn(ws, namespace, jobID, txn)
if err != nil {
return nil, err
}

if policyType == "" {
return iter, nil
}

filter := func(raw interface{}) bool {
p, ok := raw.(*structs.ScalingPolicy)
if !ok {
return true
}
return policyType != p.Type
Copy link
Contributor

@krishicks krishicks Nov 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my only comment about this is that it seems like this check could be moved into ScalingPoliciesByJobTxn, where the new policyType string would be passed through to it.

Looking at ScalingPoliciesByJobTxn, it already does the same kind of type assertion we see here, so it happening again (specifically the !ok part, which I think is inconceivable) makes me want a single filter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, maybe 50/50? this short-circuits the filter on the outside of the loop when policyType == "", as opposed to doing it on every filter operation. merging them avoids duplicate cast assertion checks.

}

return memdb.NewFilterIterator(iter, filter), nil
}

func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID string,
Expand Down
6 changes: 3 additions & 3 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9201,7 +9201,7 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) {

iter, err := state.ScalingPoliciesByJob(nil,
policyA.Target[structs.ScalingTargetNamespace],
policyA.Target[structs.ScalingTargetJob])
policyA.Target[structs.ScalingTargetJob], "")
require.NoError(err)

// Ensure we see expected policies
Expand All @@ -9223,7 +9223,7 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) {

iter, err = state.ScalingPoliciesByJob(nil,
policyB1.Target[structs.ScalingTargetNamespace],
policyB1.Target[structs.ScalingTargetJob])
policyB1.Target[structs.ScalingTargetJob], "")
require.NoError(err)

// Ensure we see expected policies
Expand Down Expand Up @@ -9267,7 +9267,7 @@ func TestStateStore_ScalingPoliciesByJob_PrefixBug(t *testing.T) {

iter, err := state.ScalingPoliciesByJob(nil,
policy1.Target[structs.ScalingTargetNamespace],
jobPrefix)
jobPrefix, "")
require.NoError(err)

// Ensure we see expected policies
Expand Down