Skip to content

Commit

Permalink
Add compaction pacing mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryanfsdf committed Jul 26, 2019
1 parent f26236c commit c70573f
Show file tree
Hide file tree
Showing 10 changed files with 311 additions and 92 deletions.
2 changes: 1 addition & 1 deletion cmd/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func newPebbleDB(dir string) DB {
DisableWAL: disableWAL,
MemTableSize: 64 << 20,
MemTableStopWritesThreshold: 4,
MinFlushRate: 4 << 20,
MinFlushRate: 1 << 20, // 1 MB/s
L0CompactionThreshold: 2,
L0StopWritesThreshold: 32,
LBaseMaxBytes: 64 << 20, // 64 MB
Expand Down
68 changes: 68 additions & 0 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/petermattis/pebble/internal/base"
"github.com/petermattis/pebble/internal/rangedel"
"github.com/petermattis/pebble/internal/rate"
"github.com/petermattis/pebble/sstable"
"github.com/petermattis/pebble/vfs"
)
Expand Down Expand Up @@ -685,6 +686,9 @@ func (d *DB) flush1() error {
return err
}

// Refresh bytes flushed count.
atomic.StoreUint64(&d.bytesFlushed, 0)

flushed := d.mu.mem.queue[:n]
d.mu.mem.queue = d.mu.mem.queue[n:]
d.updateReadStateLocked()
Expand Down Expand Up @@ -807,6 +811,7 @@ func (d *DB) compact1() (err error) {
if err != nil {
return err
}

d.updateReadStateLocked()
d.deleteObsoleteFiles(jobID)
return nil
Expand Down Expand Up @@ -999,6 +1004,10 @@ func (d *DB) runCompaction(c *compaction) (
totalBytes := d.memTableTotalBytes()
refreshDirtyBytesThreshold := uint64(d.opts.MemTableSize * 5 / 100)

var compactionSlowdownThreshold uint64
var totalCompactionDebt uint64
var estimatedMaxWAmp float64

for key, val := iter.First(); key != nil; key, val = iter.Next() {
// Slow down memtable flushing to match fill rate.
if c.flushing != nil {
Expand All @@ -1019,6 +1028,8 @@ func (d *DB) runCompaction(c *compaction) (
flushAmount := c.bytesIterated - prevBytesIterated
prevBytesIterated = c.bytesIterated

atomic.StoreUint64(&d.bytesFlushed, c.bytesIterated)

// We slow down memtable flushing when the dirty bytes indicator falls
// below the low watermark, which is 105% memtable size. This will only
// occur if memtable flushing can keep up with the pace of incoming
Expand All @@ -1045,6 +1056,63 @@ func (d *DB) runCompaction(c *compaction) (
}
d.flushLimiter.AllowN(time.Now(), int(flushAmount))
}
} else {
bytesFlushed := atomic.LoadUint64(&d.bytesFlushed)

if iterCount >= 1000 || c.bytesIterated > refreshDirtyBytesThreshold {
d.mu.Lock()
estimatedMaxWAmp = d.mu.versions.picker.estimatedMaxWAmp()
// compactionSlowdownThreshold is the low watermark for compaction debt. If compaction
// debt is below this threshold, we slow down compactions. If compaction debt is above
// this threshold, we let compactions continue as fast as possible. We want to keep
// compaction debt as low as possible to match the speed of flushes. This threshold
// is set so that a single flush cannot contribute enough compaction debt to overshoot
// the threshold.
compactionSlowdownThreshold = uint64(estimatedMaxWAmp * float64(d.opts.MemTableSize))
totalCompactionDebt = d.mu.versions.picker.estimatedCompactionDebt(bytesFlushed)
d.mu.Unlock()
refreshDirtyBytesThreshold = c.bytesIterated + uint64(d.opts.MemTableSize*5/100)
iterCount = 0
}
iterCount++

var curCompactionDebt uint64
if totalCompactionDebt > c.bytesIterated {
curCompactionDebt = totalCompactionDebt - c.bytesIterated
}

// Set the minimum compaction rate to match the minimum flush rate.
d.compactionLimiter.SetLimit(rate.Limit(float64(d.opts.MinFlushRate) * estimatedMaxWAmp))

compactAmount := c.bytesIterated - prevBytesIterated
// We slow down compactions when the compaction debt falls below the slowdown
// threshold, which is set dynamically based on the number of non-empty levels.
// This will only occur if compactions can keep up with the pace of flushes. If
// bytes are flushed faster than how fast compactions can occur, compactions
// proceed at maximum (unthrottled) speed.
if curCompactionDebt <= compactionSlowdownThreshold {
burst := d.compactionLimiter.Burst()
for compactAmount > uint64(burst) {
err := d.compactionLimiter.WaitN(context.Background(), burst)
if err != nil {
return nil, pendingOutputs, err
}
compactAmount -= uint64(burst)
}
err := d.compactionLimiter.WaitN(context.Background(), int(compactAmount))
if err != nil {
return nil, pendingOutputs, err
}
} else {
burst := d.compactionLimiter.Burst()
for compactAmount > uint64(burst) {
d.compactionLimiter.AllowN(time.Now(), burst)
compactAmount -= uint64(burst)
}
d.compactionLimiter.AllowN(time.Now(), int(compactAmount))
}

prevBytesIterated = c.bytesIterated
}
// TODO(peter,rangedel): Need to incorporate the range tombstones in the
// shouldStopBefore decision.
Expand Down
63 changes: 59 additions & 4 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ import (
// compaction picker is associated with a single version. A new compaction
// picker is created and initialized every time a new version is installed.
type compactionPicker struct {
opts *Options
vers *version

// The level to target for L0 compactions. Levels L1 to baseLevel must be
// empty.
baseLevel int

// smoothedLevelMultiplier is the size ratio between one level and the next.
smoothedLevelMultiplier float64

// levelMaxBytes holds the dynamically adjusted max bytes setting for each
// level.
levelMaxBytes [numLevels]int64
Expand All @@ -32,6 +36,7 @@ type compactionPicker struct {

func newCompactionPicker(v *version, opts *Options) *compactionPicker {
p := &compactionPicker{
opts: opts,
vers: v,
}
p.initLevelMaxBytes(v, opts)
Expand All @@ -46,6 +51,57 @@ func (p *compactionPicker) compactionNeeded() bool {
return p.score >= 1
}

// estimatedCompactionDebt estimates the number of bytes which need to be
// compacted before the LSM tree becomes stable.
func (p *compactionPicker) estimatedCompactionDebt(l0ExtraSize uint64) uint64 {
if p == nil {
return 0
}

compactionDebt := totalSize(p.vers.files[0]) + l0ExtraSize
bytesAddedToNextLevel := compactionDebt

var levelSize uint64
var nextLevelSize uint64
for level := p.baseLevel; level < numLevels - 1; level++ {
if level == p.baseLevel {
levelSize = totalSize(p.vers.files[level])
// predictedL0CompactionSize is the predicted size of the L0 component in the
// current or next L0->LBase compaction. This is needed to predict the number
// of L0->LBase compactions which will need to occur for the LSM tree to
// become stable.
predictedL0CompactionSize := uint64(p.opts.L0CompactionThreshold * p.opts.MemTableSize)
// The ratio bytesAddedToNextLevel(L0 Size)/predictedL0CompactionSize is the
// predicted number of L0->LBase compactions which will need to occur for the
// LSM tree to become stable. We multiply this by levelSize(LBase size) to
// estimate the compaction debt incurred by LBase in the L0->LBase compactions.
compactionDebt += (levelSize * bytesAddedToNextLevel) / predictedL0CompactionSize
} else {
// Every level after the base level has its level size computed at the
// end of the previous loop iteration, so reuse the result.
levelSize = nextLevelSize
nextLevelSize = 0
}

levelSize += bytesAddedToNextLevel
bytesAddedToNextLevel = 0
nextLevelSize = totalSize(p.vers.files[level + 1])
if levelSize > uint64(p.levelMaxBytes[level]) {
bytesAddedToNextLevel = levelSize - uint64(p.levelMaxBytes[level])
levelRatio := float64(nextLevelSize)/float64(levelSize)
compactionDebt += uint64(float64(bytesAddedToNextLevel) * (levelRatio + 1))
}
}

return compactionDebt
}

// estimatedMaxWAmp estimates the maximum possible write amp per byte that is
// added to L0.
func (p *compactionPicker) estimatedMaxWAmp() float64 {
return float64(numLevels - p.baseLevel) * (p.smoothedLevelMultiplier + 1)
}

func (p *compactionPicker) initLevelMaxBytes(v *version, opts *Options) {
// Determine the first non-empty level and the maximum size of any level.
firstNonEmptyLevel := -1
Expand Down Expand Up @@ -98,19 +154,18 @@ func (p *compactionPicker) initLevelMaxBytes(v *version, opts *Options) {
}
}

var smoothedLevelMultiplier float64
if p.baseLevel < numLevels-1 {
smoothedLevelMultiplier = math.Pow(
p.smoothedLevelMultiplier = math.Pow(
float64(bottomLevelSize)/float64(baseBytesMax),
1.0/float64(numLevels-p.baseLevel-1))
} else {
smoothedLevelMultiplier = 1.0
p.smoothedLevelMultiplier = 1.0
}

levelSize := float64(baseBytesMax)
for level := p.baseLevel; level < numLevels; level++ {
if level > p.baseLevel && levelSize > 0 {
levelSize *= smoothedLevelMultiplier
levelSize *= p.smoothedLevelMultiplier
}
// Round the result since test cases use small target level sizes, which
// can be impacted by floating-point imprecision + integer truncation.
Expand Down
Loading

0 comments on commit c70573f

Please sign in to comment.