Skip to content

Commit

Permalink
*: Fix race in compactMarkedFilesLocked with concurrent compactions
Browse files Browse the repository at this point in the history
Presently there's a race with concurrent compactions and the logic
in `compactMarkedFilesLocked` where we expect each call to
`maybeScheduleCompactionPicker` to create at least one compaction,
when that is not guaranteed especially around concurrent compactions.

This change updates the logic there to only wait on compactions
if we still have marked files for compaction at wait time.

Fixes cockroachdb#2044.
  • Loading branch information
itsbilal committed Oct 27, 2022
1 parent dd12a22 commit e1da16a
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 6 deletions.
20 changes: 14 additions & 6 deletions format_major_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,14 +412,22 @@ func (d *DB) compactMarkedFilesLocked() error {
if err := d.closed.Load(); err != nil {
return err.(error)
}
// NB: Waiting on this condition variable drops d.mu while blocked.
d.mu.compact.cond.Wait()

// Some flush or compaction was scheduled or completed. Loop again to
// check again for files that must be compacted. The next iteration may
// find same file again, but that's okay. It'll eventually succeed in
// scheduling the compaction and eventually be woken by its completion.
// Some flush or compaction may have scheduled or completed while we waited
// for the manifest lock in maybeScheduleCompactionPicker. Get the latest
// Version before waiting on a compaction.
curr = d.mu.versions.currentVersion()

// Only wait on compactions if there are files still marked for compaction.
// NB: Waiting on this condition variable drops d.mu while blocked.
if curr.Stats.MarkedForCompaction > 0 {
if d.mu.compact.compactingCount == 0 {
panic("expected a compaction of marked files in progress")
}
d.mu.compact.cond.Wait()
// Refresh the current version again.
curr = d.mu.versions.currentVersion()
}
}
return nil
}
Expand Down
39 changes: 39 additions & 0 deletions format_major_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/pebble/bloom"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/datadriven"
"github.com/cockroachdb/pebble/internal/testkeys"
Expand Down Expand Up @@ -520,3 +521,41 @@ func TestPebblev1MigrationRace(t *testing.T) {
require.NoError(t, d.RatchetFormatMajorVersion(FormatPrePebblev1Marked))
wg.Wait()
}

// Regression test for #2044, where multiple concurrent compactions can lead
// to an indefinite wait on the compaction goroutine in compactMarkedFilesLocked.
func TestPebblev1MigrationConcurrencyRace(t *testing.T) {
opts := &Options{
Comparer: testkeys.Comparer,
FS: vfs.NewMem(),
FormatMajorVersion: FormatSplitUserKeysMarked,
Levels: []LevelOptions{{FilterPolicy: bloom.FilterPolicy(10)}},
MaxConcurrentCompactions: func() int {
return 4
},
}
func() {
d, err := Open("", opts)
require.NoError(t, err)
defer func() {
require.NoError(t, d.Close())
}()

ks := testkeys.Alpha(3).EveryN(10)
var key [3]byte
for i := 0; i < ks.Count(); i++ {
n := testkeys.WriteKey(key[:], ks, i)
require.NoError(t, d.Set(key[:n], key[:n], nil))
if i%100 == 0 {
require.NoError(t, d.Flush())
}
}
require.NoError(t, d.Flush())
}()

opts.FormatMajorVersion = FormatPrePebblev1MarkedCompacted
d, err := Open("", opts)
require.NoError(t, err)
require.NoError(t, d.RatchetFormatMajorVersion(FormatPrePebblev1MarkedCompacted))
require.NoError(t, d.Close())
}

0 comments on commit e1da16a

Please sign in to comment.