Skip to content

Commit

Permalink
[BREAKING] opt(compactions): Improve compaction performance (#1574)
Browse files Browse the repository at this point in the history
Implement multiple ideas for speeding up compactions:

1. Dynamic Level Sizes: https://rocksdb.org/blog/2015/07/23/dynamic-level.html
2. L0 to L0 compactions: https://rocksdb.org/blog/2017/06/26/17-level-based-changes.html
3. Sub Compactions: Split up one compaction into multiple sub-compactions using key ranges, which can be run concurrently.
4. If a table being generated at Li overlaps with >= 10 tables at Li+1, finish the table. This helps avoid big overlaps and expensive compactions later.
5. Update compaction priority based on the priority of the next level prioritizing compactions of lower levels over upper levels, resulting in an always healthy LSM tree structure.

With these changes, we can load 1B entries (160GB of data) into Badger (without the Stream framework) in 1h25m at 31 MB/s. This is a significant improvement over current master.

Co-authored-by: Ibrahim Jarif <[email protected]>

fix(tests): Writebatch, Stream, Vlog tests (#1577)

This PR fixes the following issues/tests
 - Deadlock in writes batch - Use atomic to set value of `writebatch.error`
 - Vlog Truncate test - Fix issues with empty memtables
 - Test options - Set memtable size.
 - Compaction tests - Acquire lock before updating level tables
 - Vlog Write - Truncate the file size if the transaction cannot fit in vlog size
 - TestPersistLFDiscardStats - Set numLevelZeroTables=1 to force compaction.

This PR also fixes the failing bank test by adding an index cache to the bank test.
  • Loading branch information
manishrjain authored and NamanJain8 committed Nov 5, 2020
1 parent 2b71a94 commit e6f2291
Show file tree
Hide file tree
Showing 23 changed files with 888 additions and 439 deletions.
5 changes: 3 additions & 2 deletions badger/cmd/bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,14 +357,15 @@ func runTest(cmd *cobra.Command, args []string) error {
// Open DB
opts := badger.DefaultOptions(sstDir).
WithValueDir(vlogDir).
WithMaxTableSize(4 << 20). // Force more compactions.
WithBaseTableSize(4 << 20). // Force more compactions.
WithNumLevelZeroTables(2).
WithNumMemtables(2).
// Do not GC any versions, because we need them for the disect..
WithNumVersionsToKeep(int(math.MaxInt32)).
WithValueThreshold(1). // Make all values go to value log
WithCompression(options.ZSTD).
WithBlockCacheSize(10 << 20)
WithBlockCacheSize(10 << 20).
WithIndexCacheSize(10 << 20)

if verbose {
opts = opts.WithLoggingLevel(badger.DEBUG)
Expand Down
28 changes: 16 additions & 12 deletions badger/cmd/write_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var (
vlogMaxEntries uint32
loadBloomsOnOpen bool
detectConflicts bool
compression bool
zstdComp bool
showDir bool
ttlDuration string
showKeysCount bool
Expand Down Expand Up @@ -113,8 +113,8 @@ func init() {
"Load Bloom filter on DB open.")
writeBenchCmd.Flags().BoolVar(&detectConflicts, "conficts", false,
"If true, it badger will detect the conflicts")
writeBenchCmd.Flags().BoolVar(&compression, "compression", true,
"If true, badger will use ZSTD mode")
writeBenchCmd.Flags().BoolVar(&zstdComp, "zstd", false,
"If true, badger will use ZSTD mode. Otherwise, use default.")
writeBenchCmd.Flags().BoolVar(&showDir, "show-dir", false,
"If true, the report will include the directory contents")
writeBenchCmd.Flags().StringVar(&dropAllPeriod, "dropall", "0s",
Expand Down Expand Up @@ -260,12 +260,6 @@ func writeSorted(db *badger.DB, num uint64) error {
}

func writeBench(cmd *cobra.Command, args []string) error {
var cmode options.CompressionType
if compression {
cmode = options.ZSTD
} else {
cmode = options.None
}
opt := badger.DefaultOptions(sstDir).
WithValueDir(vlogDir).
WithSyncWrites(syncWrites).
Expand All @@ -277,8 +271,10 @@ func writeBench(cmd *cobra.Command, args []string) error {
WithValueLogMaxEntries(vlogMaxEntries).
WithEncryptionKey([]byte(encryptionKey)).
WithDetectConflicts(detectConflicts).
WithCompression(cmode).
WithLoggingLevel(badger.INFO)
if zstdComp {
opt = opt.WithCompression(options.ZSTD)
}

if !showLogs {
opt = opt.WithLogger(nil)
Expand Down Expand Up @@ -314,6 +310,7 @@ func writeBench(cmd *cobra.Command, args []string) error {
}

c.SignalAndWait()
fmt.Printf(db.LevelsToString())
return err
}

Expand Down Expand Up @@ -354,11 +351,13 @@ func reportStats(c *z.Closer, db *badger.DB) {
t := time.NewTicker(time.Second)
defer t.Stop()

var count int
for {
select {
case <-c.HasBeenClosed():
return
case <-t.C:
count++
if showKeysCount {
showKeysStats(db)
}
Expand Down Expand Up @@ -392,8 +391,13 @@ func reportStats(c *z.Closer, db *badger.DB) {
bytesRate := sz / uint64(dur.Seconds())
entriesRate := entries / uint64(dur.Seconds())
fmt.Printf("[WRITE] Time elapsed: %s, bytes written: %s, speed: %s/sec, "+
"entries written: %d, speed: %d/sec, gcSuccess: %d\n", y.FixedDuration(time.Since(startTime)),
humanize.Bytes(sz), humanize.Bytes(bytesRate), entries, entriesRate, gcSuccess)
"entries written: %d, speed: %d/sec, Memory: %s\n",
y.FixedDuration(time.Since(startTime)),
humanize.Bytes(sz), humanize.Bytes(bytesRate), entries, entriesRate,
humanize.IBytes(uint64(z.NumAllocBytes())))
if count%10 == 0 {
fmt.Printf(db.LevelsToString())
}
}
}
}
Expand Down
35 changes: 17 additions & 18 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package badger

import (
"sync"
"sync/atomic"

"github.com/dgraph-io/badger/v2/pb"
"github.com/dgraph-io/badger/v2/y"
Expand All @@ -30,7 +31,7 @@ type WriteBatch struct {
txn *Txn
db *DB
throttle *y.Throttle
err error
err atomic.Value

isManaged bool
commitTs uint64
Expand Down Expand Up @@ -82,18 +83,16 @@ func (wb *WriteBatch) Cancel() {
wb.txn.Discard()
}

// The caller of this callback must hold the lock.
func (wb *WriteBatch) callback(err error) {
// sync.WaitGroup is thread-safe, so it doesn't need to be run inside wb.Lock.
defer wb.throttle.Done(err)
if err == nil {
return
}

if wb.err != nil {
if err := wb.Error(); err != nil {
return
}
wb.err = err
wb.err.Store(err)
}

func (wb *WriteBatch) Write(kvList *pb.KVList) error {
Expand Down Expand Up @@ -135,7 +134,7 @@ func (wb *WriteBatch) handleEntry(e *Entry) error {
// This time the error must not be ErrTxnTooBig, otherwise, we make the
// error permanent.
if err := wb.txn.SetEntry(e); err != nil {
wb.err = err
wb.err.Store(err)
return err
}
return nil
Expand Down Expand Up @@ -172,28 +171,28 @@ func (wb *WriteBatch) Delete(k []byte) error {
return err
}
if err := wb.txn.Delete(k); err != nil {
wb.err = err
wb.err.Store(err)
return err
}
return nil
}

// Caller to commit must hold a write lock.
func (wb *WriteBatch) commit() error {
if wb.err != nil {
return wb.err
if err := wb.Error(); err != nil {
return err
}
if wb.finished {
return y.ErrCommitAfterFinish
}
if err := wb.throttle.Do(); err != nil {
wb.err = err
return wb.err
wb.err.Store(err)
return err
}
wb.txn.CommitWith(wb.callback)
wb.txn = wb.db.newTransaction(true, wb.isManaged)
wb.txn.commitTs = wb.commitTs
return wb.err
return wb.Error()
}

// Flush must be called at the end to ensure that any pending writes get committed to Badger. Flush
Expand All @@ -210,18 +209,18 @@ func (wb *WriteBatch) Flush() error {
wb.Unlock()

if err := wb.throttle.Finish(); err != nil {
if wb.err != nil {
return errors.Errorf("wb.err: %s err: %s", wb.err, err)
if wb.Error() != nil {
return errors.Errorf("wb.err: %s err: %s", wb.Error(), err)
}
return err
}

return wb.err
return wb.Error()
}

// Error returns any errors encountered so far. No commits would be run once an error is detected.
func (wb *WriteBatch) Error() error {
wb.Lock()
defer wb.Unlock()
return wb.err
// If the interface conversion fails, the err will be nil.
err, _ := wb.err.Load().(error)
return err
}
57 changes: 45 additions & 12 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type keyRange struct {
inf bool
}

func (r keyRange) isEmpty() bool {
return len(r.left) == 0 && len(r.right) == 0 && !r.inf
}

var infRange = keyRange{inf: true}

func (r keyRange) String() string {
Expand All @@ -45,7 +49,26 @@ func (r keyRange) equals(dst keyRange) bool {
r.inf == dst.inf
}

func (r *keyRange) extend(kr keyRange) {
if r.isEmpty() {
*r = kr
}
if len(r.left) == 0 || y.CompareKeys(kr.left, r.left) < 0 {
r.left = kr.left
}
if len(r.right) == 0 || y.CompareKeys(kr.right, r.right) > 0 {
r.right = kr.right
}
if kr.inf {
r.inf = true
}
}

func (r keyRange) overlapsWith(dst keyRange) bool {
// Empty keyRange always overlaps.
if r.isEmpty() {
return true
}
if r.inf || dst.inf {
return true
}
Expand Down Expand Up @@ -127,6 +150,7 @@ func (lcs *levelCompactStatus) remove(dst keyRange) bool {
type compactStatus struct {
sync.RWMutex
levels []*levelCompactStatus
tables map[uint64]struct{}
}

func (cs *compactStatus) overlapsWith(level int, this keyRange) bool {
Expand All @@ -151,11 +175,10 @@ func (cs *compactStatus) compareAndAdd(_ thisAndNextLevelRLocked, cd compactDef)
cs.Lock()
defer cs.Unlock()

level := cd.thisLevel.level

y.AssertTruef(level < len(cs.levels)-1, "Got level %d. Max levels: %d", level, len(cs.levels))
thisLevel := cs.levels[level]
nextLevel := cs.levels[level+1]
tl := cd.thisLevel.level
y.AssertTruef(tl < len(cs.levels)-1, "Got level %d. Max levels: %d", tl, len(cs.levels))
thisLevel := cs.levels[cd.thisLevel.level]
nextLevel := cs.levels[cd.nextLevel.level]

if thisLevel.overlapsWith(cd.thisRange) {
return false
Expand All @@ -171,31 +194,41 @@ func (cs *compactStatus) compareAndAdd(_ thisAndNextLevelRLocked, cd compactDef)
thisLevel.ranges = append(thisLevel.ranges, cd.thisRange)
nextLevel.ranges = append(nextLevel.ranges, cd.nextRange)
thisLevel.delSize += cd.thisSize
for _, t := range append(cd.top, cd.bot...) {
cs.tables[t.ID()] = struct{}{}
}
return true
}

func (cs *compactStatus) delete(cd compactDef) {
cs.Lock()
defer cs.Unlock()

level := cd.thisLevel.level
y.AssertTruef(level < len(cs.levels)-1, "Got level %d. Max levels: %d", level, len(cs.levels))
tl := cd.thisLevel.level
y.AssertTruef(tl < len(cs.levels)-1, "Got level %d. Max levels: %d", tl, len(cs.levels))

thisLevel := cs.levels[level]
nextLevel := cs.levels[level+1]
thisLevel := cs.levels[cd.thisLevel.level]
nextLevel := cs.levels[cd.nextLevel.level]

thisLevel.delSize -= cd.thisSize
found := thisLevel.remove(cd.thisRange)
found = nextLevel.remove(cd.nextRange) && found
if !cd.nextRange.isEmpty() {
found = nextLevel.remove(cd.nextRange) && found
}

if !found {
this := cd.thisRange
next := cd.nextRange
fmt.Printf("Looking for: [%q, %q, %v] in this level.\n", this.left, this.right, this.inf)
fmt.Printf("Looking for: %s in this level %d.\n", this, tl)
fmt.Printf("This Level:\n%s\n", thisLevel.debug())
fmt.Println()
fmt.Printf("Looking for: [%q, %q, %v] in next level.\n", next.left, next.right, next.inf)
fmt.Printf("Looking for: %s in next level %d.\n", next, cd.nextLevel.level)
fmt.Printf("Next Level:\n%s\n", nextLevel.debug())
log.Fatal("keyRange not found")
}
for _, t := range append(cd.top, cd.bot...) {
_, ok := cs.tables[t.ID()]
y.AssertTrue(ok)
delete(cs.tables, t.ID())
}
}
Loading

0 comments on commit e6f2291

Please sign in to comment.