diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 5812bf406ef..e7edc1db07c 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1091,6 +1091,9 @@ func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memd return err } + // Put them into a slice so there are no safety concerns while actually + // performing the deletes + jobs := []*structs.Job{} for { raw := iter.Next() if raw == nil { @@ -1103,7 +1106,12 @@ func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memd continue } - if _, err = txn.DeleteAll("job_version", "id", j.Namespace, j.ID, j.Version); err != nil { + jobs = append(jobs, j) + } + + // Do the deletes + for _, j := range jobs { + if err := txn.Delete("job_version", j); err != nil { return fmt.Errorf("deleting job versions failed: %v", err) } } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index ce230ffe043..92a02a05cf8 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1614,6 +1614,75 @@ func TestStateStore_DeleteJob_Job(t *testing.T) { } } +func TestStateStore_DeleteJobTxn_BatchDeletes(t *testing.T) { + state := testStateStore(t) + + const testJobCount = 10 + const jobVersionCount = 4 + + stateIndex := uint64(1000) + + jobs := make([]*structs.Job, testJobCount) + for i := 0; i < testJobCount; i++ { + stateIndex++ + job := mock.BatchJob() + + err := state.UpsertJob(stateIndex, job) + require.NoError(t, err) + + jobs[i] = job + + // Create some versions + for vi := 1; vi < jobVersionCount; vi++ { + stateIndex++ + + job := job.Copy() + job.TaskGroups[0].Tasks[0].Env = map[string]string{ + "Version": fmt.Sprintf("%d", vi), + } + + require.NoError(t, state.UpsertJob(stateIndex, job)) + } + } + + ws := memdb.NewWatchSet() + + // Sanity check that jobs are present in DB + job, err := state.JobByID(ws, jobs[0].Namespace, jobs[0].ID) + require.NoError(t, err) + require.Equal(t, jobs[0].ID, job.ID) + + jobVersions, err := state.JobVersionsByID(ws, jobs[0].Namespace, jobs[0].ID) + require.NoError(t, err) + require.Equal(t, jobVersionCount, len(jobVersions)) + + // Actually delete + const deletionIndex = uint64(10001) + err = state.WithWriteTransaction(func(txn Txn) error { + for i, job := range jobs { + err := state.DeleteJobTxn(deletionIndex, job.Namespace, job.ID, txn) + require.NoError(t, err, "failed at %d %e", i, err) + } + return nil + }) + assert.NoError(t, err) + + assert.True(t, watchFired(ws)) + + ws = memdb.NewWatchSet() + out, err := state.JobByID(ws, jobs[0].Namespace, jobs[0].ID) + require.NoError(t, err) + require.Nil(t, out) + + jobVersions, err = state.JobVersionsByID(ws, jobs[0].Namespace, jobs[0].ID) + require.NoError(t, err) + require.Empty(t, jobVersions) + + index, err := state.Index("jobs") + require.NoError(t, err) + require.Equal(t, deletionIndex, index) +} + func TestStateStore_DeleteJob_MultipleVersions(t *testing.T) { state := testStateStore(t) assert := assert.New(t)