Skip to content

Commit

Permalink
Merge pull request #8753 from hashicorp/b-scaling-policy-delete-job-p…
Browse files Browse the repository at this point in the history
…refix

resolve prefix bugs around job scaling policies
  • Loading branch information
cgbaker authored Aug 27, 2020
2 parents 39925e7 + 1ea550d commit 2d04019
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ IMPROVEMENTS:
BUG FIXES:

* core: Fixed a bug where unpromoted job versions are used when rescheduling failed allocations [[GH-8691](https://github.com/hashicorp/nomad/issues/8691)]
* core: Fixed bugs where scaling policies could be matched against incorrect jobs with a similar prefix [[GH-8753](https://github.com/hashicorp/nomad/issues/8753)]

## 0.12.3 (August 13, 2020)

Expand Down
3 changes: 3 additions & 0 deletions nomad/scaling_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func TestScalingEndpoint_ListPolicies(t *testing.T) {
get := &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "default",
},
}
var resp structs.ACLPolicyListResponse
Expand Down Expand Up @@ -170,6 +171,7 @@ func TestScalingEndpoint_ListPolicies_ACL(t *testing.T) {
get := &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "default",
},
}

Expand Down Expand Up @@ -261,6 +263,7 @@ func TestScalingEndpoint_ListPolicies_Blocking(t *testing.T) {
req := &structs.ScalingPolicyListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "default",
MinQueryIndex: 150,
},
}
Expand Down
53 changes: 48 additions & 5 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1539,11 +1539,30 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn

// deleteJobScalingPolicies deletes any scaling policies associated with the job
func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, txn *memdb.Txn) error {
numDeletedScalingPolicies, err := txn.DeleteAll("scaling_policy", "target_prefix", job.Namespace, job.ID)
iter, err := s.ScalingPoliciesByJobTxn(nil, job.Namespace, job.ID, txn)
if err != nil {
return fmt.Errorf("deleting job scaling policies failed: %v", err)
return fmt.Errorf("getting job scaling policies for deletion failed: %v", err)
}

// Put them into a slice so there are no safety concerns while actually
// performing the deletes
policies := []interface{}{}
for {
raw := iter.Next()
if raw == nil {
break
}
policies = append(policies, raw)
}

// Do the deletes
for _, p := range policies {
if err := txn.Delete("scaling_policy", p); err != nil {
return fmt.Errorf("deleting scaling policy failed: %v", err)
}
}
if numDeletedScalingPolicies > 0 {

if len(policies) > 0 {
if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
Expand Down Expand Up @@ -5332,7 +5351,19 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace str
}

ws.Add(iter.WatchCh())
return iter, nil

filter := func(raw interface{}) bool {
d, ok := raw.(*structs.ScalingPolicy)
if !ok {
return true
}

return d.Target[structs.ScalingTargetNamespace] != namespace
}

// Wrap the iterator in a filter
wrap := memdb.NewFilterIterator(iter, filter)
return wrap, nil
}

func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) {
Expand All @@ -5349,7 +5380,19 @@ func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID
}

ws.Add(iter.WatchCh())
return iter, nil

filter := func(raw interface{}) bool {
d, ok := raw.(*structs.ScalingPolicy)
if !ok {
return true
}

return d.Target[structs.ScalingTargetJob] != jobID
}

// Wrap the iterator in a filter
wrap := memdb.NewFilterIterator(iter, filter)
return wrap, nil
}

func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.ScalingPolicy, error) {
Expand Down
123 changes: 123 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8640,6 +8640,59 @@ func TestStateStore_UpsertScalingPolicy_Namespace(t *testing.T) {
require.ElementsMatch([]string{policy2.ID}, policiesInOtherNamespace)
}

func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) {
t.Parallel()
require := require.New(t)

ns1 := "name"
ns2 := "name2" // matches prefix "name"
state := testStateStore(t)
policy1 := mock.ScalingPolicy()
policy1.Target[structs.ScalingTargetNamespace] = ns1
policy2 := mock.ScalingPolicy()
policy2.Target[structs.ScalingTargetNamespace] = ns2

ws1 := memdb.NewWatchSet()
iter, err := state.ScalingPoliciesByNamespace(ws1, ns1)
require.NoError(err)
require.Nil(iter.Next())

ws2 := memdb.NewWatchSet()
iter, err = state.ScalingPoliciesByNamespace(ws2, ns2)
require.NoError(err)
require.Nil(iter.Next())

err = state.UpsertScalingPolicies(1000, []*structs.ScalingPolicy{policy1, policy2})
require.NoError(err)
require.True(watchFired(ws1))
require.True(watchFired(ws2))

iter, err = state.ScalingPoliciesByNamespace(nil, ns1)
require.NoError(err)
policiesInNS1 := []string{}
for {
raw := iter.Next()
if raw == nil {
break
}
policiesInNS1 = append(policiesInNS1, raw.(*structs.ScalingPolicy).ID)
}
require.ElementsMatch([]string{policy1.ID}, policiesInNS1)

iter, err = state.ScalingPoliciesByNamespace(nil, ns2)
require.NoError(err)
policiesInNS2 := []string{}
for {
raw := iter.Next()
if raw == nil {
break
}
policiesInNS2 = append(policiesInNS2, raw.(*structs.ScalingPolicy).ID)
}
require.ElementsMatch([]string{policy2.ID}, policiesInNS2)
}


func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -8940,6 +8993,37 @@ func TestStateStore_DeleteJob_DeleteScalingPolicies(t *testing.T) {
require.True(index > 1001)
}

func TestStateStore_DeleteJob_DeleteScalingPoliciesPrefixBug(t *testing.T) {
t.Parallel()

require := require.New(t)

state := testStateStore(t)

job := mock.Job()
require.NoError(state.UpsertJob(1000, job))
job2 := job.Copy()
job2.ID = job.ID + "-but-longer"
require.NoError(state.UpsertJob(1001, job2))

policy := mock.ScalingPolicy()
policy.Target[structs.ScalingTargetJob] = job.ID
policy2 := mock.ScalingPolicy()
policy2.Target[structs.ScalingTargetJob] = job2.ID
require.NoError(state.UpsertScalingPolicies(1002, []*structs.ScalingPolicy{policy, policy2}))

// Delete job with the shorter prefix-ID
require.NoError(state.DeleteJob(1003, job.Namespace, job.ID))

// Ensure only the associated scaling policy was deleted, not the one matching the job with the longer ID
out, err := state.ScalingPolicyByID(nil, policy.ID)
require.NoError(err)
require.Nil(out)
out, err = state.ScalingPolicyByID(nil, policy2.ID)
require.NoError(err)
require.NotNil(out)
}

// This test ensures that deleting a job that doesn't have any scaling policies
// will not cause the scaling_policy table index to increase, on either job
// registration or deletion.
Expand Down Expand Up @@ -9035,6 +9119,45 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) {
require.Equal(expect, found)
}

func TestStateStore_ScalingPoliciesByJob_PrefixBug(t *testing.T) {
t.Parallel()

require := require.New(t)

jobPrefix := "job-name-" + uuid.Generate()

state := testStateStore(t)
policy1 := mock.ScalingPolicy()
policy1.Target[structs.ScalingTargetJob] = jobPrefix
policy2 := mock.ScalingPolicy()
policy2.Target[structs.ScalingTargetJob] = jobPrefix + "-more"

// Create the policies
var baseIndex uint64 = 1000
err := state.UpsertScalingPolicies(baseIndex, []*structs.ScalingPolicy{policy1, policy2})
require.NoError(err)

iter, err := state.ScalingPoliciesByJob(nil,
policy1.Target[structs.ScalingTargetNamespace],
jobPrefix)
require.NoError(err)

// Ensure we see expected policies
count := 0
found := []string{}
for {
raw := iter.Next()
if raw == nil {
break
}
count++
found = append(found, raw.(*structs.ScalingPolicy).ID)
}
require.Equal(1, count)
expect := []string{policy1.ID}
require.Equal(expect, found)
}

func TestStateStore_UpsertScalingEvent(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down

0 comments on commit 2d04019

Please sign in to comment.