From ed2ae1e40fad835a1069e738ee2a6d72f687b1fc Mon Sep 17 00:00:00 2001 From: Anish Shanbhag Date: Wed, 21 Aug 2024 15:27:45 -0400 Subject: [PATCH] compact: add shared compaction pool for multiple stores This change adds a new compaction pool which enforces a global max compaction concurrency in a multi-store configuration. Each Pebble store (i.e. an instance of *DB) still maintains its own per-store compaction concurrency which is controlled by `opts.MaxConcurrentCompactions`. However, in a multi-store configuration, disk I/O is a per-store resource while CPU is shared across stores. A significant portion of compaction is CPU-intensive, and so this ensures that excessive compactions don't interrupt foreground CPU tasks even if the disks are capable of handling the additional throughput from those compactions. --- compaction.go | 176 ++++++++++++++++++++++++++++---------- compaction_picker_test.go | 9 +- db.go | 11 +++ format_major_version.go | 4 +- open.go | 2 + options.go | 9 ++ snapshot.go | 2 +- 7 files changed, 156 insertions(+), 57 deletions(-) diff --git a/compaction.go b/compaction.go index 7947748abd..1d4d2007fc 100644 --- a/compaction.go +++ b/compaction.go @@ -9,8 +9,10 @@ import ( "context" "fmt" "math" + "runtime" "runtime/pprof" "slices" + "sync" "sync/atomic" "time" @@ -1644,21 +1646,10 @@ func (d *DB) maybeScheduleCompactionAsync() { d.mu.Unlock() } -// maybeScheduleCompaction schedules a compaction if necessary. -// -// d.mu must be held when calling this. -func (d *DB) maybeScheduleCompaction() { - d.maybeScheduleCompactionPicker(pickAuto) -} - func pickAuto(picker compactionPicker, env compactionEnv) *pickedCompaction { return picker.pickAuto(env) } -func pickElisionOnly(picker compactionPicker, env compactionEnv) *pickedCompaction { - return picker.pickElisionOnlyCompaction(env) -} - // tryScheduleDownloadCompaction tries to start a download compaction. // // Returns true if we started a download compaction (or completed it @@ -1683,27 +1674,15 @@ func (d *DB) tryScheduleDownloadCompaction(env compactionEnv, maxConcurrentDownl return false } -// maybeScheduleCompactionPicker schedules a compaction if necessary, -// calling `pickFunc` to pick automatic compactions. +// withCompactionEnv runs the specified function after initializing the +// compaction picking environment. If the DB is read-only or has already been +// closed, the function will not be run. // // Requires d.mu to be held. -func (d *DB) maybeScheduleCompactionPicker( - pickFunc func(compactionPicker, compactionEnv) *pickedCompaction, -) { +func (d *DB) withCompactionEnv(f func(env compactionEnv)) { if d.closed.Load() != nil || d.opts.ReadOnly { return } - maxCompactions := d.opts.MaxConcurrentCompactions() - maxDownloads := d.opts.MaxConcurrentDownloads() - - if d.mu.compact.compactingCount >= maxCompactions && - (len(d.mu.compact.downloads) == 0 || d.mu.compact.downloadingCount >= maxDownloads) { - if len(d.mu.compact.manual) > 0 { - // Inability to run head blocks later manual compactions. - d.mu.compact.manual[0].retries++ - } - return - } // Compaction picking needs a coherent view of a Version. In particular, we // need to exclude concurrent ingestions from making a decision on which level @@ -1722,9 +1701,102 @@ func (d *DB) maybeScheduleCompactionPicker( diskAvailBytes: d.diskAvailBytes.Load(), earliestSnapshotSeqNum: d.mu.snapshots.earliest(), earliestUnflushedSeqNum: d.getEarliestUnflushedSeqNumLocked(), + inProgressCompactions: d.getInProgressCompactionInfoLocked(nil), + readCompactionEnv: readCompactionEnv{ + readCompactions: &d.mu.compact.readCompactions, + flushing: d.mu.compact.flushing || d.passedFlushThreshold(), + rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction, + }, + } + + f(env) +} + +// compactionPool enforces a global max compaction concurrency in a multi-store +// configuration. If multiple DBs are waiting to perform a compaction, it +// prioritizes the DB with the highest compaction score across levels. +type compactionPool struct { + mu sync.Mutex + compactingCount int + waiting map[*DB]struct{} + maxCompactionConcurrency int +} + +// NewCompactionPool creates a new compactionPool with the specified +// maxCompactionConcurrency. +func NewCompactionPool(maxCompactionConcurrency int) *compactionPool { + if maxCompactionConcurrency <= 0 { + panic("pebble: maxCompactionConcurrency for a CompactionPool must be greater than 0") + } + return &compactionPool{ + maxCompactionConcurrency: maxCompactionConcurrency, + waiting: make(map[*DB]struct{}), + } +} + +var defaultCompactionPool = NewCompactionPool(runtime.GOMAXPROCS(0) * 2) + +// maybeScheduleWaitingCompactionLocked attempts to schedule a waiting +// compaction from the list of waiting DBs. It prioritizes the DB with the +// highest compaction score across all levels. If no DBs have a compaction +// score above the threshold, it effectively picks a DB at random. +// +// c.mu must be held. DB.mu must not be held for any DB. +func (c *compactionPool) maybeScheduleWaitingCompactionLocked() { + if len(c.waiting) == 0 || c.compactingCount >= c.maxCompactionConcurrency { + return + } + + // NB: highestScore starts at 1 so that we effectively have no preference + // between two DBs that don't have any level with a score above 1. + highestScore := float64(compactionScoreThreshold) + var pickedDB *DB + for d := range c.waiting { + if len(c.waiting) == 1 { + pickedDB = d + // No need to calculate scores if only one DB is waiting. + break + } + d.mu.Lock() + inProgress := d.getInProgressCompactionInfoLocked(nil) + scores := d.mu.versions.picker.getScores(inProgress) + if pickedDB == nil || scores[0] >= highestScore { + highestScore = scores[0] + pickedDB = d + } + d.mu.Unlock() + } + + pickedDB.mu.Lock() + if !pickedDB.tryScheduleAutoCompaction() { + // If we can't schedule a compaction for this DB right now, mark it as + // no longer waiting. + delete(c.waiting, pickedDB) } + pickedDB.mu.Unlock() + + c.maybeScheduleWaitingCompactionLocked() +} + +// maybeScheduleCompaction schedules a compaction if necessary. +// +// Requires d.mu to be held. +func (d *DB) maybeScheduleCompaction() { + d.withCompactionEnv(func(env compactionEnv) { + maxDownloads := d.opts.MaxConcurrentDownloads() + for len(d.mu.compact.downloads) > 0 && d.mu.compact.downloadingCount < maxDownloads && + d.tryScheduleDownloadCompaction(env, maxDownloads) { + } + + maxCompactions := d.opts.MaxConcurrentCompactions() + if d.mu.compact.compactingCount >= maxCompactions { + if len(d.mu.compact.manual) > 0 { + // Inability to run head blocks later manual compactions. + d.mu.compact.manual[0].retries++ + } + return + } - if d.mu.compact.compactingCount < maxCompactions { // Check for delete-only compactions first, because they're expected to be // cheap and reduce future compaction work. if !d.opts.private.disableDeleteOnlyCompactions && @@ -1741,14 +1813,19 @@ func (d *DB) maybeScheduleCompactionPicker( } d.mu.compact.manual = d.mu.compact.manual[1:] } + }) - for !d.opts.DisableAutomaticCompactions && d.mu.compact.compactingCount < maxCompactions && - d.tryScheduleAutoCompaction(env, pickFunc) { - } - } - - for len(d.mu.compact.downloads) > 0 && d.mu.compact.downloadingCount < maxDownloads && - d.tryScheduleDownloadCompaction(env, maxDownloads) { + if !d.opts.DisableAutomaticCompactions { + // NB: we must release d.mu to avoid deadlock when calling + // maybeScheduleWaitingCompactionLocked below. + d.mu.Unlock() + d.compactionPool.mu.Lock() + // Mark this DB as waiting for an automatic compaction to + // be scheduled. + d.compactionPool.waiting[d] = struct{}{} + d.compactionPool.maybeScheduleWaitingCompactionLocked() + d.compactionPool.mu.Unlock() + d.mu.Lock() } } @@ -1801,24 +1878,31 @@ func (d *DB) tryScheduleManualCompaction(env compactionEnv, manual *manualCompac // Returns false if no automatic compactions are necessary or able to run at // this time. // -// Requires d.mu to be held. -func (d *DB) tryScheduleAutoCompaction( - env compactionEnv, pickFunc func(compactionPicker, compactionEnv) *pickedCompaction, -) bool { - env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil) - env.readCompactionEnv = readCompactionEnv{ - readCompactions: &d.mu.compact.readCompactions, - flushing: d.mu.compact.flushing || d.passedFlushThreshold(), - rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction, +// Requires d.mu and d.compactionPool.mu to be held. +func (d *DB) tryScheduleAutoCompaction() bool { + if d.mu.compact.compactingCount >= d.opts.MaxConcurrentCompactions() { + return false } - pc := pickFunc(d.mu.versions.picker, env) + + var pc *pickedCompaction + d.withCompactionEnv(func(env compactionEnv) { + pc = pickAuto(d.mu.versions.picker, env) + }) + if pc == nil { return false } c := newCompaction(pc, d.opts, d.timeNow(), d.ObjProvider()) d.mu.compact.compactingCount++ + d.compactionPool.compactingCount++ d.addInProgressCompaction(c) - go d.compact(c, nil) + go func() { + d.compact(c, nil) + d.compactionPool.mu.Lock() + d.compactionPool.compactingCount-- + d.compactionPool.maybeScheduleWaitingCompactionLocked() + d.compactionPool.mu.Unlock() + }() return true } diff --git a/compaction_picker_test.go b/compaction_picker_test.go index 481f4a6cd1..432f2edcdc 100644 --- a/compaction_picker_test.go +++ b/compaction_picker_test.go @@ -1413,16 +1413,11 @@ func TestCompactionPickerPickFile(t *testing.T) { d.mu.Lock() defer d.mu.Unlock() - // Use maybeScheduleCompactionPicker to take care of all of the - // initialization of the compaction-picking environment, but never - // pick a compaction; just call pickFile using the user-provided - // level. var lf manifest.LevelFile var ok bool - d.maybeScheduleCompactionPicker(func(untypedPicker compactionPicker, env compactionEnv) *pickedCompaction { - p := untypedPicker.(*compactionPickerByScore) + d.withCompactionEnv(func(env compactionEnv) { + p := d.mu.versions.picker.(*compactionPickerByScore) lf, ok = pickCompactionSeedFile(p.vers, p.virtualBackings, opts, level, level+1, env.earliestSnapshotSeqNum) - return nil }) if !ok { return "(none)" diff --git a/db.go b/db.go index 8c7b6f16e5..4293e59474 100644 --- a/db.go +++ b/db.go @@ -323,6 +323,17 @@ type DB struct { // compactionShedulers.Wait() should not be called while the DB.mu is held. compactionSchedulers sync.WaitGroup + // compactionPool enforces a global max compaction concurrency in a + // multi-store configuration. Each Pebble store (i.e. an instance of *DB) + // has its own per-store compaction concurrency which is controlled by + // opts.MaxConcurrentCompactions. However, in a multi-store configuration, + // disk I/O is a per-store resource while CPU is shared across stores. + // A significant portion of compaction is CPU-intensive, and so + // compactionPool is necessary to ensure that excessive compactions don't + // interrupt foreground CPU tasks even if the disks are capable of handling + // the additional throughput from those compactions. + compactionPool *compactionPool + // The main mutex protecting internal DB state. This mutex encompasses many // fields because those fields need to be accessed and updated atomically. In // particular, the current version, log.*, mem.*, and snapshot list need to diff --git a/format_major_version.go b/format_major_version.go index 90ef9f0b6a..653de0c8f9 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -402,9 +402,7 @@ func (d *DB) compactMarkedFilesLocked() error { for curr.Stats.MarkedForCompaction > 0 { // Attempt to schedule a compaction to rewrite a file marked for // compaction. - d.maybeScheduleCompactionPicker(func(picker compactionPicker, env compactionEnv) *pickedCompaction { - return picker.pickRewriteCompaction(env) - }) + d.maybeScheduleCompaction() // The above attempt might succeed and schedule a rewrite compaction. Or // there might not be available compaction concurrency to schedule the diff --git a/open.go b/open.go index 46ecaa49d4..d4bea66dbf 100644 --- a/open.go +++ b/open.go @@ -597,6 +597,8 @@ func Open(dirname string, opts *Options) (db *DB, err error) { } d.calculateDiskAvailableBytes() + d.compactionPool = opts.CompactionPool + d.maybeScheduleFlush() d.maybeScheduleCompaction() diff --git a/options.go b/options.go index 21a7b92563..931632a331 100644 --- a/options.go +++ b/options.go @@ -975,6 +975,12 @@ type Options struct { // The default value is 1. MaxConcurrentCompactions func() int + // CompactionPool is an instance of compactionPool that enforces a global + // maximum compaction concurrency in a multi-store configuration. By + // default, up to runtime.GOMAXPROCS(0) compactions are allowed to run + // concurrently. + CompactionPool *compactionPool + // MaxConcurrentDownloads specifies the maximum number of download // compactions. These are compactions that copy an external file to the local // store. @@ -1268,6 +1274,9 @@ func (o *Options) EnsureDefaults() *Options { if o.MaxConcurrentCompactions == nil { o.MaxConcurrentCompactions = func() int { return 1 } } + if o.CompactionPool == nil { + o.CompactionPool = defaultCompactionPool + } if o.MaxConcurrentDownloads == nil { o.MaxConcurrentDownloads = func() int { return 1 } } diff --git a/snapshot.go b/snapshot.go index 1be35931c4..6ebf88c090 100644 --- a/snapshot.go +++ b/snapshot.go @@ -117,7 +117,7 @@ func (s *Snapshot) closeLocked() error { // If s was the previous earliest snapshot, we might be able to reclaim // disk space by dropping obsolete records that were pinned by s. if e := s.db.mu.snapshots.earliest(); e > s.seqNum { - s.db.maybeScheduleCompactionPicker(pickElisionOnly) + s.db.maybeScheduleCompaction() } s.db = nil return nil