Skip to content

Commit

Permalink
Merge pull request #4903 from hashicorp/b-delete-versions-mod-while-iter
Browse files Browse the repository at this point in the history
Fix a panic related to batch GC
  • Loading branch information
notnoop authored and Mahmood Ali committed Nov 20, 2018
1 parent 3a826a0 commit e3965d6
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 1 deletion.
10 changes: 9 additions & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,9 @@ func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memd
return err
}

// Put them into a slice so there are no safety concerns while actually
// performing the deletes
jobs := []*structs.Job{}
for {
raw := iter.Next()
if raw == nil {
Expand All @@ -1103,7 +1106,12 @@ func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memd
continue
}

if _, err = txn.DeleteAll("job_version", "id", j.Namespace, j.ID, j.Version); err != nil {
jobs = append(jobs, j)
}

// Do the deletes
for _, j := range jobs {
if err := txn.Delete("job_version", j); err != nil {
return fmt.Errorf("deleting job versions failed: %v", err)
}
}
Expand Down
69 changes: 69 additions & 0 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1614,6 +1614,75 @@ func TestStateStore_DeleteJob_Job(t *testing.T) {
}
}

func TestStateStore_DeleteJobTxn_BatchDeletes(t *testing.T) {
state := testStateStore(t)

const testJobCount = 10
const jobVersionCount = 4

stateIndex := uint64(1000)

jobs := make([]*structs.Job, testJobCount)
for i := 0; i < testJobCount; i++ {
stateIndex++
job := mock.BatchJob()

err := state.UpsertJob(stateIndex, job)
require.NoError(t, err)

jobs[i] = job

// Create some versions
for vi := 1; vi < jobVersionCount; vi++ {
stateIndex++

job := job.Copy()
job.TaskGroups[0].Tasks[0].Env = map[string]string{
"Version": fmt.Sprintf("%d", vi),
}

require.NoError(t, state.UpsertJob(stateIndex, job))
}
}

ws := memdb.NewWatchSet()

// Sanity check that jobs are present in DB
job, err := state.JobByID(ws, jobs[0].Namespace, jobs[0].ID)
require.NoError(t, err)
require.Equal(t, jobs[0].ID, job.ID)

jobVersions, err := state.JobVersionsByID(ws, jobs[0].Namespace, jobs[0].ID)
require.NoError(t, err)
require.Equal(t, jobVersionCount, len(jobVersions))

// Actually delete
const deletionIndex = uint64(10001)
err = state.WithWriteTransaction(func(txn Txn) error {
for i, job := range jobs {
err := state.DeleteJobTxn(deletionIndex, job.Namespace, job.ID, txn)
require.NoError(t, err, "failed at %d %e", i, err)
}
return nil
})
assert.NoError(t, err)

assert.True(t, watchFired(ws))

ws = memdb.NewWatchSet()
out, err := state.JobByID(ws, jobs[0].Namespace, jobs[0].ID)
require.NoError(t, err)
require.Nil(t, out)

jobVersions, err = state.JobVersionsByID(ws, jobs[0].Namespace, jobs[0].ID)
require.NoError(t, err)
require.Empty(t, jobVersions)

index, err := state.Index("jobs")
require.NoError(t, err)
require.Equal(t, deletionIndex, index)
}

func TestStateStore_DeleteJob_MultipleVersions(t *testing.T) {
state := testStateStore(t)
assert := assert.New(t)
Expand Down

0 comments on commit e3965d6

Please sign in to comment.