diff --git a/CHANGELOG.md b/CHANGELOG.md index ba77e200c10..d801fcd952b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ IMPROVEMENTS: BUG FIXES: * core: Fixed a bug where unpromoted job versions are used when rescheduling failed allocations [[GH-8691](https://github.com/hashicorp/nomad/issues/8691)] + * core: Fixed bugs where scaling policies could be matched against incorrect jobs with a similar prefix [[GH-8753](https://github.com/hashicorp/nomad/issues/8753)] ## 0.12.3 (August 13, 2020) diff --git a/nomad/scaling_endpoint_test.go b/nomad/scaling_endpoint_test.go index 57f15a742c8..8dc2b7a29a8 100644 --- a/nomad/scaling_endpoint_test.go +++ b/nomad/scaling_endpoint_test.go @@ -136,6 +136,7 @@ func TestScalingEndpoint_ListPolicies(t *testing.T) { get := &structs.ScalingPolicyListRequest{ QueryOptions: structs.QueryOptions{ Region: "global", + Namespace: "default", }, } var resp structs.ACLPolicyListResponse @@ -170,6 +171,7 @@ func TestScalingEndpoint_ListPolicies_ACL(t *testing.T) { get := &structs.ScalingPolicyListRequest{ QueryOptions: structs.QueryOptions{ Region: "global", + Namespace: "default", }, } @@ -261,6 +263,7 @@ func TestScalingEndpoint_ListPolicies_Blocking(t *testing.T) { req := &structs.ScalingPolicyListRequest{ QueryOptions: structs.QueryOptions{ Region: "global", + Namespace: "default", MinQueryIndex: 150, }, } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 98b04298022..652f7c3fa08 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1539,11 +1539,30 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn // deleteJobScalingPolicies deletes any scaling policies associated with the job func (s *StateStore) deleteJobScalingPolicies(index uint64, job *structs.Job, txn *memdb.Txn) error { - numDeletedScalingPolicies, err := txn.DeleteAll("scaling_policy", "target_prefix", job.Namespace, job.ID) + iter, err := s.ScalingPoliciesByJobTxn(nil, job.Namespace, job.ID, txn) if err != nil { - return fmt.Errorf("deleting job scaling policies failed: %v", err) + return fmt.Errorf("getting job scaling policies for deletion failed: %v", err) + } + + // Put them into a slice so there are no safety concerns while actually + // performing the deletes + policies := []interface{}{} + for { + raw := iter.Next() + if raw == nil { + break + } + policies = append(policies, raw) + } + + // Do the deletes + for _, p := range policies { + if err := txn.Delete("scaling_policy", p); err != nil { + return fmt.Errorf("deleting scaling policy failed: %v", err) + } } - if numDeletedScalingPolicies > 0 { + + if len(policies) > 0 { if err := txn.Insert("index", &IndexEntry{"scaling_policy", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } @@ -5332,7 +5351,19 @@ func (s *StateStore) ScalingPoliciesByNamespace(ws memdb.WatchSet, namespace str } ws.Add(iter.WatchCh()) - return iter, nil + + filter := func(raw interface{}) bool { + d, ok := raw.(*structs.ScalingPolicy) + if !ok { + return true + } + + return d.Target[structs.ScalingTargetNamespace] != namespace + } + + // Wrap the iterator in a filter + wrap := memdb.NewFilterIterator(iter, filter) + return wrap, nil } func (s *StateStore) ScalingPoliciesByJob(ws memdb.WatchSet, namespace, jobID string) (memdb.ResultIterator, error) { @@ -5349,7 +5380,19 @@ func (s *StateStore) ScalingPoliciesByJobTxn(ws memdb.WatchSet, namespace, jobID } ws.Add(iter.WatchCh()) - return iter, nil + + filter := func(raw interface{}) bool { + d, ok := raw.(*structs.ScalingPolicy) + if !ok { + return true + } + + return d.Target[structs.ScalingTargetJob] != jobID + } + + // Wrap the iterator in a filter + wrap := memdb.NewFilterIterator(iter, filter) + return wrap, nil } func (s *StateStore) ScalingPolicyByID(ws memdb.WatchSet, id string) (*structs.ScalingPolicy, error) { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index e36fd67d28a..1a4d3e6bfd9 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -8640,6 +8640,59 @@ func TestStateStore_UpsertScalingPolicy_Namespace(t *testing.T) { require.ElementsMatch([]string{policy2.ID}, policiesInOtherNamespace) } +func TestStateStore_UpsertScalingPolicy_Namespace_PrefixBug(t *testing.T) { + t.Parallel() + require := require.New(t) + + ns1 := "name" + ns2 := "name2" // matches prefix "name" + state := testStateStore(t) + policy1 := mock.ScalingPolicy() + policy1.Target[structs.ScalingTargetNamespace] = ns1 + policy2 := mock.ScalingPolicy() + policy2.Target[structs.ScalingTargetNamespace] = ns2 + + ws1 := memdb.NewWatchSet() + iter, err := state.ScalingPoliciesByNamespace(ws1, ns1) + require.NoError(err) + require.Nil(iter.Next()) + + ws2 := memdb.NewWatchSet() + iter, err = state.ScalingPoliciesByNamespace(ws2, ns2) + require.NoError(err) + require.Nil(iter.Next()) + + err = state.UpsertScalingPolicies(1000, []*structs.ScalingPolicy{policy1, policy2}) + require.NoError(err) + require.True(watchFired(ws1)) + require.True(watchFired(ws2)) + + iter, err = state.ScalingPoliciesByNamespace(nil, ns1) + require.NoError(err) + policiesInNS1 := []string{} + for { + raw := iter.Next() + if raw == nil { + break + } + policiesInNS1 = append(policiesInNS1, raw.(*structs.ScalingPolicy).ID) + } + require.ElementsMatch([]string{policy1.ID}, policiesInNS1) + + iter, err = state.ScalingPoliciesByNamespace(nil, ns2) + require.NoError(err) + policiesInNS2 := []string{} + for { + raw := iter.Next() + if raw == nil { + break + } + policiesInNS2 = append(policiesInNS2, raw.(*structs.ScalingPolicy).ID) + } + require.ElementsMatch([]string{policy2.ID}, policiesInNS2) +} + + func TestStateStore_UpsertJob_UpsertScalingPolicies(t *testing.T) { t.Parallel() @@ -8940,6 +8993,37 @@ func TestStateStore_DeleteJob_DeleteScalingPolicies(t *testing.T) { require.True(index > 1001) } +func TestStateStore_DeleteJob_DeleteScalingPoliciesPrefixBug(t *testing.T) { + t.Parallel() + + require := require.New(t) + + state := testStateStore(t) + + job := mock.Job() + require.NoError(state.UpsertJob(1000, job)) + job2 := job.Copy() + job2.ID = job.ID + "-but-longer" + require.NoError(state.UpsertJob(1001, job2)) + + policy := mock.ScalingPolicy() + policy.Target[structs.ScalingTargetJob] = job.ID + policy2 := mock.ScalingPolicy() + policy2.Target[structs.ScalingTargetJob] = job2.ID + require.NoError(state.UpsertScalingPolicies(1002, []*structs.ScalingPolicy{policy, policy2})) + + // Delete job with the shorter prefix-ID + require.NoError(state.DeleteJob(1003, job.Namespace, job.ID)) + + // Ensure only the associated scaling policy was deleted, not the one matching the job with the longer ID + out, err := state.ScalingPolicyByID(nil, policy.ID) + require.NoError(err) + require.Nil(out) + out, err = state.ScalingPolicyByID(nil, policy2.ID) + require.NoError(err) + require.NotNil(out) +} + // This test ensures that deleting a job that doesn't have any scaling policies // will not cause the scaling_policy table index to increase, on either job // registration or deletion. @@ -9035,6 +9119,45 @@ func TestStateStore_ScalingPoliciesByJob(t *testing.T) { require.Equal(expect, found) } +func TestStateStore_ScalingPoliciesByJob_PrefixBug(t *testing.T) { + t.Parallel() + + require := require.New(t) + + jobPrefix := "job-name-" + uuid.Generate() + + state := testStateStore(t) + policy1 := mock.ScalingPolicy() + policy1.Target[structs.ScalingTargetJob] = jobPrefix + policy2 := mock.ScalingPolicy() + policy2.Target[structs.ScalingTargetJob] = jobPrefix + "-more" + + // Create the policies + var baseIndex uint64 = 1000 + err := state.UpsertScalingPolicies(baseIndex, []*structs.ScalingPolicy{policy1, policy2}) + require.NoError(err) + + iter, err := state.ScalingPoliciesByJob(nil, + policy1.Target[structs.ScalingTargetNamespace], + jobPrefix) + require.NoError(err) + + // Ensure we see expected policies + count := 0 + found := []string{} + for { + raw := iter.Next() + if raw == nil { + break + } + count++ + found = append(found, raw.(*structs.ScalingPolicy).ID) + } + require.Equal(1, count) + expect := []string{policy1.ID} + require.Equal(expect, found) +} + func TestStateStore_UpsertScalingEvent(t *testing.T) { t.Parallel() require := require.New(t)